一文告诉你Flink程序广播变量该这样用……

2020-07-01 00:00:00 数据 集群 变量 节点 广播

总是被项目经理问:为什么你的Flink代码占用我这么多的集群资源啊?集群受不了,优化一下吧,程序员一听到优化的痛疼症,你懂的……



image

今天我们就讲解一个比较基础,且容易被人忽略的基础优化Flink程序的方法。Flink和Spark一样,都有支持广播变量这定义。广播变量,可以理解成为日常的广播,是一个公共的变量。

广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降。我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。

如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。广播变量,可以借助下图辅助理解。


image

光说不用,假把式,广播变量实际如果使用呢,我们通过一个例子来体验吧,创建广播变量,我们使用withBroadcastSet来创建,而使用getRuntimeContext.getBroadcastVariable,实际操作中,我们一般把比较小的一个数据集设置为广播变量。

使用步骤,构建两个测试数据集,使用RichMapFuntion对成绩数据进行map转换,在数据集调用map方法后,会调用withBroadcastSet将学生数据集创建广播变量,广播出去。核心代码如下:

def main(args: Array[String]): Unit = {

    //获取flink运行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //分别创建两个数据集
    val student = env.fromCollection(List((1,"大胖"),(2,"二狗子"),(3,"三狗子"),(4,"小贵子"),(5,"小丸子")))

    val score = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86),(5, "物理", 86),(4, "化学", 86)))

    val resultDatas = score.map(
      new RichMapFunction[(Int, String, Int), (String, String, Int)] {

        var bc_studentList: List[(Int, String)] = null

        override def open(parameters: Configuration): Unit = {

          import scala.collection.JavaConverters._
          bc_studentList = getRuntimeContext.getBroadcastVariable[(Int, String)]("bc_student").asScala.toList

        }

        override def map(value: (Int, String, Int)): (String, String, Int) = {

          val studentId = value._1

          val tuples = bc_studentList.filter(x => x._1 == studentId)

          (tuples(0)._2, value._2, value._3)

        }
      }
    ).withBroadcastSet(student, "bc_student")

    resultDatas.print()


  }

相关文章