Spark持久化机制详解:从persist()到存储级别选择
@TOC
|
🌺The Begin🌺点点关注,收藏不迷路🌺
|
引言
在Spark作业中,重复使用同一个RDD或DataFrame进行多次操作(如多次count、多次写入)时,如果不加优化,每次Action都会触发完整的计算链路,造成严重的资源浪费。Spark的持久化机制正是为解决这一问题而设计。本文将深入探讨persist()的原理、使用场景,并详细对比不同存储级别的优缺点。
一、重复计算问题的产生
1.1 问题场景复现
# 示例:未使用持久化的情况
rdd = sc.textFile("hdfs://logs/2024-01-01/*") # 100GB日志数据
filtered = rdd.filter(lambda line: "ERROR" in line) # 过滤出错误日志
# 第一次Action:触发完整计算
error_count = filtered.count()
# 耗时:5分钟,扫描100GB数据
# 第二次Action:再次触发完整计算
error_samples = filtered.take(10)
# 再次耗时:5分钟,又扫描100GB数据
# 第三次Action:又触发完整计算
error_levels = filtered.map(extract_level).countByValue()
# 再次耗时:5分钟,又一次扫描100GB数据
# 总耗时:15分钟,重复扫描3次
无持久化
读取HDFS
filter转换
count Action
take Action
countByValue Action
重复读取
100GB
1.2 重复计算的代价
| 维度 | 影响 | 具体表现 |
|---|---|---|
| 计算资源 | CPU空转 | 相同计算重复执行 |
| IO开销 | 重复读取数据 | HDFS带宽浪费 |
| 网络传输 | 重复Shuffle | 网络IO增加 |
| 执行时间 | 线性增长 | 作业耗时成倍增加 |
二、persist()持久化机制
2.1 基本原理
persist()将RDD/DataFrame的计算结果缓存到存储介质(内存/磁盘)中,后续操作直接读取缓存数据,避免重复计算。
# 使用持久化的版本
from pyspark import StorageLevel
# 持久化关键点
filtered = rdd.filter(lambda line: "ERROR" in line) \
.persist(StorageLevel.MEMORY_ONLY)
# 第一次Action:触发计算并缓存
error_count = filtered.count() # 执行计算,同时缓存结果
# 后续Action:直接使用缓存
error_samples = filtered.take(10) # 从缓存读取,不重新计算
error_levels = filtered.map(extract_level).countByValue() # 同样从缓存读取
# 总耗时:5分钟(第一次)+ 秒级(后续)
使用持久化
读取HDFS
filter转换
缓存到
内存/磁盘
count Action
take Action
countByValue Action
从缓存读取
无需重复计算
2.2 持久化的生命周期
# 1. 标记持久化
df.persist(StorageLevel.MEMORY_ONLY) # 只标记,还未缓存
# 2. 触发计算并缓存
df.count() # 实际执行计算,数据进入缓存
# 3. 使用缓存
df.filter(...).show() # 基于缓存数据操作
# 4. 手动清理(可选)
df.unpersist() # 立即清除缓存
# 或等待自动LRU淘汰
三、存储级别详解
3.1 Spark支持的存储级别
| 存储级别 | 使用内存 | 使用磁盘 | 是否序列化 | 是否复制 | 适用场景 |
|---|---|---|---|---|---|
| MEMORY_ONLY | ✅ | ❌ | ❌ | ❌ | 小数据集,内存充足 |
| MEMORY_ONLY_SER | ✅ | ❌ | ✅ | ❌ | 内存有限,CPU换空间 |
| MEMORY_AND_DISK | ✅ | ✅ | ❌ | ❌ | 大数据集,内存可能不足 |
| MEMORY_AND_DISK_SER | ✅ | ✅ | ✅ | ❌ | 大数据集,内存紧张 |
| DISK_ONLY | ❌ | ✅ | ✅ | ❌ | 超大数据集,重用但不求快 |
| OFF_HEAP | ✅(堆外) | ❌ | ✅ | ❌ | 避免GC影响 |
3.2 存储级别源码解析
// Spark StorageLevel 源码核心定义
class StorageLevel(
val useDisk: Boolean, // 是否使用磁盘
val useMemory: Boolean, // 是否使用内存
val useOffHeap: Boolean, // 是否使用堆外内存
val deserialized: Boolean, // 是否反序列化存储
val replication: Int // 副本数
)
// 预定义级别
val MEMORY_ONLY = new StorageLevel(false, true, false, true, 1)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1)
val DISK_ONLY = new StorageLevel(true, false, false, false, 1)
四、MEMORY_ONLY vs MEMORY_AND_DISK 深度对比
4.1 核心差异对比表
| 对比维度 | MEMORY_ONLY | MEMORY_AND_DISK |
|---|---|---|
| 存储策略 | 只存内存,存不下就丢弃 | 优先内存,存不下溢写到磁盘 |
| 内存不足时 | 分区丢失,需要重算 | 分区写入磁盘,不丢失 |
| 读取速度 | 最快(纯内存) | 较快(内存快,磁盘次之) |
| 容错能力 | 弱(依赖血统重算) | 强(磁盘有备份) |
| GC压力 | 较大(大量对象) | 较小(部分数据在磁盘) |
| 适用数据量 | 小于内存 | 可大于内存 |
4.2 行为差异可视化
MEMORY_AND_DISK 行为
是
否
尝试缓存到内存
内存是否足够?
全部缓存内存
内存放不下的
溢写到磁盘
需要时从磁盘读取
无需重新计算
MEMORY_ONLY 行为
是
否
尝试全部缓存到内存
内存是否足够?
全部缓存内存
读取极快
部分分区丢失
需要时重新计算
性能下降
可能反复计算
4.3 代码示例对比
from pyspark import StorageLevel
# 场景:缓存10GB数据,Executor内存为8GB
# 方案1:MEMORY_ONLY
df1 = spark.range(1000000000).persist(StorageLevel.MEMORY_ONLY)
df1.count() # 第一次Action:尝试缓存10GB数据
# 结果:内存不足,部分分区无法缓存
# 后续:每次操作都会重新计算丢失的分区
# 方案2:MEMORY_AND_DISK
df2 = spark.range(1000000000).persist(StorageLevel.MEMORY_AND_DISK)
df2.count() # 第一次Action:缓存8GB到内存,2GB溢写到磁盘
# 结果:所有数据都被持久化(内存+磁盘)
# 后续:从内存+磁盘读取,无需重新计算
4.4 性能测试对比
// 性能测试代码
def compareStorageLevels(spark: SparkSession, dataSizeGB: Int): Unit = {
val df = spark.range(dataSizeGB * 10000000L).select(
rand() as "value",
floor(rand() * 1000) as "key"
)
// 测试MEMORY_ONLY
val dfMemoryOnly = df.persist(StorageLevel.MEMORY_ONLY)
val start1 = System.currentTimeMillis()
dfMemoryOnly.count()
val cacheTime1 = System.currentTimeMillis() - start1
val start2 = System.currentTimeMillis()
dfMemoryOnly.groupBy("key").count().collect()
val queryTime1 = System.currentTimeMillis() - start2
// 测试MEMORY_AND_DISK
val dfMemoryAndDisk = df.persist(StorageLevel.MEMORY_AND_DISK)
val start3 = System.currentTimeMillis()
dfMemoryAndDisk.count()
val cacheTime2 = System.currentTimeMillis() - start3
val start4 = System.currentTimeMillis()
dfMemoryAndDisk.groupBy("key").count().collect()
val queryTime2 = System.currentTimeMillis() - start4
println(s"""
|MEMORY_ONLY:
| 缓存耗时: ${cacheTime1}ms
| 查询耗时: ${queryTime1}ms
|
|MEMORY_AND_DISK:
| 缓存耗时: ${cacheTime2}ms
| 查询耗时: ${queryTime2}ms
""".stripMargin)
}
典型测试结果(数据量稍大于内存):
| 指标 | MEMORY_ONLY | MEMORY_AND_DISK | 差异 |
|---|---|---|---|
| 首次缓存耗时 | 120s | 135s | MEMORY_AND_DISK慢12% |
| 后续查询耗时 | 30-80s(波动) | 15s | MEMORY_AND_DISK快50%+ |
| CPU使用率 | 高(频繁重算) | 低 | MEMORY_AND_DISK更优 |
| 稳定性 | 差 | 好 | MEMORY_AND_DISK胜出 |
五、存储级别选择指南
5.1 选型决策树
是
否
是
否
是
否
是
否
需要持久化的数据集
数据是否能
完全装入内存?
是否关注GC影响?
MEMORY_ONLY
最佳性能
MEMORY_ONLY_SER
减少内存占用
是否容忍
部分重算?
MEMORY_ONLY
配合checkpoint
MEMORY_AND_DISK
安全可靠
CPU是否紧张?
MEMORY_AND_DISK_SER
节省空间
MEMORY_AND_DISK
避免序列化开销
5.2 业务场景推荐
| 业务场景 | 数据特征 | 推荐存储级别 | 理由 |
|---|---|---|---|
| 迭代算法(ML) | 多次重用,数据可完全装入内存 | MEMORY_ONLY | 追求极致性能 |
| ETL中间结果 | 数据量大,后续多次查询 | MEMORY_AND_DISK | 保证可用性,避免重算 |
| 实时查询服务 | 热数据,要求低延迟 | MEMORY_ONLY | 最快响应 |
| 数据探索分析 | 数据量大,但只需部分 | MEMORY_ONLY | 简单快速 |
| Shuffle中间结果 | 临时数据,可能溢出 | MEMORY_AND_DISK_SER | 节省内存,自动溢写 |
5.3 实际应用示例
# 示例1:机器学习特征工程
from pyspark import StorageLevel
from pyspark.ml.feature import VectorAssembler
# 原始特征数据(可完全装入内存)
features_df = spark.read.parquet("hdfs://features/")
# 特征工程需要多次使用
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
vector_df = assembler.transform(features_df)
# 选择MEMORY_ONLY:数据可完全装入内存,追求训练速度
vector_df.persist(StorageLevel.MEMORY_ONLY)
# 多次用于模型训练
for i in range(10):
model = algo.fit(vector_df) # 每次都使用缓存数据
# 示例2:数据仓库ETL(数据量大)
from pyspark import StorageLevel
# 读取原始日志,清洗后需要多次使用
raw_logs = spark.read.parquet("hdfs://logs/2024-01-01/")
cleaned_logs = raw_logs.filter("status != 'test'") \
.dropDuplicates(["user_id", "timestamp"])
# 数据量大约200GB,集群内存总共500GB,但有其他任务
# 选择MEMORY_AND_DISK:确保可用性,避免失败重算
cleaned_logs.persist(StorageLevel.MEMORY_AND_DISK)
# 多个下游任务
cleaned_logs.createOrReplaceTempView("logs")
spark.sql("SELECT date, COUNT(*) FROM logs GROUP BY date").write.saveAsTable("daily_stats")
spark.sql("SELECT city, SUM(amount) FROM logs GROUP BY city").write.saveAsTable("city_stats")
spark.sql("SELECT user_id, COUNT(*) FROM logs GROUP BY user_id HAVING count>10").write.saveAsTable("active_users")
六、高级技巧与最佳实践
6.1 缓存监控
# 查看缓存状态
spark.catalog.listTables()
spark.catalog.isCached("table_name")
spark.catalog.clearCache() # 清理所有缓存
# 通过Spark UI查看
# 访问: http://driver:4040/storage/
6.2 缓存策略优化
# 策略1:选择性缓存
# 只缓存需要多次使用的部分
essential_data = complex_transformations \
.select("key", "value") \
.filter("key IS NOT NULL") \
.persist(StorageLevel.MEMORY_AND_DISK)
# 策略2:缓存+检查点结合
# 对于超长血统,定期做checkpoint
df.persist(StorageLevel.MEMORY_AND_DISK)
df.checkpoint() # 截断血统链
# 策略3:动态缓存管理
def smart_cache(df, threshold_hours=2):
"""根据预计重用时间决定是否缓存"""
estimated_reuse_count = estimate_future_usage(df, threshold_hours)
if estimated_reuse_count > 1:
return df.persist(StorageLevel.MEMORY_AND_DISK)
return df
6.3 常见问题与解决方案
| 问题 | 现象 | 解决方案 |
|---|---|---|
| 内存占用过高 | GC频繁,任务变慢 | 改用MEMORY_ONLY_SER或MEMORY_AND_DISK_SER |
| 缓存未生效 | 仍然重复计算 | 检查是否在Action前调用persist() |
| 磁盘溢写过多 | 查询变慢 | 增加Executor内存,或改用MEMORY_ONLY |
| 数据倾斜导致缓存失效 | 部分分区丢失 | 结合salting技术,或改用MEMORY_AND_DISK |
七、总结
| 对比维度 | MEMORY_ONLY | MEMORY_AND_DISK |
|---|---|---|
| 核心优势 | 极致性能 | 稳定可靠 |
| 适用数据量 | 小于可用内存 | 可大于可用内存 |
| 容错能力 | 依赖血统重算 | 磁盘备份,无需重算 |
| GC压力 | 较大 | 较小 |
| 适用场景 | 小数据集、迭代算法 | 大数据集、ETL作业 |
选择口诀:
数据量小内存足,MEMORY_ONLY最快速
数据量大不确定,MEMORY_AND_DISK保底数
既要性能又省心,根据场景做评估
持久化是大杀器,重复计算全消除
合理使用persist()并选择合适的存储级别,可以有效减少重复计算,提升Spark作业性能。在实际应用中,建议根据数据特征和业务需求,结合监控数据做出最佳选择。

|
🌺The End🌺点点关注,收藏不迷路🌺
|