swoole方案 时序数据库 (TSDB) 写入缓冲层
composer require influxdata/influxdb-client-php
<?php
// composer require influxdata/influxdb-client-php
// 缓冲层:收单条指标,攒批后写 InfluxDB
// php buffer.php
use InfluxDB2Client;
use InfluxDB2Point;
require __DIR__ . '/vendor/autoload.php';
$client = new Client([
'url' => 'http://127.0.0.1:8086',
'token' => 'your-token',
'bucket' => 'metrics',
'org' => 'myorg',
]);
$buffer = [];
$batchSize = 100;
$flushInterval = 5000; // 5秒
$server = new SwooleHTTPServer('0.0.0.0', 9090);
$server->on('workerStart', function () use (&$buffer, $client, $flushInterval) {
SwooleTimer::tick($flushInterval, function () use (&$buffer, $client) {
if (empty($buffer)) return;
$writeApi = $client->createWriteApi();
foreach ($buffer as $p) {
$writeApi->write($p);
}
$writeApi->close();
echo "批量写入 " . count($buffer) . " 条n";
$buffer = [];
});
});
$server->on('request', function ($req, $res) use (&$buffer, $batchSize, $client) {
$data = $req->post;
$point = Point::measurement($data['metric'] ?? 'unknown')
->addTag('host', $data['host'] ?? 'localhost')
->addField('value', (float)($data['value'] ?? 0))
->time(time());
$buffer[] = $point;
// 攒够立即写
if (count($buffer) >= $batchSize) {
$writeApi = $client->createWriteApi();
foreach ($buffer as $p) {
$writeApi->write($p);
}
$writeApi->close();
echo "批量写入 " . count($buffer) . " 条n";
$buffer = [];
}
$res->end('ok');
});
$server->start();
composer require guzzlehttp/guzzle
© 版权声明
文章版权归作者所有,未经允许请勿转载。