如何在 Apache Flink 中使用 Python API?

2020-07-03 00:00:00 支持 架构 开发 方式 算子

本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink PMC,阿里巴巴技术专家 孙金城 分享。重点为大家介绍 Flink Python API 的现状及未来规划,主要内容包括:Apache Flink Python API 的前世今生和未来发展;Apache Flink Python API 架构及开发环境搭建;Apache Flink Python API 核心算子介绍及应用。

一.Apache Flink Python API 的前世今生和未来发展

1.Flink 为什么选择支持 Python

Apache Flink 是流批统一的开源大数据计算引擎,在 Flink 1.9.0 版本开启了新的 ML 接口和全新的Python API架构。那么为什么 Flink 要增加对 Python 的支持,下文将进行详细分析。

  • 流行的开发语言



Python 本身是非常的开发语言,据 RedMonk 数据统计,除 Java 和 JavaScript 之外,受欢迎度排名第三。

RedMonk 是的以开发人员为中心的行业分析公司,其更详细的分析信息,大家在拿到我的PPT之后,可以点击链接进行详细查阅。好了,那么Python的火热,与我们今天向大家分享的流批统一的大数据计算引擎,Apache Flink有什么关系呢?带着这个问题,我们大家想想目前与大数据相关的的开源组件有哪些呢?比如说早期的批处理框架Hadoop?流计算平台Storm,近异常火热的Spark?异或其他领域数仓的Hive,KV存储的HBase?这些都是非常的开源项目,那么这些项目都无一例外的进行了Python API的支持。

  • 众多开源项目支持



Python 的生态已相对完善,基于此,Apache Flink 在 1.9 版本中也投入了大量的精力,去推出了一个全新的 Pyflink。除大数据外,人工智能与Python也有十分密切的关系。

  • ML青睐的语言



从上图统计数据可以发现,Python API 本身已经占机器学习岗位需求语言的 0.129%。相对于 R 语言,Python 语言似乎更受青睐。Python 作为解释型语言,语法的设计哲学是”用一种方法并且只有一种方法来做一件事”。其简洁和易用性使其成为了世界上受欢迎的语言,在大数据计算领域都有着很好的生态建设,同时Python在机器学习 在机器学习方面也有很好的前景,所以我们在近期发布的Apache Flink 1.9 以全新的架构推出新的 Python API

Flink 是一款流批统一的计算引擎,社区非常重视和关注 Flink 用户,除 Java 语言或者 Scala 语言,社区希望提供多种入口,多种途径,让更多的用户更方便的使用 Flink,并收获 Flink 在大数据算力上带来的价值。因此 Flink 1.9 开始,Flink 社区以一个全新的技术体系来推出 Python API,并且已经支持了大部分常用的一些算子,比如如 JOIN,AGG,WINDOW 等。

2.Python API – RoadMap



在 Flink 1.9 中虽然 Python 可以使用 Java 的 User-defined Function,但是还缺乏 Python native 的 User-defined function 的定义,所以我们计划在 Flink 1.10 中进行支持 Python User-defined function 的支持。并技术增加对数据分析工具类库 Pandas 的支持,在 Flink 1.11 增加对 DataStream API 和 ML API 的支持。

二.Python API架构及开发环境搭建

1.Python Table API架构



新的 Python API 架构分为用户 API 部分,PythonVM 和 Java VM 的通讯部分,和终将作业提交到 Flink 集群进行运行的部分。那么 PythonVM 和 JavaVM 是怎样通讯的呢?我们在Python 端会会有一个 Python 的 Gateway 用于保持和 Java 通讯的链接,在 Java 部分有一个 GateWayServer 用于接收 Python 部分的调用请求。

关于 Python API 的架构部分,在 1.9 之前,Flink 的 DataSet 和 DataStream 已经有了对 Python API 的支持,但是拥有 DataSet API 和 DataStream API 两套不同的 API。对于 Flink 这样一个流批统一的流式计算引擎来讲,统一的架构至关重要。并且对于已有的 Python DataSet API 和 DataStream API 而言,采用了JPython 的技术体系架构,而 JPython 本身对目前 Python 的 3.X 系列无法很好的支持,所以 Flink 1.9 发布后,决定将原有的 Python API 体系架构废弃,以全新的技术架构出现。这套全新的 Python API 基于 Table API 之上。

Table API 和 Python API 之间的通讯采用了一种简单的办法,利用 Python VM 和 Java VM 进行通信。在 Python API 的书写或者调用过程中,以某种方式来与 Java API 进行通讯。操作 Python API 就像操作 Java 的 Table API一样。新架构中可以确保以下内容:

  • 不需要另外创建一套新的算子,可以轻松与 Java 的 Table API 的功能保持一致;
  • 得益于现有的 Java Table API 优化模型,Python 写出来的API,可以利用 Java API 优化模型进行优化,可以确保 Python 的 API 写出来的 Job 也能够具备性能。



如图,当 Python 发起对Java的对象请求时候,在 Java 段创建对象并保存在一个存储结构中,并分配一个 ID 给 Python 端,Python 端在拿到 Java 对象的 ID 后就可以对这个对象进行操作,也就是说 Python 端可以操作任何 Java 端的对象,这也就是为什么新的架构可以保证Python Table API 和 Java Table API功能一致,并且能过服用现有的优化模型。



在新的架构和通讯模型下,Python API 调用 Java API 只需要在持有 Java 对象的 ID,将调用方法的名字和参数传递给 Java VM,就能完成对 Java Table API 的调用,所以在这样的架构中开发 Python Table API 与开发 Java Table API 的方式完全一致,接下来我为大家详细介绍如何开发一个简单的 Python API 作业。

2.Python Table API – Job开发



通常来讲一个 Python Table Job 一般会分成四个部分,首先要根据目前的现状,要决定这个Job 是以批的方式运行,还是流的方式运行。当然后续版本用户可以不考虑,但当前 1.9 版本还是需要考虑。

在决定步以怎样的方式执行 Job 后,我们需要了解数据从哪里来,如何定义 Source、结构数据类型等信息。然后需要写计算逻辑,然后就是对数据进行计算操作,但终计算的结果需要持久化到某个系统。后定义 Sink,与 Source 类似,我们需要定义 Sink Schema,以及每一个字段类型。

下面将详细分享如何用 Python API 写每一步?首先,我们创建一个执行环境,对于执行环境本身来讲,首先需要一个 ExecutionEnvironment,根本上我们需要一个 TableEnvironment。那么在 TableEnvironment 中,有一个参数 Table Config,Table Config 中会有一些在执行过程中的配置参数,可以传递到 RunTime 层。除此之外,还提供了一些个性化的配置项,可以在实际业务开发中进行使用。



在拿到 Environment 后,需要对数据源表进行定义,以 CSV 格式文件为例,用"逗号"分隔,用 Field 来表明这个文件中有哪些字段。那么会看到,目前里面用逗号分隔,并且只有一个字段叫 word,类型是 String。



在定义并描述完数据源数据结构转换成 Table 数据结构后,也就是说转换到 Table API 层面之后是怎样的数据结构和数据类型?下面将通过 with_schema 添加字段及字段类型。这里只有一个字段,数据类型也是 String,终注册成一个表,注册到 catlog 中,就可以供后面的查询计算使用了。



创建结果表,当计算完成后需要将这些结果存储到持久化系统中,以 WordCount 为例,首先存储表会有一个 word 以及它的计数两个字段,一个是 String 类型的 word,另一个是 Bigint 的计数,然后把它注册成 Sink。



编写注册完 Table Sink 后,再来看如何编写逻辑。其实用 Python API 写 WordCount 和 Table API 一样非常简单。因为相对于 DataSream 而言 Python API 写一个 WordCount 只需要一行。比如 group by,先扫描Source表,然后 group by 一个 Word,再进行 Select word 并加上聚合统计Count ,终将数据结果插入到结果表里面中。

3.Python Table API – 环境搭建



那么WordCount 怎样才能真正的运行起来?首先需要搭建开发环境,不同的机器上可能安装的软件版本不一样,这里列出来了一些版本的需求和要求,其中括号中是示例机器上的版本。



第二步,构建一个 Java 的二进制发布包,以从源代码进行构建,那么这一页面就是从原代码获取我们的主干代码,并且拉取 1.9 的分支。当然大家可以用 Mater,但是 Master 不够稳定,还是建议大家在自己学习的过程中,好是用 1.9 的分支去做。接下来进行实战演练环节,首先验证 PPT 的正确性。首先编译代码,示例如下:

//下载源代码
git clone https://github.com/apache/flink.git
// 拉取1.9分支
cd flink; git fetch origin release-1.9
git checkout -b release-1.9 origin/release-1.9
//构建二进制发布包
mvn clean install -DskipTests -Dfast

相关文章