Flink零基础学习教程:map、filter和flatMap算子实例详解

2020-07-13 00:00:00 函数 元素 算子 表达式 数据流

本文将对Flink Transformation中各算子进行详细介绍,并使用大量例子展示具体使用方法。Transformation各算子可以对Flink数据流进行处理和转化,是Flink流处理非常核心的API。如之前文章所述,多个Transformation算子共同组成一个数据流图。

Flink的Transformation是对数据流进行操作,其中数据流涉及到的常用数据结构是DataStreamDataStream由多个相同的元素组成,每个元素是一个单独的事件。在Scala中,我们使用泛型DataStream[T]来定义这种组成关系,T是这个数据流中每个元素对应的数据类型。在之前的股票数据流处理的例子中,数据流中每个元素的类型是股票价格StockPrice,整个数据流的数据类型为DataStream[StockPrice]。在Java中,这种泛型对应的数据结构为DataStream<T>

在使用这些算子时,需要在算子上进行用户自定义操作,一般使用Lambda表达式或者继承模板类并重写函数两种方式完成这个用户自定义的过程。下文将用map算子来演示如何使用Lambda表达式或者重写函数的方式实现对算子的自定义。

读者可以使用Flink Scala Shell或者Intellij Idea来进行练习:

  • Flink Scala Shell使用教程
  • Intellij Idea开发环境搭建教程

Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。本文先介绍单数据流基本转换,完整的代码在github上:github.com/luweizheng/f

map

map算子对一个DataStream中的每个元素使用用户自定义的map函数进行处理,每个输入元素对应一个输出元素,终整个数据流被转换成一个新的DataStream。输出的数据流DataStream[OUT]类型可能和输入的数据流DataStream[IN]不同。

map算子示意图

我们可以重写MapFunctionRichMapFunction来自定义map函数,RichMapFunction的定义为:RichMapFunction[IN, OUT],其内部有一个map虚函数,我们需要对这个虚函数重写。

val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, , 5, -9, 8)

// 继承RichMapFunction
// 个泛型是输入类型,第二个参数是泛型类型
class DoubleMapFunction extends RichMapFunction[Int, String] {
  override def map(input: Int): String =
  ("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
}

val richFunctionDataStream = dataStream.map {new DoubleMapFunction()}

相关文章