DolphinDB节点启动时的流计算自动订阅教程

2022-03-28 00:00:00 数据 代码 节点 脚本 计算

DolphinDB 内置的流数据框架支持流数据的发布、订阅、预处理、实时内存计算、复杂指标的滚动窗口计算等,是一个运行高效、使用便捷的流数据处理框架。



本教程主要解决基于 DolphinDB 流数据处理框架完成业务代码开发后,如何部署节点启动时的流计算自动订阅问题。

1. 业务代码开发

业务描述

以金融交易实时 Level2 的股票快照数据为例,实现以下业务需求:

(1)Level2 的股票快照数据实时写入数据库持久化;

(2)根据原始数据的 LastPx、TotalVolumeTrade、TotalValueTrade 的值,处理出每一笔快照数据的一些中间值:如回报率(Return)、成交量(Volume)、成交金额(Amount);

(3)根据原始数据和处理的中间值生成一些分钟指标。

上述业务在 DolphinDB 中的处理流程如下图所示:



处理流程图说明:

(1)snapshotStream、snapshotStreamProcess、snapshotAggr1min 都是共享的异步持久化流数据表。snapshotStream 用于存储 Level2 的股票快照原始数据,snapshotStreamProcess 用于存储响应式状态引擎处理后的包含中间值的结果数据,snapshotAggr1min 用于存储时间序列引擎处理后的分钟指标数据。将内存表共享的目的是让当前节点所有其它会话对该表可见,API 写入时与 DolphinDB Server 的会话相对于定义这些表的会话不是同一个,所以需要共享。对流数据表的持久化的主要目的有两个,一是控制该表的大内存占用,通过设置 enableTableShareAndPersistence 函数中的 cacheSize 大小,控制该表在内存中保留的大记录条数进而控制该表的大内存占用;二是在节点异常关闭的情况下,从持久化目录恢复已经写入流数据表但是未消费的数据,保证流数据“至少消费一次”的需求。流数据表持久化采用异步的方式进行,可以有效提高流表写入的吞吐量。只有流数据表才可以被订阅消费,所以需要将这些表定义成流数据表。

(2)subExecutor 表示流数据处理线程,可以通过配置文件的 subExecutors 指定节点的大可用流数据处理线程数。通过设置 subscribeTable 函数中的 hash 参数,指定消费 topic 的流数据处理线程。比如 subExecutor 设置为 n,则 hash 可以从 0 至 n-1 进行指定,对应流数据处理线程 1 至 n。

(3)响应式状态引擎和时间序列引擎是 DolphinDB 的内置的流计算引擎,都已实现增量计算。响应式状态引擎处理了上述业务需求中的中间值,时间序列引擎用于计算生成分钟指标。

(4)loadTable("dfs://snapshot", "snapshot") 用于存储原始数据,做数据的持久化。

业务代码

本教程业务代码基于 1.30.15 版本开发,1.30 所有版本都可以运行本教程的示例代码,详细代码内容见附件。

2. DolphinDB 系统启动流程

DolphinDB 系统的启动流程如下图所示:



  • 系统初始化脚本(dolphindb.dos)
    系统初始化脚本是必需的,默认加载版本发布目录中的 dolphindb.dos,不建议做修改,因为版本升级的时候需要用新版本发布包中的系统初始化脚本覆盖。
  • 用户启动脚本(startup.dos)
    用户启动脚本是通过配置参数 startup 后才会执行,单机 single 模式在 dolphindb.cfg 中配置,集群模式在 cluster.cfg 中配置,可配置路径或相对路径。若配置了相对路径或者没有指定目录,系统会依次搜索本地节点的 home 目录、工作目录和可执行文件所在目录。
    配置举例如下:
startup=/opt/DolphinDB/server/startup.dos

相关文章