Hudi 0.12.0
Spark 2.4.4,根据文章:Hudi Spark源码学习总结-df.write.format("hudi").save可知,save方法会走到DefaultSource.createRelation。,它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap。,这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:HoodieTableMetaClient.initTable,接着调用writeClient.bootstrap。,这里的writeClient为SparkRDDWriteClient,然后调用HoodieTable的bootstrap,我们这里使用表类型为COW,所以为HoodieSparkCopyOnWriteTable。,这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的。,这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现。,MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的。,很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。,BootstrapRegexModeSelector我们在之前的文章中讲过:首先有配置:hoodie.bootstrap.mode.selector.regex.mode 默认值METADATA_ONLY、hoodie.bootstrap.mode.selector.regex默认值.*但是如果不是默认值的话,比如上面的2020/08/2[0-9],假设我们有分区”2020/08/10,2020/08/10/11,2020/08/20,2020/08/21”,那么匹配成功的2020/08/20和2020/08/21对应的类型为METADATA_ONLY,匹配不成功的2020/08/10和2020/08/10/11则为FULL_RECORD。而至于我的为啥都是FULL_RECORD,原因是regex设置错误,我设置的是2022/10/0[0-9],但实际的分区值为2022-10-08和2022-10-09(分隔符不一样),而如果用默认的.*的话,则全部能匹配上,也就都是METADATA_ONLY(默认情况)。,关于BootstrapModeSelector的实现一共只有上面讲的这三种,下面让我们来看一下metadataBootstrap,fullBootstrap。,这里首先创建keyGenerator,然后获取bootstrapPaths,核心逻辑在于后面的getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap,其中getMetadataHandler我们在之前的文章中讲过了,根据文件类型返回ParquetBootstrapMetadataHandler或者OrcBootstrapMetadataHandler,我们这里返回ParquetBootstrapMetadataHandler。,ParquetBootstrapMetadataHandler的runMetadataBootstrap是在其父类BaseBootstrapMetadataHandler中实现的,这里的核心逻辑在executeBootstrap。,executeBootstrap在ParquetBootstrapMetadataHandler,首先创建一个ParquetReader,然后将reader封装成ParquetReaderIterator,作为BoundedInMemoryExecutor的参数构造wrapper,然后执行wrapper.execute()。,这里是一个生产者-消费者模型,可以参考生产者-消费者模型在Hudi中的应用。,我们主要看一下producer.produce,这里的producer为Collections.singletonList(new IteratorBasedQueueProducer<>(new ParquetReaderIterator(reader))),所以这里的produce方法为IteratorBasedQueueProducer.produce。,其中queue.insertRecord的逻辑是是先执行transformFunction.apply返回payload,然后将payload放到队列里,关于transformFunction我们放到后面分析,先看inputIterator.hasNext()和inputIterator.next()这里的inputIterator为ParquetReaderIterator。,可以看到hasNext的逻辑是判断next是否为null,而next,则返回parquetReader.read(),parquetReader.read()在类ParquetReader中实现,这已经是parquet的源码了,我们就不往下分析了,总之生产者的逻辑是读取parquet的内容放到队列里供消费者使用,接下来看一下消费者。,这里主要是consumer.consume,这里的consumer为new BootstrapRecordConsumer(bootstrapHandle)。,它的consume方法是在父类BoundedInMemoryQueueConsumer中实现的,首先通过queue.iterator().next()从队列queue里获取数据,再调用consumeOneRecord。,这里的record和Payload是啥呢?这就要看我们上面提到的transformFunction了,他是在上面的方法executeBootstrap中定义的。,这里的inp应该为queue.iterator().next()返回的IndexedRecord,首先从IndexedRecord中获取recKey,然后将recKey作为hudi的主键元数据字段放到GenericRecord中,然后将gr作为BootstrapRecordPayload的record构造BootstrapRecordPayload,最后返回new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload),所以上面的record即为下面的rec,而上面的HoodieRecordPayload是通过record.getData()获取的,也就是HoodieAvroRecord中的data,即为下面的BootstrapRecordPayload。,这里的逻辑也很简单, 直接返回record,在上面的transformFunction中可知,record为GenericRecord,只有一个字段RECORD_KEY_METADATA_FIELD,这样我们知道了getInsertValue返回值,就可以继续看一下后面的write方法了。,HoodieBootstrapHandle的write方法是在父类HoodieCreateHandle中实现的,这里的核心逻辑是通过fileWriter将avroRecord写到对应的parquet中,具体的实现也是在parquet源码中,我们知道这里的Record信息只有主键信息和分区信息,这就是为什么metadata为啥只会生成主键、页脚的基本框架文件,不会重写全部数据的原因了,关于METADATA_ONLY我们就分析到这里了,接着看一下FULL_RECORD。,首先通过反射构造inputProvider,它是通过FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME(hoodie.bootstrap.full.input.provider)配置的,默认值为SparkParquetBootstrapDataProvider,然后由inputProvider.generateInputRecords读取源表的parquet文件返回inputRecordsRDD,这里返回的是全部的内容,最后将inputRecordsRDD作为参数,由getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute(),最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中,也就是以Hudi表的形式执行数据的完整复制/重写。关于bulkInsert的源码逻辑,由于比较多,限于篇幅和精力以及能力的原因,本文就不深入介绍了,有机会的话我会再单独分享一篇的。,返回SparkBulkInsertCommitActionExecutor。,执行SparkBulkInsertHelper.bulkInsert。,最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中。,本文简单的对Hudi bootstrap的一些关键的源码逻辑进行了分析,希望能对大家有所帮助。限于精力及能力的原因,有些地方可能不够深入,或者不对的地方,还请大家多多指正,让我们共同进步。,
© 版权声明
文章版权归作者所有,未经允许请勿转载。