ClickHouse + Flink + DolphinScheduler:中小厂三件套搞定离线+实时数仓,告别 Hadoop 全家桶
前方高能:本文不写 100 个 Flink 类,不堆 Hadoop 全家桶,只用三个组件,搞定离线+实时数仓。
适合人群:想搭数仓但不想养一套 Hadoop 的运维团队。
一、背景:搞数仓一定要上"大厂全家桶"吗?
1.1 灵魂拷问
你是否经历过这样的场景:
- 老板说"搞个数仓",你脑海中浮现出 Hadoop + Hive + Spark + Kafka + Airflow + …
- 立项预算只够买 3 台服务器,但技术架构画了 15 个组件
- 离线 T+1 已经跑起来了,老板又说"能不能实时看数据?"
核心矛盾:中小团队要的是"能用、好用、省钱",而不是"大而全、贵且复杂"。
1.2 我们的选择
| 组件 | 角色 | 一句话理由 |
|---|---|---|
| ClickHouse | 存储+分析引擎 | 列式存储、亚秒级查询、自带 MySQL/MongoDB 表引擎 |
| DolphinScheduler | 离线调度 | 可视化 DAG、比 Airflow 更适合国内团队 |
| Flink CDC | 实时同步 | 配置驱动、零代码、百表同步不写一行 SQL |
三个组件,各司其职,互不内卷。
1.3 前置阅读
本文是"数仓三部曲"的终章,建议先看前面几篇:
- ClickHouse + DolphinScheduler:两个组件搞定轻量离线数仓,谁还堆 Hadoop 全家桶? —— 离线数仓搭建
- Flink 1.20 实战:零代码配置实现 MySQL 百表到 ClickHouse 实时同步 —— MySQL 实时同步
- Flink 1.20 实战:一套配置搞定 MongoDB 多集合实时同步到 ClickHouse —— MongoDB 实时同步
上面 3 篇都有完整的操作步骤,本文侧重架构整合和代码优化,细节可回看前文。
二、架构设计:离线+实时,各取所长
2.1 为什么不用 Flink 做全量同步?
先看一组性能对比:
ClickHouse 表引擎直连同步(离线/全量)
| 数据源 | 同步速率 | 每分钟吞吐量 |
|---|---|---|
| MySQL → ClickHouse | ~35w 条/s | 2000w+ 条/min |
| MongoDB → ClickHouse | ~12w 条/s | 720w+ 条/min |
Flink CDC 同步(实时增量)
20 分钟同步了 932w 条数据
结论很明显:全量同步让 CK 表引擎来,实时增量让 Flink 来,各干各的擅长活。
2.2 整体数据流转

2.3 数据去重:ReplacingMergeTree 兜底
两路数据(离线+实时)写到同一张表,必然存在重复。怎么办?
答案是 ClickHouse 的 ReplacingMergeTree 引擎:
CREATE TABLE dw.ods_user_profile (
id UInt64 COMMENT '主键ID',
user_code String COMMENT '用户编号',
avatar_url Nullable(String) COMMENT '头像地址',
login_key String COMMENT '登录标识',
phone Nullable(String) COMMENT '手机号码',
user_type Int8 DEFAULT 1 COMMENT '类型: 1-普通, 2-内部',
status Nullable(Int8) COMMENT '状态: 0-禁用, 1-启用',
nickname Nullable(String) COMMENT '昵称',
create_time DateTime DEFAULT now() COMMENT '创建时间',
_version DateTime DEFAULT now() COMMENT '版本字段,用于去重'
)
ENGINE = ReplacingMergeTree(_version)
PARTITION BY toYYYYMM(create_time)
ORDER BY (user_code)
SETTINGS index_granularity = 256
COMMENT 'ODS层 - 用户信息表';
去重原理:ReplacingMergeTree 在后台合并(Merge)时,会按 ORDER BY 键去重,保留 _version 最大的那条。离线写一遍、实时写一遍,最终只留最新版本。
注意:去重发生在异步 Merge 阶段,查询时如需强一致可加
FINAL关键字(有性能开销)。
2.4 分工总结
| 维度 | 离线(CK 表引擎 + Dolphin) | 实时(Flink CDC) |
|---|---|---|
| 数据范围 | 历史全量 + T+1/H+1 增量 | 实时增量(binlog/Change Stream) |
| 同步速度 | 极快(直连,35w/s) | 秒级延迟 |
| 适用场景 | 历史数据初始化、定时报表 | 实时看板、即时分析 |
| 对源库压力 | 中(全量查询) | 低(只读增量日志) |
Flink 只负责实时增量,不处理历史数据。历史数据的活交给 CK 表引擎——“术业有专攻”。
三、代码演进:从"两份冗余"到"一套架构"
3.1 原来的问题
前面两篇文章分别实现了 MySQL → CK 和 MongoDB → CK 的实时同步,各自一个类,各自一套配置。用是能用,但:
-
代码冗余:两个类都继承
RichSinkFunction,都实现CheckpointedFunction,写 CK 的逻辑几乎一样 - 配置分散:两份 properties 文件,改 CK 地址要改两遍
- 表结构变更要重启:加字段后不重启 Flink 任务?对不起,不认识
- 一些隐藏 Bug:物化列写入报错、DECIMAL 精度丢失……
用程序员的话说:这代码能跑,但"技术债"在悄悄积累。
3.2 重构思路

5 个核心改进:
-
抽取公共基类:所有写 CK 的逻辑(批量攒批、HTTP 流式写入、Checkpoint flush、表结构管理)统一到
AbstractClickHouseSink - 表结构自动刷新:每 10 分钟自动刷新 MySQL + CK 表结构,加字段不用重启
-
Bug 修复:物化列/Alias 列自动排除(不再写入报错)、DECIMAL 精度从
information_schema动态获取 - 日志精简:正常写入只打 DEBUG,异常时才打完整数据——磁盘不再被无效日志撑爆
- 配置合并:MySQL 和 MongoDB 的配置合到一个文件,统一管理
四、MySQL 同步:7 大场景全解析
MySQLToClickHouseRealtimeSync 基于 Flink CDC 3.5 + Debezium,通过配置驱动实现以下场景:
| # | 场景 | 实现方式 | 说明 |
|---|---|---|---|
| 1 | 多表同步 | 正则匹配 tb_.* 或精确映射 a:ods_a,b:ods_b
|
一个 Flink 任务同步 N 张表 |
| 2 | 动态发现新表 | scanNewlyAddedTableEnabled=true |
MySQL 新建表,Flink 自动感知并全量同步 |
| 3 | 表结构自动加载 | 查询 information_schema.COLUMNS
|
启动时自动识别所有字段类型,无需手动配置 |
| 4 | DECIMAL Base64 解码 | new BigDecimal(new BigInteger(bytes), scale) |
Debezium 将 DECIMAL 编码为 Base64,按实际 scale 还原 |
| 5 | BIGINT UNSIGNED 解码 | new BigInteger(Base64.decode(...)) |
Debezium 将 BIGINT UNSIGNED 也编码为 Base64 |
| 6 | DATETIME/TIMESTAMP 转换 | 毫秒时间戳 → yyyy-MM-dd HH:mm:ss;ISO 8601 → 时区转换 |
区分 DATETIME(毫秒数)和 TIMESTAMP(ISO 字符串)两种编码 |
| 7 | DATE 天数转换 | LocalDate.ofEpochDay(days) |
Debezium 将 DATE 编码为从 1970-01-01 起的天数 |
上面这些场景,都是"不处理就踩坑"的类型。比如 DECIMAL 不解码,你会发现金额从 99.99 变成了一串乱码;DATETIME 不转换,日期会变成一个天文数字。
4.1 配置示例(MySQL 部分)
# MySQL CDC 配置
mysql.hostname=10.0.1.100
mysql.port=3306
mysql.database=biz_db
mysql.username=cdc_reader
mysql.password=***
# 方式1:正则匹配(推荐,自动发现新表)
# sync.table.pattern=tb_.*
# sync.table.prefix=ods_
# 方式2:精确指定
sync.tables=tb_user:ods_tb_user,\
tb_account:ods_tb_account,\
tb_product:ods_tb_product,\
tb_order:ods_tb_order,\
tb_payment:ods_tb_payment
五、MongoDB 同步:15 大场景全解析
MongoToClickHouseRealtimeSync 针对 MongoDB 的 “灵活 Schema” 做了大量适配:
| # | 场景 | 配置键 | 说明 |
|---|---|---|---|
| 1 | 多集合同步 | sync.collections |
一个任务同步 N 个集合 |
| 2 | 集合→表名映射 | trace_logs:ods_mongo_trace_logs |
Mongo 集合名映射为 CK 表名 |
| 3 | 字段重命名 | sync.field.rename.{集合} |
c_d:create_date 把缩写字段改成人话 |
| 4 | 字段排除 | sync.field.exclude.{集合} |
排除 _class 等无用字段 |
| 5 | 驼峰→下划线 | sync.field.camel_to_snake.{集合}=true |
agentId → agent_id 全自动转换 |
| 6 | 嵌套对象展平 | sync.field.flatten.{集合} |
把 {info: {name: "a"}} 展平为 info.name
|
| 7 | 展平子字段重命名 | sync.field.flatten.rename.{集合}.{字段} |
展平后还能单独改子字段名 |
| 8 | 毫秒时间戳转换 | sync.field.timestamp_ms.{集合} |
毫秒数 → yyyy-MM-dd HH:mm:ss.SSS
|
| 9 | 秒时间戳转换 | sync.field.timestamp_sec.{集合} |
秒数 → yyyy-MM-dd HH:mm:ss
|
| 10 | 多字段合并(coalesce) | sync.field.coalesce.{集合} |
create_time:c_t|ct 取第一个非空值 |
| 11 | $oid 处理 | 自动 |
{"$oid": "65a..."} → "65a..."
|
| 12 | $date 处理 | 自动 | ISO 字符串/毫秒/$numberLong 三种格式全兼容 |
| 13 | n u m b e r L o n g / numberLong/ numberLong/numberInt |
自动 | Extended JSON 数值类型自动还原 |
| 14 | $numberDecimal | 自动 | 精确还原为 BigDecimal |
| 15 | 数组智能处理 | 自动 | 对象数组 → JSON 字符串,基本类型数组 → CK Array |
MongoDB 的 Extended JSON 是个"大坑":一个
_id字段不是字符串而是{"$oid": "65a..."},一个时间不是时间而是{"$date": {"$numberLong": "1705..."}}。不处理这些,写入 CK 分分钟报错。
5.1 配置示例(MongoDB 部分)
# MongoDB CDC 配置
mongo.hosts=10.0.2.200:27017
mongo.database=app_data
mongo.username=cdc_reader
mongo.password=***
mongo.connection.options=authSource=admin&replicaSet=rs0
# 集合映射
sync.collections=event_logs:ods_mongo_event_logs,\
session_record:ods_mongo_session_record,\
dialog:ods_mongo_dialog,\
ai_call_record:ods_mongo_ai_call_record
# 字段重命名
sync.field.rename.event_logs=c_d:create_date
sync.field.rename.session_record=c_t:create_time,u_t:update_time
# 驼峰转下划线
sync.field.camel_to_snake.ai_model_record=true
# 嵌套展平
sync.field.flatten.ai_model_record=callInfo>call_info,reasonParams>reason_params
# 展平子字段重命名
sync.field.flatten.rename.ai_model_record.callInfo=apikey:api_key
# 时间戳转换
sync.field.timestamp_ms.ai_call_record=createTime,inputTime
sync.field.timestamp_sec.dialog=ct,ut
# 字段合并
sync.field.coalesce.chat_record=create_time:c_t|ct,update_time:ut|u_t
# 字段排除
sync.field.exclude.event_logs=_class,c_t
六、公共能力:AbstractClickHouseSink
两个 Sink 共享的基类,提供以下公共能力:
6.1 核心特性
| 能力 | 说明 |
|---|---|
| 批量攒批写入 | 按条数(默认 2000)或时间间隔(默认 5s)触发 flush |
| HTTP 流式写入 | 使用 Chunked Transfer + BufferedOutputStream,避免大 batch 拼字符串 OOM |
| CK 表结构定时刷新 | 每 10 分钟从 system.columns 重新加载,自动排除 MATERIALIZED/ALIAS 列 |
| 表结构变更检测 | 刷新时对比新旧 schema,日志输出新增/删除的列 |
| Checkpoint 支持 | 实现 CheckpointedFunction,Checkpoint 触发时自动 flush 缓冲区 |
| 写入失败回调 |
onFlushTableError() 可被子类 override(Mongo Sink 可提取出错行号) |
6.2 代码结构
public abstract class AbstractClickHouseSink<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
// ===== 公共字段 =====
protected transient Map<String, List<String>> buffer; // table -> rows
protected transient volatile Map<String, List<String>> ckTableColumns; // CK 表结构缓存
protected transient ScheduledExecutorService schemaRefreshExecutor;
// ===== 生命周期 =====
@Override
public void open(Configuration parameters) throws Exception {
// 初始化 buffer、认证信息、加载 CK 表结构、启动定时刷新
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
flush(); // Checkpoint 时强制刷盘
}
// ===== 扩展点(子类 override)=====
protected void refreshSchema() { /* 默认只刷新 CK schema */ }
protected void onFlushTableError(String table, List<String> rows, Exception e) throws Exception { /* 默认打日志+重抛 */ }
// ===== 核心写入 =====
protected void flush() throws Exception {
for (Map.Entry<String, List<String>> entry : buffer.entrySet()) {
String insertSql = buildInsertSql(table, ckCols);
executeHttpPostStreaming(buildInsertUrl(insertSql), rows);
}
buffer.clear();
}
// CK 表结构加载:排除 MATERIALIZED 和 ALIAS 列
protected Map<String, List<String>> loadCKTableColumnsInternal() throws Exception {
String sql = "SELECT table, name FROM system.columns "
+ "WHERE database = '...' "
+ "AND default_kind NOT IN ('MATERIALIZED', 'ALIAS') "
+ "ORDER BY table, position";
// ...
}
}
为什么排除 MATERIALIZED/ALIAS 列?因为这两类列由 CK 自动计算,手动 INSERT 会报错。这是前一版代码的一个隐藏 Bug——直到线上加了物化列才暴露。
七、部署与运行
7.1 pom.xml 关键依赖
<properties>
<flink.version>1.20.1</flink.version>
</properties>
<dependencies>
<!-- Flink 核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CDC -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>3.5.0</version>
</dependency>
<!-- JDBC 驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.4.0</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.8.5</version>
<classifier>all</classifier>
</dependency>
</dependencies>
使用
maven-shade-plugin打 fat jar,详细的 build 配置见前文。
7.2 统一配置文件
MySQL 和 MongoDB 的配置合并到一个 application-{env}.properties:
# =================== ClickHouse ===================
clickhouse.url=http://10.0.3.100:8123
clickhouse.database=dw
clickhouse.username=default
clickhouse.password=***
# =================== Flink ===================
flink.parallelism=1
flink.batch.size=2000
flink.batch.interval=5000
# Checkpoint(可选,集群已配置则无需重复)
# flink.checkpoint.interval=60000
# flink.checkpoint.dir=s3://state/checkpoint
# =================== MySQL CDC ===================
mysql.hostname=10.0.1.100
mysql.port=3306
mysql.database=biz_db
mysql.username=cdc_reader
mysql.password=***
mysql.timezone=Asia/Shanghai
sync.scan.newly.added.table=true
sync.tables=tb_user:ods_tb_user,tb_order:ods_tb_order,...
# =================== MongoDB CDC ===================
mongo.hosts=10.0.2.200:27017
mongo.database=app_data
mongo.username=cdc_reader
mongo.password=***
mongo.connection.options=authSource=admin&replicaSet=rs0
sync.collections=event_logs:ods_mongo_event_logs,...
# (字段转换配置同第五节)
7.3 启动命令
# MySQL → ClickHouse(测试环境)
flink run -c com.cloud.flink.MySQLToClickHouseRealtimeSyncV2 \
bi-flink.jar --env test
# MongoDB → ClickHouse(生产环境)
flink run -c com.cloud.flink.MongoToClickHouseRealtimeSync \
bi-flink.jar --env prod
# 指定外部配置文件
flink run -c com.cloud.flink.MySQLToClickHouseRealtimeSyncV2 \
bi-flink.jar --config /opt/flink/conf/sync.properties
7.4 最终效果
两个 Flink 任务并行运行,分别负责 MySQL 和 MongoDB 的实时同步:

八、注意事项与踩坑记录
8.1 ReplacingMergeTree 不是实时去重
ReplacingMergeTree 的去重发生在后台 Merge 阶段,不是写入时。这意味着:
- 刚写入的重复数据,查询时可能还能看到
- 需要精确去重的查询,请加
FINAL:SELECT * FROM ods_tb_user FINAL WHERE ... -
FINAL有性能开销,报表查询可以接受短暂的重复
8.2 Flink 只同步增量
Flink CDC 启动模式为 StartupOptions.latest(),即只同步启动后的增量数据。历史数据由 CK 表引擎 + DolphinScheduler 负责。
8.3 DECIMAL 精度问题
Debezium 将 MySQL 的 DECIMAL(10,4) 编码为 Base64 字符串,解码时需要知道 scale(小数位数)。
-
旧版代码:scale 硬编码为 2,
DECIMAL(10,4)的值会被错误解析 -
新版代码:从
information_schema.COLUMNS.NUMERIC_SCALE动态获取实际精度
8.4 CK 物化列写入报错
ClickHouse 的 MATERIALIZED 和 ALIAS 列不能手动插入,但 Flink 不知道哪些列是物化列。
解决方案:从 system.columns 加载表结构时,过滤 default_kind NOT IN ('MATERIALIZED', 'ALIAS'),构建显式列名的 INSERT 语句。
8.5 表结构变更不重启
每 10 分钟自动刷新 MySQL 和 CK 的表结构缓存。加字段后:
- 先在 CK 加列
- 再在 MySQL 加列(触发 binlog)
- 等待最多 10 分钟,Flink 自动识别新列并写入
不需要重启 Flink 任务——懒人的终极胜利。
九、完整代码
9.1 AbstractClickHouseSink(公共基类)
package com.cloud.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* ClickHouse HTTP Sink 抽象基类
* 提供批量写入、CK 表结构定时刷新、Checkpoint 支持等公共能力。
*
* 子类需实现:invoke()
* 子类可 override:refreshSchema()、onFlushTableError()
*/
public abstract class AbstractClickHouseSink<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(AbstractClickHouseSink.class);
protected final String clickhouseUrl;
protected final String ckUsername;
protected final String ckPassword;
protected final String ckDatabase;
protected final int batchSize;
protected final long batchIntervalMs;
protected transient String authHeader;
protected transient Map<String, List<String>> buffer;
protected transient long lastFlushTime;
protected transient volatile Map<String, List<String>> ckTableColumns;
protected transient ScheduledExecutorService schemaRefreshExecutor;
protected AbstractClickHouseSink(String clickhouseUrl, String ckUsername, String ckPassword,
String ckDatabase, int batchSize, long batchIntervalMs) {
this.clickhouseUrl = clickhouseUrl;
this.ckUsername = ckUsername;
this.ckPassword = ckPassword;
this.ckDatabase = ckDatabase;
this.batchSize = batchSize;
this.batchIntervalMs = batchIntervalMs;
}
@Override
public void open(Configuration parameters) throws Exception {
authHeader = "Basic " + Base64.getEncoder().encodeToString(
(ckUsername + ":" + ckPassword).getBytes(StandardCharsets.UTF_8));
buffer = new HashMap<>();
lastFlushTime = System.currentTimeMillis();
ckTableColumns = loadCKTableColumnsInternal();
schemaRefreshExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "schema-refresh");
t.setDaemon(true);
return t;
});
schemaRefreshExecutor.scheduleAtFixedRate(this::refreshSchema, 10, 10, TimeUnit.MINUTES);
}
@Override
public void close() throws Exception {
if (schemaRefreshExecutor != null) {
schemaRefreshExecutor.shutdown();
try { schemaRefreshExecutor.awaitTermination(5, TimeUnit.SECONDS); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
if (buffer != null && !buffer.isEmpty()) { flush(); }
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
LOG.debug("Checkpoint #{} 触发,flush 缓存数据", context.getCheckpointId());
flush();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// CDC source 自带 offset 恢复,无需在 sink 侧保存 buffer 状态
}
/** 定时刷新 CK schema,子类可 override 以同步刷新源端 schema */
protected void refreshSchema() {
try {
Map<String, List<String>> newCKSchema = loadCKTableColumnsInternal();
detectAndLogCKChanges(ckTableColumns, newCKSchema);
ckTableColumns = newCKSchema;
LOG.info("CK Schema 刷新完成: {} 个表", newCKSchema.size());
} catch (Exception e) {
LOG.warn("CK Schema 刷新失败,继续使用旧 schema: {}", e.getMessage());
}
}
/** 单表写入失败回调,子类可 override */
protected void onFlushTableError(String table, List<String> rows, Exception e) throws Exception {
LOG.error("写入 {}.{} 表失败, 共 {} 条, 首行: {}",
ckDatabase, table, rows.size(),
rows.isEmpty() ? "" : (rows.get(0).length() > 500
? rows.get(0).substring(0, 500) + "..." : rows.get(0)));
throw e;
}
protected boolean shouldFlush() {
int totalSize = buffer.values().stream().mapToInt(List::size).sum();
return totalSize >= batchSize || System.currentTimeMillis() - lastFlushTime >= batchIntervalMs;
}
protected void flush() throws Exception {
if (buffer.isEmpty()) { lastFlushTime = System.currentTimeMillis(); return; }
for (Map.Entry<String, List<String>> entry : buffer.entrySet()) {
String table = entry.getKey();
List<String> rows = entry.getValue();
if (rows.isEmpty()) continue;
List<String> ckCols = ckTableColumns != null ? ckTableColumns.get(table) : null;
String insertSql = buildInsertSql(table, ckCols);
String insertUrl = buildInsertUrl(insertSql);
LOG.debug("准备写入 {}.{} 表 {} 条数据", ckDatabase, table, rows.size());
try {
executeHttpPostStreaming(insertUrl, rows);
LOG.debug("写入 {}.{} 表 {} 条数据完成", ckDatabase, table, rows.size());
} catch (Exception e) {
onFlushTableError(table, rows, e);
}
}
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
protected String buildInsertSql(String table, List<String> ckCols) {
return (ckCols != null && !ckCols.isEmpty())
? "INSERT INTO " + table + " (" + String.join(", ", ckCols) + ") FORMAT JSONEachRow"
: "INSERT INTO " + table + " FORMAT JSONEachRow";
}
protected String buildInsertUrl(String insertSql) {
return String.format("%s/?database=%s&max_partitions_per_insert_block=0"
+ "&input_format_skip_unknown_fields=1&query=%s",
clickhouseUrl, ckDatabase,
java.net.URLEncoder.encode(insertSql, StandardCharsets.UTF_8));
}
/** 流式写入:chunked transfer,避免大 batch OOM */
protected void executeHttpPostStreaming(String urlStr, List<String> rows) throws Exception {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
try {
conn.setRequestMethod("POST");
conn.setRequestProperty("Authorization", authHeader);
conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
conn.setDoOutput(true);
conn.setChunkedStreamingMode(8192);
conn.setConnectTimeout(30000);
conn.setReadTimeout(120000);
try (BufferedOutputStream bos = new BufferedOutputStream(conn.getOutputStream(), 65536)) {
byte[] newline = "\n".getBytes(StandardCharsets.UTF_8);
for (int i = 0; i < rows.size(); i++) {
if (i > 0) bos.write(newline);
bos.write(rows.get(i).getBytes(StandardCharsets.UTF_8));
}
}
int code = conn.getResponseCode();
String responseBody = readResponse(conn, code != 200);
if (code != 200) {
LOG.error("ClickHouse 写入失败: code={}, response={}", code, responseBody);
throw new RuntimeException("Insert failed: " + responseBody);
}
} finally {
conn.disconnect();
}
}
protected String readResponse(HttpURLConnection conn, boolean isError) {
try {
InputStream is = isError ? conn.getErrorStream() : conn.getInputStream();
if (is == null) return "";
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return br.lines().collect(Collectors.joining("\n"));
}
} catch (Exception e) {
LOG.warn("读取响应失败: {}", e.getMessage());
return "";
}
}
/** 从 system.columns 加载 CK 表结构(排除 MATERIALIZED/ALIAS 列) */
protected Map<String, List<String>> loadCKTableColumnsInternal() throws Exception {
String sql = "SELECT table, name FROM system.columns WHERE database = '" + ckDatabase
+ "' AND default_kind NOT IN ('MATERIALIZED', 'ALIAS') ORDER BY table, position";
String urlStr = clickhouseUrl + "/?query=" + java.net.URLEncoder.encode(sql, StandardCharsets.UTF_8);
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
try {
conn.setRequestMethod("GET");
conn.setRequestProperty("Authorization", authHeader);
conn.setConnectTimeout(10000);
conn.setReadTimeout(10000);
int code = conn.getResponseCode();
String response = readResponse(conn, code != 200);
if (code != 200) throw new RuntimeException("查询 CK 表结构失败: " + response);
Map<String, List<String>> result = new LinkedHashMap<>();
for (String line : response.split("\n")) {
line = line.trim();
if (line.isEmpty()) continue;
String[] parts = line.split("\t");
if (parts.length == 2) {
result.computeIfAbsent(parts[0].trim(), k -> new ArrayList<>())
.add(parts[1].trim());
}
}
LOG.info("已加载 CK 表结构: {} 个表", result.size());
return result;
} finally {
conn.disconnect();
}
}
/** 对比新旧 CK schema,日志输出变更 */
protected void detectAndLogCKChanges(Map<String, List<String>> oldSchema,
Map<String, List<String>> newSchema) {
if (oldSchema == null || newSchema == null) return;
for (Map.Entry<String, List<String>> entry : newSchema.entrySet()) {
String table = entry.getKey();
List<String> newCols = entry.getValue();
List<String> oldCols = oldSchema.get(table);
if (oldCols == null) {
LOG.info("CK 检测到新表: {}", table);
} else if (!new HashSet<>(oldCols).equals(new HashSet<>(newCols))) {
Set<String> added = new HashSet<>(newCols); added.removeAll(new HashSet<>(oldCols));
Set<String> removed = new HashSet<>(oldCols); removed.removeAll(new HashSet<>(newCols));
LOG.info("CK 表 {} 结构变更: 新增列={}, 删除列={}", table, added, removed);
}
}
for (String table : oldSchema.keySet()) {
if (!newSchema.containsKey(table)) LOG.warn("CK 表已删除: {}", table);
}
}
}
9.2 MySQLToClickHouseRealtimeSync(MySQL 同步)
package com.cloud.flink;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.regex.Pattern;
public class MySQLToClickHouseRealtimeSyncV2 {
private static final Logger LOG = LoggerFactory.getLogger(MySQLToClickHouseRealtimeSyncV2.class);
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ParameterTool config = FlinkJobUtils.loadConfig(params);
TableMappingConfig tableMappingConfig = parseTableMapping(config);
LOG.info("========== 配置信息 ==========");
LOG.info("MySQL: {}:{}/{}", config.get("mysql.hostname"), config.get("mysql.port"), config.get("mysql.database"));
LOG.info("ClickHouse: {}/{}", config.get("clickhouse.url"), config.get("clickhouse.database"));
LOG.info("表映射模式: {}", tableMappingConfig.isPatternMode() ? "正则匹配" : "精确指定");
LOG.info("同步表数量: {}", tableMappingConfig.getTableList().length);
LOG.info("==============================");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(config.getInt("flink.parallelism", 1));
env.getConfig().setGlobalJobParameters(config);
FlinkJobUtils.configureCheckpoint(env, config);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(config.get("mysql.hostname"))
.port(config.getInt("mysql.port"))
.databaseList(config.get("mysql.database"))
.tableList(tableMappingConfig.getTableList())
.username(config.get("mysql.username"))
.password(config.get("mysql.password"))
.serverTimeZone(config.get("mysql.timezone", "Asia/Shanghai"))
.serverId(config.get("mysql.server.id", "5401-5404"))
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(false)
.scanNewlyAddedTableEnabled(config.getBoolean("sync.scan.newly.added.table", true))
.build();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.addSink(new ClickHouseCheckpointedSink(
config.get("mysql.hostname"), config.getInt("mysql.port"),
config.get("mysql.database"), config.get("mysql.username"), config.get("mysql.password"),
config.get("clickhouse.url"), config.get("clickhouse.username"),
config.get("clickhouse.password"), config.get("clickhouse.database"),
tableMappingConfig,
config.getInt("flink.batch.size", 2000), config.getLong("flink.batch.interval", 5000L)
)).name("ClickHouse Checkpointed Sink");
env.execute("MySQL to ClickHouse Realtime Sync V2");
}
private static TableMappingConfig parseTableMapping(ParameterTool config) {
String database = config.get("mysql.database");
if (config.has("sync.table.pattern")) {
return new TableMappingConfig(database, config.get("sync.table.pattern"),
config.get("sync.table.prefix", "ods_"));
}
if (config.has("sync.tables")) {
Map<String, String> tableMapping = new LinkedHashMap<>();
for (String pair : config.get("sync.tables").split(",")) {
pair = pair.trim();
if (pair.isEmpty()) continue;
String[] parts = pair.split(":");
if (parts.length == 2) tableMapping.put(parts[0].trim(), parts[1].trim());
else if (parts.length == 1) tableMapping.put(parts[0].trim(), "ods_" + parts[0].trim());
}
return new TableMappingConfig(database, tableMapping);
}
throw new RuntimeException("未配置表映射,请设置 sync.table.pattern 或 sync.tables");
}
/** 表映射配置(支持正则模式和精确指定模式) */
public static class TableMappingConfig implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private final String database;
private final boolean patternMode;
private final String pattern;
private final String prefix;
private final Map<String, String> tableMapping;
public TableMappingConfig(String database, String pattern, String prefix) {
this.database = database; this.patternMode = true;
this.pattern = pattern; this.prefix = prefix; this.tableMapping = null;
}
public TableMappingConfig(String database, Map<String, String> tableMapping) {
this.database = database; this.patternMode = false;
this.pattern = null; this.prefix = null; this.tableMapping = tableMapping;
}
public boolean isPatternMode() { return patternMode; }
public String[] getTableList() {
return patternMode ? new String[]{database + "." + pattern}
: tableMapping.keySet().stream().map(t -> database + "." + t).toArray(String[]::new);
}
public String getClickHouseTable(String mysqlTable) {
return patternMode ? prefix + mysqlTable : tableMapping.get(mysqlTable);
}
public boolean shouldSync(String mysqlTable) {
return patternMode ? Pattern.matches(pattern, mysqlTable) : tableMapping.containsKey(mysqlTable);
}
}
/** MySQL Sink:继承公共基类,增加 MySQL schema 加载和 Debezium 类型解码 */
public static class ClickHouseCheckpointedSink extends AbstractClickHouseSink<String> {
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseCheckpointedSink.class);
private final String mysqlHost, mysqlDatabase, mysqlUsername, mysqlPassword;
private final int mysqlPort;
private final TableMappingConfig tableMappingConfig;
private transient ObjectMapper objectMapper;
private transient volatile Map<String, Map<String, ColumnInfo>> tableColumnTypes;
public enum ColumnType { DECIMAL, BIGINT_UNSIGNED, DATETIME, DATE, OTHER }
public static class ColumnInfo implements java.io.Serializable {
private static final long serialVersionUID = 1L;
final ColumnType type;
final int scale;
ColumnInfo(ColumnType type, int scale) { this.type = type; this.scale = scale; }
}
public ClickHouseCheckpointedSink(String mysqlHost, int mysqlPort, String mysqlDatabase,
String mysqlUsername, String mysqlPassword, String clickhouseUrl,
String ckUsername, String ckPassword, String ckDatabase,
TableMappingConfig tableMappingConfig, int batchSize, long batchIntervalMs) {
super(clickhouseUrl, ckUsername, ckPassword, ckDatabase, batchSize, batchIntervalMs);
this.mysqlHost = mysqlHost; this.mysqlPort = mysqlPort;
this.mysqlDatabase = mysqlDatabase; this.mysqlUsername = mysqlUsername;
this.mysqlPassword = mysqlPassword; this.tableMappingConfig = tableMappingConfig;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
objectMapper = new ObjectMapper();
tableColumnTypes = loadMySQLTableSchemaInternal();
LOG.info("MySQL Sink 初始化完成: MySQL {} 个表, CK {} 个表",
tableColumnTypes.size(), ckTableColumns.size());
}
@Override
protected void refreshSchema() {
try {
Map<String, Map<String, ColumnInfo>> newMySQL = loadMySQLTableSchemaInternal();
Map<String, List<String>> newCK = loadCKTableColumnsInternal();
detectAndLogCKChanges(ckTableColumns, newCK);
tableColumnTypes = newMySQL;
ckTableColumns = newCK;
LOG.info("Schema 刷新完成: MySQL {} 个表, CK {} 个表", newMySQL.size(), newCK.size());
} catch (Exception e) {
LOG.warn("Schema 刷新失败: {}", e.getMessage());
}
}
@Override
public void invoke(String value, Context context) throws Exception {
String op = "", mysqlTable = "";
try {
JsonNode root = objectMapper.readTree(value);
op = root.path("op").asText();
mysqlTable = root.path("source").path("table").asText();
if (!tableMappingConfig.shouldSync(mysqlTable)) return;
String ckTable = tableMappingConfig.getClickHouseTable(mysqlTable);
if (ckTable == null) return;
if ("c".equals(op) || "r".equals(op) || "u".equals(op)) {
JsonNode data = root.path("after");
if (data != null && !data.isMissingNode()) {
Map<String, ColumnInfo> colTypes = tableColumnTypes.get(mysqlTable);
String json = convertFields(data, colTypes);
buffer.computeIfAbsent(ckTable, k -> new ArrayList<>()).add(json);
}
}
if (shouldFlush()) flush();
} catch (Exception e) {
LOG.error("处理数据失败: table={}, op={}, value={}", mysqlTable, op, value, e);
throw e;
}
}
// ===== 类型转换核心逻辑 =====
private static final DateTimeFormatter DT_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DateTimeFormatter DATE_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final ZoneId ZONE_ID = ZoneId.of("Asia/Shanghai");
private String convertFields(JsonNode data, Map<String, ColumnInfo> columnTypes) {
try {
Map<String, Object> map = objectMapper.convertValue(data, Map.class);
Map<String, Object> out = new LinkedHashMap<>();
for (Map.Entry<String, Object> e : map.entrySet()) {
String key = e.getKey(); Object val = e.getValue();
if (val == null) { out.put(key, null); continue; }
ColumnInfo ci = columnTypes != null ? columnTypes.get(key) : null;
if (ci == null) { out.put(key, val); continue; }
switch (ci.type) {
case BIGINT_UNSIGNED:
if (val instanceof String) {
try { out.put(key, new BigInteger(Base64.getDecoder().decode((String) val))); }
catch (Exception ex) { out.put(key, val); }
} else out.put(key, val);
break;
case DECIMAL:
if (val instanceof String) {
BigDecimal d = decodeDecimal((String) val, ci.scale);
out.put(key, d != null ? d : val);
} else out.put(key, val);
break;
case DATETIME:
if (val instanceof Number) {
out.put(key, LocalDateTime.ofInstant(Instant.ofEpochMilli(
((Number) val).longValue()), java.time.ZoneOffset.UTC).format(DT_FMT));
} else if (val instanceof String && ((String) val).contains("T")) {
try {
out.put(key, java.time.ZonedDateTime.parse((String) val)
.withZoneSameInstant(ZONE_ID).format(DT_FMT));
} catch (Exception ex) { out.put(key, val); }
} else out.put(key, val);
break;
case DATE:
if (val instanceof Number)
out.put(key, java.time.LocalDate.ofEpochDay(((Number) val).intValue()).format(DATE_FMT));
else out.put(key, val);
break;
default: out.put(key, val);
}
}
return objectMapper.writeValueAsString(out);
} catch (Exception e) {
LOG.warn("转换失败,使用原始数据", e);
return data.toString();
}
}
private BigDecimal decodeDecimal(String base64, int scale) {
try { return new BigDecimal(new BigInteger(Base64.getDecoder().decode(base64)), scale); }
catch (Exception e) { return null; }
}
// ===== MySQL Schema 加载 =====
private Map<String, Map<String, ColumnInfo>> loadMySQLTableSchemaInternal() {
String jdbcUrl = String.format("jdbc:mysql://%s:%d/%s?useSSL=false&serverTimezone=Asia/Shanghai",
mysqlHost, mysqlPort, mysqlDatabase);
String sql = "SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, COLUMN_TYPE, NUMERIC_SCALE "
+ "FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? ORDER BY TABLE_NAME, ORDINAL_POSITION";
try { Class.forName("com.mysql.cj.jdbc.Driver"); }
catch (ClassNotFoundException e) { throw new RuntimeException("MySQL driver not found", e); }
Map<String, Map<String, ColumnInfo>> result = new HashMap<>();
try (java.sql.Connection conn = java.sql.DriverManager.getConnection(jdbcUrl, mysqlUsername, mysqlPassword);
java.sql.PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, mysqlDatabase);
try (java.sql.ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
String dataType = rs.getString("DATA_TYPE").toUpperCase();
String fullType = rs.getString("COLUMN_TYPE").toLowerCase();
int scale = rs.getInt("NUMERIC_SCALE");
ColumnType ct;
switch (dataType) {
case "BIGINT": ct = (fullType != null && fullType.contains("unsigned"))
? ColumnType.BIGINT_UNSIGNED : ColumnType.OTHER; break;
case "DECIMAL": case "NUMERIC": case "DEC": case "FIXED":
ct = ColumnType.DECIMAL; break;
case "DATETIME": case "TIMESTAMP": ct = ColumnType.DATETIME; break;
case "DATE": ct = ColumnType.DATE; break;
default: ct = ColumnType.OTHER;
}
result.computeIfAbsent(rs.getString("TABLE_NAME"), k -> new HashMap<>())
.put(rs.getString("COLUMN_NAME"), new ColumnInfo(ct, scale));
}
}
return result;
} catch (Exception e) {
LOG.error("加载 MySQL schema 失败: {}", e.getMessage(), e);
throw new RuntimeException("Failed to load MySQL schema", e);
}
}
}
}
9.3 MongoToClickHouseRealtimeSync(MongoDB 同步)
相比前文,本版本做了以下改动:继承
AbstractClickHouseSink复用公共能力、OverrideonFlushTableError()精确定位出错行、使用FlinkJobUtils.loadConfig()统一配置加载。
package com.cloud.flink;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class MongoToClickHouseRealtimeSync {
private static final Logger LOG = LoggerFactory.getLogger(MongoToClickHouseRealtimeSync.class);
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ParameterTool config = FlinkJobUtils.loadConfig(params);
CollectionMappingConfig mappingConfig = parseCollectionMapping(config);
LOG.info("========== 配置信息 ==========");
LOG.info("MongoDB: {}/{}", config.get("mongo.hosts"), config.get("mongo.database"));
LOG.info("ClickHouse: {}/{}", config.get("clickhouse.url"), config.get("clickhouse.database"));
LOG.info("同步集合数量: {}", mappingConfig.getCollectionMapping().size());
LOG.info("==============================");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(config.getInt("flink.parallelism", 1));
env.getConfig().setGlobalJobParameters(config);
FlinkJobUtils.configureCheckpoint(env, config);
String database = config.get("mongo.database");
String[] collectionList = mappingConfig.getCollectionMapping().keySet().stream()
.map(c -> database + "." + c).toArray(String[]::new);
MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
.hosts(config.get("mongo.hosts"))
.username(config.get("mongo.username"))
.password(config.get("mongo.password"))
.connectionOptions(config.get("mongo.connection.options", "authSource=admin"))
.databaseList(database)
.collectionList(collectionList)
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.latest())
.build();
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB CDC Source")
.addSink(new ClickHouseMongoSink(
config.get("clickhouse.url"), config.get("clickhouse.username"),
config.get("clickhouse.password"), config.get("clickhouse.database"),
mappingConfig,
config.getInt("flink.batch.size", 2000), config.getLong("flink.batch.interval", 5000L)
)).name("ClickHouse Mongo Sink");
env.execute("MongoDB to ClickHouse Realtime Sync");
}
// ===== 集合映射配置解析 =====
private static CollectionMappingConfig parseCollectionMapping(ParameterTool config) {
if (!config.has("sync.collections"))
throw new RuntimeException("未配置集合映射,请设置 sync.collections");
Map<String, String> collectionMapping = new LinkedHashMap<>();
for (String pair : config.get("sync.collections").split(",")) {
pair = pair.trim(); if (pair.isEmpty()) continue;
String[] parts = pair.split(":");
if (parts.length == 2) collectionMapping.put(parts[0].trim(), parts[1].trim());
else if (parts.length == 1) collectionMapping.put(parts[0].trim(), "ods_mongo_" + parts[0].trim());
}
// 解析字段重命名
Map<String, Map<String, String>> fieldRenames = new HashMap<>();
// 解析字段排除
Map<String, Set<String>> fieldExcludes = new HashMap<>();
// 解析驼峰转下划线
Set<String> camelToSnakeCollections = new HashSet<>();
// 解析嵌套对象展平
Map<String, Map<String, String>> flattenConfig = new HashMap<>();
// 解析展平子字段重命名
Map<String, Map<String, Map<String, String>>> flattenSubRenames = new HashMap<>();
// 解析时间戳字段
Map<String, Set<String>> timestampMsFields = new HashMap<>();
Map<String, Set<String>> timestampSecFields = new HashMap<>();
// 解析字段合并
Map<String, Map<String, List<String>>> coalesceConfig = new HashMap<>();
for (String collection : collectionMapping.keySet()) {
// 字段重命名
String renameKey = "sync.field.rename." + collection;
if (config.has(renameKey)) {
Map<String, String> renames = new LinkedHashMap<>();
for (String rp : config.get(renameKey).split(",")) {
rp = rp.trim(); if (rp.isEmpty()) continue;
String[] sp = rp.split(":"); if (sp.length == 2) renames.put(sp[0].trim(), sp[1].trim());
}
if (!renames.isEmpty()) fieldRenames.put(collection, renames);
}
// 字段排除
String excludeKey = "sync.field.exclude." + collection;
if (config.has(excludeKey)) {
Set<String> excludes = Arrays.stream(config.get(excludeKey).split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
if (!excludes.isEmpty()) fieldExcludes.put(collection, excludes);
}
// 驼峰转下划线
if ("true".equals(config.get("sync.field.camel_to_snake." + collection, "")))
camelToSnakeCollections.add(collection);
// 嵌套展平
String flattenKey = "sync.field.flatten." + collection;
if (config.has(flattenKey)) {
Map<String, String> flatten = new LinkedHashMap<>();
for (String p : config.get(flattenKey).split(",")) {
p = p.trim(); if (p.isEmpty()) continue;
String[] fp = p.split(">"); if (fp.length == 2) flatten.put(fp[0].trim(), fp[1].trim());
}
if (!flatten.isEmpty()) flattenConfig.put(collection, flatten);
}
// 展平子字段重命名
Map<String, String> collFlatten = flattenConfig.getOrDefault(collection, Collections.emptyMap());
Map<String, Map<String, String>> subRenameMap = new HashMap<>();
for (String mongoField : collFlatten.keySet()) {
String key = "sync.field.flatten.rename." + collection + "." + mongoField;
if (config.has(key)) {
Map<String, String> subRenames = new LinkedHashMap<>();
for (String rp : config.get(key).split(",")) {
rp = rp.trim(); if (rp.isEmpty()) continue;
String[] sp = rp.split(":"); if (sp.length == 2) subRenames.put(sp[0].trim(), sp[1].trim());
}
if (!subRenames.isEmpty()) subRenameMap.put(mongoField, subRenames);
}
}
if (!subRenameMap.isEmpty()) flattenSubRenames.put(collection, subRenameMap);
// 毫秒时间戳
String tsMsKey = "sync.field.timestamp_ms." + collection;
if (config.has(tsMsKey)) {
Set<String> fields = Arrays.stream(config.get(tsMsKey).split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
if (!fields.isEmpty()) timestampMsFields.put(collection, fields);
}
// 秒时间戳
String tsSecKey = "sync.field.timestamp_sec." + collection;
if (config.has(tsSecKey)) {
Set<String> fields = Arrays.stream(config.get(tsSecKey).split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
if (!fields.isEmpty()) timestampSecFields.put(collection, fields);
}
// 字段合并
String coalesceKey = "sync.field.coalesce." + collection;
if (config.has(coalesceKey)) {
Map<String, List<String>> coalesceMap = new LinkedHashMap<>();
for (String cp : config.get(coalesceKey).split(",")) {
cp = cp.trim(); if (cp.isEmpty()) continue;
String[] kv = cp.split(":");
if (kv.length == 2) {
List<String> sources = Arrays.stream(kv[1].split("\\|"))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
if (!sources.isEmpty()) coalesceMap.put(kv[0].trim(), sources);
}
}
if (!coalesceMap.isEmpty()) coalesceConfig.put(collection, coalesceMap);
}
}
return new CollectionMappingConfig(collectionMapping, fieldRenames, fieldExcludes,
camelToSnakeCollections, flattenConfig, flattenSubRenames,
timestampMsFields, timestampSecFields, coalesceConfig);
}
// ===== 集合映射配置类 =====
public static class CollectionMappingConfig implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private final Map<String, String> collectionMapping;
private final Map<String, Map<String, String>> fieldRenames;
private final Map<String, Set<String>> fieldExcludes;
private final Set<String> camelToSnakeCollections;
private final Map<String, Map<String, String>> flattenConfig;
private final Map<String, Map<String, Map<String, String>>> flattenSubRenames;
private final Map<String, Set<String>> timestampMsFields;
private final Map<String, Set<String>> timestampSecFields;
private final Map<String, Map<String, List<String>>> coalesceConfig;
public CollectionMappingConfig(
Map<String, String> collectionMapping, Map<String, Map<String, String>> fieldRenames,
Map<String, Set<String>> fieldExcludes, Set<String> camelToSnakeCollections,
Map<String, Map<String, String>> flattenConfig,
Map<String, Map<String, Map<String, String>>> flattenSubRenames,
Map<String, Set<String>> timestampMsFields, Map<String, Set<String>> timestampSecFields,
Map<String, Map<String, List<String>>> coalesceConfig) {
this.collectionMapping = collectionMapping; this.fieldRenames = fieldRenames;
this.fieldExcludes = fieldExcludes; this.camelToSnakeCollections = camelToSnakeCollections;
this.flattenConfig = flattenConfig; this.flattenSubRenames = flattenSubRenames;
this.timestampMsFields = timestampMsFields; this.timestampSecFields = timestampSecFields;
this.coalesceConfig = coalesceConfig;
}
public Map<String, String> getCollectionMapping() { return collectionMapping; }
public String getClickHouseTable(String c) { return collectionMapping.get(c); }
public boolean shouldSync(String c) { return collectionMapping.containsKey(c); }
public Map<String, String> getFieldRenames(String c) { return fieldRenames.getOrDefault(c, Collections.emptyMap()); }
public Set<String> getFieldExcludes(String c) { return fieldExcludes.getOrDefault(c, Collections.emptySet()); }
public boolean isCamelToSnake(String c) { return camelToSnakeCollections.contains(c); }
public Map<String, String> getFlattenConfig(String c) { return flattenConfig.getOrDefault(c, Collections.emptyMap()); }
public Map<String, Map<String, String>> getFlattenSubRenames(String c) { return flattenSubRenames.getOrDefault(c, Collections.emptyMap()); }
public Set<String> getTimestampMsFields(String c) { return timestampMsFields.getOrDefault(c, Collections.emptySet()); }
public Set<String> getTimestampSecFields(String c) { return timestampSecFields.getOrDefault(c, Collections.emptySet()); }
public Map<String, List<String>> getCoalesceConfig(String c) { return coalesceConfig.getOrDefault(c, Collections.emptyMap()); }
}
// ===== MongoDB Sink =====
public static class ClickHouseMongoSink extends AbstractClickHouseSink<String> {
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseMongoSink.class);
private final CollectionMappingConfig mappingConfig;
private transient ObjectMapper objectMapper;
private static final DateTimeFormatter DT_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DateTimeFormatter DT_MS_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
private static final ZoneId ZONE_ID = ZoneId.of("Asia/Shanghai");
public ClickHouseMongoSink(String clickhouseUrl, String ckUsername, String ckPassword,
String ckDatabase, CollectionMappingConfig mappingConfig,
int batchSize, long batchIntervalMs) {
super(clickhouseUrl, ckUsername, ckPassword, ckDatabase, batchSize, batchIntervalMs);
this.mappingConfig = mappingConfig;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
objectMapper = new ObjectMapper();
LOG.info("Mongo Sink 初始化完成: CK {} 个表", ckTableColumns.size());
}
/** 写入失败时提取出错行号,精确定位问题数据 */
@Override
protected void onFlushTableError(String table, List<String> rows, Exception e) throws Exception {
String errMsg = e.getMessage();
Matcher matcher = Pattern.compile("at row (\\d+)").matcher(errMsg != null ? errMsg : "");
if (matcher.find()) {
int rowNum = Integer.parseInt(matcher.group(1));
LOG.error("写入 {}.{} 出错,错误行号: {}/{}", ckDatabase, table, rowNum, rows.size());
if (rowNum >= 1 && rowNum <= rows.size()) LOG.error("出错行: {}", rows.get(rowNum - 1));
if (rowNum >= 2) LOG.error("前一行: {}", rows.get(rowNum - 2));
if (rowNum < rows.size()) LOG.error("后一行: {}", rows.get(rowNum));
}
throw e;
}
@Override
public void invoke(String value, Context context) throws Exception {
String operationType = "", collection = "";
try {
JsonNode root = objectMapper.readTree(value);
operationType = root.path("operationType").asText();
JsonNode nsNode = root.path("ns");
if (nsNode.isObject()) collection = nsNode.path("coll").asText();
else if (nsNode.isTextual()) {
String ns = nsNode.asText();
int dot = ns.indexOf('.'); collection = dot >= 0 ? ns.substring(dot + 1) : ns;
}
if (!mappingConfig.shouldSync(collection)) return;
String ckTable = mappingConfig.getClickHouseTable(collection);
if (ckTable == null) return;
if ("insert".equals(operationType) || "update".equals(operationType) || "replace".equals(operationType)) {
JsonNode fullDocNode = root.path("fullDocument");
if (fullDocNode != null && !fullDocNode.isMissingNode()) {
String docStr = fullDocNode.isTextual() ? fullDocNode.asText() : fullDocNode.toString();
JsonNode document = objectMapper.readTree(docStr);
String json = convertMongoDocument(document, collection);
buffer.computeIfAbsent(ckTable, k -> new ArrayList<>()).add(json);
}
}
if (shouldFlush()) flush();
} catch (Exception e) {
LOG.error("处理数据失败: collection={}, op={}, value={}", collection, operationType, value, e);
throw e;
}
}
// ===== MongoDB 文档转换核心逻辑 =====
private String convertMongoDocument(JsonNode document, String collection) {
Map<String, String> renames = mappingConfig.getFieldRenames(collection);
Set<String> excludes = mappingConfig.getFieldExcludes(collection);
boolean useCamelToSnake = mappingConfig.isCamelToSnake(collection);
Map<String, String> flattenFields = mappingConfig.getFlattenConfig(collection);
Set<String> tsMsFields = mappingConfig.getTimestampMsFields(collection);
Set<String> tsSecFields = mappingConfig.getTimestampSecFields(collection);
Map<String, List<String>> coalesceFields = mappingConfig.getCoalesceConfig(collection);
Map<String, Object> result = new LinkedHashMap<>();
// 1. coalesce 字段(多个字段取第一个非空值)
Set<String> consumedByCoalesce = new HashSet<>();
for (Map.Entry<String, List<String>> entry : coalesceFields.entrySet()) {
consumedByCoalesce.addAll(entry.getValue());
Object value = null;
for (String srcField : entry.getValue()) {
JsonNode srcNode = document.get(srcField);
if (srcNode != null && !srcNode.isNull()) {
if (tsMsFields.contains(srcField)) value = convertTimestampMs(srcNode);
else if (tsSecFields.contains(srcField)) value = convertTimestampSec(srcNode);
else value = convertExtendedJsonValue(srcNode);
break;
}
}
if (value != null) result.put(entry.getKey(), value);
}
// 2. 普通字段
Iterator<Map.Entry<String, JsonNode>> fields = document.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey();
JsonNode fieldValue = field.getValue();
if (consumedByCoalesce.contains(fieldName) || excludes.contains(fieldName)) continue;
// 嵌套对象展平
if (flattenFields.containsKey(fieldName)) {
String ckPrefix = flattenFields.get(fieldName);
if (fieldValue.isObject() && !isExtendedJsonType(fieldValue)) {
Map<String, String> subRenames = mappingConfig.getFlattenSubRenames(collection)
.getOrDefault(fieldName, Collections.emptyMap());
Iterator<Map.Entry<String, JsonNode>> subFields = fieldValue.fields();
while (subFields.hasNext()) {
Map.Entry<String, JsonNode> sub = subFields.next();
String ckSubName = subRenames.containsKey(sub.getKey()) ? subRenames.get(sub.getKey())
: useCamelToSnake ? camelToSnake(sub.getKey()) : sub.getKey();
result.put(ckPrefix + "." + ckSubName, convertExtendedJsonValue(sub.getValue()));
}
}
continue;
}
// 值转换
Object converted;
if (tsMsFields.contains(fieldName)) converted = convertTimestampMs(fieldValue);
else if (tsSecFields.contains(fieldName)) converted = convertTimestampSec(fieldValue);
else converted = convertExtendedJsonValue(fieldValue);
// 字段重命名
String targetName = renames.containsKey(fieldName) ? renames.get(fieldName)
: useCamelToSnake ? camelToSnake(fieldName) : fieldName;
if (converted != null) result.put(targetName, converted);
}
try { return objectMapper.writeValueAsString(result); }
catch (Exception e) { LOG.warn("序列化失败: {}", collection, e); return document.toString(); }
}
/** 转换 MongoDB Extended JSON 值 */
private Object convertExtendedJsonValue(JsonNode value) {
if (value == null || value.isNull()) return null;
if (value.isObject()) {
if (value.has("$oid")) return value.get("$oid").asText();
if (value.has("$date")) {
JsonNode dateNode = value.get("$date");
if (dateNode.isTextual()) {
try { return Instant.parse(dateNode.asText()).atZone(ZONE_ID).format(DT_FMT); }
catch (Exception e) { return dateNode.asText(); }
} else if (dateNode.isNumber()) {
return Instant.ofEpochMilli(dateNode.asLong()).atZone(ZONE_ID).format(DT_FMT);
} else if (dateNode.isObject() && dateNode.has("$numberLong")) {
return Instant.ofEpochMilli(Long.parseLong(dateNode.get("$numberLong").asText()))
.atZone(ZONE_ID).format(DT_FMT);
}
return dateNode.asText();
}
if (value.has("$numberLong")) return Long.parseLong(value.get("$numberLong").asText());
if (value.has("$numberInt")) return Integer.parseInt(value.get("$numberInt").asText());
if (value.has("$numberDecimal")) return new BigDecimal(value.get("$numberDecimal").asText());
return value.toString(); // 普通嵌套对象 → JSON 字符串
}
if (value.isArray()) {
boolean hasComplex = false;
for (JsonNode el : value) { if (el.isObject() && !isExtendedJsonType(el)) { hasComplex = true; break; } }
if (hasComplex) return value.toString(); // 对象数组 → JSON 字符串
List<Object> arr = new ArrayList<>();
for (JsonNode el : value) arr.add(convertExtendedJsonValue(el));
return arr; // 基本类型数组 → CK Array
}
if (value.isTextual()) return value.asText();
if (value.isBoolean()) return value.asBoolean();
if (value.isInt()) return value.asInt();
if (value.isLong()) return value.asLong();
if (value.isDouble() || value.isFloat()) {
double d = value.asDouble();
if (d == Math.floor(d) && !Double.isInfinite(d)) return (long) d;
return d;
}
return value.asText();
}
private Object convertTimestampMs(JsonNode value) {
long millis;
if (value.isNumber()) millis = value.asLong();
else if (value.isObject() && value.has("$numberLong"))
millis = Long.parseLong(value.get("$numberLong").asText());
else return convertExtendedJsonValue(value);
return Instant.ofEpochMilli(millis).atZone(ZONE_ID).format(DT_MS_FMT);
}
private Object convertTimestampSec(JsonNode value) {
long seconds;
if (value.isNumber()) seconds = value.asLong();
else if (value.isObject() && value.has("$numberLong"))
seconds = Long.parseLong(value.get("$numberLong").asText());
else if (value.isObject() && value.has("$numberInt"))
seconds = Integer.parseInt(value.get("$numberInt").asText());
else return convertExtendedJsonValue(value);
return Instant.ofEpochSecond(seconds).atZone(ZONE_ID).format(DT_FMT);
}
private static boolean isExtendedJsonType(JsonNode node) {
return node.has("$oid") || node.has("$date") || node.has("$numberLong")
|| node.has("$numberInt") || node.has("$numberDecimal");
}
private static String camelToSnake(String camelCase) {
return camelCase.replaceAll("([a-z0-9])([A-Z])", "$1_$2").toLowerCase();
}
}
}
9.4 FlinkJobUtils(公共工具类)
package com.cloud.flink;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.Properties;
import java.util.stream.Collectors;
public class FlinkJobUtils {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobUtils.class);
private FlinkJobUtils() {}
/**
* 加载配置:--config 文件路径 > --env 环境名 > 默认 test
* 命令行参数优先级最高
*/
public static ParameterTool loadConfig(ParameterTool params) throws Exception {
if (params.has("config")) {
return ParameterTool.fromPropertiesFile(params.get("config")).mergeWith(params);
}
String env = params.get("env", "test");
String resourcePath = "application-" + env + ".properties";
try (InputStream is = FlinkJobUtils.class.getClassLoader().getResourceAsStream(resourcePath)) {
if (is == null) throw new RuntimeException("配置文件不存在: " + resourcePath);
Properties props = new Properties();
props.load(is);
return ParameterTool.fromMap(props.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString())))
.mergeWith(params);
}
}
/** 配置 Checkpoint(可选,不配则用集群默认值) */
public static void configureCheckpoint(StreamExecutionEnvironment env, ParameterTool config) {
if (config.has("flink.checkpoint.interval")) {
long interval = config.getLong("flink.checkpoint.interval");
env.enableCheckpointing(interval);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(interval / 2);
env.getCheckpointConfig().setCheckpointTimeout(interval * 2);
LOG.info("Checkpoint 配置: interval={}ms", interval);
}
String dir = config.get("flink.checkpoint.dir", "");
if (!dir.isEmpty()) {
env.getCheckpointConfig().setCheckpointStorage(dir);
LOG.info("Checkpoint 目录: {}", dir);
}
}
}
结语
回顾一下,我们用三个组件搭了一套完整的离线+实时数仓:
| 层次 | 方案 | 角色 |
|---|---|---|
| 存储+分析 | ClickHouse | ODS/DW 层,ReplacingMergeTree 自动去重 |
| 离线调度 | DolphinScheduler | T+1/H+1 定时同步(CK 表引擎直连) |
| 实时同步 | Flink CDC | MySQL binlog + MongoDB Change Stream,秒级入仓 |
不堆 Hadoop 全家桶,不写 100 个 Flink 类,只靠配置文件驱动——同步一张新表,改一行配置就行。
如果这篇文章对你有帮助,欢迎点赞收藏,也欢迎在评论区聊聊你的数仓选型方案。
系列文章:
- ClickHouse + DolphinScheduler:两个组件搞定轻量离线数仓
- Flink 1.20 实战:零代码配置实现 MySQL 百表到 ClickHouse 实时同步
- Flink 1.20 实战:一套配置搞定 MongoDB 多集合实时同步到 ClickHouse