大佬们,Flink 中我mysql-cdc,想取当时数据在mysql中产生的时间,怎么取,ts_?[阿里云实时计算 Flink版]

大佬们,Flink 中我mysql-cdc,想取当时数据在mysql中产生的时间,怎么取,ts_ms 取的好像是cdc执行的时间?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
1 条回复 A 作者 M 管理员
  1. 在 MySQL 中使用 CDC(Change Data Capture)技术进行数据变更抓取时,可以通过获取 binlog 中的 timestamp 来获取数据变更的时间。在 Flink 中使用 MySQL-CDC 抓取到的数据中,可以通过以下两种方式获取数据变更的时间:

    使用 Flink-Debezium 库:Flink-Debezium 库是 Flink 官方提供的一个 Debezium Connector,可以方便地将 CDC 数据源集成到 Flink 程序中。在 Flink-Debezium 库中,可以通过 Debezium 的 SourceRecord 中的 timestamp 字段获取数据变更的时间,该字段是 binlog 中的时间戳,表示数据变更的时间。

    使用 CDC 数据中的时间戳字段:在 MySQL-CDC 抓取到的数据中,可以通过查看 CDC 数据的具体格式和字段定义,来确定数据变更的时间戳字段。通常情况下,CDC 数据中会包含一个时间戳字段,例如 binlog 中的 timestamp 字段或者 MySQL 5.6 以上版本中的 row_update_time 字段。可以通过读取该字段来获取数据变更的时间。

  2. 在 Flink 中使用 MySQL CDC,要获取数据在 MySQL 中产生的时间,可以使用 source 函数提供的元信息(metadata)来获取特定字段的值。

    默认情况下,当使用 Flink CDC 连接到 MySQL 数据源时,会将每条记录的变更时间(cdc执行的时间)写入到字段 ts_ms 中。如果您想获取 MySQL 中实际产生数据的时间,需要在 MySQL 表中有相应的字段记录数据的生成时间,并确保该字段的值能够传递到 Flink 的数据流中。

    假设 MySQL 表有一个名为 create_time 的字段来记录数据的生成时间,您可以通过以下示例代码来获取该字段的值:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 创建 MySQL CDC 数据源DataStream mysqlCdcStream = env.addSource(new MySqlCdcSource(...));// 将 create_time 字段作为事件时间戳DataStream withEventTimeStream = mysqlCdcStream.assignTimestampsAndWatermarks(  WatermarkStrategy.forMonotonousTimestamps()    .withTimestampAssigner((event, timestamp) -> {      // 获取 create_time 字段的值作为事件时间戳      long eventTime = event.getField("create_time").toTimestamp().getTime();      return eventTime;    }));// 在后续操作中可以使用事件时间戳withEventTimeStream  .keyBy(...)  .window(...)  .process(...);env.execute("MySQL CDC");

    在上述示例中,通过 assignTimestampsAndWatermarks 方法设置了事件时间戳,并指定了 create_time 字段的值作为事件时间戳。然后,在后续操作中可以使用该事件时间戳进行窗口计算等操作。

    需要根据实际情况修改代码,确保字段名称和数据类型与实际表结构一致。