5分钟Flink1.10 - 自定义Data Source源
文章内容
自定义Flink Source,案例分别实现了继承于SourceFunction的四个案例,三个完全自定义的Source, 另外一个Source为常见的MySQL,通过这几个案例,启发我们进行实际案例的Source研发
代码版本
Flink : 1.10.0
Scala : 2.12.6
官网部分说明
这个是关于Interface中Souce中的信息以及链接,关于SourceFunction的说明,基本使用到的是实现了SourceFunction接口的类
Flink1.10:https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/SourceFunction.html
ALL Known Implementing Classes 就是SourceFunction以及实现于SourceFunction的各个类
自定义Source中,我们可以使用SourceFunction也可以使用它的实现类,看具体情况
可以通过-非并行Source实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源
以下有四个案例,可以根据代码直接进行跑通实现
- 自定义Source,实现自定义&并行度为1的source
- 自定义Source,实现一个支持并行度的source
- 自定义Source,实现一个支持并行度的富类source
- 自定义Source,实现消费MySQL中的数据
1. 自定义Source,实现自定义&并行度为1的source
自定义source,实现SourceFunction接口,实现一个没有并行度的案例
功能:每隔 1s 进行自增加1
实现的方法:run(),作为数据源,所有数据的产生都在 run() 方法中实现
文件名:MyNoParallelFunction.scala
package com.tech.consumer
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
/**
* 创建自定义并行度为1的source
*/
class MyNoParallelFunction extends SourceFunction[Long]{
var count = 0L
var isRunning = true
override def run(ctx: SourceContext[Long]): Unit = {
while( isRunning ) {
ctx.collect(count)
count += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
相关文章