Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

2020-05-25 00:00:00 数据 语句 数据源 计算 演示

上周六在深圳分享了《Flink SQL 1.9.0 技术内幕和佳实践》,会后许多小伙伴对后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于 Flink SQL 的初学者能有所帮助。完整分享可以观看 Meetup 视频回顾 :developer.aliyun.com/li

演示代码已经开源到了 GitHub 上:https://github.com/wuchong/flink-sql-submit

这份代码主要由两部分组成:1) 能用来提交 SQL 文件的 SqlSubmit 实现。2) 用于演示的 SQL 示例、Kafka 启动停止脚本、 一份测试数据集、Kafka 数据源生成器。

通过本实战,你将学到:

  1. 如何使用 Blink Planner
  2. 一个简单的 SqlSubmit 是如何实现的
  3. 如何用 DDL 创建一个 Kafka 源表和 MySQL 结果表
  4. 运行一个从 Kafka 读取数据,计算 PVUV,并写入 MySQL 的作业
  5. 设置调优参数,观察对作业的影响

SqlSubmit 的实现

笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。所以笔者就只好自己写了个简单的提交脚本。后来想想,也挺好的,可以让听众同时了解如何通过 SQL 的方式,和编程的方式使用 Flink SQL。

SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...)。如果是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build();
// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 读取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通过正则表达式匹配前缀,来区分不同的 SQL 语句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根据不同的 SQL 语句,调用 TableEnvironment 执行
for (SqlCommandCall call : calls) {
  switch (call.command) {
    case SET:
      String key = call.operands[0];
      String value = call.operands[1];
      // 设置参数
      tEnv.getConfig().getConfiguration().setString(key, value);
      break;
    case CREATE_TABLE:
      String ddl = call.operands[0];
      tEnv.sqlUpdate(ddl);
      break;
    case INSERT_INTO:
      String dml = call.operands[0];
      tEnv.sqlUpdate(dml);
      break;
    default:
      throw new RuntimeException("Unsupported command: " + call.command);
  }
}
// 提交作业
tEnv.execute("SQL Job");

相关文章