一文告诉你Flink程序广播变量该这样用……
总是被项目经理问:为什么你的Flink代码占用我这么多的集群资源啊?集群受不了,优化一下吧,程序员一听到优化的痛疼症,你懂的……
image
今天我们就讲解一个比较基础,且容易被人忽略的基础优化Flink程序的方法。Flink和Spark一样,都有支持广播变量这定义。广播变量,可以理解成为日常的广播,是一个公共的变量。
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降。我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。
如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。广播变量,可以借助下图辅助理解。
image
光说不用,假把式,广播变量实际如果使用呢,我们通过一个例子来体验吧,创建广播变量,我们使用withBroadcastSet来创建,而使用getRuntimeContext.getBroadcastVariable,实际操作中,我们一般把比较小的一个数据集设置为广播变量。
使用步骤,构建两个测试数据集,使用RichMapFuntion对成绩数据进行map转换,在数据集调用map方法后,会调用withBroadcastSet将学生数据集创建广播变量,广播出去。核心代码如下:
def main(args: Array[String]): Unit = {
//获取flink运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//分别创建两个数据集
val student = env.fromCollection(List((1,"大胖"),(2,"二狗子"),(3,"三狗子"),(4,"小贵子"),(5,"小丸子")))
val score = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86),(5, "物理", 86),(4, "化学", 86)))
val resultDatas = score.map(
new RichMapFunction[(Int, String, Int), (String, String, Int)] {
var bc_studentList: List[(Int, String)] = null
override def open(parameters: Configuration): Unit = {
import scala.collection.JavaConverters._
bc_studentList = getRuntimeContext.getBroadcastVariable[(Int, String)]("bc_student").asScala.toList
}
override def map(value: (Int, String, Int)): (String, String, Int) = {
val studentId = value._1
val tuples = bc_studentList.filter(x => x._1 == studentId)
(tuples(0)._2, value._2, value._3)
}
}
).withBroadcastSet(student, "bc_student")
resultDatas.print()
}
相关文章