Hive 终于等来了 Flink

2020-07-01 00:00:00 数据 版本 编译 依赖 集成

文章整理:加米谷大数据

Apache Spark 什么时候开始支持集成 Hive 功能?笔者相信只要使用过 Spark 的读者,应该都会说这是很久以前的事情了。

那 Apache Flink 什么时候支持与 Hive 的集成呢?读者可能有些疑惑,还没有支持吧,没用过?或者说近版本才支持,但是功能还比较弱。

其实比较也没啥意义,不同社区发展的目标总是会有差异,而且 Flink 在真正的实时流计算方面投入的精力很多。不过笔者想表达的是,Apache Hive 已经成为数据仓库生态系统的焦点,它不仅是一个用于大数据分析和 ETL 的 SQL 引擎,也是一个数据管理平台,所以无论是 Spark,还是 Flink,或是 Impala、Presto 等,都会积极地支持集成 Hive 的功能。

的确,对真正需要使用 Flink 访问 Hive 进行数据读写的读者会发现,Apache Flink 1.9.0 版本才开始提供与 Hive 集成的功能。不过,值得欣慰的是,Flink 社区在集成 Hive 功能方面付出很多,目前进展也比较顺利,近 Flink 1.10.0 RC1 版本已经发布,感兴趣的读者可以进行调研和验证功能。

架构设计

首先,笔者基于社区公开的资料以及博客,概括性地讲解 Flink 集成 Hive 的架构设计。

Apache Flink 与 Hive 集成的目的,主要包含了元数据和实际表数据的访问。

元数据

为了访问外部系统的元数据,Flink 刚开始提供了 ExternalCatalog 的概念。但是 ExternalCatalog 的定义非常不完整,基本处于不可用的状态。Flink 1.10 版本正式删除了 ExternalCatalog API (FLINK-13697),这包括:

  • ExternalCatalog(以及所有依赖的类,比如 ExternalTable)
  • SchematicDescriptor、MetadataDescriptor 和 StatisticsDescriptor

针对 ExternalCatalog 的问题,Flink 社区提出了一套全新的 Catalog 接口(new Catalog API)来取代现有的 ExternalCatalog。新的 Catalog 实现的功能包括:

  • 能够支持数据库、表、分区等多种元数据对象
  • 允许在一个用户 Session 中维护多个 Catalog 实例,从而支持同时访问多个外部系统
  • Catalog 以可插拔的方式接入 Flink,允许用户提供自定义的实现

下图展示了新的 Catalog API 的总体架构:



创建 TableEnvironment 的时候会同时创建一个 CatalogManager,负责管理不同的 Catalog 实例。TableEnvironment 通过 Catalog 来为 Table API 和 SQL Client 用户提供元数据服务。




目前 Catalog 有两个实现,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元数据管理机制,将所有元数据保存在内存中。而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。

另一方面,HiveCatalog 也可以用来处理 Flink 自身的元数据,在这种场景下,HiveCatalog 仅将 Hive Metastore 作为持久化存储使用,写入 Hive Metastore 中的元数据并不一定是 Hive 所支持的格式。一个 HiveCatalog 实例可以同时支持这两种模式,用户无需为管理 Hive 和 Flink 的元数据创建不同的实例。

另外,通过设计 HiveShim 来支持不同版本的 Hive Metastore,具体支持的 Hive 版本列表,请参考官方文档。

表数据

Flink 提供了 Hive Data Connector 来读写 Hive 的表数据。Hive Data Connector 尽可能的复用了 Hive 本身的 Input/Output Format 和 SerDe 等类,这样做的好处一方面是减少了代码重复,更重要的是可以大程度的保持与 Hive 的兼容,即 Flink 写入的数据 Hive 可以正常读取,并且反之亦然。

集成 Hive 功能

Flink 与 Hive 集成的功能在 1.9.0 版本中作为试用功能发布,存在不少使用的局限性,但是不久将发布的 Flink 1.10 稳定版本会更加完善集成 Hive 的功能并应用到企业场景中。

为了让读者提前体验 Flink 1.10 集成 Hive 的功能,笔者会基于 Cloudera CDH 编译 Flink 1.10.0 RC1 版本并进行较为完整的测试。

环境信息

CDH 版本:cdh5.16.2Flink 版本:release-1.10.0-rc1

Flink 使用了 RC 版本,仅供测试,不建议用于生产环境。目前 Cloudera Data Platform 正式集成了 Flink 作为其流计算产品,非常方便用户使用。

CDH 环境开启了 Sentry 和 Kerberos。

下载并编译 Flink




不出意外的话,编译到 flink-hadoop-fs 模块时,会报如下错误:




编译中遇到 flink-shaded-hadoop-2 找不到的问题,其实查看 Maven 仓库会发现,根本原因是 CDH 的 flink-shaded-hadoop-2 的 jar 包在 Maven 中央仓库是没有对应的编译版本,所以需要先对 Flink 依赖的 flink-shaded-hadoop-2 进行打包,再进行编译。

■ 解决 flink-shaded-hadoop-2 问题

  • 获取 flink-shaded 源码




  • 切换依赖的版本分支

根据上面报错时提示缺少的版本切换对应的代码分支,即缺少的是 9.0 版本的 flink-shaded-hadoop-2:




  • 配置 CDH Repo 仓库

修改 flink-shaded 项目中的 pom.xml,添加 CDH maven 仓库,否则编译时找不到 CDH 相关的包。

在 ... 中添加如下内容:




  • 编译 flink-shaded

开始执行编译:

mvn clean install -DskipTests-Drat.skip=true-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

相关文章