Flink深入浅出:Hive读取原理分析
随着Flink 1.10版本的发布,Table API&SQL得以完善,现在可以基于Flink SQL实现离线分析了。在离线分析的场景中,经常的使用场景是读取Hive做分析处理,这时就不得不研究下Flink读取Hive的原理了。在阅读本文之前,可以思考两个问题:
1 Flink是如何读取Hive数据?
2 Flink如何控制读取的并行度?
1 Flink如何识别Hive表
Flink读取Hive表跟其他的外部数据源不太一样,其他的组件是通过with属性中的type+SPI服务扫描对应的Factory类实现,而hive则是直接基于HiveCatalog创建。
创建TableFactory之后的套路,就都一样了。比如创建对应的HiveTableSource,在调用TableSource的getDataStream方法时,创建对应的HiveTableInpurtFormat,后基于这个inputFormat执行查询操作。
2 如何读取Hive中的数据
先来看看个问题,一个标准的Hive表创建后,可以在hive命令行中查看对应的创建信息,使用的命令为:hive> describe formatted table_name;
可以看到原生的hive表使用的是ParquetHiveSerDe作为序列化工具,MapredParquetInputFormat 作为查询格式。因此我们可以使用下面的代码直接读取:
看到这里应该就能猜想到Flink是如何读取数据的了吧:
1 获取HiveCatalog中的定义,拿到对应的目录地址、SerDe、InputFormat
2 通过反射创建对应的反序列化器和InputFormat,获得一条结果
3 调用recordReader.next()获取下一条数据.....
3 Hive表的并行度控制
在HiveTableSource调用getDataStream的步会初始化分区信息,返回当前sql匹配的分区。然后创建对应的InputFormat,当开启了并行度自动推断后,基于前面的分区信息,统计文件个数,并把对应的文件内容放入InputSplit[]中。这个数组有两个作用:
1 后面判断并行度时,根据该数组的长度与默认配置进行比较,确定终并行度
2 切分形成Task后,task会基于这个数组确定每个Slot读取的文件(流程类似前面分享的jdbc的分区抢占)
4 总结流程
通过前面两个问题,再来梳理下读取Hive的整体流程:
1 Flink基于calcite解析sql,确定表来源于hive,在HiveCatalog中创建HiveTableFactory
2 HiveTableFactory基于配置文件创建HiveTableSource
3 HiveTableSource在真正执行时,调用getDataStream方法
4 在getDataStream中先确定查询匹配的分区信息,然后创建表对应的InputFormat,并确定并行度
5 根据并行度确定slot分发HiveMapredSplitReader任务
6 在TaskManager端的Slot中,基于split确定读取的内容
7 基于Hive中定义的序列化工具、InpurtFormat执行读取返序列化,得到value值
8 循环执行reader.next获取value,基于HiveInspectors解析成Row
以上就是flink读取hive数据的大致流程,更多内容可以参考专辑列表。
更多内容参考公众号:
http://weixin.qq.com/r/-yjG3iTEbnkRrWC9933t (二维码自动识别)
相关文章