淘宝客APP数据湖架构:Iceberg + Flink实现的历史数据回溯与增量计算统一存储方案
淘宝客APP数据湖架构:Iceberg + Flink实现的历史数据回溯与增量计算统一存储方案
大家好,我是高佣返利省赚客APP研发者微赚! 在淘宝客业务中,订单数据的准确性直接关系到用户的钱袋子和平台的信誉。面对每日亿级的流水记录、频繁的订单状态变更(如下单、付款、结算、失效)以及复杂的佣金追溯需求,传统的Hive数仓在ACID事务支持和实时性上显得捉襟见肘,而单纯的Kafka流处理又难以满足大规模历史数据回溯分析的要求。为此,省赚客APP研发团队构建了基于Apache Iceberg和Flink的新一代数据湖架构,实现了流批一体的统一存储,完美解决了数据回溯难、增量计算慢、数据不一致三大痛点。
一、Iceberg表结构设计与ACID事务保障
Iceberg作为核心存储格式,提供了强大的隐藏分区、模式演进和行级更新能力。我们将订单明细表设计为Iceberg格式,利用其Snapshot机制管理数据版本,确保每一次状态变更都可追溯。
package cn.juwatech.iceberg.catalog;
import cn.juwatech.cn.model.OrderRecord;
import cn.juwatech.cn.config.IcebergConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.types.Types;
import org.apache.hadoop.conf.Configuration;
import static org.apache.iceberg.types.Types.NestedField.*;
public class OrderTableInitializer {
private final Catalog catalog;
private final Configuration hadoopConf;
public OrderTableInitializer(Catalog catalog, Configuration conf) {
this.catalog = catalog;
this.hadoopConf = conf;
}
/**
* 创建支持实时更新和时序分区的订单表
*/
public void createOrderTable() {
Schema schema = new Schema(
required(1, "order_id", Types.StringType.get(), "订单ID"),
required(2, "user_id", Types.StringType.get(), "用户ID"),
required(3, "trade_status", Types.StringType.get(), "交易状态"),
required(4, "commission_amount", Types.DoubleType.get(), "佣金金额"),
required(5, "update_time", Types.TimestampType.withZone(), "更新时间"),
required(6, "create_time", Types.TimestampType.withZone(), "创建时间")
);
// 按天分区,同时利用Iceberg的隐藏分区特性优化查询
PartitionSpec spec = PartitionSpec.builderFor(schema)
.days("create_time")
.build();
Namespace namespace = Namespace.of("shengzhuanke", "ods");
if (!catalog.tableExists(cn.juwatech.cn.util.TableIdentifierUtil.of(namespace, "order_detail"))) {
Table table = catalog.createTable(
cn.juwatech.cn.util.TableIdentifierUtil.of(namespace, "order_detail"),
schema,
spec,
cn.juwatech.cn.config.IcebergConfig.getTableProperties()
);
// 启用合并小文件和删除文件自动优化
table.updateProperties()
.set("write.merge.target-file-size-bytes", "134217728")
.set("write.delete.enabled", "true")
.commit();
}
}
}
通过上述代码,我们定义了包含主键和时序字段的Schema,并开启了删除文件支持,为后续的Upsert操作奠定基础。
二、Flink CDC实时写入与Upsert逻辑
利用Flink CDC捕获MySQL Binlog,我们将实时的订单变更流直接写入Iceberg表。Flink的Checkpoint机制与Iceberg的Snapshot提交机制协同工作,保证了端到端的Exactly-Once语义。
package cn.juwatech.flink.sink.iceberg;
import cn.juwatech.cn.model.OrderChangeEvent;
import cn.juwatech.cn.serde.OrderChangeSerde;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
public class RealTimeOrderIngestion {
public static void buildUpsertPipeline(StreamExecutionEnvironment env, Catalog icebergCatalog, String tableName) {
DataStream<OrderChangeEvent> changeStream = env
.addSource(cn.juwatech.cn.source.MySqlCdcSource.create())
.name("MySQL CDC Source")
.uid("cdc-source-uid");
// 加载Iceberg表
TableLoader tableLoader = TableLoader.fromCatalog(icebergCatalog, cn.juwatech.cn.util.TableIdentifierUtil.parse(tableName));
Table table = icebergCatalog.loadTable(cn.juwatech.cn.util.TableIdentifierUtil.parse(tableName));
// 构建Flink Sink,开启Upsert模式
FlinkSink.forRow(changeStream, cn.juwatech.cn.schema.OrderChangeSchema.getFlinkSchema())
.table(table)
.tableLoader(tableLoader)
.writeParallelism(10)
.upsert(true) // 关键:开启Upsert,自动处理Insert/Update/Delete
.distributionMode(org.apache.iceberg.DistributionMode.HASH) // 按主键哈希分布,优化合并性能
.sinkParallelism(10)
.appendToTable();
// 设置Checkpoint以保障事务一致性
env.enableCheckpointing(60000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
}
}
此配置使得Flink能够自动识别CDC中的更新和删除操作,并在Iceberg中生成对应的Delete Files和Data Files,无需编写复杂的SQL逻辑即可实现实时数据的精确修正。
三、历史数据回溯与增量计算统一查询
在传统架构中,全量重跑和增量计算往往需要两套代码。而在Iceberg架构下,通过Time Travel(时间旅行)功能,我们可以轻松读取任意历史版本的数据,实现精准回溯。
package cn.juwatech.spark.query;
import cn.juwatech.cn.job.BackfillJob;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public class UnifiedDataQuery {
private final SparkSession spark;
public UnifiedDataQuery(SparkSession session) {
this.spark = session;
}
/**
* 场景一:读取当前最新数据进行日常报表计算
*/
public Dataset<Row> queryLatestDailyCommission(String date) {
return spark.read()
.format("iceberg")
.load("shengzhuanke.ods.order_detail")
.filter("date(create_time) = '" + date + "'");
}
/**
* 场景二:历史数据回溯 - 读取特定Snapshot ID或时间戳的数据
* 用于修复因上游系统Bug导致的历史佣金计算错误
*/
public Dataset<Row> queryHistoricalSnapshot(String tableName, long timestampMs) {
// 利用Iceberg的time-travel功能,读取故障发生前一秒的数据快照
return spark.read()
.format("iceberg")
.option("as-of-timestamp", String.valueOf(timestampMs))
.load(tableName);
}
/**
* 场景三:增量计算 - 仅扫描自上次作业以来新增或变更的文件
*/
public Dataset<Row> queryIncrementalChanges(String tableName, long fromTimestamp, long toTimestamp) {
// 结合元数据扫描,只读取指定时间范围内发生变化的数据文件
return spark.read()
.format("iceberg")
.option("start-timestamp", String.valueOf(fromTimestamp))
.option("end-timestamp", String.valueOf(toTimestamp))
.load(tableName);
}
}
通过as-of-timestamp和start-timestamp参数,同一套计算逻辑既能处理T+1的全量报表,也能执行分钟级的增量修正,极大地简化了ETL开发流程。
四、架构优势与未来展望
Iceberg + Flink的组合拳,让省赚客APP的数据湖具备了数据库般的ACID特性和大数据的海量存储能力。我们不再需要维护复杂的Lambda架构,流批真正走向统一。数据回溯从“天级”缩短至“秒级”,增量计算效率提升十倍不止。这套架构不仅保障了每一笔佣金的准确无误,更为未来的实时用户画像、动态风控模型提供了高质量的数据底座。我们将持续深化数据湖治理,探索更多实时应用场景,用技术驱动业务增长。
本文著作权归 省赚客app 研发团队,转载请注明出处!