ODS到DWD数据清洗实战:基于Spark的高效ETL实现

ODS到DWD数据清洗实战:基于Spark的高效ETL实现

    • 引言
    • 一、ODS与DWD层概述
      • 1.1 分层定义
      • 1.2 清洗流程概览
    • 二、核心清洗操作详解
      • 2.1 数据过滤
      • 2.2 数据去重
        • 2.2.1 去重策略选择
        • 2.2.2 去重实现
      • 2.3 空值处理
        • 2.3.1 空值处理策略
        • 2.3.2 空值处理实现
      • 2.4 格式标准化
        • 2.4.1 时间格式标准化
        • 2.4.2 字符串标准化
    • 三、完整清洗流程实现
      • 3.1 封装清洗函数
      • 3.2 执行清洗作业
    • 四、性能优化技巧
      • 4.1 分区策略优化
      • 4.2 内存与缓存优化
      • 4.3 并行度调优
    • 五、数据质量监控
      • 5.1 质量检查指标
      • 5.2 异常数据捕获
    • 六、总结

🌺The Begin🌺点点关注,收藏不迷路🌺

引言

在数据仓库分层架构中,ODS(操作数据存储)层到DWD(数据仓库明细)层的处理是数据治理的关键环节。这一阶段需要对原始数据进行清洗、去重、标准化,形成高质量、可用的明细数据。本文将深入探讨如何利用Spark实现高效、可扩展的ODS→DWD清洗流程。

一、ODS与DWD层概述

1.1 分层定义

层次 全称 数据特征 主要作用
ODS层 操作数据存储 原始数据,未经过处理,可能存在脏数据、重复数据 数据备份、追溯
DWD层 数据仓库明细 清洗后的明细数据,结构化、标准化 提供高质量数据供下游使用

1.2 清洗流程概览

DWD层

清洗过程

ODS层

业务库Binlog

ODS表

日志文件

外部数据

数据过滤

去重处理

空值处理

格式标准化

维度退化

DWD表

数据服务

数据分析

数据挖掘

二、核心清洗操作详解

2.1 数据过滤

数据过滤是清洗的第一步,旨在剔除无效、异常数据。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
  .appName("ODS2DWD_Cleaning")
  .enableHiveSupport()
  .getOrCreate()
// 读取ODS层数据
val odsOrders = spark.table("ods.orders")
// 基础过滤规则
val filteredOrders = odsOrders
  // 过滤关键字段为null的记录
  .filter(col("order_id").isNotNull)
  .filter(col("order_time").isNotNull)
  .filter(col("city_id").isNotNull)
  // 过滤异常数据
  .filter(col("order_amount") > 0)  // 金额必须为正
  .filter(col("order_amount") < 1000000)  // 剔除异常大额
  // 过滤时间范围
  .filter(col("order_time") >= lit("2024-01-01"))
  .filter(col("order_time") <= lit("2024-12-31"))
println(s"过滤后数据量: ${filteredOrders.count()}")

2.2 数据去重

2.2.1 去重策略选择
去重级别 适用场景 代码示例 性能影响
完全去重 所有字段完全重复的记录 df.dropDuplicates() 高(需要全字段比较)
指定主键去重 业务主键重复(如订单号) df.dropDuplicates("order_id")
组合键去重 联合主键重复 df.dropDuplicates("order_id", "city_id")
时间窗口去重 保留最新记录 需配合窗口函数
2.2.2 去重实现
// 1. 简单去重(按指定列)
val dedupedOrders = filteredOrders
  .dropDuplicates("order_id", "city_id")  // 组合键去重
  .dropDuplicates("order_id")              // 也可以单独调用
// 2. 复杂去重:保留最新记录(使用窗口函数)
import org.apache.spark.sql.expressions.Window
val windowSpec = Window
  .partitionBy("order_id")  // 按订单号分组
  .orderBy(col("update_time").desc)  // 按更新时间降序
val latestOrders = filteredOrders
  .withColumn("rn", row_number().over(windowSpec))
  .filter(col("rn") === 1)
  .drop("rn")
// 3. 多条件去重:根据业务规则选择保留
val businessDedup = filteredOrders
  .dropDuplicates("order_id")  // 先简单去重
  .union(
    filteredOrders
      .filter(col("order_status") === "completed")
      .dropDuplicates("order_id", "city_id")
  )
  .dropDuplicates("order_id")  // 最终去重

2.3 空值处理

2.3.1 空值处理策略
字段类型 处理策略 示例 说明
关键字段 直接过滤 filter(col("id").isNotNull) 缺少关键信息,无法使用
数值字段 填充默认值 na.fill(0) 如金额、数量等
字符串字段 填充"unknown" na.fill("unknown") 如备注、描述等
时间字段 填充当前时间 when(col("ts").isNull, current_timestamp()) 更新时间
2.3.2 空值处理实现
// 1. 单字段空值填充
val cleanedOrders = dedupedOrders
  // 数值字段填充0
  .na.fill(0, Seq("order_amount", "discount_amount", "shipping_fee"))
  // 字符串字段填充默认值
  .na.fill("unknown", Seq("driver_id", "passenger_id", "remark"))
  // 使用条件填充
  .withColumn("payment_method",
    when(col("payment_method").isNull, lit("cash"))
    .otherwise(col("payment_method"))
  )
  // 时间字段特殊处理
  .withColumn("update_time",
    when(col("update_time").isNull, current_timestamp())
    .otherwise(col("update_time"))
  )
// 2. 批量填充不同类型字段
val fillMap = Map(
  "order_amount" -> 0.0,
  "discount_amount" -> 0.0,
  "driver_id" -> "unknown",
  "passenger_phone" -> "",
  "remark" -> ""
)
val batchFilled = cleanedOrders.na.fill(fillMap)
// 3. 复杂空值逻辑:业务规则填充
val businessFilled = batchFilled
  .withColumn("order_status",
    when(col("order_status").isNull, 
         when(col("pay_time").isNotNull, "paid")
         .when(col("complete_time").isNotNull, "completed")
         .otherwise("unknown"))
    .otherwise(col("order_status"))
  )

2.4 格式标准化

2.4.1 时间格式标准化
// 1. 统一时间格式为 yyyy-MM-dd HH:mm:ss
val timeStandardized = batchFilled
  // 字符串转时间戳
  .withColumn("order_time_std",
    when(col("order_time").cast("timestamp").isNotNull,
         col("order_time").cast("timestamp"))
    .otherwise(to_timestamp(col("order_time"), "yyyy/MM/dd HH:mm:ss"))
  )
  .withColumn("order_time_std",
    when(col("order_time_std").isNotNull, col("order_time_std"))
    .otherwise(to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss"))
  )
  // 提取日期维度
  .withColumn("order_date", to_date(col("order_time_std")))
  .withColumn("order_hour", hour(col("order_time_std")))
  .withColumn("order_week", weekofyear(col("order_time_std")))
  .withColumn("order_month", month(col("order_time_std")))
  // 删除原始时间列,使用标准化后的
  .drop("order_time")
  .withColumnRenamed("order_time_std", "order_time")
// 2. 处理时区问题
import java.util.TimeZone
spark.conf.set("spark.sql.session.timeZone", "Asia/Shanghai")
val timezoneFixed = timeStandardized
  .withColumn("order_time_utc", to_utc_timestamp(col("order_time"), "Asia/Shanghai"))
2.4.2 字符串标准化
// 1. 统一大小写、去除空格
val stringStandardized = timeStandardized
  .withColumn("city_name", upper(trim(col("city_name"))))
  .withColumn("province", initcap(trim(col("province"))))
  .withColumn("remark", regexp_replace(col("remark"), "\\s+", " "))
// 2. 手机号脱敏处理
val desensitized = stringStandardized
  .withColumn("passenger_phone",
    when(col("passenger_phone").isNotNull,
         concat(
           substring(col("passenger_phone"), 1, 3),
           lit("****"),
           substring(col("passenger_phone"), -4, 4)
         ))
    .otherwise(col("passenger_phone"))
  )
// 3. 枚举值标准化
val enumStandardized = desensitized
  .withColumn("order_type",
    when(lower(col("order_type")).contains("快车"), "express")
    .when(lower(col("order_type")).contains("专车"), "premium")
    .when(lower(col("order_type")).contains("拼车"), "carpool")
    .otherwise("other")
  )

三、完整清洗流程实现

3.1 封装清洗函数

// 订单数据清洗类
class OrderDataCleaner(spark: SparkSession) {
  import spark.implicits._
  /**
   * 完整的ODS→DWD清洗流程
   */
  def cleanOrders(odsTable: String, dwdTable: String, date: String): Unit = {
    val odsDF = spark.table(odsTable)
      .filter(col("dt") === date)  // 只处理指定日期
    // 步骤1:数据过滤
    val filtered = filterOrders(odsDF)
    // 步骤2:数据去重
    val deduped = deduplicateOrders(filtered)
    // 步骤3:空值处理
    val nullFilled = handleNulls(deduped)
    // 步骤4:格式标准化
    val standardized = standardizeFormats(nullFilled)
    // 步骤5:字段增强
    val enhanced = enhanceFields(standardized)
    // 写入DWD层
    enhanced.write
      .mode("overwrite")
      .partitionBy("dt")
      .format("parquet")
      .saveAsTable(dwdTable)
    println(s"成功清洗 $date 数据,共 ${enhanced.count()} 条")
  }
  /**
   * 数据过滤
   */
  private def filterOrders(df: DataFrame): DataFrame = {
    df.filter(col("order_id").isNotNull)
      .filter(col("order_time").isNotNull)
      .filter(col("city_id").isNotNull)
      .filter(col("order_amount") > 0)
      .filter(col("order_amount") < 1000000)
  }
  /**
   * 数据去重
   */
  private def deduplicateOrders(df: DataFrame): DataFrame = {
    // 按订单号去重,保留最新的
    val windowSpec = Window
      .partitionBy("order_id")
      .orderBy(col("update_time").desc)
    df.withColumn("rn", row_number().over(windowSpec))
      .filter(col("rn") === 1)
      .drop("rn")
  }
  /**
   * 空值处理
   */
  private def handleNulls(df: DataFrame): DataFrame = {
    df.na.fill(Map(
      "driver_id" -> "unknown",
      "passenger_name" -> "",
      "order_amount" -> 0.0,
      "discount_amount" -> 0.0,
      "remark" -> ""
    )).na.fill(0, Seq("order_amount", "discount_amount", "shipping_fee"))
  }
  /**
   * 格式标准化
   */
  private def standardizeFormats(df: DataFrame): DataFrame = {
    // 时间标准化
    val withTime = df
      .withColumn("order_time_std", 
        to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss"))
      .withColumn("order_date", to_date(col("order_time_std")))
      .withColumn("order_hour", hour(col("order_time_std")))
    // 字符串标准化
    val withString = withTime
      .withColumn("city_name", upper(trim(col("city_name"))))
      .withColumn("order_type", 
        when(lower(col("order_type")).contains("快车"), "express")
        .when(lower(col("order_type")).contains("专车"), "premium")
        .otherwise("other"))
    withString.drop("order_time").withColumnRenamed("order_time_std", "order_time")
  }
  /**
   * 字段增强
   */
  private def enhanceFields(df: DataFrame): DataFrame = {
    df.withColumn("amount_with_tax", col("order_amount") * 1.06)
      .withColumn("is_weekend", dayofweek(col("order_date")).isin(1, 7))
      .withColumn("processing_time", current_timestamp())
  }
}

3.2 执行清洗作业

object ODSToDWDJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ODS_To_DWD_Cleaning")
      .config("spark.sql.adaptive.enabled", "true")
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .config("spark.sql.parquet.compression.codec", "snappy")
      .enableHiveSupport()
      .getOrCreate()
    try {
      val cleaner = new OrderDataCleaner(spark)
      // 处理历史数据
      val dates = Seq("2024-01-01", "2024-01-02", "2024-01-03")
      dates.foreach { date =>
        cleaner.cleanOrders("ods.orders", "dwd.orders", date)
      }
      // 处理当天数据(增量)
      val today = java.time.LocalDate.now.toString
      cleaner.cleanOrders("ods.orders", "dwd.orders", today)
    } finally {
      spark.stop()
    }
  }
}

四、性能优化技巧

4.1 分区策略优化

// 1. 合理设置分区键
val partitionedDF = cleanedDF
  .repartition(200, col("order_date"), col("city_id"))
// 2. 使用分区裁剪
val incrementalRead = spark.table("ods.orders")
  .filter(col("dt") === today)  // 只读当天分区
// 3. 动态分区写入
cleanedDF.write
  .mode("overwrite")
  .partitionBy("dt", "city_id")
  .format("parquet")
  .option("maxRecordsPerFile", "1000000")  // 控制文件大小
  .saveAsTable("dwd.orders")

4.2 内存与缓存优化

// 1. 缓存中间结果
val cleanedCache = cleanedDF.persist(StorageLevel.MEMORY_AND_DISK)
// 2. 检查点设置
spark.sparkContext.setCheckpointDir("hdfs://checkpoint/")
cleanedDF.checkpoint()
// 3. 广播小维表
val dimCity = spark.table("dim.city").filter("is_active=1")
val broadcastDim = broadcast(dimCity)
val enriched = cleanedDF.join(broadcastDim, Seq("city_id"), "left")

4.3 并行度调优

// 根据数据量动态调整分区数
val dataSize = spark.table("ods.orders").filter(col("dt") === today).count()
val targetPartitionSize = 128 * 1024 * 1024  // 128MB
val recommendedPartitions = (dataSize / targetPartitionSize).toInt.max(200)
spark.conf.set("spark.sql.shuffle.partitions", recommendedPartitions.toString)

五、数据质量监控

5.1 质量检查指标

// 清洗前后对比
def qualityCheck(odsDF: DataFrame, dwdDF: DataFrame, date: String): Unit = {
  val odsCount = odsDF.filter(col("dt") === date).count()
  val dwdCount = dwdDF.filter(col("dt") === date).count()
  println(s"""
    |========== 数据质量报告 [$date] ==========
    |ODS层数据量: $odsCount
    |DWD层数据量: $dwdCount
    |清洗率: ${(odsCount - dwdCount) * 100.0 / odsCount}% 被过滤
    |空值率 - driver_id: ${nullRate(dwdDF, "driver_id")}%
    |空值率 - passenger_id: ${nullRate(dwdDF, "passenger_id")}%
    |重复率: ${duplicateRate(dwdDF, "order_id")}%
    |========================================
  """.stripMargin)
}
def nullRate(df: DataFrame, column: String): Double = {
  val total = df.count()
  val nullCount = df.filter(col(column).isNull).count()
  nullCount * 100.0 / total
}
def duplicateRate(df: DataFrame, key: String): Double = {
  val total = df.count()
  val uniqueCount = df.select(key).distinct().count()
  (total - uniqueCount) * 100.0 / total
}

5.2 异常数据捕获

// 将异常数据写入异常表
val abnormalData = odsOrders
  .filter(col("order_amount") <= 0 || col("order_amount") >= 1000000)
abnormalData.write
  .mode("append")
  .partitionBy("dt")
  .saveAsTable("dwd.abnormal_orders")

六、总结

清洗阶段 主要操作 优化要点 质量指标
数据过滤 非空检查、范围过滤 分区裁剪、谓词下推 过滤率<20%
数据去重 dropDuplicates、窗口函数 选择合适的去重粒度 重复率<0.1%
空值处理 fill、when-otherwise 批量填充、业务规则 关键字段空值率=0
格式标准化 to_timestamp、regexp_replace 内置函数优化 格式统一率100%
字段增强 withColumn、join维表 广播小表、列剪枝 字段完整度提升

核心原则

  1. 可重复性:清洗逻辑要幂等,支持重跑
  2. 可观测性:埋点监控,及时发现质量问题
  3. 高性能:合理利用Spark优化技术
  4. 可维护性:清洗逻辑模块化,便于扩展

通过以上方法,可以构建一个高效、可靠、可扩展的ODS→DWD数据清洗流程,为数据分析和数据服务提供高质量的数据基础。

在这里插入图片描述

🌺The End🌺点点关注,收藏不迷路🌺
© 版权声明

相关文章