flinkcdc streaming 采集mysql的坑(1)

背景

今天在调试程序,发现flinkcdc到kafka的时间字段有问题,date类型给做了计算变成了数值,datetime类型的变成了时间戳long。不能忍受,必须得改过来。做个记录,既然踩过坑了,也希望能帮助一些小伙伴避坑。

时间字段

在mysql好好的时间格式,比如 2026-04-14 12:30:33 到了kafka变成了long类型的时间戳,那肯定写入到es也就是long类型了。日期类型改成了int类型。

导致的问题就是flink消费kafka写入es以后字段的值都不对了,只能从cdc这里进行修改。索引通过搜索找到了一个转换器。https://github.com/holmofy/debezium-datetime-converter/releases

修改代码

读取mysql

按照上一篇文章的内容,在配置这里加入转换器的配置即可。

        // 1. 启用转换器,名字可随意(这里用 "datetime")
        debeziumProperties.setProperty("converters", "datetime");
// 2. 指定转换器类的全限定名(两种方案二选一,见下文)
        debeziumProperties.setProperty("datetime.type", "com.darcytech.debezium.converter.MySqlDateTimeConverter");
//// 3. 指定数据库类型(有些转换器需要)
//        debeziumProperties.setProperty("datetime.database.type", "mysql");
// 4. 配置输出格式(核心!)
        debeziumProperties.setProperty("datetime.format.date", "yyyy-MM-dd");           // DATE → 字符串
        debeziumProperties.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); // DATETIME → 字符串
        debeziumProperties.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss");// TIMESTAMP → 字符串
// 5. 时区设置(避免差8小时问题)
        debeziumProperties.setProperty("datetime.format.timestamp.zone", "UTC+8");

kafka内容

截图如下,之前的record_day日期是一个int类型,现在加入了转换器终于可以正确显示了。

在这里插入图片描述


下一个问题是时间类型字符变成了null,怎么办?直接把源代码扔给ai让它帮忙分析,之后找到了是因为这个项目只针对增量的监控,如果初始化快照对应time类型是不能转换的。所以自己按照这个工程的转换器把缺少的部分补上。之后咱们再看结果。前后一对比,还真没毛病。有了AI还是提升不少工作效率啊!

在这里插入图片描述


在这里插入图片描述

部署

  1. 代码展示 MySqlDateTimeConverter 中处理时间的部分修改
 private String convertDateTime(Object input) {
        if (input == null) {
            return null;
        }
        try {
            // CDC 增量阶段:LocalDateTime
            if (input instanceof LocalDateTime) {
                return datetimeFormatter.format((LocalDateTime) input);
            }
            // 快照阶段:java.sql.Timestamp
            if (input instanceof java.sql.Timestamp) {
                LocalDateTime localDateTime = ((java.sql.Timestamp) input).toLocalDateTime();
                return datetimeFormatter.format(localDateTime);
            }
            // 兜底:Long 类型毫秒值
            if (input instanceof Long) {
                long millis = (Long) input;
                Instant instant = Instant.ofEpochMilli(millis);
                // 使用配置的时区,而不是系统默认
                LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, timestampZoneId);
                return datetimeFormatter.format(localDateTime);
            }
            // 其他可能的类型
            if (input instanceof java.util.Date) {
                java.util.Date date = (java.util.Date) input;
                Instant instant = date.toInstant();
                LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, timestampZoneId);
                return datetimeFormatter.format(localDateTime);
            }
            log.warn("Unsupported DATETIME type: {}, value: {}", input.getClass().getName(), input);
            return null;
        } catch (Exception e) {
            log.error("Failed to convert DATETIME value: {}", input, e);
            return null;
        }
    }
  1. kafka的数据能够保证正确显示了,接下来就是提交任务写入es的结果查看了。上一篇我们已经写了怎么写入es了,直接上写入的结果吧。

    在这里插入图片描述


    在这里插入图片描述

`

总结

还是要多做,才能发现问题,多去尝试才会有成长。

© 版权声明

相关文章