Flink如何读取hudi在changelog下的insert呢[阿里云实时计算 Flink版]

Flink如何读取hudi在changelog下的insert呢

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
=====这是一个广告位,招租中,联系qq 78315851====
14 条回复 A 作者 M 管理员
  1. Flink可以通过读取Hudi表中的changelog文件来获取插入、更新和删除操作的信息。

    使用 Flink 流式计算进行 changelog 文件处理时,需要按照以下步骤进行:

    1. 构造 Hudi 表的 Schema 对象。

    读取 Hudi changelog 前,需要构造 Hudi 表的字段信息和Schema,可以使用:org.apache.hudi.keygen.SimpleKeyGenerator。

    例如,对下面的 Stripe 表,构造Schema:

    hudi-hive-sync-demo_hudi_trips_cow:{trip_id: long,ts: long,uuid: string,rider: string,driver: string,begin_lat: double,begin_lon: double,end_lat: double,end_lon: double,fare: double,partitionpath: string}

    代码示例:

    String tableSchema = "{"        + ""trip_id": "long","        + ""ts": "long","        + ""uuid": "string","        + ""rider": "string","        + ""driver": "string","        + ""begin_lat": "double","        + ""begin_lon": "double","        + ""end_lat": "double","        + ""end_lon": "double","        + ""fare": "double","        + ""partitionpath": "string""        + "}";Schema schema = new Schema.Parser().parse(tableSchema);
    1. 从 Hudi 表的 changelog 中获取插入、更新或删除的数据。

    可以像读取普通的 Flink 流数据一样,利用 Flink 的 FileInputFormat 读取 Hudi 表的 changelog,并将数据解析为 POJO。

    代码示例:

    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jdbcUrl, selectedTableName);String tablePath = metaClient.getBasePath();String changeLogFolder = tablePath + "/.hoodie/.temp/" + writeToken + "/changelog";// 构造 FileInputFormatChangelogFileInputFormat source = new ChangelogFileInputFormat(    new Path(changeLogFolder), schema, writeToken, operationTypes);//source.setNestedFileEnumeration(true);source.setFilesFilter(FilePathFilter.createDefaultFilter());

    在上述代码中,变量 writeToken 表示Hudi表的写入令牌、operationTypes 表示需要获取的操作类型,如 INSERT、UPSET、DELETE等。

    1. 在 Flink 中对数据进行处理。

    通过读取 Hudi 表的 changelog 文件,可以获取到插入、更新或删除的数据等信息,可以在 Flink 中对这些数据进行处理,例如:

    // 将读取到的每一条数据封装成 POJOpublic static class CdcRecord {    public String operation;    public String path;    public long timestamp;    public long id;    public String uuid;    public String rider;    public String driver;    public double beginLat;    public double beginLon;    public double endLat;    public double endLon;    public double fare;    public String partitionPath;    public CdcRecord(ObjectNode json) {        operation = json.get("meta").get("action").asText();        path = json.get("meta").get("partition").get("fileId").asText();        timestamp = json.get("meta").get("timestamp").asLong();        id = json.get("data").get("trip_id").asLong();        uuid = json.get("data").get("uuid").asText();        rider = json.get("data").get("rider").asText();        driver = json.get("data").get("driver").asText();        beginLat = json.get("data").get("begin_lat").asDouble();        beginLon = json.get("data").get("begin_lon").asDouble();        endLat = json.get("data").get("end_lat").asDouble();        endLon = json.get("data").get("end_lon").asDouble();        fare = json.get("data").get("fare").asDouble();        partitionPath = json.get("partitionpath").asText();    }}

    在 Hudi changelog 的消息中,可以获取到更多的信息,如 hudifile、commitTime、recordKey等。

    以上是使用 Flink 读取 Hudi changelog 的过程,最后可以将数据写入到 MySQL 或其他数据存储中。

  2. 要读取Hudi在changelog下的insert操作,可以使用Flink的HudiInputFormat。HudiInputFormat是Flink提供的一种用于读取Hudi数据的输入格式,它可以读取Hudi 的数据文件和changelog文件,并将其转换为Flink的DataStream。

  3. 在Hudi中,Changelog是用来记录数据变更的日志,它写入到对应的Hudi表的指定路径下的.hoodie目录中,Changelog文件名以_changelog结尾。Changelog中记录了Hudi表的所有变更操作,包括insert、update、delete等。

    如果需要在Flink中读取Hudi表Changelog中的insert操作,可以使用Flink的HoodieStreamTableSource类来实现。这个类可以将Hudi表作为输入流,可以读取Hudi表的Changelog并将其解析为Flink的数据流。

    以下是一个示例代码:

    String tablePath = "/path/to/hudi/table";StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建 HoodieStreamTableSourceHoodieStreamTableSource hoodieTableSource = HoodieStreamTableSource.builder()    .path(tablePath)    .recordKeyField("id")    .build();// 读取 Changelog 中的 insert 操作DataStream<Row> stream = env.fromTableSource(hoodieTableSource)    .filter(row -> row.getField(2).equals("_I"));// 对于 insert 操作的处理逻辑// ...env.execute("Read Hudi Changelog insert operations");

    在这个代码中,首先使用HoodieStreamTableSource类创建一个Hudi表的输入源,设置表路径和记录键字段名称。然后使用fromTableSource方法将输入源转换为数据流,并使用filter方法筛选出Changelog中的insert操作。最后,可以在数据流中编写逻辑来处理insert操作。

    需要注意的是,HoodieStreamTableSource目前只能以流方式处理Changelog,如果需要读取历史版本或快照数据,可以参考其他方法,例如使用HoodieTableSource类或Flink的JDBCInputFormat

  4. Flink 通过 Hudi 提供的 HoodieDeltaStreamer 工具可以读取 Hudi 在 changelog 下的 insert,并将其转化为流式数据处理。以下是步骤:

    在 Flink 程序中引入 Hudi 相关依赖,例如:

    org.apache.hudi hudi-flink_2.11 0.7.0

    在 Flink 程序中使用 HudiSourceFunction 创建一个 source,例如:

    HudiSourceConfig hudiSourceConfig = new HudiSourceConfig.Builder() .withTableName(“

    “) .withChangelogEnabled(true) .withBootstrap(false) .build(); HudiSourceFunction source = new HudiSourceFunction(hudiSourceConfig);

    其中,

    是要读取的 Hudi 表名称。

    需要注意的是,withChangelogEnabled(true) 参数启用了 changelog 模式,用于读取插入和更新操作,withBootstrap(false) 设置为不使用 Bootstrap 模式。

    使用 Flink 的 DataStream API 处理 Hudi changelog 中的 insert 操作,例如:

    DataStream insertStream = env.addSource(source) .filter(new FilterFunction() { @Override public boolean filter(BaseRow value) throws Exception { // 过滤出 insert 操作 return value.getRowKind() == RowKind.INSERT; } }) .map(new MapFunction() { @Override public String map(BaseRow value) throws Exception { // 将 BaseRow 转化为 String return value.toString(); } });

    使用 insertStream 进行下游处理,例如:

    insertStream.print();

  5. 在 Flink 中读取 Hudi 的 Changelog 数据,需要使用 Flink 的 Hudi Connector。具体步骤如下:

    1. 在 Flink 中添加 Hudi Connector 的依赖,可以通过 Maven 或 Gradle 进行添加。

    2. 在 Flink 的程序中,使用 HudiSourceHudiInputFormat 类来读取 Hudi 的 Changelog 数据。这两个类的使用方式类似,下面以 HudiSource 为例进行说明。

    3. 在使用 HudiSource 时,需要指定 Hudi 的配置信息。可以通过 HudiConf 类来构建配置信息,例如:

    HudiConf conf = new HudiConf()    .withPath("/path/to/hudi/table")    .withSchema("hudi schema")    .withPartitionFields("partition field")    .withRecordKeyFields("record key field")    .withKeyGeneratorClass(HoodieAvroKeyGenerator.class)    .withReadSchema("avro schema")    .withBasePath("/path/to/hudi/base")    .withChangelogEnabled(true)    .withChangelogMode(ChangelogMode.INCREMENTAL);

    其中,withPath 指定 Hudi 表的路径,withSchema 指定 Hudi 表的 Schema,withPartitionFields 指定 Hudi 表的分区字段,withRecordKeyFields 指定 Hudi 表的记录键字段,withKeyGeneratorClass 指定 Hudi 表的键生成器类,withReadSchema 指定 Hudi 表的读取 Schema,withBasePath 指定 Hudi 表的基本路径,withChangelogEnabled 指定是否启用 Changelog 模式,withChangelogMode 指定 Changelog 模式。

    1. 创建 HudiSource 对象,并将其作为数据源添加到 Flink 的 DataStream 中,例如:
    DataStream hudiStream = env.addSource(new HudiSource(conf));

    这样就可以读取 Hudi 的 Changelog 数据,并将其转换为 Flink 的 DataStream。 需要注意的是,在读取 Hudi 的 Changelog 数据时,需要保证 Hudi 表启用了 Changelog 模式,并且 Changelog 数据已经被写入到对应的目录中。如果 Hudi 表没有启用 Changelog 模式,或者 Changelog 数据还没有被写入到目录中,那么在读取时就无法获取到数据。

  6. Flink可以通过Hudi提供的HoodieIncrementalInputFormat读取Hudi表的changelog,其中包括insert、update和delete等操作。

  7. 楼主你好,其实Hudi集成Flink的读取方式分为:流读、增量读取、限流等三种方式,Flink读取Hudi下的insert,可通过Hudi的工具类HoodieFlinkStreamer来读取数据。

  8. 在 Flink 中读取 Hudi 的 Changelog 数据,可以使用 Hudi 提供的 HoodieChangelogStreamer 工具将 Changelog 数据导出到 Kafka 或 HBase 等支持 Flink 连接的数据源中,然后使用 Flink 的 FlinkKafkaConsumer 或 FlinkHBaseReader 等工具读取数据源中的数据。

    下面以使用 Kafka 作为数据源为例说明如何读取 Hudi 的 Changelog 数据:

    使用 HoodieChangelogStreamer 工具将 Changelog 数据导出到 Kafka 中:

    Copy code

    java -cp hudi-utilities-bundle.jar org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  --props /path/to/config/file.properties  --op UPSERT  --table-type MERGE_ON_READ  --source-class org.apache.hudi.DataSource  --source-ordering-field name  --target-base-path /path/to/hudi-table  --target-table hudi_table  --target-topic hudi_changelog_topic  --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer  --continuous

    其中 --target-topic 指定导出的 Changelog 数据要写入的 Kafka 主题。

    在 Flink 中使用 FlinkKafkaConsumer 读取 Kafka 中的数据:

    Copy code

    Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", "localhost:9092");kafkaProps.setProperty("group.id", "flink_consumer");FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("hudi_changelog_topic", new SimpleStringSchema(), kafkaProps);DataStream changelogStream = env.addSource(consumer);// 对 Changelog 数据进行处理changelogStream.map(new MapFunction() {    @Override    public MyData map(String value) throws Exception {        // 解析 Changelog 数据并转换为 MyData 对象        // ...        return myData;    }});

    在 FlinkKafkaConsumer 的构造函数中指定了要读取的 Kafka 主题名称,并使用 SimpleStringSchema 解析 Kafka 中的消息。然后可以对读取到的数据进行处理,例如解析 Changelog 数据并转换为自定义的数据类型。

    需要注意的是,由于 Hudi 的 Changelog 数据中包含了更新和删除操作,因此需要根据操作类型进行相应的处理,例如更新操作需要在 Flink 中执行相应的更新操作。

  9. Flink可以通过Hudi提供的HoodieIncrementalInputFormat读取Hudi表的changelog,其中包括insert、update和delete等操作。

    具体步骤如下:

    在Flink中定义HoodieIncrementalInputFormat,并设置需要读取的Hudi表的相关配置信息,例如: HoodieIncrementalInputFormat inputFormat = new HoodieIncrementalInputFormat<>( new HoodieTableMetaClient.Builder().setBasePath(“path/to/hudi/table”).build(), TypeInformation.of(RowData.class), “commitTime”, ConfigUtils.getDefaultHadoopConf() ); java 使用Flink的DataSource将HoodieIncrementalInputFormat转换为DataStream,例如: DataStream dataStream = env.createInput(inputFormat); java 对DataStream进行处理,例如: dataStream .filter(row -> row.getRowKind() == RowKind.INSERT) .map(row -> { // 处理insert操作的数据 return row; }); java 通过以上步骤,就可以读取Hudi表的changelog,并对其中的insert操作进行处理。

  10. 要读取 Hudi 在 changelog 模式下的 insert,需要使用 Flink 的 Hudi source 进行读取,Hudi source 可以根据 Hudi 数据集的类型自动选择对应的读取器。

    具体而言,可以按照以下步骤配置 Hudi source:

    导入相关依赖:在 Flink 项目中添加 Hudi 相关依赖,例如 flink-connector-hudi; 创建 Hudi source:使用 HoodieSource 类创建 Hudi source。该类需要传入三个参数:数据集类型(例如 COPY_ON_WRITE、MERGE_ON_READ)、数据集路径和表 schema; 配置 Hudi source:使用 HoodieSourceBuilder 类配置 Hudi source,设置必要的参数,例如 checkpoint 目录、读取起始位置等; 读取数据:使用 Flink 的 DataStream API 读取 Hudi 数据源。 示例代码如下:

    // 导入必要的依赖 import org.apache.hudi.connectors.flink.HoodieSource; import org.apache.hudi.connectors.flink.HoodieSourceBuilder; import org.apache.hudi.connectors.flink.util.SerializableConfiguration;

    // 创建 HoodieSource String basePath = “/path/to/hudi/dataset”; // Hudi 数据集路径 String tableName = “hudi_table”; // Hudi 表名 String datasetType = “MERGE_ON_READ”; // Hudi 数据集类型 SerializableConfiguration conf = new SerializableConfiguration(hadoopConf); // Hadoop 配置 String[] readCols = {“col1”, “col2”}; // 读取列名 HoodieSource hoodieSource = new HoodieSourceBuilder() .basePath(basePath) .tableName(tableName) .datasetType(datasetType) .conf(conf.get()) .readCols(readCols) .build();

    // 配置 HoodieSource // 设置 checkpoint 目录和读取起始位置等参数 DataStream> dataStream = env.addSource(hoodieSource);

    // 读取数据,进行下一步处理 需要注意的是,在读取 Hudi 数据时,Flink 会自动跟踪相关的元数据信息,并根据 Hudi 数据集类型选择相应的读取器。在读取过程中,Flink 还可以自动进行 Schema 的解析和匹配,并将 Hudi 数据转换为 Flink 内部的数据结构。

  11. Flink 可以通过 Hudi 提供的工具类 HoodieFlinkStreamer 来读取 Hudi 中的数据,包括 Changelog、Snapshot 等。

    在 Flink 中使用 HoodieFlinkStreamer 读取 Changelog 的 Insert 事件可以参考以下代码:

    // 引入依赖    org.apache.hudi    hudi-flink    ${hudi.version}// 创建 HoodieFlinkStreamer 实例HoodieFlinkStreamer streamer = new HoodieFlinkStreamer();// 设置 Hoodie 数据集配置项Properties props = new Properties();props.put("hoodie.deltastreamer.source.dfs.root", "/path/to/source/data");props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", "/path/to/source/schema/file");props.put("hoodie.deltastreamer.checkpoint.dir", "/path/to/checkpoint/dir");streamer.setProps(props);// 配置 Flink 数据集DataStream hoodieStream = streamer    .withEmbeddedTimelineServer()    .fromFolder("/path/to/hudi/table")    .withFileSystemViewConfig(HoodieFlinkFileSystemViewConfig.newBuilder().build())    .pollIntervalMillis(1000)    .buildInputFormat();    // 过滤 Insert 事件Stream insertStream = hoodieStream    .filter(new FilterFunction() {        @Override        public boolean filter(HoodieRecord hoodieRecord) throws Exception {            return hoodieRecord.getCurrentLocation().getInstantTime().isAfter("20210621155603");        }    })    .filter(new SerializableFunction() {        @Override        public Boolean apply(HoodieRecord hoodieRecord) {            return hoodieRecord.getOperation().equals(HoodieOperation.INSERT);        }    });

    其中,HoodieRecord 是 Hudi 中记录的数据类型,HoodieOperation.INSERT 表示 Insert 操作。通过 filterSerializableFunction 可以过滤出指定时间范围内的 Insert 事件。

  12. Flink 读取 HBase 中的 insert 操作可以通过 TableEnvironment 和 Table 类来实现。

    首先,需要在 Flink 项目中添加 HBase 相关的依赖,例如 Apache HBase 和 Flink 的 TableEnvironment。

    然后,可以使用 TableEnvironment 类来创建一个 Flink 表环境,并使用 Table 类来读取 HBase 中的 insert 操作。