KafkaUtils.createDStream vs createDirectStream:Spark Streaming整合Kafka的两种方式
KafkaUtils.createDStream vs createDirectStream:Spark Streaming整合Kafka的两种方式
-
- 引言
- 一、核心区别概览
- 二、Receiver方式 (createDStream) 详解
-
- 2.1 工作原理图
- 2.2 代码示例
- 2.3 关键配置
- 三、Direct方式 (createDirectStream) 详解
-
- 3.1 工作原理图
- 3.2 代码示例
- 四、核心区别深度对比
-
- 4.1 数据接收与存储
- 4.2 并行度控制
- 4.3 容错与语义保证
- 五、优缺点对比
-
- 5.1 Receiver方式
- 5.2 Direct方式
- 六、偏移量管理对比
-
- 6.1 Receiver方式偏移量管理
- 6.2 Direct方式偏移量管理
- 七、性能对比
- 八、选型建议
-
- 8.1 决策树
- 8.2 场景推荐
- 九、生产环境配置示例
-
- 9.1 Receiver方式配置
- 9.2 Direct方式配置
- 十、总结
|
🌺The Begin🌺点点关注,收藏不迷路🌺
|
引言
在Spark Streaming与Kafka整合的历史演进中,出现过两种主要的API:基于Receiver的createDStream和基于Direct的createDirectStream。理解这两者的区别,对于设计可靠、高效的实时数据处理系统至关重要。本文将深入对比两种方式的原理、优缺点及适用场景。
一、核心区别概览
| 对比维度 | createDStream (Receiver方式) | createDirectStream (Direct方式) |
|---|---|---|
| 引入版本 | Spark 1.0+ | Spark 1.3+ |
| 数据接收 | Receiver线程持续接收 | Driver触发时直接拉取 |
| 存储方式 | Executor内存 + WAL | 无中间存储 |
| 偏移量管理 | Zookeeper | Spark Checkpoint/外部系统 |
| 语义保证 | At-Least-Once | Exactly-Once |
| 并行度 | 由Receiver数量决定 | 与Kafka分区数一致 |
二、Receiver方式 (createDStream) 详解
2.1 工作原理图
Driver
Spark Executor 2
Spark Executor 1
Kafka集群
更新偏移量
更新偏移量
Partition 0
Partition 1
Partition 2
Receiver线程
消费Partition 0&1
BlockManager
内存缓存
WAL预写日志
HDFS
Receiver线程
消费Partition 2
BlockManager
内存缓存
WAL预写日志
HDFS
ReceiverTracker
Zookeeper
偏移量存储
2.2 代码示例
// Receiver方式(Kafka 0.8 API)
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
// 启用WAL(可选)
ssc.checkpoint("hdfs://namenode/checkpoint")
System.setProperty("spark.streaming.receiver.writeAheadLog.enable", "true")
val kafkaStream = KafkaUtils.createStream(
ssc,
"zookeeper:2181", // Zookeeper连接
"spark-streaming-group", // Consumer组
Map("topic1" -> 1, "topic2" -> 1), // 主题和并行度
StorageLevel.MEMORY_AND_DISK_SER_2 // 存储级别
)
// 处理数据
kafkaStream.map(record => (record._1, record._2))
.print()
2.3 关键配置
// 启用WAL(写前日志)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
// 存储级别(双副本+序列化+内存磁盘)
StorageLevel.MEMORY_AND_DISK_SER_2
// 背压控制
sparkConf.set("spark.streaming.backpressure.enabled", "true")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "1000")
三、Direct方式 (createDirectStream) 详解
3.1 工作原理图
Executor 2
Executor 1
Kafka集群
Driver
偏移量管理
Driver
偏移量获取
查询Kafka
偏移量存储
Checkpoint/外部
Partition 0
Partition 1
Partition 2
Task 1
读取Partition 0
Task 2
读取Partition 1
Task 3
读取Partition 2
3.2 代码示例
// Direct方式(Kafka 0.10+ API)
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "broker1:9092,broker2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-direct-demo",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean) // 手动提交
)
val topics = Array("topic1", "topic2")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // 均匀分布
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// 处理数据
stream.foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 业务处理
rdd.map(record => (record.key(), record.value()))
.foreach(processRecord)
// 手动提交偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
四、核心区别深度对比
4.1 数据接收与存储
| 特性 | Receiver方式 | Direct方式 |
|---|---|---|
| 数据接收 | Receiver线程持续接收 | Task直接拉取 |
| 中间存储 | Executor内存+序列化 | 无中间存储 |
| 数据备份 | WAL + 双副本 | 不备份 |
| 内存压力 | 高 | 低 |
4.2 并行度控制
// Receiver方式:并行度由Receiver数量决定
val receiverStream = KafkaUtils.createStream(
ssc,
zkQuorum,
groupId,
Map("topic1" -> 3) // 3个Receiver,3个并行度
)
// Direct方式:并行度由Kafka分区数决定
// 如果topic1有10个分区,自动有10个并行度
val directStream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe(Array("topic1"), kafkaParams)
)
4.3 容错与语义保证
Direct方式 – Exactly-Once
从未提交偏移量开始
获取偏移量范围
处理数据
提交偏移量
同事务
故障恢复
Receiver方式 – At-Least-Once
重启后重读WAL
接收数据
写入WAL
更新Zookeeper偏移量
处理数据
Driver故障
五、优缺点对比
5.1 Receiver方式
| 优点 | 缺点 |
|---|---|
| 实现简单,API友好 | 需要额外内存存储数据 |
| 支持WAL保证数据不丢 | At-Least-Once可能重复 |
| 与旧版Kafka兼容 | Receiver可能成为瓶颈 |
| 内置背压支持 | 数据延迟较大 |
5.2 Direct方式
| 优点 | 缺点 |
|---|---|
| Exactly-Once语义 | 需要手动管理偏移量 |
| 并行度与分区一致 | 不支持Kafka 0.8 |
| 无需额外内存存储 | 复杂度略高 |
| 低延迟 | 需要外部系统存储偏移量 |
六、偏移量管理对比
6.1 Receiver方式偏移量管理
// Receiver方式:偏移量存储在Zookeeper
// 自动提交,无需手动处理
val stream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
// 偏移量存储在Zookeeper路径:
// /consumers/[groupId]/offsets/[topic]/[partition]
6.2 Direct方式偏移量管理
// Direct方式:需要手动管理偏移量
// 方案1:提交到Kafka
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 业务处理
// 提交到Kafka
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
// 方案2:存储到外部系统(MySQL/Redis/HBase)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 开启事务
beginTransaction()
try {
processData(rdd)
saveOffsetsToHBase(offsetRanges) // 同事务
commitTransaction()
} catch {
case e: Exception => rollbackTransaction()
}
}
// 从HBase恢复偏移量
def getStoredOffsets(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
// 从HBase读取已提交偏移量
}
七、性能对比
| 指标 | Receiver方式 | Direct方式 |
|---|---|---|
| 最大吞吐 | 10-50 MB/s/node | 50-200 MB/s/node |
| 处理延迟 | 几秒~几分钟 | 毫秒~秒级 |
| 资源消耗 | 高(内存+磁盘) | 低 |
| 扩展性 | 受Receiver限制 | 随分区数扩展 |
八、选型建议
8.1 决策树
<0.8
Exactly-Once
At-Least-Once
内存充足
追求低延迟
选择Kafka整合方式
Kafka版本
Receiver方式
语义要求
Direct方式
资源情况
Receiver方式
启用WAL
Direct方式
8.2 场景推荐
| 场景 | 推荐方式 | 理由 |
|---|---|---|
| 金融交易 | Direct | 需要Exactly-Once |
| 日志收集 | Direct | 高吞吐,精确一次 |
| 老系统升级 | Receiver | 兼容Kafka 0.8 |
| 实时风控 | Direct | 低延迟要求 |
| 数据湖摄入 | Direct | 避免数据重复 |
九、生产环境配置示例
9.1 Receiver方式配置
val conf = new SparkConf()
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.maxRate", "5000")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.backpressure.initialRate", "1000")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
ssc.checkpoint("hdfs://namenode/checkpoint")
val stream = KafkaUtils.createStream(
ssc,
"zk1:2181,zk2:2181/consumer",
"group1",
Map("topic1" -> 3),
StorageLevel.MEMORY_AND_DISK_SER_2
)
9.2 Direct方式配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "broker1:9092,broker2:9092",
"group.id" -> "group1",
"enable.auto.commit" -> (false: java.lang.Boolean),
"auto.offset.reset" -> "earliest",
"max.poll.records" -> "1000",
"session.timeout.ms" -> "30000",
"heartbeat.interval.ms" -> "3000"
)
val stream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe(Array("topic1"), kafkaParams)
)
// Exactly-Once处理
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 业务处理(确保幂等)
rdd.foreachPartition { partition =>
val connection = createHBaseConnection()
partition.foreach { record =>
connection.saveWithRowKey(record.key(), record.value())
}
connection.close()
}
// 提交偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
十、总结
| 维度 | createDStream (Receiver) | createDirectStream (Direct) |
|---|---|---|
| 核心原理 | Receiver接收+缓存+WAL | Driver调度+Task直连 |
| 数据流 | Kafka → Receiver → BlockManager → Task | Kafka → Task |
| 偏移量管理 | Zookeeper | Spark/外部系统 |
| 语义保证 | At-Least-Once | Exactly-Once |
| 并行度 | 由Receiver数量决定 | 与Kafka分区数一致 |
| 适用版本 | Kafka 0.8+ | Kafka 0.10+ |
| 生产推荐 | 逐步淘汰 | 标准方案 |
一句话总结:
- Receiver方式:先收后处理,有WAL保障,但可能重复
- Direct方式:直接拉取处理,精确一次,推荐使用
随着Kafka和Spark的版本演进,Direct方式已经成为事实标准,提供更好的性能、精确的语义保证和更简洁的架构。

|
🌺The End🌺点点关注,收藏不迷路🌺
|