干货丨时序数据库DolphinDB横截面引擎教程

2022-05-23 00:00:00 数据 计算 引擎 触发 横截面

在处理实时流数据时,不仅需要按照时间做纵向聚合计算(时间序列聚合引擎),还需要对新的数据做横向比较和计算,如金融里对所有股票的新报价求百分位、工业物联网中计算一批设备的温度均值等。DolphinDB database 提供了横截面聚合引擎,可以对流数据中所有分组的新数据做聚合运算。

横截面引擎的主体分为两部分:横截面数据表和计算引擎。横截面数据是横截面引擎的内部表,保存了所有分组新的截面数据。计算引擎是一组聚合计算表达式以及触发器,系统会按照指定的方式触发聚合运算,计算结果会输出到另外一个表中。

1. 基本用法

在DolphinDB中,通过createCrossSectionalAggregator创建横截面聚合引擎。它返回一个横截面数据表,保存了所有分组新的截面数据,往这个表写入数据意味着这些数据进入横截面聚合引擎进行计算。具体用法如下:

createCrossSectionalAggregator(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern="perBatch"], [triggeringInterval=1000])
复制代码
  • name是一个字符串,表示横截面聚合引擎的名称,是横截面聚合引擎的标识。它可以包含字母,数字和下划线,但必须以字母开头。

  • metrics是元代码。它可以是系统内置或用户自定义的函数,如<[sum(qty), avg(price)]>,可以对聚合结果使用表达式,如<[avg(price1)-avg(price2)]>,也可以对计算列进行聚合运算,如<[std(price1-price2)]>。详情可参考元编程。

  • dummyTable是表对象,它可以不包含数据,但它的结构必须与订阅的流数据表相同。

  • outputTable是表对象,用于保存计算结果。输出表的列数为metrics数量+1,列为TIMESTAMP类型,用于存放发生计算的时间戳,,其他列的数据类型必须与metrics返回结果的数据类型一致。

  • keyColumn是一个字符串,指定dummyTable的某列为横截面聚合引擎的key。keyColumn指定列中的每一个key对应表中的一行。

  • triggeringPattern是一个字符串,表示触发计算的方式。它可以是以下取值:

  • "perRow": 每插入一行数据触发一次计算

  • "perBatch": 每插入一次数据触发一次计算

  • "interval": 按一定的时间间隔触发计算

  • triggeringInterval是一个整数。只有当triggeringPattern的取值为interval时才生效,表示触发计算的时间间隔。默认值为1000毫秒。

2. 示例

下面通过一个例子说明横截面聚合引擎的应用。在金融交易中,往往需要实时了解所有股票新的报价均值、近一次成交量总和以及近一次交易的交易量。DolphinDB的横截面聚合引擎结合流数据订阅功能可以方便地完成这些工作。

(1)创建实时交易表

股票的实时交易表trades,包含以下主要字段:

sym:股票代码
time:时间
price:成交价
qty:成交量
复制代码

每当交易发生时,实时数据会写入trades表。创建trades表的脚本如下:

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
复制代码

(2)创建横截面聚合引擎

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perRow)
复制代码

tradesCrossAggregator是横截面数据表,它按股票代码分组,每个股票有且仅有一行。当数据进入该表时,会计算每个股票的avg(price), sum(qty)和sum(price*qty)。每插入一条数据触发一次计算。

(3)横截面数据表订阅实时交易表

subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)
复制代码

通过流数据订阅功能,把实时数据写入横截面数据表。

(4)模拟数据产生

def writeData(n){
   timev  = 2000.10.08T01:01:01.001 + timestamp(1..n)
   symv   = take(`A`B, n)
   pricev = take(102.1 33.4 73.6 223,n)
   qtyv   = take(60 74 82 59, n)
   insert into trades values(timev, symv, pricev,qtyv)
}
writeData(4);
复制代码

查看实时交易表,共有4条数据。

select * from trades
time                    sym price qty
----------------------- --- ----- ---
2000.10.08T01:01:01.002 A   102.1 60 
2000.10.08T01:01:01.003 B   33.4  74 
2000.10.08T01:01:01.004 A   73.6  82 
2000.10.08T01:01:01.005 B   223   59 
复制代码

查看横截面数据表,里面保存了A、B两只股票近的两笔交易记录。

select * from tradesCrossAggregator
time                    sym price qty
----------------------- --- ----- ---
2000.10.08T01:01:01.004 A   73.6  82 
2000.10.08T01:01:01.005 B   223   59 
复制代码

查看横截面引擎的输出表,由于横截面引擎采用了perRow每行触发计算的频率,所以每往横截面表写入一行数据,聚合引擎都会做一次计算,因此一共有4条记录。

select * from outputTable
time                    avgPrice sumqty Total  
----------------------- -------- ------ -------
2019.07.08T10:04:41.731 102.1    60     6126   
2019.07.08T10:04:41.732 67.75    134    8597.6 
2019.07.08T10:04:41.732 53.5     156    8506.8 
2019.07.08T10:04:41.732 148.3    141    19192.2
复制代码

通过getAggregatorStat函数查看横截面引擎的状态。

getAggregatorStat().CrossSectionalAggregator
name               user  status lastErrMsg numRows numMetrics metrics            triggeringPattern triggeringInterval
------------------ ----- ------ ---------- ------- ---------- ------------------ ----------------- ------------------
CrossSectionalDemo guest OK                2       3          [ avg(price), su...perRow            1000    
复制代码

通过removeAggregator函数删除横截面引擎。

removeAggregator("CrossSectionalDemo")
复制代码

3. 触发计算的几种方式

横截面引擎一共有三种触发计算的方式:perRow、perBatch和interval。上面的例子中采用的是每插入一行数据触发一次计算。下面介绍另外两种触发计算的方式。

  • perBatch

perBatch参数表示每追加一批数据就触发一次写入,下例按perBatch模式启用横截面引擎,脚本一共生成12条记录,分三批写入,输出表中预期有3条记录。

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE])
tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perBatch)
subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)
def writeData(n){
   timev  = 2000.10.08T01:01:01.001 + timestamp(1..n)
   symv   = take(`A`B, n)
   pricev = take(102.1 33.4 73.6 223,n)
   qtyv   = take(60 74 82 59, n)
   insert into trades values(timev, symv, pricev,qtyv)
}
//写入三批数据,预期会触发三次计算,输出三次聚合结果。
writeData(4);
writeData(4);
writeData(4);
复制代码

查看横截面数据表。

select * from tradesCrossAggregator
time                    sym price qty
----------------------- --- ----- ---
2000.10.08T01:01:01.002 A   73.6  82 
2000.10.08T01:01:01.003 B   33.4  59 
复制代码

查看输出表。插入了三批数据,因此输出表中有3条记录。

select * from outputTable
time                    avgPrice sumqty Total  
----------------------- -------- ------ -------
2019.07.08T10:14:54.446 148.3    141    19192.2
2019.07.08T10:14:54.446 148.3    141    19192.2
2019.07.08T10:14:54.446 148.3    141    19192.2
复制代码
  • interval

当触发计算的方式为interval时,需要指定triggeringInterval,表示每隔triggeringInterval毫秒触发一次计算。下面的例子中,分6次写入12条记录,每次间隔500毫秒。设置横截面引擎每1000毫秒触发一次计算,预期终输出3条记录。

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE])
tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `interval,1000)
subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)
def writeData(n){
   timev  = 2000.10.08T01:01:01.001 + timestamp(1..n)
   symv   = take(`A`B, n)
   pricev = take(102.1 33.4 73.6 223,n)
   qtyv   = take(60 74 82 59, n)
   insert into trades values(timev, symv, pricev,qtyv)
}
a = now()
writeData(2);
sleep(500)
writeData(2);
sleep(500)
writeData(2);
sleep(500)
writeData(2);
sleep(500)
writeData(2);
sleep(500)
writeData(2);
sleep(500)
b = now()
select count(*) from outputTable

3
复制代码

如果再次执行select count(*) from outputTable,会发现随着时间的推移,输出表的记录数会不断增长。这是因为在interval模式下,计算是按照现实时间定时触发,并不依赖于是否有新的数据进来。

4. 横截面数据表的独立使用

从上面的例子中可以看出,横截面表虽然是为聚合计算提供的一个中间数据表,但其实在很多场合还是能独立发挥作用的。比如我们需要定时刷新某只股票的新交易价格,按照常规思路是从实时交易表中按代码筛选股票并拿出后一条记录,而交易表的数据量是随着时间快速增长的,如果频繁做这样的查询,无论从系统的资源消耗还是从查询的效能来看都不是很好的做法。而横截面表永远只保存所有股票的近一次交易数据,数据量是稳定的,对于这种定时轮询的场景非常合适。

如果要单独使用横截面表,需要在创建横截面引擎时,把metrics,outputTable这两个参数设置为空。

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", , trades,, `sym, `perRow)
复制代码

相关链接:

流数据教程

时间序列引擎教程

异常检测引擎教程

相关文章