KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

AI2天前发布 beixibaobao
2 0 0

        针对智能家居云端数据处理模式的网络依赖、低延迟性差、隐私泄露三大痛点,基于 KaiwuDB(KWDB)多模时序数据库 + 华为 CodeArts 代码智能体的本地化数据处理解决方案。从环境搭建、KWDB 自动化部署,到系统全模块开发、接口测试实现全流程落地,打造零云端依赖、低延迟、高隐私的智能家居本地化数据处理系统,方案基于开源技术栈与自动化开发工具,降低技术门槛,适配新手开发者与实际家庭场景需求。

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

        随着智能家居设备渗透率持续提升,家庭中温湿度传感器、智能灯、空调、门锁等设备呈规模化增长,设备运行产生的时序数据(温湿度、能耗、设备状态)与关系型数据(设备信息、规则配置)呈爆发式增长,对数据的存储、处理与利用提出更高要求。

本文选择KaiwuDB作为本地化数据存储与计算核心,华为 CodeArts 代码智能体作为自动化研发引擎,二者结合实现智能家居本地化数据处理系统的高效构建,核心优势如下:

1.1 KaiwuDB:适配 AIoT 场景的多模数据库基座

KaiwuDB(开源版本简称 KWDB)是开放原子开源基金会孵化的分布式多模数据库,专为 AIoT 场景设计,核心特性完美匹配智能家居本地化需求:

  • 多模融合:支持时序库与关系库统一实例管理,无需多库部署,一站式处理智能家居时序数据与关系型数据,简化架构与运维成本;
  • 极致性能:就地计算、智能预计算技术实现百万级数据秒级写入、亿级数据秒级读取,单机支持 1 秒 20 亿记录数据探索,满足设备高频采集与低延迟查询需求;
  • 本地高可靠:支持离线运行、断网可用,数据全程本地留存,从根源保障隐私;特有压缩算法节省 90% 时序数据存储空间,降低本地硬件成本;
  • 高兼容性:支持标准 SQL 语法与主流数据库工具,协议插件框架可快速适配 MQTT、Modbus 等智能家居设备协议,扩展性强。

1.2 华为 CodeArts 代码智能体:自动化研发效率引擎

华为云 CodeArts 代码智能体融合 AI 大模型与 IDE 工具,实现自然语言到代码的全流程转化,核心能力如下:

  • 自然语言转开发:将业务需求直接转化为结构化开发步骤,自动完成代码生成、配置编写、接口开发,非专业开发者也可参与系统构建;
  • 环境自动适配:自主调用 Shell 命令、代码搜索等工具,完成 KaiwuDB 部署、表结构设计、硬件环境适配,大幅减少手动操作;
  • 代码质量保障:内置代码规范检查、漏洞扫描、单元测试生成,确保代码符合工业级标准,支持多人协同与快速迭代;
  • 全流程自动化:从数据库部署到系统开发、测试,全程通过自然语言指令驱动,大幅缩短研发周期。

下面将详细介绍系统搭建的完整流程,从环境准备到系统测试一步到位

一、环境准备

1.1、配置vscode远程链接服务器

CodeArts代码智能体自动部署kaiwudb与自动化构建智能家居本地化数据处理系统所有操作在vscode下完成

1.1.1、首先在vscode下载安装 remote-ssh扩展,打开vscode,在扩展里搜索remote-ssh,然后点击安装

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

1.1.2、链接远程服务器

点击remote-ssh扩展,点击加号,然后按着提示输入:ssh root@192.168.150.111,这里服务器替换成自己的ip与用户,然后回车确认

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

选择ssh信息存放的配置文件

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

至此配置完成,点击登录

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

因为我们是linux的服务器,这里选择linux

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

输入密码然后回车完成

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

至此链接远程服务器成功

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

2.2、准备CodeArts代码智能体

CodeArts代码智能体是基于智能生成、智能问答两大核心能力构建起一套全方位、多层次的智能开发体系。在智能生成方面,它能够依据开发者输入的需求描述,准确且高效地生成高质量代码;

2.2.1、安装CodeArts扩展

在vscode下载安装 CodeArts代码智能体,在扩展里搜索CodeArts,然后点击安装

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

2.2.2、CodeArts登录

安装完成后点击CodeArts图标,然后点击登录

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

然后登录华为云账号进行登录

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

登录成功后如下

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

在CodeArts代码智能体对话框下侧点Agent模式,在弹出的菜单最后侧点设置按钮,进入授权所有自动化操作界面,分别勾选读取文件和目录编辑文件执行命令更新代办、执行task工具、使用浏览器。        

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

至此CodeArts代码智能体已经准备完成了,下面全部交给ai帮我们部署数据库,开发部署应用

二、数据库自动部署

借助 CodeArts 代码智能体的自动化能力,通过自然语言指令即可完成 KaiwuDB 的 Docker 化部署,全程无需手动编写部署脚本

在CodeArts代码智能体里输入提示词:帮我使用docker部署一个最新版的kwdb,然后等待数据库部署

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

等待十几分钟后我们来看成果,可以看到数据库已经部署成功了

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

进入容器查看kwdb的服务

docker exec -it kwdb-server bash

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

连接到kwdb的SQL客户端查看下数据库版本,及数据的基础功能是否正常,若终端显示 KaiwuDB 版本号(如 3.1.0),且容器状态为「Up」,则说明部署成功。

docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

是不是很简单,不得不惊叹科技进步成这个样子了

部署后可通过以下命令管理 KWDB 容器,方便后续运维

# 查看容器状态
docker ps --filter "name=kwdb-server"
# 查看日志
docker logs -f kwdb-server
# 进入容器
docker exec -it kwdb-server bash
# 连接到SQL客户端
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
# 停止服务
docker-compose down
# 启动服务
docker-compose up -d

三、智能家居本地化数据处理系统开发

3.1 发送开发指令,启动自动化构建

基于已部署的 KaiwuDB,通过 CodeArts 智能体指令驱动,自动完成智能家居本地化数据处理系统的全模块开发,包括设备接入、数据处理、规则引擎、本地存储、Web API 等核心模块。

先构造好提示词如下,在 CodeArts 智能体模式下,输入以下 Prompt 指令,引导智能体完成应用开发::

使用上面部署的kwdb多模时序数据库请为我设计一套「智能家居本地化数据处理系统」,核心目标是:
1. 所有设备数据(如传感器数据、设备控制指令、用户行为数据)均在本地处理,不上传至云端,保障用户隐私;
2. 系统具备低延迟、高稳定性,支持断网情况下智能家居设备正常联动;
3. 适配常见智能家居设备(如温湿度传感器、人体红外传感器、智能灯、智能窗帘、智能空调、智能门锁);
4. 支持简单的自动化规则自定义(如“当检测到有人且光线低于阈值时自动开灯”)。
5. 核心功能模块设计:
   - 设备接入模块:如何适配不同协议的设备,实现数据采集;
   - 本地化数据处理模块:实时数据解析、清洗、转换的逻辑;
   - 规则引擎模块:用户自定义自动化规则的存储与执行逻辑;
   - 本地存储模块:数据存储策略(如哪些数据需要时序表、哪些数据持久化关系表、存储周期);

此时可以看到,智能体开始自动构建任务,生成开发方案、编写代码。

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

3.2、系统整体架构

系统采用分层架构设计,所有模块均基于本地运行,数据全量存储在 KaiwuDB,实现零云端依赖

┌─────────────────────────────────────────────────────────────┐
│                    智能家居本地化数据处理系统                    │
├─────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │
│  │  设备接入层   │  │  数据处理层   │  │  规则引擎层   │       │
│  │ - MQTT协议   │  │ - 数据解析    │  │ - 规则存储    │       │
│  │ - HTTP协议   │  │ - 数据清洗    │  │ - 规则匹配    │       │
│  │ - CoAP协议   │  │ - 数据转换    │  │ - 规则执行    │       │
│  │ - Zigbee网关 │  │ - 数据验证    │  │ - 动作触发    │       │
│  └──────────────┘  └──────────────┘  └──────────────┘       │
│         │                  │                  │              │
│         └──────────────────┼──────────────────┘              │
│                            │                                 │
│                   ┌────────▼────────┐                        │
│                   │   KaiwuDB数据库  │                        │
│                   │ - 时序数据表    │                        │
│                   │ - 设备信息表    │                        │
│                   │ - 规则配置表    │                        │
│                   │ - 事件日志表    │                        │
│                   └─────────────────┘                        │
└─────────────────────────────────────────────────────────────┘

3.3、项目结构

CodeArts 智能体将生成规范的项目目录,所有模块解耦设计,便于后续扩展与维护,核心项目结构如下:

smarthome-system/
├── README.md
├── docker-compose.yml          # 系统部署配置
├── database/
│   ├── schema.sql              # 数据库初始化脚本
│   └── sample_data.sql         # 示例数据
├── device-adapter/
│   ├── mqtt_adapter.py         # MQTT协议适配器
│   ├── http_adapter.py         # HTTP协议适配器
│   ├── coap_adapter.py         # CoAP协议适配器
│   └── zigbee_adapter.py       # Zigbee网关适配器
├── data-processor/
│   ├── parser.py               # 数据解析器
│   ├── cleaner.py              # 数据清洗器
│   ├── transformer.py          # 数据转换器
│   └── validator.py            # 数据验证器
├── rule-engine/
│   ├── rule_manager.py         # 规则管理器
│   ├── rule_executor.py        # 规则执行器
│   ├── condition_matcher.py    # 条件匹配器
│   └── action_trigger.py       # 动作触发器
├── storage/
│   ├── timeseries_store.py     # 时序数据存储
│   ├── relational_store.py     # 关系数据存储
│   └── data_retention.py       # 数据保留策略
├── web-api/
│   ├── app.py                  # Flask应用
│   ├── device_api.py           # 设备管理API
│   ├── rule_api.py             # 规则管理API
│   └── data_api.py             # 数据查询API
└── scripts/
    ├── init_system.py          # 系统初始化脚本
    └── test_system.py          # 系统测试脚本

3.4、KaiwuDB 核心库表设计

智能家居本地化数据处理系统时序表设计:

-- 创建时序数据库
CREATE TS DATABASE IF NOT EXISTS tsdb;
USE tsdb;
-- ============================================
-- 时序表设计
-- ============================================
-- 1. 传感器时序数据表
CREATE TABLE IF NOT EXISTS home_sensor (
  ts TIMESTAMP NOT NULL,
  value FLOAT,
  status VARCHAR(32),
  file_path VARCHAR(128)
) ATTRIBUTES (
  device_id VARCHAR(64) NOT NULL,
  data_type VARCHAR(32) NOT NULL,
  location VARCHAR(32),
  device_model VARCHAR(64)
)
PRIMARY TAGS(device_id)
RETENTIONS 20D
ACTIVETIME 12h;
-- 2. 设备控制指令时序表
CREATE TABLE IF NOT EXISTS device_commands (
  ts TIMESTAMP NOT NULL,
  command_type VARCHAR(64),
  parameters varchar(64),
  status VARCHAR(32),
  executed_at TIMESTAMP,
  error_message VARCHAR(256)
) ATTRIBUTES (
  device_id VARCHAR(64) NOT NULL,
  command_id VARCHAR(128),
  user_id VARCHAR(64),
  source VARCHAR(32)
)
PRIMARY TAGS(device_id)
RETENTIONS 30D
ACTIVETIME 1h;
-- 3. 规则执行历史时序表
CREATE TABLE IF NOT EXISTS rule_execution (
  ts TIMESTAMP NOT NULL,
  trigger_event  VARCHAR(32),
  conditions_matched  VARCHAR(32),
  actions_executed VARCHAR(32),
  status VARCHAR(32),
  execution_time_ms INT,
  error_message VARCHAR(256)
) ATTRIBUTES (
  rule_id INT NOT NULL,
  rule_name VARCHAR(128) NOT NULL,
  priority INT,
  triggered_by VARCHAR(64)
)
PRIMARY TAGS(rule_id)
RETENTIONS 60D
ACTIVETIME 6h;
-- 4. 系统事件时序表
CREATE TABLE IF NOT EXISTS system_events (
  ts TIMESTAMP NOT NULL,
  event_type VARCHAR(64),
  event_level VARCHAR(32),
  message VARCHAR(512),
  details  VARCHAR(64)
) ATTRIBUTES (
  source VARCHAR(64) NOT NULL,
  device_id VARCHAR(64)
)
PRIMARY TAGS(source)
RETENTIONS 30D
ACTIVETIME 1h;
-- 5. 用户行为时序表
CREATE TABLE IF NOT EXISTS user_activity (
  ts TIMESTAMP NOT NULL,
  action_type VARCHAR(64),
  device_id VARCHAR(64),
  action_details  VARCHAR(64)
) ATTRIBUTES (
    user_id VARCHAR(64) NOT NULL,
    session_id VARCHAR(128),
    ip_address VARCHAR(64),
    user_agent VARCHAR(256)
  )
  PRIMARY TAGS(user_id)
  RETENTIONS 90D
  ACTIVETIME 24h;

智能家居本地化数据处理系统关系表设计:

USE defaultdb;
-- 1. 设备管理表(关系型表)
CREATE TABLE IF NOT EXISTS devices (
  id INT PRIMARY KEY,
  name STRING NOT NULL,
  device_id STRING NOT NULL UNIQUE,  -- 设备唯一标识
  device_type STRING NOT NULL,
  protocol STRING NOT NULL,
  location STRING,
  status STRING DEFAULT 'online',
  config JSONB,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices(device_id);
CREATE INDEX IF NOT EXISTS idx_devices_type ON devices(device_type);
CREATE INDEX IF NOT EXISTS idx_devices_location ON devices(location);
CREATE INDEX IF NOT EXISTS idx_devices_status ON devices(status);
-- 2. 自动化规则表(关系型表)
CREATE TABLE IF NOT EXISTS automation_rules (
  id INT PRIMARY KEY,
  name STRING NOT NULL,
  description STRING,
  enabled BOOL DEFAULT TRUE,
  priority INT DEFAULT 0,
  conditions JSONB NOT NULL,
  actions JSONB NOT NULL,
  created_by STRING DEFAULT 'system',
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  last_triggered_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_rules_enabled ON automation_rules(enabled);
CREATE INDEX IF NOT EXISTS idx_rules_priority ON automation_rules(priority DESC);
-- 3. 数据聚合统计表(关系型表)
CREATE TABLE IF NOT EXISTS data_statistics (
  id INT PRIMARY KEY,
  device_id STRING NOT NULL,
  data_type STRING NOT NULL,
  stat_type STRING NOT NULL,
  time_window STRING NOT NULL,
  stat_value DECIMAL(20,6),
  stat_time TIMESTAMP NOT NULL,
  calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_data_stats_device ON data_statistics(device_id, data_type, stat_type, time_window, stat_time);
-- 4. 设备状态快照表(关系型表)
CREATE TABLE IF NOT EXISTS device_snapshots (
  id INT PRIMARY KEY,
  device_id STRING NOT NULL,
  snapshot_time TIMESTAMP NOT NULL,
  current_state JSONB,
  battery_level INT,
  signal_strength INT,
  last_seen TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_snapshots_device ON device_snapshots(device_id, snapshot_time DESC);
-- ============================================
-- 创建序列
-- ============================================
CREATE SEQUENCE IF NOT EXISTS devices_seq START 1;
CREATE SEQUENCE IF NOT EXISTS automation_rules_seq START 1;
CREATE SEQUENCE IF NOT EXISTS data_statistics_seq START 1;
CREATE SEQUENCE IF NOT EXISTS device_snapshots_seq START 1;

智能家居本地化数据处理系统视图设计:

-- 创建视图
-- ============================================
CREATE VIEW IF NOT EXISTS v_device_latest_status AS
SELECT
    d.id,
    d.name,
    d.device_id,
    d.device_type,
    d.location,
    d.status,
    hs.ts as last_update,
    hs.value,
    hs.data_type,
    hs.status as sensor_status
FROM devices d
LEFT JOIN (
    SELECT device_id, data_type, ts, value, status,
           ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn
    FROM tsdb.home_sensor
) hs ON d.device_id = hs.device_id AND hs.rn = 1;
CREATE VIEW IF NOT EXISTS v_active_rules AS
SELECT
    id,
    name,
    description,
    priority,
    conditions,
    actions,
    last_triggered_at
FROM automation_rules
WHERE enabled = TRUE
ORDER BY priority DESC;

智能家居本地化数据处理系统留的数据接口

POST /api/data/receive - 接收设备数据
POST /api/device/<id>/control - 控制设备
GET /api/devices - 获取设备列表
GET /api/sensor-data/latest - 获取最新传感器数据
GET /api/rules - 获取自动化规则
GET /api/statistics - 获取统计数据

发送传感器数据样例:

curl -X POST http://localhost:5001/api/data/receive 
  -H "Content-Type: application/json" 
  -d '{"device_id": "sensor_temp_livingroom", "value": 26.5, "data_type": "temperature", "unit": "°C"}'

控制设备样例:

curl -X POST http://localhost:5001/api/device/5/control 
  -H "Content-Type: application/json" 
  -d '{"command": "turn_on", "parameters": {"brightness": 80}}'

准备一些测试数据


INSERT INTO devices (id, name, device_id, device_type, protocol, location, status, config) VALUES
(1, '客厅温度传感器', 'sensor_temp_livingroom', 'temperature', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/temp", "interval": 60}'),
(2, '客厅湿度传感器', 'sensor_humidity_livingroom', 'humidity', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/humidity", "interval": 60}'),
(3, '客厅人体传感器', 'sensor_motion_livingroom', 'motion', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/motion", "interval": 5}'),
(4, '客厅光照传感器', 'sensor_light_livingroom', 'light', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/light", "interval": 30}'),
(5, '客厅智能灯', 'device_light_livingroom', 'smart_light', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness", "set_color"]}'),
(6, '卧室温度传感器', 'sensor_temp_bedroom', 'temperature', 'mqtt', '卧室', 'online', '{"topic": "sensors/bedroom/temp", "interval": 60}'),
(7, '卧室智能灯', 'device_light_bedroom', 'smart_light', 'mqtt', '卧室', 'online', '{"topic": "devices/bedroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness"]}'),
(8, '智能门锁', 'device_lock_entrance', 'smart_lock', 'mqtt', '玄关', 'online', '{"topic": "devices/entrance/lock", "supported_commands": ["lock", "unlock", "get_status"]}'),
(9, '智能窗帘', 'device_curtain_livingroom', 'smart_curtain', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/curtain", "supported_commands": ["open", "close", "set_position"]}'),
(10, '智能空调', 'device_ac_livingroom', 'smart_ac', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/ac", "supported_commands": ["turn_on", "turn_off", "set_temperature", "set_mode"]}}');
-- ============================================
-- 插入示例自动化规则
-- ============================================
INSERT INTO automation_rules (id, name, description, enabled, priority, conditions, actions) VALUES
(1, '客厅有人自动开灯', '检测到客厅有人且光线低于阈值时自动开灯', TRUE, 10,
 '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": true}, {"device_id": "sensor_light_livingroom", "type": "light", "operator": "<", "value": 100}], "logic": "AND"}',
 '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_on", "parameters": {"brightness": 80}}]}'),
(2, '客厅无人自动关灯', '客厅无人超过10分钟自动关灯', TRUE, 5,
 '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": false, "duration": 600}], "logic": "AND"}',
 '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}]}'),
(3, '温度过高自动开空调', '客厅温度超过28度自动开启空调', TRUE, 8,
 '{"conditions": [{"device_id": "sensor_temp_livingroom", "type": "temperature", "operator": ">", "value": 28}], "logic": "AND"}',
 '{"actions": [{"device_id": "device_ac_livingroom", "command": "turn_on", "parameters": {"temperature": 24, "mode": "cool"}}]}'),
(4, '离家模式', '离家时关闭所有灯光和空调', TRUE, 15,
 '{"conditions": [{"device_id": "device_lock_entrance", "type": "door_lock", "operator": "=", "value": "locked"}], "logic": "AND"}',
 '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}, {"device_id": "device_light_bedroom", "command": "turn_off"}, {"device_id": "device_ac_livingroom", "command": "turn_off"}]}');

整体项目结构其实挺复杂的,这里把核心代码贴上:

#!/usr/bin/env python3
"""
智能家居Web应用主程序
提供数据展示和控制界面
"""
import sys
import os
import json
import logging
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify, request
# 添加模块路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../data-processor'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../rule-engine'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../storage'))
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder='templates', static_folder='static')
# 模拟数据库连接
class MockDB:
    """模拟数据库连接"""
    def execute_query(self, query, params=None):
        """执行查询"""
        import subprocess
        try:
            cmd = ['docker', 'exec', 'kwdb-server', '/kaiwudb/bin/kwbase', 'sql', '--insecure', '--execute', query]
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                lines = result.stdout.strip().split('n')
                if len(lines) > 1:
                    # 解析结果
                    headers = lines[0].split('t')
                    data = []
                    for line in lines[1:]:
                        values = line.split('t')
                        if len(values) == len(headers):
                            data.append(values)
                    return data
            return []
        except Exception as e:
            logger.error(f"查询失败: {e}")
            return []
db = MockDB()
@app.route('/')
def index():
    """首页"""
    return render_template('index.html')
@app.route('/api/devices')
def get_devices():
    """获取设备列表"""
    query = """
    SELECT id, name, device_id, device_type, protocol, location, status
    FROM devices
    ORDER BY id
    """
    results = db.execute_query(query)
    devices = []
    if results and len(results) > 1:
        headers = results[0]
        for row in results[1:]:
            device = {
                'id': row[0],
                'name': row[1],
                'device_id': row[2],
                'device_type': row[3],
                'protocol': row[4],
                'location': row[5],
                'status': row[6]
            }
            devices.append(device)
    return jsonify(devices)
@app.route('/api/devices/<int:device_id>/data')
def get_device_data(device_id):
    """获取设备数据"""
    hours = request.args.get('hours', 24, type=int)
    start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
    # 获取设备信息
    device_query = f"SELECT device_id FROM devices WHERE id = {device_id}"
    device_result = db.execute_query(device_query)
    if not device_result or len(device_result) < 2:
        return jsonify({'error': '设备不存在'}), 404
    device_id_str = device_result[1][0]
    # 获取传感器数据
    data_query = f"""
    SELECT ts, value, data_type, status
    FROM sensor_data
    WHERE device_id = '{device_id_str}' AND ts >= '{start_time}'
    ORDER BY ts DESC
    LIMIT 100
    """
    results = db.execute_query(data_query)
    data_points = []
    if results and len(results) > 1:
        for row in results[1:]:
            data_points.append({
                'timestamp': row[0],
                'value': float(row[1]) if row[1] else None,
                'data_type': row[2],
                'status': row[3]
            })
    return jsonify({
        'device_id': device_id,
        'device_id_str': device_id_str,
        'data': data_points
    })
@app.route('/api/sensor-data/latest')
def get_latest_sensor_data():
    """获取最新传感器数据"""
    query = """
    SELECT
        d.id,
        d.name,
        d.device_type,
        d.location,
        d.status,
        sd.ts as last_update,
        sd.value,
        sd.data_type,
        sd.unit
    FROM devices d
    LEFT JOIN (
        SELECT device_id, data_type, ts, value, unit,
               ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn
        FROM sensor_data
    ) sd ON d.device_id = sd.device_id AND sd.rn = 1
    ORDER BY d.id
    """
    results = db.execute_query(query)
    sensor_data = []
    if results and len(results) > 1:
        for row in results[1:]:
            sensor_data.append({
                'id': row[0],
                'name': row[1],
                'device_type': row[2],
                'location': row[3],
                'status': row[4],
                'last_update': row[5],
                'value': float(row[6]) if row[6] else None,
                'data_type': row[7],
                'unit': row[8] if len(row) > 8 else None
            })
    return jsonify(sensor_data)
@app.route('/api/rules')
def get_rules():
    """获取自动化规则"""
    query = """
    SELECT id, name, description, enabled, priority, last_triggered_at
    FROM automation_rules
    ORDER BY priority DESC
    """
    results = db.execute_query(query)
    rules = []
    if results and len(results) > 1:
        for row in results[1:]:
            rules.append({
                'id': row[0],
                'name': row[1],
                'description': row[2],
                'enabled': row[3] == 't',
                'priority': int(row[4]),
                'last_triggered_at': row[5]
            })
    return jsonify(rules)
@app.route('/api/statistics')
def get_statistics():
    """获取统计数据"""
    stats = {}
    # 设备统计
    device_query = """
    SELECT
        device_type,
        COUNT(*) as count,
        COUNT(CASE WHEN status = 'online' THEN 1 END) as online_count
    FROM devices
    GROUP BY device_type
    """
    device_results = db.execute_query(device_query)
    stats['devices'] = {}
    if device_results and len(device_results) > 1:
        for row in device_results[1:]:
            stats['devices'][row[0]] = {
                'total': int(row[1]),
                'online': int(row[2])
            }
    # 传感器数据统计
    data_query = """
    SELECT COUNT(*) as total_records
    FROM sensor_data
    """
    data_results = db.execute_query(data_query)
    if data_results and len(data_results) > 1:
        stats['sensor_data'] = {
            'total_records': int(data_results[1][0])
        }
    # 规则统计
    rule_query = """
    SELECT
        COUNT(*) as total,
        COUNT(CASE WHEN enabled = TRUE THEN 1 END) as enabled
    FROM automation_rules
    """
    rule_results = db.execute_query(rule_query)
    if rule_results and len(rule_results) > 1:
        stats['rules'] = {
            'total': int(rule_results[1][0]),
            'enabled': int(rule_results[1][1])
        }
    return jsonify(stats)
@app.route('/api/data/receive', methods=['POST'])
def receive_data():
    """接收设备数据"""
    try:
        data = request.json
        # 验证数据
        if not data or 'device_id' not in data or 'value' not in data:
            return jsonify({'error': '缺少必要字段'}), 400
        # 插入数据
        device_id = data['device_id']
        value = data['value']
        data_type = data.get('data_type', 'value')
        status = data.get('status', 'good')
        unit = data.get('unit', '')
        query = f"""
        INSERT INTO sensor_data (time, device_id, data_type, value, status, unit)
        VALUES (NOW(), '{device_id}', '{data_type}', {value}, '{status}', '{unit}')
        """
        db.execute_query(query)
        return jsonify({'success': True, 'message': '数据接收成功'})
    except Exception as e:
        logger.error(f"接收数据失败: {e}")
        return jsonify({'error': str(e)}), 500
@app.route('/api/device/<int:device_id>/control', methods=['POST'])
def control_device(device_id):
    """控制设备"""
    try:
        data = request.json
        if not data or 'command' not in data:
            return jsonify({'error': '缺少命令参数'}), 400
        command = data['command']
        parameters = data.get('parameters', {})
        # 获取设备信息
        device_query = f"SELECT device_id, name FROM devices WHERE id = {device_id}"
        device_result = db.execute_query(device_query)
        if not device_result or len(device_result) < 2:
            return jsonify({'error': '设备不存在'}), 404
        device_id_str = device_result[1][0]
        device_name = device_result[1][1]
        # 记录控制指令
        cmd_query = f"""
        INSERT INTO device_commands (time, device_id, command_type, parameters, status)
        VALUES (NOW(), '{device_id_str}', '{command}', '{json.dumps(parameters)}', 'success')
        """
        db.execute_query(cmd_query)
        logger.info(f"控制设备: {device_name} ({device_id_str}), 命令: {command}")
        return jsonify({
            'success': True,
            'message': f'设备 {device_name} 控制指令已发送',
            'command': command,
            'parameters': parameters
        })
    except Exception as e:
        logger.error(f"控制设备失败: {e}")
        return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001, debug=True)

至此系统开发完成

四、系统运行与功能测试

启动系统 Web API 服务,并通过接口测试页面验证,确认系统所有功能正常运行

1、启动系统 Web API 服务

在命令行输入启动命令并运行

cd /root/workspace/smarthome-system/web-api && python3 simple_app.py > /tmp/smarthome-web-5002.log 2>&1 &
root@kwdb:~/workspace/smarthome-system/web-api# python3 simple_app.py
启动智能家居Web服务...
 * Serving Flask app 'simple_app'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5002
 * Running on http://192.168.150.111:5002
Press CTRL+C to quit
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /static/css/style.css HTTP/1.1" 200 -
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /static/js/app.js HTTP/1.1" 200 -
执行查询: 
    SELECT id, name, device_type, protocol, location, status
    FROM devices
    ORDER BY id
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /favicon.ico HTTP/1.1" 404 -
查询结果: [['1', '客厅温度传感器', 'temperature', 'mqtt', '客厅', 'online'], ['2', '客厅湿度传感器', 'humidity', 'mqtt', '客厅', 'online'], ['3', '客厅人体传感器', 'motion', 'mqtt', '客厅', 'online'], ['4', '客厅光照传感器', 'light', 'mqtt', '客厅', 'online'], ['5', '客厅智能灯', 'smart_light', 'mqtt', '客厅', 'online'], ['6', '卧室温度传感器', 'temperature', 'mqtt', '卧室', 'online'], ['7', '卧室智能灯', 'smart_light', 'mqtt', '卧室', 'online'], ['8', '智能门锁', 'smart_lock', 'mqtt', '玄关', 'online'], ['9', '智能窗帘', 'smart_curtain', 'mqtt', '客厅', 'online'], ['10', '智能空调', 'smart_ac', 'mqtt', '客厅', 'online']]

启动成功后,终端将显示:Running on http://[你的服务器IP]:5002,即服务启动完成,点击在浏览器中打开可以看到程序运行的界面

系统概览:设备总数、在线设备数、自动化规则数、数据记录数及设备列表

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

实时数据:温湿度、光照、人体感应等传感器的最新数据与更新时间;自动化规则:所有规则的名称、描述、启用状态、优先级;

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

系统状态:KaiwuDB 版本、服务运行状态、接口文档,这里为了方便把数据接口放在了最下面,便于查找调用

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

2、测试接口

通过数据接口插入客厅传感器数据

curl -X POST http://localhost:5002/api/data/receive   -H "Content-Type: application/json"   -d '{"device_id": "temperature", "value": 32, "data_type": "temperature", "unit": "°C"}'

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

将传感器实时数据上报至本地系统,数据自动存储至 KaiwuDB 时序表,这里可以看到客厅问题传感器已经标称32度

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

测试一下测试设备列表API

curl -s http://localhost:5002/api/devices

可以看到返回了列表数据

KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统

至此项目开发全部结束,希望对想研究ai及kwdb数据库的同学有一些帮助

五、总结

本文以 KaiwuDB 为数据基座,华为 CodeArts 代码智能体为自动化开发引擎,实现了智能家居本地化数据处理系统的快速构建,全程无需复杂的手动编码,新手也可按步骤复现。随着 AIoT 技术的发展,本地化数据处理将成为智能家居的主流趋势,KaiwuDB 多模数据库让开发者实现 “轻量、高效、安全” 的智能场景落地。

© 版权声明

相关文章