tongchenkeji 发表于:2023-7-3 15:16:000次点击 已关注取消关注 关注 私信 大佬们,Flink 中我mysql-cdc,想取当时数据在mysql中产生的时间,怎么取,ts_?[阿里云实时计算 Flink版] 暂停朗读为您朗读 大佬们,Flink 中我mysql-cdc,想取当时数据在mysql中产生的时间,怎么取,ts_ms 取的好像是cdc执行的时间? 「点点赞赏,手留余香」 赞赏 还没有人赞赏,快来当第一个赞赏的人吧! 海报 实时计算Flink版# MySQL1179# 云数据库 RDS MySQL 版1517# 关系型数据库2577# 实时计算 Flink版3179# 流计算2236
算精通AM 2023-11-27 18:23:04 1 在 MySQL 中使用 CDC(Change Data Capture)技术进行数据变更抓取时,可以通过获取 binlog 中的 timestamp 来获取数据变更的时间。在 Flink 中使用 MySQL-CDC 抓取到的数据中,可以通过以下两种方式获取数据变更的时间: 使用 Flink-Debezium 库:Flink-Debezium 库是 Flink 官方提供的一个 Debezium Connector,可以方便地将 CDC 数据源集成到 Flink 程序中。在 Flink-Debezium 库中,可以通过 Debezium 的 SourceRecord 中的 timestamp 字段获取数据变更的时间,该字段是 binlog 中的时间戳,表示数据变更的时间。 使用 CDC 数据中的时间戳字段:在 MySQL-CDC 抓取到的数据中,可以通过查看 CDC 数据的具体格式和字段定义,来确定数据变更的时间戳字段。通常情况下,CDC 数据中会包含一个时间戳字段,例如 binlog 中的 timestamp 字段或者 MySQL 5.6 以上版本中的 row_update_time 字段。可以通过读取该字段来获取数据变更的时间。
Star时光AM 2023-11-27 18:23:04 2 在 Flink 中使用 MySQL CDC,要获取数据在 MySQL 中产生的时间,可以使用 source 函数提供的元信息(metadata)来获取特定字段的值。 默认情况下,当使用 Flink CDC 连接到 MySQL 数据源时,会将每条记录的变更时间(cdc执行的时间)写入到字段 ts_ms 中。如果您想获取 MySQL 中实际产生数据的时间,需要在 MySQL 表中有相应的字段记录数据的生成时间,并确保该字段的值能够传递到 Flink 的数据流中。 假设 MySQL 表有一个名为 create_time 的字段来记录数据的生成时间,您可以通过以下示例代码来获取该字段的值: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 创建 MySQL CDC 数据源DataStream mysqlCdcStream = env.addSource(new MySqlCdcSource(...));// 将 create_time 字段作为事件时间戳DataStream withEventTimeStream = mysqlCdcStream.assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> { // 获取 create_time 字段的值作为事件时间戳 long eventTime = event.getField("create_time").toTimestamp().getTime(); return eventTime; }));// 在后续操作中可以使用事件时间戳withEventTimeStream .keyBy(...) .window(...) .process(...);env.execute("MySQL CDC"); 在上述示例中,通过 assignTimestampsAndWatermarks 方法设置了事件时间戳,并指定了 create_time 字段的值作为事件时间戳。然后,在后续操作中可以使用该事件时间戳进行窗口计算等操作。 需要根据实际情况修改代码,确保字段名称和数据类型与实际表结构一致。
在 MySQL 中使用 CDC(Change Data Capture)技术进行数据变更抓取时,可以通过获取 binlog 中的 timestamp 来获取数据变更的时间。在 Flink 中使用 MySQL-CDC 抓取到的数据中,可以通过以下两种方式获取数据变更的时间:
使用 Flink-Debezium 库:Flink-Debezium 库是 Flink 官方提供的一个 Debezium Connector,可以方便地将 CDC 数据源集成到 Flink 程序中。在 Flink-Debezium 库中,可以通过 Debezium 的 SourceRecord 中的 timestamp 字段获取数据变更的时间,该字段是 binlog 中的时间戳,表示数据变更的时间。
使用 CDC 数据中的时间戳字段:在 MySQL-CDC 抓取到的数据中,可以通过查看 CDC 数据的具体格式和字段定义,来确定数据变更的时间戳字段。通常情况下,CDC 数据中会包含一个时间戳字段,例如 binlog 中的 timestamp 字段或者 MySQL 5.6 以上版本中的 row_update_time 字段。可以通过读取该字段来获取数据变更的时间。
在 Flink 中使用 MySQL CDC,要获取数据在 MySQL 中产生的时间,可以使用
source
函数提供的元信息(metadata)来获取特定字段的值。默认情况下,当使用 Flink CDC 连接到 MySQL 数据源时,会将每条记录的变更时间(cdc执行的时间)写入到字段
ts_ms
中。如果您想获取 MySQL 中实际产生数据的时间,需要在 MySQL 表中有相应的字段记录数据的生成时间,并确保该字段的值能够传递到 Flink 的数据流中。假设 MySQL 表有一个名为
create_time
的字段来记录数据的生成时间,您可以通过以下示例代码来获取该字段的值:在上述示例中,通过
assignTimestampsAndWatermarks
方法设置了事件时间戳,并指定了create_time
字段的值作为事件时间戳。然后,在后续操作中可以使用该事件时间戳进行窗口计算等操作。需要根据实际情况修改代码,确保字段名称和数据类型与实际表结构一致。