淘宝客APP数据湖架构:Iceberg + Flink实现的历史数据回溯与增量计算统一存储方案

淘宝客APP数据湖架构:Iceberg + Flink实现的历史数据回溯与增量计算统一存储方案

大家好,我是高佣返利省赚客APP研发者微赚! 在淘宝客业务中,订单数据的准确性直接关系到用户的钱袋子和平台的信誉。面对每日亿级的流水记录、频繁的订单状态变更(如下单、付款、结算、失效)以及复杂的佣金追溯需求,传统的Hive数仓在ACID事务支持和实时性上显得捉襟见肘,而单纯的Kafka流处理又难以满足大规模历史数据回溯分析的要求。为此,省赚客APP研发团队构建了基于Apache Iceberg和Flink的新一代数据湖架构,实现了流批一体的统一存储,完美解决了数据回溯难、增量计算慢、数据不一致三大痛点。

一、Iceberg表结构设计与ACID事务保障

Iceberg作为核心存储格式,提供了强大的隐藏分区、模式演进和行级更新能力。我们将订单明细表设计为Iceberg格式,利用其Snapshot机制管理数据版本,确保每一次状态变更都可追溯。

package cn.juwatech.iceberg.catalog;
import cn.juwatech.cn.model.OrderRecord;
import cn.juwatech.cn.config.IcebergConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.types.Types;
import org.apache.hadoop.conf.Configuration;
import static org.apache.iceberg.types.Types.NestedField.*;
public class OrderTableInitializer {
    private final Catalog catalog;
    private final Configuration hadoopConf;
    public OrderTableInitializer(Catalog catalog, Configuration conf) {
        this.catalog = catalog;
        this.hadoopConf = conf;
    }
    /**
     * 创建支持实时更新和时序分区的订单表
     */
    public void createOrderTable() {
        Schema schema = new Schema(
            required(1, "order_id", Types.StringType.get(), "订单ID"),
            required(2, "user_id", Types.StringType.get(), "用户ID"),
            required(3, "trade_status", Types.StringType.get(), "交易状态"),
            required(4, "commission_amount", Types.DoubleType.get(), "佣金金额"),
            required(5, "update_time", Types.TimestampType.withZone(), "更新时间"),
            required(6, "create_time", Types.TimestampType.withZone(), "创建时间")
        );
        // 按天分区,同时利用Iceberg的隐藏分区特性优化查询
        PartitionSpec spec = PartitionSpec.builderFor(schema)
            .days("create_time")
            .build();
        Namespace namespace = Namespace.of("shengzhuanke", "ods");
        if (!catalog.tableExists(cn.juwatech.cn.util.TableIdentifierUtil.of(namespace, "order_detail"))) {
            Table table = catalog.createTable(
                cn.juwatech.cn.util.TableIdentifierUtil.of(namespace, "order_detail"),
                schema,
                spec,
                cn.juwatech.cn.config.IcebergConfig.getTableProperties()
            );
            // 启用合并小文件和删除文件自动优化
            table.updateProperties()
                 .set("write.merge.target-file-size-bytes", "134217728")
                 .set("write.delete.enabled", "true")
                 .commit();
        }
    }
}

通过上述代码,我们定义了包含主键和时序字段的Schema,并开启了删除文件支持,为后续的Upsert操作奠定基础。

二、Flink CDC实时写入与Upsert逻辑

利用Flink CDC捕获MySQL Binlog,我们将实时的订单变更流直接写入Iceberg表。Flink的Checkpoint机制与Iceberg的Snapshot提交机制协同工作,保证了端到端的Exactly-Once语义。

package cn.juwatech.flink.sink.iceberg;
import cn.juwatech.cn.model.OrderChangeEvent;
import cn.juwatech.cn.serde.OrderChangeSerde;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
public class RealTimeOrderIngestion {
    public static void buildUpsertPipeline(StreamExecutionEnvironment env, Catalog icebergCatalog, String tableName) {
        DataStream<OrderChangeEvent> changeStream = env
            .addSource(cn.juwatech.cn.source.MySqlCdcSource.create())
            .name("MySQL CDC Source")
            .uid("cdc-source-uid");
        // 加载Iceberg表
        TableLoader tableLoader = TableLoader.fromCatalog(icebergCatalog, cn.juwatech.cn.util.TableIdentifierUtil.parse(tableName));
        Table table = icebergCatalog.loadTable(cn.juwatech.cn.util.TableIdentifierUtil.parse(tableName));
        // 构建Flink Sink,开启Upsert模式
        FlinkSink.forRow(changeStream, cn.juwatech.cn.schema.OrderChangeSchema.getFlinkSchema())
            .table(table)
            .tableLoader(tableLoader)
            .writeParallelism(10)
            .upsert(true) // 关键:开启Upsert,自动处理Insert/Update/Delete
            .distributionMode(org.apache.iceberg.DistributionMode.HASH) // 按主键哈希分布,优化合并性能
            .sinkParallelism(10)
            .appendToTable();
        // 设置Checkpoint以保障事务一致性
        env.enableCheckpointing(60000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
    }
}

此配置使得Flink能够自动识别CDC中的更新和删除操作,并在Iceberg中生成对应的Delete Files和Data Files,无需编写复杂的SQL逻辑即可实现实时数据的精确修正。

三、历史数据回溯与增量计算统一查询

在传统架构中,全量重跑和增量计算往往需要两套代码。而在Iceberg架构下,通过Time Travel(时间旅行)功能,我们可以轻松读取任意历史版本的数据,实现精准回溯。

package cn.juwatech.spark.query;
import cn.juwatech.cn.job.BackfillJob;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public class UnifiedDataQuery {
    private final SparkSession spark;
    public UnifiedDataQuery(SparkSession session) {
        this.spark = session;
    }
    /**
     * 场景一:读取当前最新数据进行日常报表计算
     */
    public Dataset<Row> queryLatestDailyCommission(String date) {
        return spark.read()
            .format("iceberg")
            .load("shengzhuanke.ods.order_detail")
            .filter("date(create_time) = '" + date + "'");
    }
    /**
     * 场景二:历史数据回溯 - 读取特定Snapshot ID或时间戳的数据
     * 用于修复因上游系统Bug导致的历史佣金计算错误
     */
    public Dataset<Row> queryHistoricalSnapshot(String tableName, long timestampMs) {
        // 利用Iceberg的time-travel功能,读取故障发生前一秒的数据快照
        return spark.read()
            .format("iceberg")
            .option("as-of-timestamp", String.valueOf(timestampMs)) 
            .load(tableName);
    }
    /**
     * 场景三:增量计算 - 仅扫描自上次作业以来新增或变更的文件
     */
    public Dataset<Row> queryIncrementalChanges(String tableName, long fromTimestamp, long toTimestamp) {
        // 结合元数据扫描,只读取指定时间范围内发生变化的数据文件
        return spark.read()
            .format("iceberg")
            .option("start-timestamp", String.valueOf(fromTimestamp))
            .option("end-timestamp", String.valueOf(toTimestamp))
            .load(tableName);
    }
}

通过as-of-timestampstart-timestamp参数,同一套计算逻辑既能处理T+1的全量报表,也能执行分钟级的增量修正,极大地简化了ETL开发流程。

四、架构优势与未来展望

Iceberg + Flink的组合拳,让省赚客APP的数据湖具备了数据库般的ACID特性和大数据的海量存储能力。我们不再需要维护复杂的Lambda架构,流批真正走向统一。数据回溯从“天级”缩短至“秒级”,增量计算效率提升十倍不止。这套架构不仅保障了每一笔佣金的准确无误,更为未来的实时用户画像、动态风控模型提供了高质量的数据底座。我们将持续深化数据湖治理,探索更多实时应用场景,用技术驱动业务增长。

本文著作权归 省赚客app 研发团队,转载请注明出处!

© 版权声明

相关文章