KafkaUtils.createDStream vs createDirectStream:Spark Streaming整合Kafka的两种方式

KafkaUtils.createDStream vs createDirectStream:Spark Streaming整合Kafka的两种方式

    • 引言
    • 一、核心区别概览
    • 二、Receiver方式 (createDStream) 详解
      • 2.1 工作原理图
      • 2.2 代码示例
      • 2.3 关键配置
    • 三、Direct方式 (createDirectStream) 详解
      • 3.1 工作原理图
      • 3.2 代码示例
    • 四、核心区别深度对比
      • 4.1 数据接收与存储
      • 4.2 并行度控制
      • 4.3 容错与语义保证
    • 五、优缺点对比
      • 5.1 Receiver方式
      • 5.2 Direct方式
    • 六、偏移量管理对比
      • 6.1 Receiver方式偏移量管理
      • 6.2 Direct方式偏移量管理
    • 七、性能对比
    • 八、选型建议
      • 8.1 决策树
      • 8.2 场景推荐
    • 九、生产环境配置示例
      • 9.1 Receiver方式配置
      • 9.2 Direct方式配置
    • 十、总结

🌺The Begin🌺点点关注,收藏不迷路🌺

引言

在Spark Streaming与Kafka整合的历史演进中,出现过两种主要的API:基于Receiver的createDStream和基于Direct的createDirectStream。理解这两者的区别,对于设计可靠、高效的实时数据处理系统至关重要。本文将深入对比两种方式的原理、优缺点及适用场景。

一、核心区别概览

对比维度 createDStream (Receiver方式) createDirectStream (Direct方式)
引入版本 Spark 1.0+ Spark 1.3+
数据接收 Receiver线程持续接收 Driver触发时直接拉取
存储方式 Executor内存 + WAL 无中间存储
偏移量管理 Zookeeper Spark Checkpoint/外部系统
语义保证 At-Least-Once Exactly-Once
并行度 由Receiver数量决定 与Kafka分区数一致

二、Receiver方式 (createDStream) 详解

2.1 工作原理图

Driver

Spark Executor 2

Spark Executor 1

Kafka集群

更新偏移量

更新偏移量

Partition 0

Partition 1

Partition 2

Receiver线程
消费Partition 0&1

BlockManager
内存缓存

WAL预写日志
HDFS

Receiver线程
消费Partition 2

BlockManager
内存缓存

WAL预写日志
HDFS

ReceiverTracker

Zookeeper
偏移量存储

2.2 代码示例

// Receiver方式(Kafka 0.8 API)
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
// 启用WAL(可选)
ssc.checkpoint("hdfs://namenode/checkpoint")
System.setProperty("spark.streaming.receiver.writeAheadLog.enable", "true")
val kafkaStream = KafkaUtils.createStream(
  ssc,
  "zookeeper:2181",                    // Zookeeper连接
  "spark-streaming-group",              // Consumer组
  Map("topic1" -> 1, "topic2" -> 1),    // 主题和并行度
  StorageLevel.MEMORY_AND_DISK_SER_2    // 存储级别
)
// 处理数据
kafkaStream.map(record => (record._1, record._2))
           .print()

2.3 关键配置

// 启用WAL(写前日志)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
// 存储级别(双副本+序列化+内存磁盘)
StorageLevel.MEMORY_AND_DISK_SER_2
// 背压控制
sparkConf.set("spark.streaming.backpressure.enabled", "true")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "1000")

三、Direct方式 (createDirectStream) 详解

3.1 工作原理图

Executor 2

Executor 1

Kafka集群

Driver

偏移量管理

1. 获取偏移量
1. 获取偏移量
1. 获取偏移量
2. 分配任务
2. 分配任务
2. 分配任务
3. 直接拉取
3. 直接拉取
3. 直接拉取
4. 处理完成
4. 处理完成
4. 处理完成
5. 提交偏移量

Driver

偏移量获取
查询Kafka

偏移量存储
Checkpoint/外部

Partition 0

Partition 1

Partition 2

Task 1
读取Partition 0

Task 2
读取Partition 1

Task 3
读取Partition 2

3.2 代码示例

// Direct方式(Kafka 0.10+ API)
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "broker1:9092,broker2:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-direct-demo",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)  // 手动提交
)
val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,  // 均匀分布
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// 处理数据
stream.foreachRDD { rdd =>
  // 获取偏移量
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 业务处理
  rdd.map(record => (record.key(), record.value()))
     .foreach(processRecord)
  // 手动提交偏移量
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

四、核心区别深度对比

4.1 数据接收与存储

特性 Receiver方式 Direct方式
数据接收 Receiver线程持续接收 Task直接拉取
中间存储 Executor内存+序列化 无中间存储
数据备份 WAL + 双副本 不备份
内存压力

4.2 并行度控制

// Receiver方式:并行度由Receiver数量决定
val receiverStream = KafkaUtils.createStream(
  ssc,
  zkQuorum,
  groupId,
  Map("topic1" -> 3)  // 3个Receiver,3个并行度
)
// Direct方式:并行度由Kafka分区数决定
// 如果topic1有10个分区,自动有10个并行度
val directStream = KafkaUtils.createDirectStream(
  ssc,
  PreferConsistent,
  Subscribe(Array("topic1"), kafkaParams)
)

4.3 容错与语义保证

Direct方式 – Exactly-Once

从未提交偏移量开始

获取偏移量范围

处理数据

提交偏移量
同事务

故障恢复

Receiver方式 – At-Least-Once

重启后重读WAL

接收数据

写入WAL

更新Zookeeper偏移量

处理数据

Driver故障

五、优缺点对比

5.1 Receiver方式

优点 缺点
实现简单,API友好 需要额外内存存储数据
支持WAL保证数据不丢 At-Least-Once可能重复
与旧版Kafka兼容 Receiver可能成为瓶颈
内置背压支持 数据延迟较大

5.2 Direct方式

优点 缺点
Exactly-Once语义 需要手动管理偏移量
并行度与分区一致 不支持Kafka 0.8
无需额外内存存储 复杂度略高
低延迟 需要外部系统存储偏移量

六、偏移量管理对比

6.1 Receiver方式偏移量管理

// Receiver方式:偏移量存储在Zookeeper
// 自动提交,无需手动处理
val stream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
// 偏移量存储在Zookeeper路径:
// /consumers/[groupId]/offsets/[topic]/[partition]

6.2 Direct方式偏移量管理

// Direct方式:需要手动管理偏移量
// 方案1:提交到Kafka
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 业务处理
  // 提交到Kafka
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
// 方案2:存储到外部系统(MySQL/Redis/HBase)
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 开启事务
  beginTransaction()
  try {
    processData(rdd)
    saveOffsetsToHBase(offsetRanges)  // 同事务
    commitTransaction()
  } catch {
    case e: Exception => rollbackTransaction()
  }
}
// 从HBase恢复偏移量
def getStoredOffsets(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
  // 从HBase读取已提交偏移量
}

七、性能对比

指标 Receiver方式 Direct方式
最大吞吐 10-50 MB/s/node 50-200 MB/s/node
处理延迟 几秒~几分钟 毫秒~秒级
资源消耗 高(内存+磁盘)
扩展性 受Receiver限制 随分区数扩展

八、选型建议

8.1 决策树

<0.8

>=0.10

Exactly-Once

At-Least-Once

内存充足

追求低延迟

选择Kafka整合方式

Kafka版本

Receiver方式

语义要求

Direct方式

资源情况

Receiver方式
启用WAL

Direct方式

8.2 场景推荐

场景 推荐方式 理由
金融交易 Direct 需要Exactly-Once
日志收集 Direct 高吞吐,精确一次
老系统升级 Receiver 兼容Kafka 0.8
实时风控 Direct 低延迟要求
数据湖摄入 Direct 避免数据重复

九、生产环境配置示例

9.1 Receiver方式配置

val conf = new SparkConf()
  .set("spark.streaming.receiver.writeAheadLog.enable", "true")
  .set("spark.streaming.receiver.maxRate", "5000")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.backpressure.initialRate", "1000")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
ssc.checkpoint("hdfs://namenode/checkpoint")
val stream = KafkaUtils.createStream(
  ssc,
  "zk1:2181,zk2:2181/consumer",
  "group1",
  Map("topic1" -> 3),
  StorageLevel.MEMORY_AND_DISK_SER_2
)

9.2 Direct方式配置

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "broker1:9092,broker2:9092",
  "group.id" -> "group1",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "auto.offset.reset" -> "earliest",
  "max.poll.records" -> "1000",
  "session.timeout.ms" -> "30000",
  "heartbeat.interval.ms" -> "3000"
)
val stream = KafkaUtils.createDirectStream(
  ssc,
  PreferConsistent,
  Subscribe(Array("topic1"), kafkaParams)
)
// Exactly-Once处理
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 业务处理(确保幂等)
  rdd.foreachPartition { partition =>
    val connection = createHBaseConnection()
    partition.foreach { record =>
      connection.saveWithRowKey(record.key(), record.value())
    }
    connection.close()
  }
  // 提交偏移量
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

十、总结

维度 createDStream (Receiver) createDirectStream (Direct)
核心原理 Receiver接收+缓存+WAL Driver调度+Task直连
数据流 Kafka → Receiver → BlockManager → Task Kafka → Task
偏移量管理 Zookeeper Spark/外部系统
语义保证 At-Least-Once Exactly-Once
并行度 由Receiver数量决定 与Kafka分区数一致
适用版本 Kafka 0.8+ Kafka 0.10+
生产推荐 逐步淘汰 标准方案

一句话总结

  • Receiver方式:先收后处理,有WAL保障,但可能重复
  • Direct方式:直接拉取处理,精确一次,推荐使用

随着Kafka和Spark的版本演进,Direct方式已经成为事实标准,提供更好的性能、精确的语义保证和更简洁的架构。

在这里插入图片描述

🌺The End🌺点点关注,收藏不迷路🌺
© 版权声明

相关文章