swoole方案 时序数据库 (TSDB) 写入缓冲层

AI8小时前发布 beixibaobao
2 0 0
composer require influxdata/influxdb-client-php
<?php
// composer require influxdata/influxdb-client-php
// 缓冲层:收单条指标,攒批后写 InfluxDB
// php buffer.php
use InfluxDB2Client;
use InfluxDB2Point;
require __DIR__ . '/vendor/autoload.php';
$client = new Client([
    'url' => 'http://127.0.0.1:8086',
    'token' => 'your-token',
    'bucket' => 'metrics',
    'org' => 'myorg',
]);
$buffer = [];
$batchSize = 100;
$flushInterval = 5000; // 5秒
$server = new SwooleHTTPServer('0.0.0.0', 9090);
$server->on('workerStart', function () use (&$buffer, $client, $flushInterval) {
    SwooleTimer::tick($flushInterval, function () use (&$buffer, $client) {
        if (empty($buffer)) return;
        $writeApi = $client->createWriteApi();
        foreach ($buffer as $p) {
            $writeApi->write($p);
        }
        $writeApi->close();
        echo "批量写入 " . count($buffer) . " 条n";
        $buffer = [];
    });
});
$server->on('request', function ($req, $res) use (&$buffer, $batchSize, $client) {
    $data = $req->post;
    $point = Point::measurement($data['metric'] ?? 'unknown')
        ->addTag('host', $data['host'] ?? 'localhost')
        ->addField('value', (float)($data['value'] ?? 0))
        ->time(time());
    $buffer[] = $point;
    // 攒够立即写
    if (count($buffer) >= $batchSize) {
        $writeApi = $client->createWriteApi();
        foreach ($buffer as $p) {
            $writeApi->write($p);
        }
        $writeApi->close();
        echo "批量写入 " . count($buffer) . " 条n";
        $buffer = [];
    }
    $res->end('ok');
});
$server->start();
composer require guzzlehttp/guzzle
© 版权声明

相关文章