5分钟Flink1.10 - 自定义Data Source源

2020-07-03 00:00:00 数据 并行 自定义 数据源 文件名


文章内容

自定义Flink Source,案例分别实现了继承于SourceFunction的四个案例,三个完全自定义的Source, 另外一个Source为常见的MySQL,通过这几个案例,启发我们进行实际案例的Source研发

代码版本

Flink : 1.10.0
Scala : 2.12.6

官网部分说明

这个是关于Interface中Souce中的信息以及链接,关于SourceFunction的说明,基本使用到的是实现了SourceFunction接口的类

Flink1.10:ci.apache.org/projects/


ALL Known Implementing Classes 就是SourceFunction以及实现于SourceFunction的各个类

自定义Source中,我们可以使用SourceFunction也可以使用它的实现类,看具体情况

可以通过-非并行Source实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源

以下有四个案例,可以根据代码直接进行跑通实现
  1. 自定义Source,实现自定义&并行度为1的source
  2. 自定义Source,实现一个支持并行度的source
  3. 自定义Source,实现一个支持并行度的富类source
  4. 自定义Source,实现消费MySQL中的数据

1. 自定义Source,实现自定义&并行度为1的source

自定义source,实现SourceFunction接口,实现一个没有并行度的案例

功能:每隔 1s 进行自增加1

实现的方法:run(),作为数据源,所有数据的产生都在 run() 方法中实现

文件名:MyNoParallelFunction.scala

package com.tech.consumer

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
/**
  * 创建自定义并行度为1的source
  */
class MyNoParallelFunction extends SourceFunction[Long]{
  var count = 0L
  var isRunning = true

  override def run(ctx: SourceContext[Long]): Unit = {
    while( isRunning ) {
      ctx.collect(count)
      count += 1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

相关文章