KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统
针对智能家居云端数据处理模式的网络依赖、低延迟性差、隐私泄露三大痛点,基于 KaiwuDB(KWDB)多模时序数据库 + 华为 CodeArts 代码智能体的本地化数据处理解决方案。从环境搭建、KWDB 自动化部署,到系统全模块开发、接口测试实现全流程落地,打造零云端依赖、低延迟、高隐私的智能家居本地化数据处理系统,方案基于开源技术栈与自动化开发工具,降低技术门槛,适配新手开发者与实际家庭场景需求。

随着智能家居设备渗透率持续提升,家庭中温湿度传感器、智能灯、空调、门锁等设备呈规模化增长,设备运行产生的时序数据(温湿度、能耗、设备状态)与关系型数据(设备信息、规则配置)呈爆发式增长,对数据的存储、处理与利用提出更高要求。
本文选择KaiwuDB作为本地化数据存储与计算核心,华为 CodeArts 代码智能体作为自动化研发引擎,二者结合实现智能家居本地化数据处理系统的高效构建,核心优势如下:
1.1 KaiwuDB:适配 AIoT 场景的多模数据库基座
KaiwuDB(开源版本简称 KWDB)是开放原子开源基金会孵化的分布式多模数据库,专为 AIoT 场景设计,核心特性完美匹配智能家居本地化需求:
- 多模融合:支持时序库与关系库统一实例管理,无需多库部署,一站式处理智能家居时序数据与关系型数据,简化架构与运维成本;
- 极致性能:就地计算、智能预计算技术实现百万级数据秒级写入、亿级数据秒级读取,单机支持 1 秒 20 亿记录数据探索,满足设备高频采集与低延迟查询需求;
- 本地高可靠:支持离线运行、断网可用,数据全程本地留存,从根源保障隐私;特有压缩算法节省 90% 时序数据存储空间,降低本地硬件成本;
- 高兼容性:支持标准 SQL 语法与主流数据库工具,协议插件框架可快速适配 MQTT、Modbus 等智能家居设备协议,扩展性强。
1.2 华为 CodeArts 代码智能体:自动化研发效率引擎
华为云 CodeArts 代码智能体融合 AI 大模型与 IDE 工具,实现自然语言到代码的全流程转化,核心能力如下:
- 自然语言转开发:将业务需求直接转化为结构化开发步骤,自动完成代码生成、配置编写、接口开发,非专业开发者也可参与系统构建;
- 环境自动适配:自主调用 Shell 命令、代码搜索等工具,完成 KaiwuDB 部署、表结构设计、硬件环境适配,大幅减少手动操作;
- 代码质量保障:内置代码规范检查、漏洞扫描、单元测试生成,确保代码符合工业级标准,支持多人协同与快速迭代;
- 全流程自动化:从数据库部署到系统开发、测试,全程通过自然语言指令驱动,大幅缩短研发周期。
下面将详细介绍系统搭建的完整流程,从环境准备到系统测试一步到位
一、环境准备
1.1、配置vscode远程链接服务器
CodeArts代码智能体自动部署kaiwudb与自动化构建智能家居本地化数据处理系统所有操作在vscode下完成
1.1.1、首先在vscode下载安装 remote-ssh扩展,打开vscode,在扩展里搜索remote-ssh,然后点击安装

1.1.2、链接远程服务器
点击remote-ssh扩展,点击加号,然后按着提示输入:ssh root@192.168.150.111,这里服务器替换成自己的ip与用户,然后回车确认

选择ssh信息存放的配置文件

至此配置完成,点击登录

因为我们是linux的服务器,这里选择linux

输入密码然后回车完成

至此链接远程服务器成功

2.2、准备CodeArts代码智能体
CodeArts代码智能体是基于智能生成、智能问答两大核心能力构建起一套全方位、多层次的智能开发体系。在智能生成方面,它能够依据开发者输入的需求描述,准确且高效地生成高质量代码;
2.2.1、安装CodeArts扩展
在vscode下载安装 CodeArts代码智能体,在扩展里搜索CodeArts,然后点击安装

2.2.2、CodeArts登录
安装完成后点击CodeArts图标,然后点击登录

然后登录华为云账号进行登录

登录成功后如下

在CodeArts代码智能体对话框下侧点Agent模式,在弹出的菜单最后侧点设置按钮,进入授权所有自动化操作界面,分别勾选读取文件和目录、编辑文件、执行命令、更新代办、执行task工具、使用浏览器。


至此CodeArts代码智能体已经准备完成了,下面全部交给ai帮我们部署数据库,开发部署应用
二、数据库自动部署
借助 CodeArts 代码智能体的自动化能力,通过自然语言指令即可完成 KaiwuDB 的 Docker 化部署,全程无需手动编写部署脚本
在CodeArts代码智能体里输入提示词:帮我使用docker部署一个最新版的kwdb,然后等待数据库部署

等待十几分钟后我们来看成果,可以看到数据库已经部署成功了

进入容器查看kwdb的服务
docker exec -it kwdb-server bash

连接到kwdb的SQL客户端查看下数据库版本,及数据的基础功能是否正常,若终端显示 KaiwuDB 版本号(如 3.1.0),且容器状态为「Up」,则说明部署成功。
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure


是不是很简单,不得不惊叹科技进步成这个样子了
部署后可通过以下命令管理 KWDB 容器,方便后续运维
# 查看容器状态
docker ps --filter "name=kwdb-server"
# 查看日志
docker logs -f kwdb-server
# 进入容器
docker exec -it kwdb-server bash
# 连接到SQL客户端
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
# 停止服务
docker-compose down
# 启动服务
docker-compose up -d
三、智能家居本地化数据处理系统开发
3.1 发送开发指令,启动自动化构建
基于已部署的 KaiwuDB,通过 CodeArts 智能体指令驱动,自动完成智能家居本地化数据处理系统的全模块开发,包括设备接入、数据处理、规则引擎、本地存储、Web API 等核心模块。
先构造好提示词如下,在 CodeArts 智能体模式下,输入以下 Prompt 指令,引导智能体完成应用开发::
使用上面部署的kwdb多模时序数据库请为我设计一套「智能家居本地化数据处理系统」,核心目标是:
1. 所有设备数据(如传感器数据、设备控制指令、用户行为数据)均在本地处理,不上传至云端,保障用户隐私;
2. 系统具备低延迟、高稳定性,支持断网情况下智能家居设备正常联动;
3. 适配常见智能家居设备(如温湿度传感器、人体红外传感器、智能灯、智能窗帘、智能空调、智能门锁);
4. 支持简单的自动化规则自定义(如“当检测到有人且光线低于阈值时自动开灯”)。
5. 核心功能模块设计:
- 设备接入模块:如何适配不同协议的设备,实现数据采集;
- 本地化数据处理模块:实时数据解析、清洗、转换的逻辑;
- 规则引擎模块:用户自定义自动化规则的存储与执行逻辑;
- 本地存储模块:数据存储策略(如哪些数据需要时序表、哪些数据持久化关系表、存储周期);
此时可以看到,智能体开始自动构建任务,生成开发方案、编写代码。

3.2、系统整体架构
系统采用分层架构设计,所有模块均基于本地运行,数据全量存储在 KaiwuDB,实现零云端依赖
┌─────────────────────────────────────────────────────────────┐
│ 智能家居本地化数据处理系统 │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 设备接入层 │ │ 数据处理层 │ │ 规则引擎层 │ │
│ │ - MQTT协议 │ │ - 数据解析 │ │ - 规则存储 │ │
│ │ - HTTP协议 │ │ - 数据清洗 │ │ - 规则匹配 │ │
│ │ - CoAP协议 │ │ - 数据转换 │ │ - 规则执行 │ │
│ │ - Zigbee网关 │ │ - 数据验证 │ │ - 动作触发 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ KaiwuDB数据库 │ │
│ │ - 时序数据表 │ │
│ │ - 设备信息表 │ │
│ │ - 规则配置表 │ │
│ │ - 事件日志表 │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
3.3、项目结构
CodeArts 智能体将生成规范的项目目录,所有模块解耦设计,便于后续扩展与维护,核心项目结构如下:
smarthome-system/
├── README.md
├── docker-compose.yml # 系统部署配置
├── database/
│ ├── schema.sql # 数据库初始化脚本
│ └── sample_data.sql # 示例数据
├── device-adapter/
│ ├── mqtt_adapter.py # MQTT协议适配器
│ ├── http_adapter.py # HTTP协议适配器
│ ├── coap_adapter.py # CoAP协议适配器
│ └── zigbee_adapter.py # Zigbee网关适配器
├── data-processor/
│ ├── parser.py # 数据解析器
│ ├── cleaner.py # 数据清洗器
│ ├── transformer.py # 数据转换器
│ └── validator.py # 数据验证器
├── rule-engine/
│ ├── rule_manager.py # 规则管理器
│ ├── rule_executor.py # 规则执行器
│ ├── condition_matcher.py # 条件匹配器
│ └── action_trigger.py # 动作触发器
├── storage/
│ ├── timeseries_store.py # 时序数据存储
│ ├── relational_store.py # 关系数据存储
│ └── data_retention.py # 数据保留策略
├── web-api/
│ ├── app.py # Flask应用
│ ├── device_api.py # 设备管理API
│ ├── rule_api.py # 规则管理API
│ └── data_api.py # 数据查询API
└── scripts/
├── init_system.py # 系统初始化脚本
└── test_system.py # 系统测试脚本
3.4、KaiwuDB 核心库表设计
智能家居本地化数据处理系统时序表设计:
-- 创建时序数据库
CREATE TS DATABASE IF NOT EXISTS tsdb;
USE tsdb;
-- ============================================
-- 时序表设计
-- ============================================
-- 1. 传感器时序数据表
CREATE TABLE IF NOT EXISTS home_sensor (
ts TIMESTAMP NOT NULL,
value FLOAT,
status VARCHAR(32),
file_path VARCHAR(128)
) ATTRIBUTES (
device_id VARCHAR(64) NOT NULL,
data_type VARCHAR(32) NOT NULL,
location VARCHAR(32),
device_model VARCHAR(64)
)
PRIMARY TAGS(device_id)
RETENTIONS 20D
ACTIVETIME 12h;
-- 2. 设备控制指令时序表
CREATE TABLE IF NOT EXISTS device_commands (
ts TIMESTAMP NOT NULL,
command_type VARCHAR(64),
parameters varchar(64),
status VARCHAR(32),
executed_at TIMESTAMP,
error_message VARCHAR(256)
) ATTRIBUTES (
device_id VARCHAR(64) NOT NULL,
command_id VARCHAR(128),
user_id VARCHAR(64),
source VARCHAR(32)
)
PRIMARY TAGS(device_id)
RETENTIONS 30D
ACTIVETIME 1h;
-- 3. 规则执行历史时序表
CREATE TABLE IF NOT EXISTS rule_execution (
ts TIMESTAMP NOT NULL,
trigger_event VARCHAR(32),
conditions_matched VARCHAR(32),
actions_executed VARCHAR(32),
status VARCHAR(32),
execution_time_ms INT,
error_message VARCHAR(256)
) ATTRIBUTES (
rule_id INT NOT NULL,
rule_name VARCHAR(128) NOT NULL,
priority INT,
triggered_by VARCHAR(64)
)
PRIMARY TAGS(rule_id)
RETENTIONS 60D
ACTIVETIME 6h;
-- 4. 系统事件时序表
CREATE TABLE IF NOT EXISTS system_events (
ts TIMESTAMP NOT NULL,
event_type VARCHAR(64),
event_level VARCHAR(32),
message VARCHAR(512),
details VARCHAR(64)
) ATTRIBUTES (
source VARCHAR(64) NOT NULL,
device_id VARCHAR(64)
)
PRIMARY TAGS(source)
RETENTIONS 30D
ACTIVETIME 1h;
-- 5. 用户行为时序表
CREATE TABLE IF NOT EXISTS user_activity (
ts TIMESTAMP NOT NULL,
action_type VARCHAR(64),
device_id VARCHAR(64),
action_details VARCHAR(64)
) ATTRIBUTES (
user_id VARCHAR(64) NOT NULL,
session_id VARCHAR(128),
ip_address VARCHAR(64),
user_agent VARCHAR(256)
)
PRIMARY TAGS(user_id)
RETENTIONS 90D
ACTIVETIME 24h;
智能家居本地化数据处理系统关系表设计:
USE defaultdb;
-- 1. 设备管理表(关系型表)
CREATE TABLE IF NOT EXISTS devices (
id INT PRIMARY KEY,
name STRING NOT NULL,
device_id STRING NOT NULL UNIQUE, -- 设备唯一标识
device_type STRING NOT NULL,
protocol STRING NOT NULL,
location STRING,
status STRING DEFAULT 'online',
config JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices(device_id);
CREATE INDEX IF NOT EXISTS idx_devices_type ON devices(device_type);
CREATE INDEX IF NOT EXISTS idx_devices_location ON devices(location);
CREATE INDEX IF NOT EXISTS idx_devices_status ON devices(status);
-- 2. 自动化规则表(关系型表)
CREATE TABLE IF NOT EXISTS automation_rules (
id INT PRIMARY KEY,
name STRING NOT NULL,
description STRING,
enabled BOOL DEFAULT TRUE,
priority INT DEFAULT 0,
conditions JSONB NOT NULL,
actions JSONB NOT NULL,
created_by STRING DEFAULT 'system',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_triggered_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_rules_enabled ON automation_rules(enabled);
CREATE INDEX IF NOT EXISTS idx_rules_priority ON automation_rules(priority DESC);
-- 3. 数据聚合统计表(关系型表)
CREATE TABLE IF NOT EXISTS data_statistics (
id INT PRIMARY KEY,
device_id STRING NOT NULL,
data_type STRING NOT NULL,
stat_type STRING NOT NULL,
time_window STRING NOT NULL,
stat_value DECIMAL(20,6),
stat_time TIMESTAMP NOT NULL,
calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_data_stats_device ON data_statistics(device_id, data_type, stat_type, time_window, stat_time);
-- 4. 设备状态快照表(关系型表)
CREATE TABLE IF NOT EXISTS device_snapshots (
id INT PRIMARY KEY,
device_id STRING NOT NULL,
snapshot_time TIMESTAMP NOT NULL,
current_state JSONB,
battery_level INT,
signal_strength INT,
last_seen TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_snapshots_device ON device_snapshots(device_id, snapshot_time DESC);
-- ============================================
-- 创建序列
-- ============================================
CREATE SEQUENCE IF NOT EXISTS devices_seq START 1;
CREATE SEQUENCE IF NOT EXISTS automation_rules_seq START 1;
CREATE SEQUENCE IF NOT EXISTS data_statistics_seq START 1;
CREATE SEQUENCE IF NOT EXISTS device_snapshots_seq START 1;
智能家居本地化数据处理系统视图设计:
-- 创建视图
-- ============================================
CREATE VIEW IF NOT EXISTS v_device_latest_status AS
SELECT
d.id,
d.name,
d.device_id,
d.device_type,
d.location,
d.status,
hs.ts as last_update,
hs.value,
hs.data_type,
hs.status as sensor_status
FROM devices d
LEFT JOIN (
SELECT device_id, data_type, ts, value, status,
ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn
FROM tsdb.home_sensor
) hs ON d.device_id = hs.device_id AND hs.rn = 1;
CREATE VIEW IF NOT EXISTS v_active_rules AS
SELECT
id,
name,
description,
priority,
conditions,
actions,
last_triggered_at
FROM automation_rules
WHERE enabled = TRUE
ORDER BY priority DESC;
智能家居本地化数据处理系统留的数据接口
POST /api/data/receive - 接收设备数据
POST /api/device/<id>/control - 控制设备
GET /api/devices - 获取设备列表
GET /api/sensor-data/latest - 获取最新传感器数据
GET /api/rules - 获取自动化规则
GET /api/statistics - 获取统计数据
发送传感器数据样例:
curl -X POST http://localhost:5001/api/data/receive
-H "Content-Type: application/json"
-d '{"device_id": "sensor_temp_livingroom", "value": 26.5, "data_type": "temperature", "unit": "°C"}'
控制设备样例:
curl -X POST http://localhost:5001/api/device/5/control
-H "Content-Type: application/json"
-d '{"command": "turn_on", "parameters": {"brightness": 80}}'
准备一些测试数据
INSERT INTO devices (id, name, device_id, device_type, protocol, location, status, config) VALUES
(1, '客厅温度传感器', 'sensor_temp_livingroom', 'temperature', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/temp", "interval": 60}'),
(2, '客厅湿度传感器', 'sensor_humidity_livingroom', 'humidity', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/humidity", "interval": 60}'),
(3, '客厅人体传感器', 'sensor_motion_livingroom', 'motion', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/motion", "interval": 5}'),
(4, '客厅光照传感器', 'sensor_light_livingroom', 'light', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/light", "interval": 30}'),
(5, '客厅智能灯', 'device_light_livingroom', 'smart_light', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness", "set_color"]}'),
(6, '卧室温度传感器', 'sensor_temp_bedroom', 'temperature', 'mqtt', '卧室', 'online', '{"topic": "sensors/bedroom/temp", "interval": 60}'),
(7, '卧室智能灯', 'device_light_bedroom', 'smart_light', 'mqtt', '卧室', 'online', '{"topic": "devices/bedroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness"]}'),
(8, '智能门锁', 'device_lock_entrance', 'smart_lock', 'mqtt', '玄关', 'online', '{"topic": "devices/entrance/lock", "supported_commands": ["lock", "unlock", "get_status"]}'),
(9, '智能窗帘', 'device_curtain_livingroom', 'smart_curtain', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/curtain", "supported_commands": ["open", "close", "set_position"]}'),
(10, '智能空调', 'device_ac_livingroom', 'smart_ac', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/ac", "supported_commands": ["turn_on", "turn_off", "set_temperature", "set_mode"]}}');
-- ============================================
-- 插入示例自动化规则
-- ============================================
INSERT INTO automation_rules (id, name, description, enabled, priority, conditions, actions) VALUES
(1, '客厅有人自动开灯', '检测到客厅有人且光线低于阈值时自动开灯', TRUE, 10,
'{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": true}, {"device_id": "sensor_light_livingroom", "type": "light", "operator": "<", "value": 100}], "logic": "AND"}',
'{"actions": [{"device_id": "device_light_livingroom", "command": "turn_on", "parameters": {"brightness": 80}}]}'),
(2, '客厅无人自动关灯', '客厅无人超过10分钟自动关灯', TRUE, 5,
'{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": false, "duration": 600}], "logic": "AND"}',
'{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}]}'),
(3, '温度过高自动开空调', '客厅温度超过28度自动开启空调', TRUE, 8,
'{"conditions": [{"device_id": "sensor_temp_livingroom", "type": "temperature", "operator": ">", "value": 28}], "logic": "AND"}',
'{"actions": [{"device_id": "device_ac_livingroom", "command": "turn_on", "parameters": {"temperature": 24, "mode": "cool"}}]}'),
(4, '离家模式', '离家时关闭所有灯光和空调', TRUE, 15,
'{"conditions": [{"device_id": "device_lock_entrance", "type": "door_lock", "operator": "=", "value": "locked"}], "logic": "AND"}',
'{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}, {"device_id": "device_light_bedroom", "command": "turn_off"}, {"device_id": "device_ac_livingroom", "command": "turn_off"}]}');
整体项目结构其实挺复杂的,这里把核心代码贴上:
#!/usr/bin/env python3
"""
智能家居Web应用主程序
提供数据展示和控制界面
"""
import sys
import os
import json
import logging
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify, request
# 添加模块路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../data-processor'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../rule-engine'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../storage'))
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder='templates', static_folder='static')
# 模拟数据库连接
class MockDB:
"""模拟数据库连接"""
def execute_query(self, query, params=None):
"""执行查询"""
import subprocess
try:
cmd = ['docker', 'exec', 'kwdb-server', '/kaiwudb/bin/kwbase', 'sql', '--insecure', '--execute', query]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
lines = result.stdout.strip().split('n')
if len(lines) > 1:
# 解析结果
headers = lines[0].split('t')
data = []
for line in lines[1:]:
values = line.split('t')
if len(values) == len(headers):
data.append(values)
return data
return []
except Exception as e:
logger.error(f"查询失败: {e}")
return []
db = MockDB()
@app.route('/')
def index():
"""首页"""
return render_template('index.html')
@app.route('/api/devices')
def get_devices():
"""获取设备列表"""
query = """
SELECT id, name, device_id, device_type, protocol, location, status
FROM devices
ORDER BY id
"""
results = db.execute_query(query)
devices = []
if results and len(results) > 1:
headers = results[0]
for row in results[1:]:
device = {
'id': row[0],
'name': row[1],
'device_id': row[2],
'device_type': row[3],
'protocol': row[4],
'location': row[5],
'status': row[6]
}
devices.append(device)
return jsonify(devices)
@app.route('/api/devices/<int:device_id>/data')
def get_device_data(device_id):
"""获取设备数据"""
hours = request.args.get('hours', 24, type=int)
start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
# 获取设备信息
device_query = f"SELECT device_id FROM devices WHERE id = {device_id}"
device_result = db.execute_query(device_query)
if not device_result or len(device_result) < 2:
return jsonify({'error': '设备不存在'}), 404
device_id_str = device_result[1][0]
# 获取传感器数据
data_query = f"""
SELECT ts, value, data_type, status
FROM sensor_data
WHERE device_id = '{device_id_str}' AND ts >= '{start_time}'
ORDER BY ts DESC
LIMIT 100
"""
results = db.execute_query(data_query)
data_points = []
if results and len(results) > 1:
for row in results[1:]:
data_points.append({
'timestamp': row[0],
'value': float(row[1]) if row[1] else None,
'data_type': row[2],
'status': row[3]
})
return jsonify({
'device_id': device_id,
'device_id_str': device_id_str,
'data': data_points
})
@app.route('/api/sensor-data/latest')
def get_latest_sensor_data():
"""获取最新传感器数据"""
query = """
SELECT
d.id,
d.name,
d.device_type,
d.location,
d.status,
sd.ts as last_update,
sd.value,
sd.data_type,
sd.unit
FROM devices d
LEFT JOIN (
SELECT device_id, data_type, ts, value, unit,
ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn
FROM sensor_data
) sd ON d.device_id = sd.device_id AND sd.rn = 1
ORDER BY d.id
"""
results = db.execute_query(query)
sensor_data = []
if results and len(results) > 1:
for row in results[1:]:
sensor_data.append({
'id': row[0],
'name': row[1],
'device_type': row[2],
'location': row[3],
'status': row[4],
'last_update': row[5],
'value': float(row[6]) if row[6] else None,
'data_type': row[7],
'unit': row[8] if len(row) > 8 else None
})
return jsonify(sensor_data)
@app.route('/api/rules')
def get_rules():
"""获取自动化规则"""
query = """
SELECT id, name, description, enabled, priority, last_triggered_at
FROM automation_rules
ORDER BY priority DESC
"""
results = db.execute_query(query)
rules = []
if results and len(results) > 1:
for row in results[1:]:
rules.append({
'id': row[0],
'name': row[1],
'description': row[2],
'enabled': row[3] == 't',
'priority': int(row[4]),
'last_triggered_at': row[5]
})
return jsonify(rules)
@app.route('/api/statistics')
def get_statistics():
"""获取统计数据"""
stats = {}
# 设备统计
device_query = """
SELECT
device_type,
COUNT(*) as count,
COUNT(CASE WHEN status = 'online' THEN 1 END) as online_count
FROM devices
GROUP BY device_type
"""
device_results = db.execute_query(device_query)
stats['devices'] = {}
if device_results and len(device_results) > 1:
for row in device_results[1:]:
stats['devices'][row[0]] = {
'total': int(row[1]),
'online': int(row[2])
}
# 传感器数据统计
data_query = """
SELECT COUNT(*) as total_records
FROM sensor_data
"""
data_results = db.execute_query(data_query)
if data_results and len(data_results) > 1:
stats['sensor_data'] = {
'total_records': int(data_results[1][0])
}
# 规则统计
rule_query = """
SELECT
COUNT(*) as total,
COUNT(CASE WHEN enabled = TRUE THEN 1 END) as enabled
FROM automation_rules
"""
rule_results = db.execute_query(rule_query)
if rule_results and len(rule_results) > 1:
stats['rules'] = {
'total': int(rule_results[1][0]),
'enabled': int(rule_results[1][1])
}
return jsonify(stats)
@app.route('/api/data/receive', methods=['POST'])
def receive_data():
"""接收设备数据"""
try:
data = request.json
# 验证数据
if not data or 'device_id' not in data or 'value' not in data:
return jsonify({'error': '缺少必要字段'}), 400
# 插入数据
device_id = data['device_id']
value = data['value']
data_type = data.get('data_type', 'value')
status = data.get('status', 'good')
unit = data.get('unit', '')
query = f"""
INSERT INTO sensor_data (time, device_id, data_type, value, status, unit)
VALUES (NOW(), '{device_id}', '{data_type}', {value}, '{status}', '{unit}')
"""
db.execute_query(query)
return jsonify({'success': True, 'message': '数据接收成功'})
except Exception as e:
logger.error(f"接收数据失败: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/device/<int:device_id>/control', methods=['POST'])
def control_device(device_id):
"""控制设备"""
try:
data = request.json
if not data or 'command' not in data:
return jsonify({'error': '缺少命令参数'}), 400
command = data['command']
parameters = data.get('parameters', {})
# 获取设备信息
device_query = f"SELECT device_id, name FROM devices WHERE id = {device_id}"
device_result = db.execute_query(device_query)
if not device_result or len(device_result) < 2:
return jsonify({'error': '设备不存在'}), 404
device_id_str = device_result[1][0]
device_name = device_result[1][1]
# 记录控制指令
cmd_query = f"""
INSERT INTO device_commands (time, device_id, command_type, parameters, status)
VALUES (NOW(), '{device_id_str}', '{command}', '{json.dumps(parameters)}', 'success')
"""
db.execute_query(cmd_query)
logger.info(f"控制设备: {device_name} ({device_id_str}), 命令: {command}")
return jsonify({
'success': True,
'message': f'设备 {device_name} 控制指令已发送',
'command': command,
'parameters': parameters
})
except Exception as e:
logger.error(f"控制设备失败: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001, debug=True)
至此系统开发完成
四、系统运行与功能测试
启动系统 Web API 服务,并通过接口测试与页面验证,确认系统所有功能正常运行
1、启动系统 Web API 服务
在命令行输入启动命令并运行
cd /root/workspace/smarthome-system/web-api && python3 simple_app.py > /tmp/smarthome-web-5002.log 2>&1 &
root@kwdb:~/workspace/smarthome-system/web-api# python3 simple_app.py
启动智能家居Web服务...
* Serving Flask app 'simple_app'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5002
* Running on http://192.168.150.111:5002
Press CTRL+C to quit
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /static/css/style.css HTTP/1.1" 200 -
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /static/js/app.js HTTP/1.1" 200 -
执行查询:
SELECT id, name, device_type, protocol, location, status
FROM devices
ORDER BY id
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /favicon.ico HTTP/1.1" 404 -
查询结果: [['1', '客厅温度传感器', 'temperature', 'mqtt', '客厅', 'online'], ['2', '客厅湿度传感器', 'humidity', 'mqtt', '客厅', 'online'], ['3', '客厅人体传感器', 'motion', 'mqtt', '客厅', 'online'], ['4', '客厅光照传感器', 'light', 'mqtt', '客厅', 'online'], ['5', '客厅智能灯', 'smart_light', 'mqtt', '客厅', 'online'], ['6', '卧室温度传感器', 'temperature', 'mqtt', '卧室', 'online'], ['7', '卧室智能灯', 'smart_light', 'mqtt', '卧室', 'online'], ['8', '智能门锁', 'smart_lock', 'mqtt', '玄关', 'online'], ['9', '智能窗帘', 'smart_curtain', 'mqtt', '客厅', 'online'], ['10', '智能空调', 'smart_ac', 'mqtt', '客厅', 'online']]
启动成功后,终端将显示:Running on http://[你的服务器IP]:5002,即服务启动完成,点击在浏览器中打开可以看到程序运行的界面
系统概览:设备总数、在线设备数、自动化规则数、数据记录数及设备列表

实时数据:温湿度、光照、人体感应等传感器的最新数据与更新时间;自动化规则:所有规则的名称、描述、启用状态、优先级;

系统状态:KaiwuDB 版本、服务运行状态、接口文档,这里为了方便把数据接口放在了最下面,便于查找调用

2、测试接口
通过数据接口插入客厅传感器数据
curl -X POST http://localhost:5002/api/data/receive -H "Content-Type: application/json" -d '{"device_id": "temperature", "value": 32, "data_type": "temperature", "unit": "°C"}'

将传感器实时数据上报至本地系统,数据自动存储至 KaiwuDB 时序表,这里可以看到客厅问题传感器已经标称32度

测试一下测试设备列表API
curl -s http://localhost:5002/api/devices
可以看到返回了列表数据

至此项目开发全部结束,希望对想研究ai及kwdb数据库的同学有一些帮助
五、总结
本文以 KaiwuDB 为数据基座,华为 CodeArts 代码智能体为自动化开发引擎,实现了智能家居本地化数据处理系统的快速构建,全程无需复杂的手动编码,新手也可按步骤复现。随着 AIoT 技术的发展,本地化数据处理将成为智能家居的主流趋势,KaiwuDB 多模数据库让开发者实现 “轻量、高效、安全” 的智能场景落地。