ClickHouse + Flink + DolphinScheduler:中小厂三件套搞定离线+实时数仓,告别 Hadoop 全家桶

前方高能:本文不写 100 个 Flink 类,不堆 Hadoop 全家桶,只用三个组件,搞定离线+实时数仓。

适合人群:想搭数仓但不想养一套 Hadoop 的运维团队。


一、背景:搞数仓一定要上"大厂全家桶"吗?

1.1 灵魂拷问

你是否经历过这样的场景:

  • 老板说"搞个数仓",你脑海中浮现出 Hadoop + Hive + Spark + Kafka + Airflow + …
  • 立项预算只够买 3 台服务器,但技术架构画了 15 个组件
  • 离线 T+1 已经跑起来了,老板又说"能不能实时看数据?"

核心矛盾:中小团队要的是"能用、好用、省钱",而不是"大而全、贵且复杂"。

1.2 我们的选择

组件 角色 一句话理由
ClickHouse 存储+分析引擎 列式存储、亚秒级查询、自带 MySQL/MongoDB 表引擎
DolphinScheduler 离线调度 可视化 DAG、比 Airflow 更适合国内团队
Flink CDC 实时同步 配置驱动、零代码、百表同步不写一行 SQL

三个组件,各司其职,互不内卷。

1.3 前置阅读

本文是"数仓三部曲"的终章,建议先看前面几篇:

  1. ClickHouse + DolphinScheduler:两个组件搞定轻量离线数仓,谁还堆 Hadoop 全家桶? —— 离线数仓搭建
  2. Flink 1.20 实战:零代码配置实现 MySQL 百表到 ClickHouse 实时同步 —— MySQL 实时同步
  3. Flink 1.20 实战:一套配置搞定 MongoDB 多集合实时同步到 ClickHouse —— MongoDB 实时同步

上面 3 篇都有完整的操作步骤,本文侧重架构整合和代码优化,细节可回看前文。


二、架构设计:离线+实时,各取所长

2.1 为什么不用 Flink 做全量同步?

先看一组性能对比:

ClickHouse 表引擎直连同步(离线/全量)

数据源 同步速率 每分钟吞吐量
MySQL → ClickHouse ~35w 条/s 2000w+ 条/min
MongoDB → ClickHouse ~12w 条/s 720w+ 条/min

Flink CDC 同步(实时增量)

20 分钟同步了 932w 条数据

结论很明显:全量同步让 CK 表引擎来,实时增量让 Flink 来,各干各的擅长活。

2.2 整体数据流转

ClickHouse + Flink + DolphinScheduler:中小厂三件套搞定离线+实时数仓,告别 Hadoop 全家桶

2.3 数据去重:ReplacingMergeTree 兜底

两路数据(离线+实时)写到同一张表,必然存在重复。怎么办?

答案是 ClickHouse 的 ReplacingMergeTree 引擎:

CREATE TABLE dw.ods_user_profile (
    id UInt64 COMMENT '主键ID',
    user_code String COMMENT '用户编号',
    avatar_url Nullable(String) COMMENT '头像地址',
    login_key String COMMENT '登录标识',
    phone Nullable(String) COMMENT '手机号码',
    user_type Int8 DEFAULT 1 COMMENT '类型: 1-普通, 2-内部',
    status Nullable(Int8) COMMENT '状态: 0-禁用, 1-启用',
    nickname Nullable(String) COMMENT '昵称',
    create_time DateTime DEFAULT now() COMMENT '创建时间',
    _version DateTime DEFAULT now() COMMENT '版本字段,用于去重'
)
ENGINE = ReplacingMergeTree(_version)
PARTITION BY toYYYYMM(create_time)
ORDER BY (user_code)
SETTINGS index_granularity = 256
COMMENT 'ODS层 - 用户信息表';

去重原理ReplacingMergeTree 在后台合并(Merge)时,会按 ORDER BY 键去重,保留 _version 最大的那条。离线写一遍、实时写一遍,最终只留最新版本。

注意:去重发生在异步 Merge 阶段,查询时如需强一致可加 FINAL 关键字(有性能开销)。

2.4 分工总结

维度 离线(CK 表引擎 + Dolphin) 实时(Flink CDC)
数据范围 历史全量 + T+1/H+1 增量 实时增量(binlog/Change Stream)
同步速度 极快(直连,35w/s) 秒级延迟
适用场景 历史数据初始化、定时报表 实时看板、即时分析
对源库压力 中(全量查询) 低(只读增量日志)

Flink 只负责实时增量,不处理历史数据。历史数据的活交给 CK 表引擎——“术业有专攻”。


三、代码演进:从"两份冗余"到"一套架构"

3.1 原来的问题

前面两篇文章分别实现了 MySQL → CK 和 MongoDB → CK 的实时同步,各自一个类,各自一套配置。用是能用,但:

  • 代码冗余:两个类都继承 RichSinkFunction,都实现 CheckpointedFunction,写 CK 的逻辑几乎一样
  • 配置分散:两份 properties 文件,改 CK 地址要改两遍
  • 表结构变更要重启:加字段后不重启 Flink 任务?对不起,不认识
  • 一些隐藏 Bug:物化列写入报错、DECIMAL 精度丢失……

用程序员的话说:这代码能跑,但"技术债"在悄悄积累。

3.2 重构思路

ClickHouse + Flink + DolphinScheduler:中小厂三件套搞定离线+实时数仓,告别 Hadoop 全家桶

5 个核心改进

  1. 抽取公共基类:所有写 CK 的逻辑(批量攒批、HTTP 流式写入、Checkpoint flush、表结构管理)统一到 AbstractClickHouseSink
  2. 表结构自动刷新:每 10 分钟自动刷新 MySQL + CK 表结构,加字段不用重启
  3. Bug 修复:物化列/Alias 列自动排除(不再写入报错)、DECIMAL 精度从 information_schema 动态获取
  4. 日志精简:正常写入只打 DEBUG,异常时才打完整数据——磁盘不再被无效日志撑爆
  5. 配置合并:MySQL 和 MongoDB 的配置合到一个文件,统一管理

四、MySQL 同步:7 大场景全解析

MySQLToClickHouseRealtimeSync 基于 Flink CDC 3.5 + Debezium,通过配置驱动实现以下场景:

# 场景 实现方式 说明
1 多表同步 正则匹配 tb_.* 或精确映射 a:ods_a,b:ods_b 一个 Flink 任务同步 N 张表
2 动态发现新表 scanNewlyAddedTableEnabled=true MySQL 新建表,Flink 自动感知并全量同步
3 表结构自动加载 查询 information_schema.COLUMNS 启动时自动识别所有字段类型,无需手动配置
4 DECIMAL Base64 解码 new BigDecimal(new BigInteger(bytes), scale) Debezium 将 DECIMAL 编码为 Base64,按实际 scale 还原
5 BIGINT UNSIGNED 解码 new BigInteger(Base64.decode(...)) Debezium 将 BIGINT UNSIGNED 也编码为 Base64
6 DATETIME/TIMESTAMP 转换 毫秒时间戳 → yyyy-MM-dd HH:mm:ss;ISO 8601 → 时区转换 区分 DATETIME(毫秒数)和 TIMESTAMP(ISO 字符串)两种编码
7 DATE 天数转换 LocalDate.ofEpochDay(days) Debezium 将 DATE 编码为从 1970-01-01 起的天数

上面这些场景,都是"不处理就踩坑"的类型。比如 DECIMAL 不解码,你会发现金额从 99.99 变成了一串乱码;DATETIME 不转换,日期会变成一个天文数字。

4.1 配置示例(MySQL 部分)

# MySQL CDC 配置
mysql.hostname=10.0.1.100
mysql.port=3306
mysql.database=biz_db
mysql.username=cdc_reader
mysql.password=***
# 方式1:正则匹配(推荐,自动发现新表)
# sync.table.pattern=tb_.*
# sync.table.prefix=ods_
# 方式2:精确指定
sync.tables=tb_user:ods_tb_user,\
  tb_account:ods_tb_account,\
  tb_product:ods_tb_product,\
  tb_order:ods_tb_order,\
  tb_payment:ods_tb_payment

五、MongoDB 同步:15 大场景全解析

MongoToClickHouseRealtimeSync 针对 MongoDB 的 “灵活 Schema” 做了大量适配:

# 场景 配置键 说明
1 多集合同步 sync.collections 一个任务同步 N 个集合
2 集合→表名映射 trace_logs:ods_mongo_trace_logs Mongo 集合名映射为 CK 表名
3 字段重命名 sync.field.rename.{集合} c_d:create_date 把缩写字段改成人话
4 字段排除 sync.field.exclude.{集合} 排除 _class 等无用字段
5 驼峰→下划线 sync.field.camel_to_snake.{集合}=true agentIdagent_id 全自动转换
6 嵌套对象展平 sync.field.flatten.{集合} {info: {name: "a"}} 展平为 info.name
7 展平子字段重命名 sync.field.flatten.rename.{集合}.{字段} 展平后还能单独改子字段名
8 毫秒时间戳转换 sync.field.timestamp_ms.{集合} 毫秒数 → yyyy-MM-dd HH:mm:ss.SSS
9 秒时间戳转换 sync.field.timestamp_sec.{集合} 秒数 → yyyy-MM-dd HH:mm:ss
10 多字段合并(coalesce) sync.field.coalesce.{集合} create_time:c_t|ct 取第一个非空值
11 $oid 处理 自动 {"$oid": "65a..."}"65a..."
12 $date 处理 自动 ISO 字符串/毫秒/$numberLong 三种格式全兼容
13
n
u
m
b
e
r
L
o
n
g
/
numberLong/
numberLong/
numberInt
自动 Extended JSON 数值类型自动还原
14 $numberDecimal 自动 精确还原为 BigDecimal
15 数组智能处理 自动 对象数组 → JSON 字符串,基本类型数组 → CK Array

MongoDB 的 Extended JSON 是个"大坑":一个 _id 字段不是字符串而是 {"$oid": "65a..."},一个时间不是时间而是 {"$date": {"$numberLong": "1705..."}}。不处理这些,写入 CK 分分钟报错。

5.1 配置示例(MongoDB 部分)

# MongoDB CDC 配置
mongo.hosts=10.0.2.200:27017
mongo.database=app_data
mongo.username=cdc_reader
mongo.password=***
mongo.connection.options=authSource=admin&replicaSet=rs0
# 集合映射
sync.collections=event_logs:ods_mongo_event_logs,\
  session_record:ods_mongo_session_record,\
  dialog:ods_mongo_dialog,\
  ai_call_record:ods_mongo_ai_call_record
# 字段重命名
sync.field.rename.event_logs=c_d:create_date
sync.field.rename.session_record=c_t:create_time,u_t:update_time
# 驼峰转下划线
sync.field.camel_to_snake.ai_model_record=true
# 嵌套展平
sync.field.flatten.ai_model_record=callInfo>call_info,reasonParams>reason_params
# 展平子字段重命名
sync.field.flatten.rename.ai_model_record.callInfo=apikey:api_key
# 时间戳转换
sync.field.timestamp_ms.ai_call_record=createTime,inputTime
sync.field.timestamp_sec.dialog=ct,ut
# 字段合并
sync.field.coalesce.chat_record=create_time:c_t|ct,update_time:ut|u_t
# 字段排除
sync.field.exclude.event_logs=_class,c_t

六、公共能力:AbstractClickHouseSink

两个 Sink 共享的基类,提供以下公共能力:

6.1 核心特性

能力 说明
批量攒批写入 按条数(默认 2000)或时间间隔(默认 5s)触发 flush
HTTP 流式写入 使用 Chunked Transfer + BufferedOutputStream,避免大 batch 拼字符串 OOM
CK 表结构定时刷新 每 10 分钟从 system.columns 重新加载,自动排除 MATERIALIZED/ALIAS 列
表结构变更检测 刷新时对比新旧 schema,日志输出新增/删除的列
Checkpoint 支持 实现 CheckpointedFunction,Checkpoint 触发时自动 flush 缓冲区
写入失败回调 onFlushTableError() 可被子类 override(Mongo Sink 可提取出错行号)

6.2 代码结构

public abstract class AbstractClickHouseSink<T> extends RichSinkFunction<T>
        implements CheckpointedFunction {
    // ===== 公共字段 =====
    protected transient Map<String, List<String>> buffer;         // table -> rows
    protected transient volatile Map<String, List<String>> ckTableColumns;  // CK 表结构缓存
    protected transient ScheduledExecutorService schemaRefreshExecutor;
    // ===== 生命周期 =====
    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化 buffer、认证信息、加载 CK 表结构、启动定时刷新
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        flush();  // Checkpoint 时强制刷盘
    }
    // ===== 扩展点(子类 override)=====
    protected void refreshSchema() { /* 默认只刷新 CK schema */ }
    protected void onFlushTableError(String table, List<String> rows, Exception e) throws Exception { /* 默认打日志+重抛 */ }
    // ===== 核心写入 =====
    protected void flush() throws Exception {
        for (Map.Entry<String, List<String>> entry : buffer.entrySet()) {
            String insertSql = buildInsertSql(table, ckCols);
            executeHttpPostStreaming(buildInsertUrl(insertSql), rows);
        }
        buffer.clear();
    }
    // CK 表结构加载:排除 MATERIALIZED 和 ALIAS 列
    protected Map<String, List<String>> loadCKTableColumnsInternal() throws Exception {
        String sql = "SELECT table, name FROM system.columns "
                   + "WHERE database = '...' "
                   + "AND default_kind NOT IN ('MATERIALIZED', 'ALIAS') "
                   + "ORDER BY table, position";
        // ...
    }
}

为什么排除 MATERIALIZED/ALIAS 列?因为这两类列由 CK 自动计算,手动 INSERT 会报错。这是前一版代码的一个隐藏 Bug——直到线上加了物化列才暴露。


七、部署与运行

7.1 pom.xml 关键依赖

<properties>
    <flink.version>1.20.1</flink.version>
</properties>
<dependencies>
    <!-- Flink 核心 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Flink CDC -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>3.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mongodb-cdc</artifactId>
        <version>3.5.0</version>
    </dependency>
    <!-- JDBC 驱动 -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <version>8.4.0</version>
    </dependency>
    <dependency>
        <groupId>com.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.8.5</version>
        <classifier>all</classifier>
    </dependency>
</dependencies>

使用 maven-shade-plugin 打 fat jar,详细的 build 配置见前文。

7.2 统一配置文件

MySQL 和 MongoDB 的配置合并到一个 application-{env}.properties

# =================== ClickHouse ===================
clickhouse.url=http://10.0.3.100:8123
clickhouse.database=dw
clickhouse.username=default
clickhouse.password=***
# =================== Flink ===================
flink.parallelism=1
flink.batch.size=2000
flink.batch.interval=5000
# Checkpoint(可选,集群已配置则无需重复)
# flink.checkpoint.interval=60000
# flink.checkpoint.dir=s3://state/checkpoint
# =================== MySQL CDC ===================
mysql.hostname=10.0.1.100
mysql.port=3306
mysql.database=biz_db
mysql.username=cdc_reader
mysql.password=***
mysql.timezone=Asia/Shanghai
sync.scan.newly.added.table=true
sync.tables=tb_user:ods_tb_user,tb_order:ods_tb_order,...
# =================== MongoDB CDC ===================
mongo.hosts=10.0.2.200:27017
mongo.database=app_data
mongo.username=cdc_reader
mongo.password=***
mongo.connection.options=authSource=admin&replicaSet=rs0
sync.collections=event_logs:ods_mongo_event_logs,...
# (字段转换配置同第五节)

7.3 启动命令

# MySQL → ClickHouse(测试环境)
flink run -c com.cloud.flink.MySQLToClickHouseRealtimeSyncV2 \
  bi-flink.jar --env test
# MongoDB → ClickHouse(生产环境)
flink run -c com.cloud.flink.MongoToClickHouseRealtimeSync \
  bi-flink.jar --env prod
# 指定外部配置文件
flink run -c com.cloud.flink.MySQLToClickHouseRealtimeSyncV2 \
  bi-flink.jar --config /opt/flink/conf/sync.properties

7.4 最终效果

两个 Flink 任务并行运行,分别负责 MySQL 和 MongoDB 的实时同步:

在这里插入图片描述


八、注意事项与踩坑记录

8.1 ReplacingMergeTree 不是实时去重

ReplacingMergeTree 的去重发生在后台 Merge 阶段,不是写入时。这意味着:

  • 刚写入的重复数据,查询时可能还能看到
  • 需要精确去重的查询,请加 FINALSELECT * FROM ods_tb_user FINAL WHERE ...
  • FINAL 有性能开销,报表查询可以接受短暂的重复

8.2 Flink 只同步增量

Flink CDC 启动模式为 StartupOptions.latest(),即只同步启动后的增量数据。历史数据由 CK 表引擎 + DolphinScheduler 负责。

8.3 DECIMAL 精度问题

Debezium 将 MySQL 的 DECIMAL(10,4) 编码为 Base64 字符串,解码时需要知道 scale(小数位数)。

  • 旧版代码:scale 硬编码为 2,DECIMAL(10,4) 的值会被错误解析
  • 新版代码:从 information_schema.COLUMNS.NUMERIC_SCALE 动态获取实际精度

8.4 CK 物化列写入报错

ClickHouse 的 MATERIALIZEDALIAS 列不能手动插入,但 Flink 不知道哪些列是物化列。

解决方案:从 system.columns 加载表结构时,过滤 default_kind NOT IN ('MATERIALIZED', 'ALIAS'),构建显式列名的 INSERT 语句。

8.5 表结构变更不重启

每 10 分钟自动刷新 MySQL 和 CK 的表结构缓存。加字段后:

  1. 先在 CK 加列
  2. 再在 MySQL 加列(触发 binlog)
  3. 等待最多 10 分钟,Flink 自动识别新列并写入

不需要重启 Flink 任务——懒人的终极胜利。


九、完整代码

9.1 AbstractClickHouseSink(公共基类)

package com.cloud.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
 * ClickHouse HTTP Sink 抽象基类
 * 提供批量写入、CK 表结构定时刷新、Checkpoint 支持等公共能力。
 *
 * 子类需实现:invoke()
 * 子类可 override:refreshSchema()、onFlushTableError()
 */
public abstract class AbstractClickHouseSink<T> extends RichSinkFunction<T>
        implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractClickHouseSink.class);
    protected final String clickhouseUrl;
    protected final String ckUsername;
    protected final String ckPassword;
    protected final String ckDatabase;
    protected final int batchSize;
    protected final long batchIntervalMs;
    protected transient String authHeader;
    protected transient Map<String, List<String>> buffer;
    protected transient long lastFlushTime;
    protected transient volatile Map<String, List<String>> ckTableColumns;
    protected transient ScheduledExecutorService schemaRefreshExecutor;
    protected AbstractClickHouseSink(String clickhouseUrl, String ckUsername, String ckPassword,
                                     String ckDatabase, int batchSize, long batchIntervalMs) {
        this.clickhouseUrl = clickhouseUrl;
        this.ckUsername    = ckUsername;
        this.ckPassword    = ckPassword;
        this.ckDatabase    = ckDatabase;
        this.batchSize     = batchSize;
        this.batchIntervalMs = batchIntervalMs;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        authHeader = "Basic " + Base64.getEncoder().encodeToString(
                (ckUsername + ":" + ckPassword).getBytes(StandardCharsets.UTF_8));
        buffer        = new HashMap<>();
        lastFlushTime = System.currentTimeMillis();
        ckTableColumns = loadCKTableColumnsInternal();
        schemaRefreshExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "schema-refresh");
            t.setDaemon(true);
            return t;
        });
        schemaRefreshExecutor.scheduleAtFixedRate(this::refreshSchema, 10, 10, TimeUnit.MINUTES);
    }
    @Override
    public void close() throws Exception {
        if (schemaRefreshExecutor != null) {
            schemaRefreshExecutor.shutdown();
            try { schemaRefreshExecutor.awaitTermination(5, TimeUnit.SECONDS); }
            catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        }
        if (buffer != null && !buffer.isEmpty()) { flush(); }
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        LOG.debug("Checkpoint #{} 触发,flush 缓存数据", context.getCheckpointId());
        flush();
    }
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // CDC source 自带 offset 恢复,无需在 sink 侧保存 buffer 状态
    }
    /** 定时刷新 CK schema,子类可 override 以同步刷新源端 schema */
    protected void refreshSchema() {
        try {
            Map<String, List<String>> newCKSchema = loadCKTableColumnsInternal();
            detectAndLogCKChanges(ckTableColumns, newCKSchema);
            ckTableColumns = newCKSchema;
            LOG.info("CK Schema 刷新完成: {} 个表", newCKSchema.size());
        } catch (Exception e) {
            LOG.warn("CK Schema 刷新失败,继续使用旧 schema: {}", e.getMessage());
        }
    }
    /** 单表写入失败回调,子类可 override */
    protected void onFlushTableError(String table, List<String> rows, Exception e) throws Exception {
        LOG.error("写入 {}.{} 表失败, 共 {} 条, 首行: {}",
                ckDatabase, table, rows.size(),
                rows.isEmpty() ? "" : (rows.get(0).length() > 500
                        ? rows.get(0).substring(0, 500) + "..." : rows.get(0)));
        throw e;
    }
    protected boolean shouldFlush() {
        int totalSize = buffer.values().stream().mapToInt(List::size).sum();
        return totalSize >= batchSize || System.currentTimeMillis() - lastFlushTime >= batchIntervalMs;
    }
    protected void flush() throws Exception {
        if (buffer.isEmpty()) { lastFlushTime = System.currentTimeMillis(); return; }
        for (Map.Entry<String, List<String>> entry : buffer.entrySet()) {
            String table = entry.getKey();
            List<String> rows = entry.getValue();
            if (rows.isEmpty()) continue;
            List<String> ckCols = ckTableColumns != null ? ckTableColumns.get(table) : null;
            String insertSql = buildInsertSql(table, ckCols);
            String insertUrl = buildInsertUrl(insertSql);
            LOG.debug("准备写入 {}.{} 表 {} 条数据", ckDatabase, table, rows.size());
            try {
                executeHttpPostStreaming(insertUrl, rows);
                LOG.debug("写入 {}.{} 表 {} 条数据完成", ckDatabase, table, rows.size());
            } catch (Exception e) {
                onFlushTableError(table, rows, e);
            }
        }
        buffer.clear();
        lastFlushTime = System.currentTimeMillis();
    }
    protected String buildInsertSql(String table, List<String> ckCols) {
        return (ckCols != null && !ckCols.isEmpty())
                ? "INSERT INTO " + table + " (" + String.join(", ", ckCols) + ") FORMAT JSONEachRow"
                : "INSERT INTO " + table + " FORMAT JSONEachRow";
    }
    protected String buildInsertUrl(String insertSql) {
        return String.format("%s/?database=%s&max_partitions_per_insert_block=0"
                        + "&input_format_skip_unknown_fields=1&query=%s",
                clickhouseUrl, ckDatabase,
                java.net.URLEncoder.encode(insertSql, StandardCharsets.UTF_8));
    }
    /** 流式写入:chunked transfer,避免大 batch OOM */
    protected void executeHttpPostStreaming(String urlStr, List<String> rows) throws Exception {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        try {
            conn.setRequestMethod("POST");
            conn.setRequestProperty("Authorization", authHeader);
            conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
            conn.setDoOutput(true);
            conn.setChunkedStreamingMode(8192);
            conn.setConnectTimeout(30000);
            conn.setReadTimeout(120000);
            try (BufferedOutputStream bos = new BufferedOutputStream(conn.getOutputStream(), 65536)) {
                byte[] newline = "\n".getBytes(StandardCharsets.UTF_8);
                for (int i = 0; i < rows.size(); i++) {
                    if (i > 0) bos.write(newline);
                    bos.write(rows.get(i).getBytes(StandardCharsets.UTF_8));
                }
            }
            int code = conn.getResponseCode();
            String responseBody = readResponse(conn, code != 200);
            if (code != 200) {
                LOG.error("ClickHouse 写入失败: code={}, response={}", code, responseBody);
                throw new RuntimeException("Insert failed: " + responseBody);
            }
        } finally {
            conn.disconnect();
        }
    }
    protected String readResponse(HttpURLConnection conn, boolean isError) {
        try {
            InputStream is = isError ? conn.getErrorStream() : conn.getInputStream();
            if (is == null) return "";
            try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
                return br.lines().collect(Collectors.joining("\n"));
            }
        } catch (Exception e) {
            LOG.warn("读取响应失败: {}", e.getMessage());
            return "";
        }
    }
    /** 从 system.columns 加载 CK 表结构(排除 MATERIALIZED/ALIAS 列) */
    protected Map<String, List<String>> loadCKTableColumnsInternal() throws Exception {
        String sql = "SELECT table, name FROM system.columns WHERE database = '" + ckDatabase
                + "' AND default_kind NOT IN ('MATERIALIZED', 'ALIAS') ORDER BY table, position";
        String urlStr = clickhouseUrl + "/?query=" + java.net.URLEncoder.encode(sql, StandardCharsets.UTF_8);
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        try {
            conn.setRequestMethod("GET");
            conn.setRequestProperty("Authorization", authHeader);
            conn.setConnectTimeout(10000);
            conn.setReadTimeout(10000);
            int code = conn.getResponseCode();
            String response = readResponse(conn, code != 200);
            if (code != 200) throw new RuntimeException("查询 CK 表结构失败: " + response);
            Map<String, List<String>> result = new LinkedHashMap<>();
            for (String line : response.split("\n")) {
                line = line.trim();
                if (line.isEmpty()) continue;
                String[] parts = line.split("\t");
                if (parts.length == 2) {
                    result.computeIfAbsent(parts[0].trim(), k -> new ArrayList<>())
                          .add(parts[1].trim());
                }
            }
            LOG.info("已加载 CK 表结构: {} 个表", result.size());
            return result;
        } finally {
            conn.disconnect();
        }
    }
    /** 对比新旧 CK schema,日志输出变更 */
    protected void detectAndLogCKChanges(Map<String, List<String>> oldSchema,
                                         Map<String, List<String>> newSchema) {
        if (oldSchema == null || newSchema == null) return;
        for (Map.Entry<String, List<String>> entry : newSchema.entrySet()) {
            String table = entry.getKey();
            List<String> newCols = entry.getValue();
            List<String> oldCols = oldSchema.get(table);
            if (oldCols == null) {
                LOG.info("CK 检测到新表: {}", table);
            } else if (!new HashSet<>(oldCols).equals(new HashSet<>(newCols))) {
                Set<String> added = new HashSet<>(newCols); added.removeAll(new HashSet<>(oldCols));
                Set<String> removed = new HashSet<>(oldCols); removed.removeAll(new HashSet<>(newCols));
                LOG.info("CK 表 {} 结构变更: 新增列={}, 删除列={}", table, added, removed);
            }
        }
        for (String table : oldSchema.keySet()) {
            if (!newSchema.containsKey(table)) LOG.warn("CK 表已删除: {}", table);
        }
    }
}

9.2 MySQLToClickHouseRealtimeSync(MySQL 同步)

package com.cloud.flink;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.regex.Pattern;
public class MySQLToClickHouseRealtimeSyncV2 {
    private static final Logger LOG = LoggerFactory.getLogger(MySQLToClickHouseRealtimeSyncV2.class);
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ParameterTool config = FlinkJobUtils.loadConfig(params);
        TableMappingConfig tableMappingConfig = parseTableMapping(config);
        LOG.info("========== 配置信息 ==========");
        LOG.info("MySQL: {}:{}/{}", config.get("mysql.hostname"), config.get("mysql.port"), config.get("mysql.database"));
        LOG.info("ClickHouse: {}/{}", config.get("clickhouse.url"), config.get("clickhouse.database"));
        LOG.info("表映射模式: {}", tableMappingConfig.isPatternMode() ? "正则匹配" : "精确指定");
        LOG.info("同步表数量: {}", tableMappingConfig.getTableList().length);
        LOG.info("==============================");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(config.getInt("flink.parallelism", 1));
        env.getConfig().setGlobalJobParameters(config);
        FlinkJobUtils.configureCheckpoint(env, config);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(config.get("mysql.hostname"))
                .port(config.getInt("mysql.port"))
                .databaseList(config.get("mysql.database"))
                .tableList(tableMappingConfig.getTableList())
                .username(config.get("mysql.username"))
                .password(config.get("mysql.password"))
                .serverTimeZone(config.get("mysql.timezone", "Asia/Shanghai"))
                .serverId(config.get("mysql.server.id", "5401-5404"))
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .includeSchemaChanges(false)
                .scanNewlyAddedTableEnabled(config.getBoolean("sync.scan.newly.added.table", true))
                .build();
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
           .addSink(new ClickHouseCheckpointedSink(
                   config.get("mysql.hostname"), config.getInt("mysql.port"),
                   config.get("mysql.database"), config.get("mysql.username"), config.get("mysql.password"),
                   config.get("clickhouse.url"), config.get("clickhouse.username"),
                   config.get("clickhouse.password"), config.get("clickhouse.database"),
                   tableMappingConfig,
                   config.getInt("flink.batch.size", 2000), config.getLong("flink.batch.interval", 5000L)
           )).name("ClickHouse Checkpointed Sink");
        env.execute("MySQL to ClickHouse Realtime Sync V2");
    }
    private static TableMappingConfig parseTableMapping(ParameterTool config) {
        String database = config.get("mysql.database");
        if (config.has("sync.table.pattern")) {
            return new TableMappingConfig(database, config.get("sync.table.pattern"),
                    config.get("sync.table.prefix", "ods_"));
        }
        if (config.has("sync.tables")) {
            Map<String, String> tableMapping = new LinkedHashMap<>();
            for (String pair : config.get("sync.tables").split(",")) {
                pair = pair.trim();
                if (pair.isEmpty()) continue;
                String[] parts = pair.split(":");
                if (parts.length == 2) tableMapping.put(parts[0].trim(), parts[1].trim());
                else if (parts.length == 1) tableMapping.put(parts[0].trim(), "ods_" + parts[0].trim());
            }
            return new TableMappingConfig(database, tableMapping);
        }
        throw new RuntimeException("未配置表映射,请设置 sync.table.pattern 或 sync.tables");
    }
    /** 表映射配置(支持正则模式和精确指定模式) */
    public static class TableMappingConfig implements java.io.Serializable {
        private static final long serialVersionUID = 1L;
        private final String database;
        private final boolean patternMode;
        private final String pattern;
        private final String prefix;
        private final Map<String, String> tableMapping;
        public TableMappingConfig(String database, String pattern, String prefix) {
            this.database = database; this.patternMode = true;
            this.pattern = pattern; this.prefix = prefix; this.tableMapping = null;
        }
        public TableMappingConfig(String database, Map<String, String> tableMapping) {
            this.database = database; this.patternMode = false;
            this.pattern = null; this.prefix = null; this.tableMapping = tableMapping;
        }
        public boolean isPatternMode() { return patternMode; }
        public String[] getTableList() {
            return patternMode ? new String[]{database + "." + pattern}
                    : tableMapping.keySet().stream().map(t -> database + "." + t).toArray(String[]::new);
        }
        public String getClickHouseTable(String mysqlTable) {
            return patternMode ? prefix + mysqlTable : tableMapping.get(mysqlTable);
        }
        public boolean shouldSync(String mysqlTable) {
            return patternMode ? Pattern.matches(pattern, mysqlTable) : tableMapping.containsKey(mysqlTable);
        }
    }
    /** MySQL Sink:继承公共基类,增加 MySQL schema 加载和 Debezium 类型解码 */
    public static class ClickHouseCheckpointedSink extends AbstractClickHouseSink<String> {
        private static final Logger LOG = LoggerFactory.getLogger(ClickHouseCheckpointedSink.class);
        private final String mysqlHost, mysqlDatabase, mysqlUsername, mysqlPassword;
        private final int mysqlPort;
        private final TableMappingConfig tableMappingConfig;
        private transient ObjectMapper objectMapper;
        private transient volatile Map<String, Map<String, ColumnInfo>> tableColumnTypes;
        public enum ColumnType { DECIMAL, BIGINT_UNSIGNED, DATETIME, DATE, OTHER }
        public static class ColumnInfo implements java.io.Serializable {
            private static final long serialVersionUID = 1L;
            final ColumnType type;
            final int scale;
            ColumnInfo(ColumnType type, int scale) { this.type = type; this.scale = scale; }
        }
        public ClickHouseCheckpointedSink(String mysqlHost, int mysqlPort, String mysqlDatabase,
                String mysqlUsername, String mysqlPassword, String clickhouseUrl,
                String ckUsername, String ckPassword, String ckDatabase,
                TableMappingConfig tableMappingConfig, int batchSize, long batchIntervalMs) {
            super(clickhouseUrl, ckUsername, ckPassword, ckDatabase, batchSize, batchIntervalMs);
            this.mysqlHost = mysqlHost; this.mysqlPort = mysqlPort;
            this.mysqlDatabase = mysqlDatabase; this.mysqlUsername = mysqlUsername;
            this.mysqlPassword = mysqlPassword; this.tableMappingConfig = tableMappingConfig;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            objectMapper = new ObjectMapper();
            tableColumnTypes = loadMySQLTableSchemaInternal();
            LOG.info("MySQL Sink 初始化完成: MySQL {} 个表, CK {} 个表",
                    tableColumnTypes.size(), ckTableColumns.size());
        }
        @Override
        protected void refreshSchema() {
            try {
                Map<String, Map<String, ColumnInfo>> newMySQL = loadMySQLTableSchemaInternal();
                Map<String, List<String>> newCK = loadCKTableColumnsInternal();
                detectAndLogCKChanges(ckTableColumns, newCK);
                tableColumnTypes = newMySQL;
                ckTableColumns = newCK;
                LOG.info("Schema 刷新完成: MySQL {} 个表, CK {} 个表", newMySQL.size(), newCK.size());
            } catch (Exception e) {
                LOG.warn("Schema 刷新失败: {}", e.getMessage());
            }
        }
        @Override
        public void invoke(String value, Context context) throws Exception {
            String op = "", mysqlTable = "";
            try {
                JsonNode root = objectMapper.readTree(value);
                op = root.path("op").asText();
                mysqlTable = root.path("source").path("table").asText();
                if (!tableMappingConfig.shouldSync(mysqlTable)) return;
                String ckTable = tableMappingConfig.getClickHouseTable(mysqlTable);
                if (ckTable == null) return;
                if ("c".equals(op) || "r".equals(op) || "u".equals(op)) {
                    JsonNode data = root.path("after");
                    if (data != null && !data.isMissingNode()) {
                        Map<String, ColumnInfo> colTypes = tableColumnTypes.get(mysqlTable);
                        String json = convertFields(data, colTypes);
                        buffer.computeIfAbsent(ckTable, k -> new ArrayList<>()).add(json);
                    }
                }
                if (shouldFlush()) flush();
            } catch (Exception e) {
                LOG.error("处理数据失败: table={}, op={}, value={}", mysqlTable, op, value, e);
                throw e;
            }
        }
        // ===== 类型转换核心逻辑 =====
        private static final DateTimeFormatter DT_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        private static final DateTimeFormatter DATE_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        private static final ZoneId ZONE_ID = ZoneId.of("Asia/Shanghai");
        private String convertFields(JsonNode data, Map<String, ColumnInfo> columnTypes) {
            try {
                Map<String, Object> map = objectMapper.convertValue(data, Map.class);
                Map<String, Object> out = new LinkedHashMap<>();
                for (Map.Entry<String, Object> e : map.entrySet()) {
                    String key = e.getKey(); Object val = e.getValue();
                    if (val == null) { out.put(key, null); continue; }
                    ColumnInfo ci = columnTypes != null ? columnTypes.get(key) : null;
                    if (ci == null) { out.put(key, val); continue; }
                    switch (ci.type) {
                        case BIGINT_UNSIGNED:
                            if (val instanceof String) {
                                try { out.put(key, new BigInteger(Base64.getDecoder().decode((String) val))); }
                                catch (Exception ex) { out.put(key, val); }
                            } else out.put(key, val);
                            break;
                        case DECIMAL:
                            if (val instanceof String) {
                                BigDecimal d = decodeDecimal((String) val, ci.scale);
                                out.put(key, d != null ? d : val);
                            } else out.put(key, val);
                            break;
                        case DATETIME:
                            if (val instanceof Number) {
                                out.put(key, LocalDateTime.ofInstant(Instant.ofEpochMilli(
                                        ((Number) val).longValue()), java.time.ZoneOffset.UTC).format(DT_FMT));
                            } else if (val instanceof String && ((String) val).contains("T")) {
                                try {
                                    out.put(key, java.time.ZonedDateTime.parse((String) val)
                                            .withZoneSameInstant(ZONE_ID).format(DT_FMT));
                                } catch (Exception ex) { out.put(key, val); }
                            } else out.put(key, val);
                            break;
                        case DATE:
                            if (val instanceof Number)
                                out.put(key, java.time.LocalDate.ofEpochDay(((Number) val).intValue()).format(DATE_FMT));
                            else out.put(key, val);
                            break;
                        default: out.put(key, val);
                    }
                }
                return objectMapper.writeValueAsString(out);
            } catch (Exception e) {
                LOG.warn("转换失败,使用原始数据", e);
                return data.toString();
            }
        }
        private BigDecimal decodeDecimal(String base64, int scale) {
            try { return new BigDecimal(new BigInteger(Base64.getDecoder().decode(base64)), scale); }
            catch (Exception e) { return null; }
        }
        // ===== MySQL Schema 加载 =====
        private Map<String, Map<String, ColumnInfo>> loadMySQLTableSchemaInternal() {
            String jdbcUrl = String.format("jdbc:mysql://%s:%d/%s?useSSL=false&serverTimezone=Asia/Shanghai",
                    mysqlHost, mysqlPort, mysqlDatabase);
            String sql = "SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, COLUMN_TYPE, NUMERIC_SCALE "
                    + "FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? ORDER BY TABLE_NAME, ORDINAL_POSITION";
            try { Class.forName("com.mysql.cj.jdbc.Driver"); }
            catch (ClassNotFoundException e) { throw new RuntimeException("MySQL driver not found", e); }
            Map<String, Map<String, ColumnInfo>> result = new HashMap<>();
            try (java.sql.Connection conn = java.sql.DriverManager.getConnection(jdbcUrl, mysqlUsername, mysqlPassword);
                 java.sql.PreparedStatement stmt = conn.prepareStatement(sql)) {
                stmt.setString(1, mysqlDatabase);
                try (java.sql.ResultSet rs = stmt.executeQuery()) {
                    while (rs.next()) {
                        String dataType = rs.getString("DATA_TYPE").toUpperCase();
                        String fullType = rs.getString("COLUMN_TYPE").toLowerCase();
                        int scale = rs.getInt("NUMERIC_SCALE");
                        ColumnType ct;
                        switch (dataType) {
                            case "BIGINT": ct = (fullType != null && fullType.contains("unsigned"))
                                    ? ColumnType.BIGINT_UNSIGNED : ColumnType.OTHER; break;
                            case "DECIMAL": case "NUMERIC": case "DEC": case "FIXED":
                                ct = ColumnType.DECIMAL; break;
                            case "DATETIME": case "TIMESTAMP": ct = ColumnType.DATETIME; break;
                            case "DATE": ct = ColumnType.DATE; break;
                            default: ct = ColumnType.OTHER;
                        }
                        result.computeIfAbsent(rs.getString("TABLE_NAME"), k -> new HashMap<>())
                              .put(rs.getString("COLUMN_NAME"), new ColumnInfo(ct, scale));
                    }
                }
                return result;
            } catch (Exception e) {
                LOG.error("加载 MySQL schema 失败: {}", e.getMessage(), e);
                throw new RuntimeException("Failed to load MySQL schema", e);
            }
        }
    }
}

9.3 MongoToClickHouseRealtimeSync(MongoDB 同步)

相比前文,本版本做了以下改动:继承 AbstractClickHouseSink 复用公共能力、Override onFlushTableError() 精确定位出错行、使用 FlinkJobUtils.loadConfig() 统一配置加载。

package com.cloud.flink;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class MongoToClickHouseRealtimeSync {
    private static final Logger LOG = LoggerFactory.getLogger(MongoToClickHouseRealtimeSync.class);
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ParameterTool config = FlinkJobUtils.loadConfig(params);
        CollectionMappingConfig mappingConfig = parseCollectionMapping(config);
        LOG.info("========== 配置信息 ==========");
        LOG.info("MongoDB: {}/{}", config.get("mongo.hosts"), config.get("mongo.database"));
        LOG.info("ClickHouse: {}/{}", config.get("clickhouse.url"), config.get("clickhouse.database"));
        LOG.info("同步集合数量: {}", mappingConfig.getCollectionMapping().size());
        LOG.info("==============================");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(config.getInt("flink.parallelism", 1));
        env.getConfig().setGlobalJobParameters(config);
        FlinkJobUtils.configureCheckpoint(env, config);
        String database = config.get("mongo.database");
        String[] collectionList = mappingConfig.getCollectionMapping().keySet().stream()
                .map(c -> database + "." + c).toArray(String[]::new);
        MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
                .hosts(config.get("mongo.hosts"))
                .username(config.get("mongo.username"))
                .password(config.get("mongo.password"))
                .connectionOptions(config.get("mongo.connection.options", "authSource=admin"))
                .databaseList(database)
                .collectionList(collectionList)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.latest())
                .build();
        env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB CDC Source")
           .addSink(new ClickHouseMongoSink(
                   config.get("clickhouse.url"), config.get("clickhouse.username"),
                   config.get("clickhouse.password"), config.get("clickhouse.database"),
                   mappingConfig,
                   config.getInt("flink.batch.size", 2000), config.getLong("flink.batch.interval", 5000L)
           )).name("ClickHouse Mongo Sink");
        env.execute("MongoDB to ClickHouse Realtime Sync");
    }
    // ===== 集合映射配置解析 =====
    private static CollectionMappingConfig parseCollectionMapping(ParameterTool config) {
        if (!config.has("sync.collections"))
            throw new RuntimeException("未配置集合映射,请设置 sync.collections");
        Map<String, String> collectionMapping = new LinkedHashMap<>();
        for (String pair : config.get("sync.collections").split(",")) {
            pair = pair.trim(); if (pair.isEmpty()) continue;
            String[] parts = pair.split(":");
            if (parts.length == 2) collectionMapping.put(parts[0].trim(), parts[1].trim());
            else if (parts.length == 1) collectionMapping.put(parts[0].trim(), "ods_mongo_" + parts[0].trim());
        }
        // 解析字段重命名
        Map<String, Map<String, String>> fieldRenames = new HashMap<>();
        // 解析字段排除
        Map<String, Set<String>> fieldExcludes = new HashMap<>();
        // 解析驼峰转下划线
        Set<String> camelToSnakeCollections = new HashSet<>();
        // 解析嵌套对象展平
        Map<String, Map<String, String>> flattenConfig = new HashMap<>();
        // 解析展平子字段重命名
        Map<String, Map<String, Map<String, String>>> flattenSubRenames = new HashMap<>();
        // 解析时间戳字段
        Map<String, Set<String>> timestampMsFields = new HashMap<>();
        Map<String, Set<String>> timestampSecFields = new HashMap<>();
        // 解析字段合并
        Map<String, Map<String, List<String>>> coalesceConfig = new HashMap<>();
        for (String collection : collectionMapping.keySet()) {
            // 字段重命名
            String renameKey = "sync.field.rename." + collection;
            if (config.has(renameKey)) {
                Map<String, String> renames = new LinkedHashMap<>();
                for (String rp : config.get(renameKey).split(",")) {
                    rp = rp.trim(); if (rp.isEmpty()) continue;
                    String[] sp = rp.split(":"); if (sp.length == 2) renames.put(sp[0].trim(), sp[1].trim());
                }
                if (!renames.isEmpty()) fieldRenames.put(collection, renames);
            }
            // 字段排除
            String excludeKey = "sync.field.exclude." + collection;
            if (config.has(excludeKey)) {
                Set<String> excludes = Arrays.stream(config.get(excludeKey).split(","))
                        .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
                if (!excludes.isEmpty()) fieldExcludes.put(collection, excludes);
            }
            // 驼峰转下划线
            if ("true".equals(config.get("sync.field.camel_to_snake." + collection, "")))
                camelToSnakeCollections.add(collection);
            // 嵌套展平
            String flattenKey = "sync.field.flatten." + collection;
            if (config.has(flattenKey)) {
                Map<String, String> flatten = new LinkedHashMap<>();
                for (String p : config.get(flattenKey).split(",")) {
                    p = p.trim(); if (p.isEmpty()) continue;
                    String[] fp = p.split(">"); if (fp.length == 2) flatten.put(fp[0].trim(), fp[1].trim());
                }
                if (!flatten.isEmpty()) flattenConfig.put(collection, flatten);
            }
            // 展平子字段重命名
            Map<String, String> collFlatten = flattenConfig.getOrDefault(collection, Collections.emptyMap());
            Map<String, Map<String, String>> subRenameMap = new HashMap<>();
            for (String mongoField : collFlatten.keySet()) {
                String key = "sync.field.flatten.rename." + collection + "." + mongoField;
                if (config.has(key)) {
                    Map<String, String> subRenames = new LinkedHashMap<>();
                    for (String rp : config.get(key).split(",")) {
                        rp = rp.trim(); if (rp.isEmpty()) continue;
                        String[] sp = rp.split(":"); if (sp.length == 2) subRenames.put(sp[0].trim(), sp[1].trim());
                    }
                    if (!subRenames.isEmpty()) subRenameMap.put(mongoField, subRenames);
                }
            }
            if (!subRenameMap.isEmpty()) flattenSubRenames.put(collection, subRenameMap);
            // 毫秒时间戳
            String tsMsKey = "sync.field.timestamp_ms." + collection;
            if (config.has(tsMsKey)) {
                Set<String> fields = Arrays.stream(config.get(tsMsKey).split(","))
                        .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
                if (!fields.isEmpty()) timestampMsFields.put(collection, fields);
            }
            // 秒时间戳
            String tsSecKey = "sync.field.timestamp_sec." + collection;
            if (config.has(tsSecKey)) {
                Set<String> fields = Arrays.stream(config.get(tsSecKey).split(","))
                        .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
                if (!fields.isEmpty()) timestampSecFields.put(collection, fields);
            }
            // 字段合并
            String coalesceKey = "sync.field.coalesce." + collection;
            if (config.has(coalesceKey)) {
                Map<String, List<String>> coalesceMap = new LinkedHashMap<>();
                for (String cp : config.get(coalesceKey).split(",")) {
                    cp = cp.trim(); if (cp.isEmpty()) continue;
                    String[] kv = cp.split(":");
                    if (kv.length == 2) {
                        List<String> sources = Arrays.stream(kv[1].split("\\|"))
                                .map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
                        if (!sources.isEmpty()) coalesceMap.put(kv[0].trim(), sources);
                    }
                }
                if (!coalesceMap.isEmpty()) coalesceConfig.put(collection, coalesceMap);
            }
        }
        return new CollectionMappingConfig(collectionMapping, fieldRenames, fieldExcludes,
                camelToSnakeCollections, flattenConfig, flattenSubRenames,
                timestampMsFields, timestampSecFields, coalesceConfig);
    }
    // ===== 集合映射配置类 =====
    public static class CollectionMappingConfig implements java.io.Serializable {
        private static final long serialVersionUID = 1L;
        private final Map<String, String> collectionMapping;
        private final Map<String, Map<String, String>> fieldRenames;
        private final Map<String, Set<String>> fieldExcludes;
        private final Set<String> camelToSnakeCollections;
        private final Map<String, Map<String, String>> flattenConfig;
        private final Map<String, Map<String, Map<String, String>>> flattenSubRenames;
        private final Map<String, Set<String>> timestampMsFields;
        private final Map<String, Set<String>> timestampSecFields;
        private final Map<String, Map<String, List<String>>> coalesceConfig;
        public CollectionMappingConfig(
                Map<String, String> collectionMapping, Map<String, Map<String, String>> fieldRenames,
                Map<String, Set<String>> fieldExcludes, Set<String> camelToSnakeCollections,
                Map<String, Map<String, String>> flattenConfig,
                Map<String, Map<String, Map<String, String>>> flattenSubRenames,
                Map<String, Set<String>> timestampMsFields, Map<String, Set<String>> timestampSecFields,
                Map<String, Map<String, List<String>>> coalesceConfig) {
            this.collectionMapping = collectionMapping; this.fieldRenames = fieldRenames;
            this.fieldExcludes = fieldExcludes; this.camelToSnakeCollections = camelToSnakeCollections;
            this.flattenConfig = flattenConfig; this.flattenSubRenames = flattenSubRenames;
            this.timestampMsFields = timestampMsFields; this.timestampSecFields = timestampSecFields;
            this.coalesceConfig = coalesceConfig;
        }
        public Map<String, String> getCollectionMapping() { return collectionMapping; }
        public String getClickHouseTable(String c) { return collectionMapping.get(c); }
        public boolean shouldSync(String c) { return collectionMapping.containsKey(c); }
        public Map<String, String> getFieldRenames(String c) { return fieldRenames.getOrDefault(c, Collections.emptyMap()); }
        public Set<String> getFieldExcludes(String c) { return fieldExcludes.getOrDefault(c, Collections.emptySet()); }
        public boolean isCamelToSnake(String c) { return camelToSnakeCollections.contains(c); }
        public Map<String, String> getFlattenConfig(String c) { return flattenConfig.getOrDefault(c, Collections.emptyMap()); }
        public Map<String, Map<String, String>> getFlattenSubRenames(String c) { return flattenSubRenames.getOrDefault(c, Collections.emptyMap()); }
        public Set<String> getTimestampMsFields(String c) { return timestampMsFields.getOrDefault(c, Collections.emptySet()); }
        public Set<String> getTimestampSecFields(String c) { return timestampSecFields.getOrDefault(c, Collections.emptySet()); }
        public Map<String, List<String>> getCoalesceConfig(String c) { return coalesceConfig.getOrDefault(c, Collections.emptyMap()); }
    }
    // ===== MongoDB Sink =====
    public static class ClickHouseMongoSink extends AbstractClickHouseSink<String> {
        private static final Logger LOG = LoggerFactory.getLogger(ClickHouseMongoSink.class);
        private final CollectionMappingConfig mappingConfig;
        private transient ObjectMapper objectMapper;
        private static final DateTimeFormatter DT_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        private static final DateTimeFormatter DT_MS_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        private static final ZoneId ZONE_ID = ZoneId.of("Asia/Shanghai");
        public ClickHouseMongoSink(String clickhouseUrl, String ckUsername, String ckPassword,
                                   String ckDatabase, CollectionMappingConfig mappingConfig,
                                   int batchSize, long batchIntervalMs) {
            super(clickhouseUrl, ckUsername, ckPassword, ckDatabase, batchSize, batchIntervalMs);
            this.mappingConfig = mappingConfig;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            objectMapper = new ObjectMapper();
            LOG.info("Mongo Sink 初始化完成: CK {} 个表", ckTableColumns.size());
        }
        /** 写入失败时提取出错行号,精确定位问题数据 */
        @Override
        protected void onFlushTableError(String table, List<String> rows, Exception e) throws Exception {
            String errMsg = e.getMessage();
            Matcher matcher = Pattern.compile("at row (\\d+)").matcher(errMsg != null ? errMsg : "");
            if (matcher.find()) {
                int rowNum = Integer.parseInt(matcher.group(1));
                LOG.error("写入 {}.{} 出错,错误行号: {}/{}", ckDatabase, table, rowNum, rows.size());
                if (rowNum >= 1 && rowNum <= rows.size()) LOG.error("出错行: {}", rows.get(rowNum - 1));
                if (rowNum >= 2) LOG.error("前一行: {}", rows.get(rowNum - 2));
                if (rowNum < rows.size()) LOG.error("后一行: {}", rows.get(rowNum));
            }
            throw e;
        }
        @Override
        public void invoke(String value, Context context) throws Exception {
            String operationType = "", collection = "";
            try {
                JsonNode root = objectMapper.readTree(value);
                operationType = root.path("operationType").asText();
                JsonNode nsNode = root.path("ns");
                if (nsNode.isObject()) collection = nsNode.path("coll").asText();
                else if (nsNode.isTextual()) {
                    String ns = nsNode.asText();
                    int dot = ns.indexOf('.'); collection = dot >= 0 ? ns.substring(dot + 1) : ns;
                }
                if (!mappingConfig.shouldSync(collection)) return;
                String ckTable = mappingConfig.getClickHouseTable(collection);
                if (ckTable == null) return;
                if ("insert".equals(operationType) || "update".equals(operationType) || "replace".equals(operationType)) {
                    JsonNode fullDocNode = root.path("fullDocument");
                    if (fullDocNode != null && !fullDocNode.isMissingNode()) {
                        String docStr = fullDocNode.isTextual() ? fullDocNode.asText() : fullDocNode.toString();
                        JsonNode document = objectMapper.readTree(docStr);
                        String json = convertMongoDocument(document, collection);
                        buffer.computeIfAbsent(ckTable, k -> new ArrayList<>()).add(json);
                    }
                }
                if (shouldFlush()) flush();
            } catch (Exception e) {
                LOG.error("处理数据失败: collection={}, op={}, value={}", collection, operationType, value, e);
                throw e;
            }
        }
        // ===== MongoDB 文档转换核心逻辑 =====
        private String convertMongoDocument(JsonNode document, String collection) {
            Map<String, String> renames = mappingConfig.getFieldRenames(collection);
            Set<String> excludes = mappingConfig.getFieldExcludes(collection);
            boolean useCamelToSnake = mappingConfig.isCamelToSnake(collection);
            Map<String, String> flattenFields = mappingConfig.getFlattenConfig(collection);
            Set<String> tsMsFields = mappingConfig.getTimestampMsFields(collection);
            Set<String> tsSecFields = mappingConfig.getTimestampSecFields(collection);
            Map<String, List<String>> coalesceFields = mappingConfig.getCoalesceConfig(collection);
            Map<String, Object> result = new LinkedHashMap<>();
            // 1. coalesce 字段(多个字段取第一个非空值)
            Set<String> consumedByCoalesce = new HashSet<>();
            for (Map.Entry<String, List<String>> entry : coalesceFields.entrySet()) {
                consumedByCoalesce.addAll(entry.getValue());
                Object value = null;
                for (String srcField : entry.getValue()) {
                    JsonNode srcNode = document.get(srcField);
                    if (srcNode != null && !srcNode.isNull()) {
                        if (tsMsFields.contains(srcField)) value = convertTimestampMs(srcNode);
                        else if (tsSecFields.contains(srcField)) value = convertTimestampSec(srcNode);
                        else value = convertExtendedJsonValue(srcNode);
                        break;
                    }
                }
                if (value != null) result.put(entry.getKey(), value);
            }
            // 2. 普通字段
            Iterator<Map.Entry<String, JsonNode>> fields = document.fields();
            while (fields.hasNext()) {
                Map.Entry<String, JsonNode> field = fields.next();
                String fieldName = field.getKey();
                JsonNode fieldValue = field.getValue();
                if (consumedByCoalesce.contains(fieldName) || excludes.contains(fieldName)) continue;
                // 嵌套对象展平
                if (flattenFields.containsKey(fieldName)) {
                    String ckPrefix = flattenFields.get(fieldName);
                    if (fieldValue.isObject() && !isExtendedJsonType(fieldValue)) {
                        Map<String, String> subRenames = mappingConfig.getFlattenSubRenames(collection)
                                .getOrDefault(fieldName, Collections.emptyMap());
                        Iterator<Map.Entry<String, JsonNode>> subFields = fieldValue.fields();
                        while (subFields.hasNext()) {
                            Map.Entry<String, JsonNode> sub = subFields.next();
                            String ckSubName = subRenames.containsKey(sub.getKey()) ? subRenames.get(sub.getKey())
                                    : useCamelToSnake ? camelToSnake(sub.getKey()) : sub.getKey();
                            result.put(ckPrefix + "." + ckSubName, convertExtendedJsonValue(sub.getValue()));
                        }
                    }
                    continue;
                }
                // 值转换
                Object converted;
                if (tsMsFields.contains(fieldName)) converted = convertTimestampMs(fieldValue);
                else if (tsSecFields.contains(fieldName)) converted = convertTimestampSec(fieldValue);
                else converted = convertExtendedJsonValue(fieldValue);
                // 字段重命名
                String targetName = renames.containsKey(fieldName) ? renames.get(fieldName)
                        : useCamelToSnake ? camelToSnake(fieldName) : fieldName;
                if (converted != null) result.put(targetName, converted);
            }
            try { return objectMapper.writeValueAsString(result); }
            catch (Exception e) { LOG.warn("序列化失败: {}", collection, e); return document.toString(); }
        }
        /** 转换 MongoDB Extended JSON 值 */
        private Object convertExtendedJsonValue(JsonNode value) {
            if (value == null || value.isNull()) return null;
            if (value.isObject()) {
                if (value.has("$oid")) return value.get("$oid").asText();
                if (value.has("$date")) {
                    JsonNode dateNode = value.get("$date");
                    if (dateNode.isTextual()) {
                        try { return Instant.parse(dateNode.asText()).atZone(ZONE_ID).format(DT_FMT); }
                        catch (Exception e) { return dateNode.asText(); }
                    } else if (dateNode.isNumber()) {
                        return Instant.ofEpochMilli(dateNode.asLong()).atZone(ZONE_ID).format(DT_FMT);
                    } else if (dateNode.isObject() && dateNode.has("$numberLong")) {
                        return Instant.ofEpochMilli(Long.parseLong(dateNode.get("$numberLong").asText()))
                                .atZone(ZONE_ID).format(DT_FMT);
                    }
                    return dateNode.asText();
                }
                if (value.has("$numberLong")) return Long.parseLong(value.get("$numberLong").asText());
                if (value.has("$numberInt")) return Integer.parseInt(value.get("$numberInt").asText());
                if (value.has("$numberDecimal")) return new BigDecimal(value.get("$numberDecimal").asText());
                return value.toString();  // 普通嵌套对象 → JSON 字符串
            }
            if (value.isArray()) {
                boolean hasComplex = false;
                for (JsonNode el : value) { if (el.isObject() && !isExtendedJsonType(el)) { hasComplex = true; break; } }
                if (hasComplex) return value.toString();  // 对象数组 → JSON 字符串
                List<Object> arr = new ArrayList<>();
                for (JsonNode el : value) arr.add(convertExtendedJsonValue(el));
                return arr;  // 基本类型数组 → CK Array
            }
            if (value.isTextual()) return value.asText();
            if (value.isBoolean()) return value.asBoolean();
            if (value.isInt()) return value.asInt();
            if (value.isLong()) return value.asLong();
            if (value.isDouble() || value.isFloat()) {
                double d = value.asDouble();
                if (d == Math.floor(d) && !Double.isInfinite(d)) return (long) d;
                return d;
            }
            return value.asText();
        }
        private Object convertTimestampMs(JsonNode value) {
            long millis;
            if (value.isNumber()) millis = value.asLong();
            else if (value.isObject() && value.has("$numberLong"))
                millis = Long.parseLong(value.get("$numberLong").asText());
            else return convertExtendedJsonValue(value);
            return Instant.ofEpochMilli(millis).atZone(ZONE_ID).format(DT_MS_FMT);
        }
        private Object convertTimestampSec(JsonNode value) {
            long seconds;
            if (value.isNumber()) seconds = value.asLong();
            else if (value.isObject() && value.has("$numberLong"))
                seconds = Long.parseLong(value.get("$numberLong").asText());
            else if (value.isObject() && value.has("$numberInt"))
                seconds = Integer.parseInt(value.get("$numberInt").asText());
            else return convertExtendedJsonValue(value);
            return Instant.ofEpochSecond(seconds).atZone(ZONE_ID).format(DT_FMT);
        }
        private static boolean isExtendedJsonType(JsonNode node) {
            return node.has("$oid") || node.has("$date") || node.has("$numberLong")
                    || node.has("$numberInt") || node.has("$numberDecimal");
        }
        private static String camelToSnake(String camelCase) {
            return camelCase.replaceAll("([a-z0-9])([A-Z])", "$1_$2").toLowerCase();
        }
    }
}

9.4 FlinkJobUtils(公共工具类)

package com.cloud.flink;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.Properties;
import java.util.stream.Collectors;
public class FlinkJobUtils {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobUtils.class);
    private FlinkJobUtils() {}
    /**
     * 加载配置:--config 文件路径 > --env 环境名 > 默认 test
     * 命令行参数优先级最高
     */
    public static ParameterTool loadConfig(ParameterTool params) throws Exception {
        if (params.has("config")) {
            return ParameterTool.fromPropertiesFile(params.get("config")).mergeWith(params);
        }
        String env = params.get("env", "test");
        String resourcePath = "application-" + env + ".properties";
        try (InputStream is = FlinkJobUtils.class.getClassLoader().getResourceAsStream(resourcePath)) {
            if (is == null) throw new RuntimeException("配置文件不存在: " + resourcePath);
            Properties props = new Properties();
            props.load(is);
            return ParameterTool.fromMap(props.entrySet().stream()
                    .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString())))
                    .mergeWith(params);
        }
    }
    /** 配置 Checkpoint(可选,不配则用集群默认值) */
    public static void configureCheckpoint(StreamExecutionEnvironment env, ParameterTool config) {
        if (config.has("flink.checkpoint.interval")) {
            long interval = config.getLong("flink.checkpoint.interval");
            env.enableCheckpointing(interval);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(interval / 2);
            env.getCheckpointConfig().setCheckpointTimeout(interval * 2);
            LOG.info("Checkpoint 配置: interval={}ms", interval);
        }
        String dir = config.get("flink.checkpoint.dir", "");
        if (!dir.isEmpty()) {
            env.getCheckpointConfig().setCheckpointStorage(dir);
            LOG.info("Checkpoint 目录: {}", dir);
        }
    }
}

结语

回顾一下,我们用三个组件搭了一套完整的离线+实时数仓:

层次 方案 角色
存储+分析 ClickHouse ODS/DW 层,ReplacingMergeTree 自动去重
离线调度 DolphinScheduler T+1/H+1 定时同步(CK 表引擎直连)
实时同步 Flink CDC MySQL binlog + MongoDB Change Stream,秒级入仓

不堆 Hadoop 全家桶,不写 100 个 Flink 类,只靠配置文件驱动——同步一张新表,改一行配置就行。

如果这篇文章对你有帮助,欢迎点赞收藏,也欢迎在评论区聊聊你的数仓选型方案。

系列文章:

  • ClickHouse + DolphinScheduler:两个组件搞定轻量离线数仓
  • Flink 1.20 实战:零代码配置实现 MySQL 百表到 ClickHouse 实时同步
  • Flink 1.20 实战:一套配置搞定 MongoDB 多集合实时同步到 ClickHouse
© 版权声明

相关文章