Kafka Streams 实时流处理:构建高效数据管道
Kafka Streams 实时流处理:构建高效数据管道
别叫我大神,叫我 Alex 就好。流处理是现代数据架构的核心,Kafka Streams 让这一切变得简单优雅。
一、Kafka Streams 基础
1.1 核心概念
// 创建 Kafka Streams 应用
@Configuration
public class KafkaStreamsConfig {
@Bean
public KafkaStreams kafkaStreams() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
buildTopology(builder);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
return streams;
}
private void buildTopology(StreamsBuilder builder) {
// 简单转换
KStream<String, String> orders = builder.stream("orders");
orders
.filter((key, value) -> value.contains("status:CREATED"))
.mapValues(this::enrichOrder)
.to("enriched-orders");
}
private String enrichOrder(String order) {
// 添加时间戳和处理信息
return order + ",processedAt:" + System.currentTimeMillis();
}
}
1.2 状态存储
@Component
public class OrderAggregationService {
public void buildAggregationTopology(StreamsBuilder builder) {
// 按用户聚合订单金额
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
orders
.groupBy((key, order) -> order.getUserId(), Grouped.with(Serdes.String(), new OrderSerde()))
.aggregate(
() -> new UserOrderStats(),
(userId, order, stats) -> stats.addOrder(order),
Materialized.<String, UserOrderStats, KeyValueStore<Bytes, byte[]>>as("user-order-stats")
.withKeySerde(Serdes.String())
.withValueSerde(new UserOrderStatsSerde())
)
.toStream()
.to("user-order-stats", Produced.with(Serdes.String(), new UserOrderStatsSerde()));
}
}
// 状态类
public class UserOrderStats {
private String userId;
private int totalOrders = 0;
private BigDecimal totalAmount = BigDecimal.ZERO;
public UserOrderStats addOrder(Order order) {
this.userId = order.getUserId();
this.totalOrders++;
this.totalAmount = this.totalAmount.add(order.getAmount());
return this;
}
}
二、窗口操作
2.1 时间窗口
@Component
public class WindowedAnalyticsService {
public void buildWindowedTopology(StreamsBuilder builder) {
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// 滚动窗口 - 每5分钟统计
orders
.groupByKey(Grouped.with(Serdes.String(), new OrderSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
() -> BigDecimal.ZERO,
(key, order, sum) -> sum.add(order.getAmount()),
Materialized.with(Serdes.String(), new BigDecimalSerde())
)
.toStream((windowedKey, value) ->
windowedKey.key() + "@" + windowedKey.window().start())
.to("5min-order-stats");
// 滑动窗口 - 每1分钟滑动,窗口大小5分钟
orders
.groupByKey(Grouped.with(Serdes.String(), new OrderSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream((windowedKey, count) ->
String.format("%s@%s:%d", windowedKey.key(),
windowedKey.window().start(), count))
.to("sliding-window-stats");
// 会话窗口 - 用户活动会话
orders
.groupByKey(Grouped.with(Serdes.String(), new OrderSerde()))
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.aggregate(
() -> new SessionStats(),
(key, order, stats) -> stats.addOrder(order),
(key, stats1, stats2) -> stats1.merge(stats2),
Materialized.with(Serdes.String(), new SessionStatsSerde())
)
.toStream()
.to("session-stats");
}
}
2.2 窗口连接
@Component
public class StreamJoinService {
public void buildJoinTopology(StreamsBuilder builder) {
// 订单流
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// 支付流
KStream<String, Payment> payments = builder.stream(
"payments",
Consumed.with(Serdes.String(), new PaymentSerde())
);
// 窗口内连接 - 30分钟内匹配订单和支付
orders
.join(
payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.of(Duration.ofMinutes(30)),
StreamJoined.with(Serdes.String(), new OrderSerde(), new PaymentSerde())
)
.to("order-payment-matches", Produced.with(Serdes.String(), new OrderPaymentSerde()));
// 左外连接 - 找出未支付的订单
orders
.leftJoin(
payments,
(order, payment) -> {
if (payment == null) {
return new UnpaidOrder(order, System.currentTimeMillis());
}
return null;
},
JoinWindows.of(Duration.ofHours(24)),
StreamJoined.with(Serdes.String(), new OrderSerde(), new PaymentSerde())
)
.filter((key, value) -> value != null)
.to("unpaid-orders", Produced.with(Serdes.String(), new UnpaidOrderSerde()));
}
}
三、高级处理模式
3.1 分支处理
@Component
public class OrderRoutingService {
public void buildRoutingTopology(StreamsBuilder builder) {
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// 按订单金额分支
Map<String, KStream<String, Order>> branches = orders
.split(Named.as("order-"))
.branch((key, order) -> order.getAmount().compareTo(new BigDecimal("1000")) < 0,
Branched.as("small"))
.branch((key, order) -> order.getAmount().compareTo(new BigDecimal("10000")) < 0,
Branched.as("medium"))
.defaultBranch(Branched.as("large"));
// 小额订单快速处理
branches.get("order-small")
.mapValues(this::processSmallOrder)
.to("small-orders-processed");
// 中额订单标准处理
branches.get("order-medium")
.mapValues(this::processMediumOrder)
.to("medium-orders-processed");
// 大额订单人工审核
branches.get("order-large")
.mapValues(this::flagForReview)
.to("large-orders-pending");
}
private Order processSmallOrder(Order order) {
order.setStatus(OrderStatus.PROCESSED);
return order;
}
private Order processMediumOrder(Order order) {
order.setStatus(OrderStatus.PROCESSING);
// 风险检查
return order;
}
private Order flagForReview(Order order) {
order.setStatus(OrderStatus.PENDING_REVIEW);
return order;
}
}
3.2 处理器 API
@Component
public class CustomProcessorService {
public void buildCustomTopology(StreamsBuilder builder) {
builder.addSource("order-source", "orders")
.addProcessor("validation-processor",
() -> new OrderValidationProcessor(),
"order-source")
.addProcessor("enrichment-processor",
() -> new OrderEnrichmentProcessor(),
"validation-processor")
.addSink("order-sink", "processed-orders",
"enrichment-processor");
}
}
// 自定义处理器
public class OrderValidationProcessor implements Processor<String, Order, String, Order> {
private ProcessorContext<String, Order> context;
@Override
public void init(ProcessorContext<String, Order> context) {
this.context = context;
}
@Override
public void process(Record<String, Order> record) {
Order order = record.value();
// 验证订单
List<String> errors = validate(order);
if (errors.isEmpty()) {
order.setValid(true);
context.forward(record.withValue(order));
} else {
order.setValid(false);
order.setValidationErrors(errors);
context.forward(record.withValue(order), "invalid-orders");
}
}
private List<String> validate(Order order) {
List<String> errors = new ArrayList<>();
if (order.getAmount() == null || order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
errors.add("Invalid amount");
}
if (order.getUserId() == null || order.getUserId().isEmpty()) {
errors.add("Missing user ID");
}
return errors;
}
@Override
public void close() {}
}
四、交互式查询
4.1 查询状态存储
@RestController
public class OrderStatsController {
@Autowired
private KafkaStreams kafkaStreams;
@GetMapping("/stats/user/{userId}")
public ResponseEntity<UserOrderStats> getUserStats(@PathVariable String userId) {
// 获取状态存储
ReadOnlyKeyValueStore<String, UserOrderStats> statsStore =
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"user-order-stats",
QueryableStoreTypes.keyValueStore()
)
);
UserOrderStats stats = statsStore.get(userId);
if (stats == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(stats);
}
@GetMapping("/stats/top-users")
public ResponseEntity<List<UserOrderStats>> getTopUsers(
@RequestParam(defaultValue = "10") int limit) {
ReadOnlyKeyValueStore<String, UserOrderStats> statsStore =
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"user-order-stats",
QueryableStoreTypes.keyValueStore()
)
);
List<UserOrderStats> topUsers = new ArrayList<>();
try (KeyValueIterator<String, UserOrderStats> iterator = statsStore.all()) {
while (iterator.hasNext()) {
topUsers.add(iterator.next().value);
}
}
topUsers.sort((a, b) ->
b.getTotalAmount().compareTo(a.getTotalAmount()));
return ResponseEntity.ok(topUsers.subList(0, Math.min(limit, topUsers.size())));
}
}
五、生产环境配置
# application.yml
spring:
kafka:
streams:
application-id: order-processing-service
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
properties:
processing.guarantee: exactly_once_v2
replication.factor: 3
min.insync.replicas: 2
num.stream.threads: 4
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.production.exception.handler: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
commit.interval.ms: 10000
cache.max.bytes.buffering: 10485760
state.dir: /var/lib/kafka-streams
metrics.recording.level: DEBUG
六、总结
Kafka Streams 提供了强大的流处理能力,关键点在于:
- 状态管理:合理使用状态存储
- 窗口操作:选择合适的窗口类型
- 性能优化:调整缓存和提交间隔
- 监控运维:关注延迟和吞吐量
这其实可以更优雅一点。流处理的设计要考虑数据的时效性和一致性。
参考资源:
- Kafka Streams Documentation
- Confluent Kafka Streams
© 版权声明
文章版权归作者所有,未经允许请勿转载。