Kafka Streams 实时流处理:构建高效数据管道

Kafka Streams 实时流处理:构建高效数据管道

别叫我大神,叫我 Alex 就好。流处理是现代数据架构的核心,Kafka Streams 让这一切变得简单优雅。

一、Kafka Streams 基础

1.1 核心概念

// 创建 Kafka Streams 应用
@Configuration
public class KafkaStreamsConfig {
    @Bean
    public KafkaStreams kafkaStreams() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processing-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        StreamsBuilder builder = new StreamsBuilder();
        buildTopology(builder);
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        return streams;
    }
    private void buildTopology(StreamsBuilder builder) {
        // 简单转换
        KStream<String, String> orders = builder.stream("orders");
        orders
            .filter((key, value) -> value.contains("status:CREATED"))
            .mapValues(this::enrichOrder)
            .to("enriched-orders");
    }
    private String enrichOrder(String order) {
        // 添加时间戳和处理信息
        return order + ",processedAt:" + System.currentTimeMillis();
    }
}

1.2 状态存储

@Component
public class OrderAggregationService {
    public void buildAggregationTopology(StreamsBuilder builder) {
        // 按用户聚合订单金额
        KStream<String, Order> orders = builder.stream(
            "orders",
            Consumed.with(Serdes.String(), new OrderSerde())
        );
        orders
            .groupBy((key, order) -> order.getUserId(), Grouped.with(Serdes.String(), new OrderSerde()))
            .aggregate(
                () -> new UserOrderStats(),
                (userId, order, stats) -> stats.addOrder(order),
                Materialized.<String, UserOrderStats, KeyValueStore<Bytes, byte[]>>as("user-order-stats")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new UserOrderStatsSerde())
            )
            .toStream()
            .to("user-order-stats", Produced.with(Serdes.String(), new UserOrderStatsSerde()));
    }
}
// 状态类
public class UserOrderStats {
    private String userId;
    private int totalOrders = 0;
    private BigDecimal totalAmount = BigDecimal.ZERO;
    public UserOrderStats addOrder(Order order) {
        this.userId = order.getUserId();
        this.totalOrders++;
        this.totalAmount = this.totalAmount.add(order.getAmount());
        return this;
    }
}

二、窗口操作

2.1 时间窗口

@Component
public class WindowedAnalyticsService {
    public void buildWindowedTopology(StreamsBuilder builder) {
        KStream<String, Order> orders = builder.stream(
            "orders",
            Consumed.with(Serdes.String(), new OrderSerde())
        );
        // 滚动窗口 - 每5分钟统计
        orders
            .groupByKey(Grouped.with(Serdes.String(), new OrderSerde()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .aggregate(
                () -> BigDecimal.ZERO,
                (key, order, sum) -> sum.add(order.getAmount()),
                Materialized.with(Serdes.String(), new BigDecimalSerde())
            )
            .toStream((windowedKey, value) ->
                windowedKey.key() + "@" + windowedKey.window().start())
            .to("5min-order-stats");
        // 滑动窗口 - 每1分钟滑动,窗口大小5分钟
        orders
            .groupByKey(Grouped.with(Serdes.String(), new OrderSerde()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
            .count(Materialized.with(Serdes.String(), Serdes.Long()))
            .toStream((windowedKey, count) ->
                String.format("%s@%s:%d", windowedKey.key(),
                    windowedKey.window().start(), count))
            .to("sliding-window-stats");
        // 会话窗口 - 用户活动会话
        orders
            .groupByKey(Grouped.with(Serdes.String(), new OrderSerde()))
            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
            .aggregate(
                () -> new SessionStats(),
                (key, order, stats) -> stats.addOrder(order),
                (key, stats1, stats2) -> stats1.merge(stats2),
                Materialized.with(Serdes.String(), new SessionStatsSerde())
            )
            .toStream()
            .to("session-stats");
    }
}

2.2 窗口连接

@Component
public class StreamJoinService {
    public void buildJoinTopology(StreamsBuilder builder) {
        // 订单流
        KStream<String, Order> orders = builder.stream(
            "orders",
            Consumed.with(Serdes.String(), new OrderSerde())
        );
        // 支付流
        KStream<String, Payment> payments = builder.stream(
            "payments",
            Consumed.with(Serdes.String(), new PaymentSerde())
        );
        // 窗口内连接 - 30分钟内匹配订单和支付
        orders
            .join(
                payments,
                (order, payment) -> new OrderPayment(order, payment),
                JoinWindows.of(Duration.ofMinutes(30)),
                StreamJoined.with(Serdes.String(), new OrderSerde(), new PaymentSerde())
            )
            .to("order-payment-matches", Produced.with(Serdes.String(), new OrderPaymentSerde()));
        // 左外连接 - 找出未支付的订单
        orders
            .leftJoin(
                payments,
                (order, payment) -> {
                    if (payment == null) {
                        return new UnpaidOrder(order, System.currentTimeMillis());
                    }
                    return null;
                },
                JoinWindows.of(Duration.ofHours(24)),
                StreamJoined.with(Serdes.String(), new OrderSerde(), new PaymentSerde())
            )
            .filter((key, value) -> value != null)
            .to("unpaid-orders", Produced.with(Serdes.String(), new UnpaidOrderSerde()));
    }
}

三、高级处理模式

3.1 分支处理

@Component
public class OrderRoutingService {
    public void buildRoutingTopology(StreamsBuilder builder) {
        KStream<String, Order> orders = builder.stream(
            "orders",
            Consumed.with(Serdes.String(), new OrderSerde())
        );
        // 按订单金额分支
        Map<String, KStream<String, Order>> branches = orders
            .split(Named.as("order-"))
            .branch((key, order) -> order.getAmount().compareTo(new BigDecimal("1000")) < 0,
                Branched.as("small"))
            .branch((key, order) -> order.getAmount().compareTo(new BigDecimal("10000")) < 0,
                Branched.as("medium"))
            .defaultBranch(Branched.as("large"));
        // 小额订单快速处理
        branches.get("order-small")
            .mapValues(this::processSmallOrder)
            .to("small-orders-processed");
        // 中额订单标准处理
        branches.get("order-medium")
            .mapValues(this::processMediumOrder)
            .to("medium-orders-processed");
        // 大额订单人工审核
        branches.get("order-large")
            .mapValues(this::flagForReview)
            .to("large-orders-pending");
    }
    private Order processSmallOrder(Order order) {
        order.setStatus(OrderStatus.PROCESSED);
        return order;
    }
    private Order processMediumOrder(Order order) {
        order.setStatus(OrderStatus.PROCESSING);
        // 风险检查
        return order;
    }
    private Order flagForReview(Order order) {
        order.setStatus(OrderStatus.PENDING_REVIEW);
        return order;
    }
}

3.2 处理器 API

@Component
public class CustomProcessorService {
    public void buildCustomTopology(StreamsBuilder builder) {
        builder.addSource("order-source", "orders")
            .addProcessor("validation-processor",
                () -> new OrderValidationProcessor(),
                "order-source")
            .addProcessor("enrichment-processor",
                () -> new OrderEnrichmentProcessor(),
                "validation-processor")
            .addSink("order-sink", "processed-orders",
                "enrichment-processor");
    }
}
// 自定义处理器
public class OrderValidationProcessor implements Processor<String, Order, String, Order> {
    private ProcessorContext<String, Order> context;
    @Override
    public void init(ProcessorContext<String, Order> context) {
        this.context = context;
    }
    @Override
    public void process(Record<String, Order> record) {
        Order order = record.value();
        // 验证订单
        List<String> errors = validate(order);
        if (errors.isEmpty()) {
            order.setValid(true);
            context.forward(record.withValue(order));
        } else {
            order.setValid(false);
            order.setValidationErrors(errors);
            context.forward(record.withValue(order), "invalid-orders");
        }
    }
    private List<String> validate(Order order) {
        List<String> errors = new ArrayList<>();
        if (order.getAmount() == null || order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            errors.add("Invalid amount");
        }
        if (order.getUserId() == null || order.getUserId().isEmpty()) {
            errors.add("Missing user ID");
        }
        return errors;
    }
    @Override
    public void close() {}
}

四、交互式查询

4.1 查询状态存储

@RestController
public class OrderStatsController {
    @Autowired
    private KafkaStreams kafkaStreams;
    @GetMapping("/stats/user/{userId}")
    public ResponseEntity<UserOrderStats> getUserStats(@PathVariable String userId) {
        // 获取状态存储
        ReadOnlyKeyValueStore<String, UserOrderStats> statsStore =
            kafkaStreams.store(
                StoreQueryParameters.fromNameAndType(
                    "user-order-stats",
                    QueryableStoreTypes.keyValueStore()
                )
            );
        UserOrderStats stats = statsStore.get(userId);
        if (stats == null) {
            return ResponseEntity.notFound().build();
        }
        return ResponseEntity.ok(stats);
    }
    @GetMapping("/stats/top-users")
    public ResponseEntity<List<UserOrderStats>> getTopUsers(
            @RequestParam(defaultValue = "10") int limit) {
        ReadOnlyKeyValueStore<String, UserOrderStats> statsStore =
            kafkaStreams.store(
                StoreQueryParameters.fromNameAndType(
                    "user-order-stats",
                    QueryableStoreTypes.keyValueStore()
                )
            );
        List<UserOrderStats> topUsers = new ArrayList<>();
        try (KeyValueIterator<String, UserOrderStats> iterator = statsStore.all()) {
            while (iterator.hasNext()) {
                topUsers.add(iterator.next().value);
            }
        }
        topUsers.sort((a, b) ->
            b.getTotalAmount().compareTo(a.getTotalAmount()));
        return ResponseEntity.ok(topUsers.subList(0, Math.min(limit, topUsers.size())));
    }
}

五、生产环境配置

# application.yml
spring:
  kafka:
    streams:
      application-id: order-processing-service
      bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
      properties:
        processing.guarantee: exactly_once_v2
        replication.factor: 3
        min.insync.replicas: 2
        num.stream.threads: 4
        default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
        default.production.exception.handler: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
        commit.interval.ms: 10000
        cache.max.bytes.buffering: 10485760
        state.dir: /var/lib/kafka-streams
        metrics.recording.level: DEBUG

六、总结

Kafka Streams 提供了强大的流处理能力,关键点在于:

  1. 状态管理:合理使用状态存储
  2. 窗口操作:选择合适的窗口类型
  3. 性能优化:调整缓存和提交间隔
  4. 监控运维:关注延迟和吞吐量

这其实可以更优雅一点。流处理的设计要考虑数据的时效性和一致性。


参考资源:

  • Kafka Streams Documentation
  • Confluent Kafka Streams
© 版权声明

相关文章