基于 Kafka 的医嘱事件架构

一:为什么要用事件总线

  • 解耦:把 HIS 的业务事实(医嘱、执行、记账)解耦为事件,病案、计费、质控、DRG 等系统通过消费同一条事实构建各自视图

  • 可回放 / 恢复:事件可以持久化并重放,用于补录、补算或回放历史

  • 高吞吐与扩展:Kafka 能天然横向扩展,适应门诊高峰场景

  • 最终一致性:通过 outbox/CDC 能把 DB 事务原子性地映射到事件,总体保证业务一致性

二:总体架构

HIS DB (业务事务)
   └─ Outbox 表(同事务写入)
       └─ Debezium / Kafka Connect
            └─ Kafka Cluster (Topics: order.created, execution.reported, charge.posted, ...)
                 ├─ 病案系统 Consumer (case)
                 ├─ 计费系统 Consumer (billing)
                 ├─ 执行系统 Consumer (execution)
                 ├─ DRG/上报 Consumer
                 └─ Kafka Streams / ksqlDB 作实时聚合 -> compacted order.state

关键点:HIS 写业务数据 + outbox(同一事务),Debezium 将 outbox 的变化写入 Kafka,消费者做幂等处理并写入各自系统。

三:核心设计原则

  • 事件为事实源头:事件是业务事实(事实不可变,状态由事件衍生)

  • 幂等消费:消费者需保证“至少一次”语义下的幂等或事务化处理

  • 保证就诊内顺序:partition key 使用 encounterId(或 orderId)保证同一次住院事件顺序

  • Schema 管理:使用 Schema Registry(Avro/Protobuf)并强制兼容策略

  • Outbox 模式:在 DB 事务内写 outbox,再由 CDC 写 Kafka,避免事件丢失

  • DLQ + 人工补偿:不可恢复失败进入 DLQ 并有运维/病案室修复界面

四:事件模型(最小字段 & 示例)

通用 header(每个事件必须包含):

eventId: UUID
eventType: string
schemaVersion: string
occurredAt: timestamp
source: string
patientId: string
encounterId: string
orderId: string (如适用)
traceId: string (可选)
payload: object

示例:OrderCreated(Avro/JSON 形式)

{
  "eventId":"uuid-xxxx",
  "eventType":"OrderCreated",
  "schemaVersion":"v1",
  "occurredAt":"2026-02-06T08:23:12Z",
  "source":"HIS-OrderService",
  "patientId":"P-10001",
  "encounterId":"E-20260206-001",
  "orderId":"O-20260206-0001",
  "payload":{
    "items":[{"itemCode":"MED-001","type":"drug","qty":2,"unit":"pills"}],
    "orderingPhysician":"dr001",
    "orderType":"inpatient"
  }
}

常见事件:OrderCreatedOrderUpdatedOrderCancelledExecutionReportedChargePostedChargeReversedEncounterOpenedEncounterClosed

五:Topic 命名与 Partition 策略

  • 命名规范:<env>.<domain>.<entity>.<event>,例如 prod.his.order.created

  • Partition key:使用 encounterId(或 orderId,保证相同住院/医嘱事件到同一 partition,从而保证顺序

  • Partition 数:初期 8–24(根据并发消费者数),生产环境按吞吐与消费者扩展

  • Retention 策略:

    • 事件 topic:保留 7–30 天(用于回放/补录)

    • 状态 topic(compacted):prod.his.order.state(compact,保留最新状态)

    • 审计/归档 topic:长保留或导出到对象存储(S3 / HDFS)

六:事务一致性:Outbox 模式

为什么用outbox?

直接在应用写 DB 后再发 Kafka 存在“写 DB 成功但发 Kafka 失败”的风险。outbox 把“写业务表 + 写 outbox 表”放在同一 DB 事务内,Debezium 将 outbox 的变更转成 Kafka 事件,从而实现“事务内写入 -> 事件最终到 Kafka”的原子性。

outbox 表示例 DDL(简化)

CREATE TABLE outbox (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  aggregate_id VARCHAR(64), -- e.g. encounterId
  event_type VARCHAR(64),
  payload TEXT,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  processed BOOLEAN DEFAULT FALSE
);

流程:

  1. 业务事务写业务表 + 插入 outbox 行(同事务 commit)

  2. Debezium/Connector 捕获 outbox 表增量 -> 写到 Kafka topic(并可删除或标记 processed)

  3. 下游消费者处理 Kafka 事件(幂等)

七:Producer / Consumer 实践(示例代码片段)

下面给出 Java Spring Boot + Spring Kafka 的关键实现要点(伪代码,去掉细节配置):

Consumer(幂等写 DB)

@KafkaListener(topics = "prod.his.order.created", groupId = "billing")
public void onOrderCreated(ConsumerRecord<String, OrderEvent> record) {
  OrderEvent event = record.value();
  String eventId = event.getEventId();
  if (processedEventRepository.exists(eventId)) {
    // 幂等:已经处理,直接返回
    return;
  }
  // 本地 DB 事务:写业务表 + processed_event 表
  txTemplate.execute(status -> {
     billingService.applyOrder(event.getPayload());
     processedEventRepository.insert(eventId, now());
     return null;
  });
}

关键点:

  • 在本地 DB 事务内写入 processed_event(eventId),保证幂等检查与写入原子性

  • 若需要发出衍生事件(例如 ChargePosted),建议写入本地 outbox,再由 Debezium 推送,避免跨系统事务难题

Producer(若直接用 producer)

启用幂等性与事务(若不使用 outbox):

enable.idempotence=true
transactional.id=his-producer-1
kafkaTemplate.executeInTransaction(kt -> {
  kt.send(topic, key, payload);
  // 本地 DB 写放在不同事务 -> 风险较大,不推荐此方式
  return true;
});

注意:直接 producer + 本地 DB 难以保证原子性,推荐 outbox + Debezium

八:CDC(Debezium / Kafka Connect)配置要点

  • 使用 Debezium Connector 捕获 outbox 或业务表增量

  • Debezium 写入 Kafka 时可设置转换(Kafka Connect SMT)把表 change event 转成领域事件(remove metadata,rename fields)

  • 性能注意:不要直接把高写表全部 CDC 到 Kafka,否则会产生高流量;只 CDC 关键表或 outbox

示例 SMT:io.debezium.transforms.ExtractNewRecordStatepayload.after 提取为 message value

九:流处理:Kafka Streams / ksqlDB 用例

常见场景:

  • OrderCreatedExecutionReported join 形成 order.state(compacted)快照,供下游快速查询

  • 实时聚合同一 encounter 的计费总额、执行次数,生成监控报警

ksql 伪示例:

CREATE STREAM order_created (...) WITH (kafka_topic='prod.his.order.created', value_format='AVRO');
CREATE STREAM exec_reported (...) WITH (kafka_topic='prod.his.execution.reported', value_format='AVRO');
CREATE TABLE order_state AS
  SELECT encounterId, LATEST_BY_OFFSET(payload) AS last_payload
  FROM order_created
  GROUP BY encounterId;

十:监控与报警

必监控项(分层):

  • Kafka Broker:UnderReplicatedPartitions, OfflinePartitionsCount, RequestHandlerAvgIdlePercent, DiskUsage

  • Topic / Consumer:consumer_lag(每 consumer group)、throughput (MB/s)、partition skew

  • 业务指标:事件延迟(produce->consume P95/P99)、DLQ 事件速率、事件处理失败率

  • Alert 建议consumer_lag > thresholdDLQ_rate > 0.5%under_replicated_partitions > 0

监控栈:Prometheus + Grafana + JMX Exporter + (Confluent Control Center / Kafka Manager)

十一:安全与合规

  • 传输加密:TLS(Kafka 集群 + clients)

  • 认证与授权:SASL(Kerberos 或 SCRAM)+ ACL(topic/consumer group 级权限)

  • 敏感数据处理:事件中尽量只传 patientId,PII 留在受控 DB;若必须携带敏感字段,使用字段级加密或脱敏

  • 审计:记录 produce/consume 操作日志、DLQ 人工修复记录、谁触发了补偿事件等

十二:错误处理与补偿策略

  1. 即时重试:消费失败短期内指数退避重试(供瞬时故障恢复)

  2. Retry topic:将失败事件路由到延时 retry topic(例如 retry-1m, retry-5m)再尝试

  3. DLQ:超次数失败进入 DLQ 并触发人工告警、监控看板与修复 UI(查看原始 payload、日志、手动修复/重放)

  4. 补偿事件:当需要回滚(例如记账出错)发布 ChargeReversal / OrderCompensated 事件。补偿必须也是幂等的并写入审计

十三:性能与容量建议

  • replication.factor = 3

  • 每 broker 支持上千分区(视硬件),但单 topic 分区不宜过多以便运维

  • 高流量 topic 初始 8–24 partitions(按消费并发再调)

  • 估算:单事件大小 1–4 KB,2000 events/s ≈ 2–8 MB/s(HTTP + Kafka 双向),留足 headroom

十四:常见坑与对策(实战经验)

  • 坑:事件丢失(应用写 DB 成功,但事件未进 Kafka)

    • 对策:Outbox + Debezium,保证原子性

  • 坑:消费乱序导致状态错误

    • 对策:partition key 设计为 encounterId;使用 compacted order.state topic 保存最新状态

  • 坑:schema 变动导致消费者宕机

    • 对策:严格 Schema Registry + 兼容策略(backward/forward)+ 版本化升级流程

  • 坑:DLQ 积压无人处理

    • 对策:建立运维流程与 UI,自动告警与负责人制度

  • 坑:敏感数据泄露

    • 对策:事件中不携带敏感 PII;必要时使用字段加密与访问控制

十五:落地实施步骤

  1. PoC 环境:搭 3 节点 Kafka + Schema Registry + Debezium,接 test HIS DB

  2. 实现 Outbox:在 HIS 事务中写 outbox;配置 Debezium 捕获并写入 prod.his.order.created

  3. 实现 Consumer 幂等processed_event 表 + 本地事务处理

  4. 实现 DLQ & 修复 UI:运维可查看并手动重放/修正事件

  5. 增加流处理:Kafka Streams/ksqlDB 生成 order.state compacted topic

  6. 灰度:先在小科室跑,观察延迟、DLQ、处理率,再逐步全院推广

十六:示例附录

A. 简化 Avro schema(OrderCreated)

{
  "namespace": "hospital.his",
  "type": "record",
  "name": "OrderCreated",
  "fields": [
    {"name":"eventId","type":"string"},
    {"name":"eventType","type":"string"},
    {"name":"schemaVersion","type":"string"},
    {"name":"occurredAt","type":"string"},
    {"name":"source","type":"string"},
    {"name":"patientId","type":"string"},
    {"name":"encounterId","type":"string"},
    {"name":"orderId","type":"string"},
    {"name":"payload","type":{
      "type":"record","name":"OrderPayload","fields":[
        {"name":"items","type":{"type":"array","items":{
          "type":"record","name":"Item","fields":[
            {"name":"itemCode","type":"string"},
            {"name":"type","type":"string"},
            {"name":"qty","type":"int"},
            {"name":"unit","type":"string"}
          ]
        }}},
        {"name":"orderingPhysician","type":["null","string"], "default": null},
        {"name":"orderType","type":"string"}
      ]
    }}
  ]
}

B. Debezium outbox SMT 配置(示例片段)

{
  "name": "outbox-connector",
  "config": {
    "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max" : "1",
    "database.hostname" : "db-host",
    "database.port" : "3306",
    "database.user" : "debezium",
    "database.password" : "pwd",
    "database.server.id" : "184054",
    "database.server.name" : "dbserver1",
    "table.include.list": "hisdb.outbox",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

总结

  • 核心建议:使用 Outbox + Debezium 保证事务一致性;以 encounterId 为 partition key 保证就诊内顺序;使用 Schema Registry 严格管理 schema;实现幂等消费与 DLQ+人工补偿机制。

  • 事件化不是把一切改成事件就完成,而是要在主数据治理、监控、补偿链路上花功夫,才能真正把 HIS 的医嘱/费用流稳定地流转到病案、计费、DRG 与外部上报系统。

© 版权声明

相关文章