DolphinDB异常检测引擎教程
1. 概述
物联网设备(如机床、锅炉、电梯、水表、气表等等)无时无刻不在产生海量的设备状态数据和业务消息数据,这些数据的在采集、计算、分析过程中又常常涉及异常数据的检测。
DolphinDB作为一个高性能的分布式时序数据库 (time series database),内置了一个流数据框架,既能实时处理分析这些物联网数据,也能对历史数据进行计算分析,帮助用户利用、发挥这些数据的价值。DolphinDB内置的流数据框架支持流数据的发布、订阅、预处理、实时内存计算、复杂指标的滚动窗口计算等,是一个运行高效,使用便捷的流数据处理框架。具体介绍详见DolphinDB流数据教程。
针对异常数据检测的需求,DolphinDB提供基于流数据框架的异常检测引擎函数,用户只需指定异常指标,异常检测引擎就可以实时地进行异常数据检测。
2. 异常检测引擎框架
DolphinDB的异常检测引擎建立在流数据的发布-订阅模型之上。下例中,通过createAnomalyDetectionEngine函数创建异常检测引擎,并通过subscribeTable函数订阅流数据,每次有新数据流入就会按指定规则触发append!{engine},将流数据持续输入异常检测引擎中。异常检测引擎实时检测数据是否符合用户自定义的警报指标temp>65,如发现异常数据,将它们输出到表outputTable中。
share streamTable(1000:0, `time`device`temp, [TIMESTAMP, SYMBOL, DOUBLE]) as sensor
share streamTable(1000:0, `time`device`anomalyType`anomalyString, [TIMESTAMP, SYMBOL, INT, SYMBOL]) as outputTable
engine = createAnomalyDetectionEngine("engine1", <[temp > 65]>, sensor, outputTable, `time, `device, 10, 1)
subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
复制代码
这里对异常处理引擎涉及到的一些概念做简要介绍:
- 流数据表:DolphinDB为流式数据提供的一种特定的表对象,提供流式数据的发布功能。通过subscribeTable函数,其他的节点或应用可以订阅和消费流数据。
- 异常处理引擎数据源:为异常处理引擎提供"原料"的通道。createAnomalyDetectionEngine函数返回一个抽象表,向这个抽象表写入数据,就意味着数据进入异常处理引擎进行计算。
- 异常指标:以元代码的格式提供一组处理流数据的布尔表达式。其中可以包含聚合函数,以支持复杂的场景。
- 数据窗口:每次计算时截取的流数据窗口长度。数据窗口仅在指标中包含聚合函数时有意义。
- 输出表:异常检测引擎的输出表列必须是时间类型,用于存放检测到异常的时间戳,如果有指定分组列,那么第二列为分组列,之后的两列分别为int类型和string或symbol类型,用于记录异常的类型(异常指标的表达式在metrics中的下标)和异常的内容。
3. 异常指标
异常检测引擎中的指标均要求返回布尔值。一般是一个函数或一个表达式。当指标中包含聚合函数,必须指定窗口长度和计算的时间间隔,异常检测引擎每隔一段时间,在固定长度的移动窗口中计算指标。异常指标一般有以下三种类型:
- 只包含列名或非聚合函数,例如qty > 10, lt(qty, prev(qty))。对于这类指标,异常检测引擎会对每一条收到的数据进行计算,判断是否符合指标并决定是否输出。
- 所有出现的列名都在聚合函数的参数中,例如avg(qty - price) > 10, percentile(qty, 90) < 100, sum(qty) > prev(sum(qty))。对于这类指标,异常检测引擎只会在窗口发生移动时对数据进行聚合计算,和时间序列聚合引擎(Time Series Aggregator)类似。
- 出现的列名中,既有作为聚合函数的参数,也有不是聚合函数参数,例如avg(qty) > qty, le(med(qty), price)。对于这类指标,异常检测引擎会在在窗口发生移动时对聚合列进行聚合计算,并在有数据到达时对每一条数据进行计算,其中聚合函数的返回值使用近一个窗口的计算值。
4. 数据窗口
当异常指标中包含聚合函数时,用户必须指定数据窗口。流数据聚合计算是每隔一段时间,在固定长度的移动窗口中进行。窗口长度由参数windowSize设定;计算的时间间隔由参数step设定。
在有多组数据的情况下,若每组都根据各自条数据进入系统的时间来构造数据窗口的边界,则一般无法将各组的计算结果在相同数据窗口中进行对比。考虑到这一点,系统按照参数step值确定一个整型的规整尺度alignmentSize,以对各组个数据窗口的边界值进行规整处理。
(1)当数据时间类型为MONTH时,会以条数据对应年份的1月作为窗口的上边界。
(2)当数据的时间类型为DATE时,不对个数据窗口的边界值进行规整。
(2)当数据时间精度为秒或分钟时,如MINUTE, DATETIME或SECOND类型,alignmentSize取值规则如下表:
step alignmentSize
0~2 2
3~5 5
6~10 10
11~15 15
16~20 20
21~30 30
31~60 60
复制代码
(2)当数据时间精度为毫秒时,如TIMESTAMP或TIME类型,alignmentSize取值规则如下表:
step alignmentSize
0~2 2
3~5 5
6~10 10
11~20 20
21~25 25
26~50 50
51~100 100
101~200 200
201~250 250
251~500 500
501~1000 1000
复制代码
假设条数据时间的小精度值为x,那么个数据窗口的左边界小精度经过规整后为x/alignmentSize\*alignmentSize,其中/代表相除后取整。举例来说,若条数据时间为 2018.10.08T01:01:01.365,则x=365。若step=100,根据上表,alignmentSize=100,可得出规整后的个数据窗口左边界小精度为365\100*100=300,因此规整后的个数据窗口范围为2018.10.08T01:01:01.300至 2018.10.08T01:01:01.400。
5. 应用示例
5.1 应用场景
现模拟传感器设备采集温度。假设窗口长度为4ms,每隔2ms移动一次窗口,每隔1ms采集一次温度,规定以下异常指标:
- 单次采集的温度超过65;
- 单次采集的温度超过上一个窗口中75%的值;
- 窗口内平均温度和上一个窗口的平均温度相对误差大于1%。
5.2 系统设计
采集的数据存放到流数据表中,异常检测引擎通过订阅流数据表来获取实时数据,并进行异常检测,符合异常指标的数据输出到另外一个表中。
5.3 实现步骤
(1) 定义流数据表sensor来存放采集的数据:
share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor
复制代码
(2) 定义异常检测引擎和输出表outputTable,输出表也是流数据表:
share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTable
engine = createAnomalyDetectionEngine("engine1", <[temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01]>, sensor, outputTable, `time, , 6, 3)
复制代码
(3) 异常检测引擎engine订阅流数据表sensor:
subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
复制代码
(4) 向流数据表sensor中写入10次数据模拟采集温度:
timev = 2018.10.08T01:01:01.001 + 1..10
tempv = 59 66 57 60 63 51 53 52 56 55
insert into sensor values(timev, tempv)
复制代码
查看流数据表sensor的内容:
time temp
2018.10.08T01:01:01.002 59
2018.10.08T01:01:01.003 66
2018.10.08T01:01:01.004 57
2018.10.08T01:01:01.005 60
2018.10.08T01:01:01.006 63
2018.10.08T01:01:01.007 51
2018.10.08T01:01:01.008 53
2018.10.08T01:01:01.009 52
2018.10.08T01:01:01.010 56
2018.10.08T01:01:01.011 55
复制代码
再查看结果表outputTable:
time anomalyType anomalyString
2018.10.08T01:01:01.003 0 temp > 65
2018.10.08T01:01:01.003 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.005 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.006 2 abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
2018.10.08T01:01:01.006 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.009 2 abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
复制代码
下面详细解释异常检测引擎的计算过程。为方便阅读,对时间的描述中省略相同的2018.10.08T01:01:01部分,只列出毫秒部分。
(1)指标temp > 65只包含不作为函数参数的列temp,因此会在每条数据到达时计算。模拟数据中只有003时的温度满足检测异常的指标。
(2)指标temp > percentile(temp, 75)中,temp列既作为聚合函数percentile的参数,又单独出现,因此会在每条数据到达时,将其中的temp与上一个窗口计算得到的percentile(temp, 75)比较。个窗口基于行数据的时间002进行对齐,对齐后窗口起始边界为000,个窗口是从000到002,只包含002一条记录,计算percentile(temp, 75)的结果是59,数据003到005与这个值比较,满足条件的有003和005。第二个窗口是从002到005,计算percentile(temp, 75)的结果是60,数据006到008与这个值比较,满足条件的有006。第三个窗口是从003到008,计算percentile(temp, 75)的结果是63,数据009到011与这个值比较,其中没有满足条件的行。后一条数据011到达后,尚未触发新的窗口计算。
(3)指标abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01中,temp只作为聚合函数avg的参数出现,因此只会在每次窗口计算时检查。类似上一个指标的分析,前三个窗口计算得到的avg(temp)分别为59, 60.5, 58.33,满足abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01的时间为第二个窗口和第三个窗口的计算时间006和009。
5.4 监控异常检测引擎的状态
getAggregatorStat().AnomalDetectionAggregator
name user status lastErrMsg numGroups numRows numMetrics metrics
------- ----- ------ ---------- --------- ------- ---------- --------------------
engine1 guest OK 0 10 3 temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
复制代码
5.5 删除异常检测引擎
removeAggregator("engine1")
复制代码
6. createAnomalyEngine函数介绍
语法
createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize])
复制代码
返回对象
createAnomalyDetectionEngine函数的作用是返回一个表对象,向该表写入数据意味着这些数据进入异常检测引擎进行计算。
参数
- name: 一个字符串,表示异常检测引擎的名称,是异常检测引擎的标识。它可以包含字母,数字和下划线,但必须以字母开头。
- metrics: 元代码。它的返回值必须是bool类型。它可以是函数或表达式,如<[qty > 5, eq(qty, price)]>。可以在其中使用系统内置或用户自定义的聚合函数(使用defg关键字定义),如<[sum(qty) > 5, lt(avg(price), price)]>。详情可参考元编程。
- dummyTable: 表对象,它可以不包含数据,但它的结构必须与订阅的流数据表结构相同。
- outputTable: 表对象,用于保存计算结果。它的列必须是时间类型,用于存放检测到异常的时间戳,并且该列的数据类型要与dummyTable的时间列一致。如果keyColumn参数不为空,那么outputTable的第二列为keyColumn。之后的两列分别为int类型和string/symbol类型,用于记录异常的类型(在metrics中的下标)和异常的内容
- timeColumn: 字符串标量,表示输入流数据表的时间列名称。
- keyColumn: 字符串标量,表示分组列。异常检测引擎会按照keyColumn对输入数据分组,并在每组中进行聚合计算。它是可选参数。
- windowSize: 正整数。当metrics中包含聚合函数时,windowSize必须指定,表示用于聚合计算的数据窗口的长度。如果metrics中没有聚合函数,这个参数不起作用。
- step: 正整数。当metrics中包含聚合函数时,step必须指定,表示计算的时间间隔。windowSize必须是step的整数倍,否则会抛出异常。如果metrics中没有聚合函数,这个参数不起作用。
- garbageSize: 正整数。它是可选参数,默认值是50,000。如果没有指定keyColumn,当内存中历史数据的数量超过garbageSize时,系统会清理本次计算不需要的历史数据。如果指定了keyColumn,意味着需要分组计算时,内存清理是各分组独立进行的。当一个组的历史数据记录数超出garbageSize时,会清理该组不再需要的历史数据。若一个组的历史数据记录数未超出garbageSize,则该组数据不会被清理。如果metrics中没有聚合函数,这个参数不起作用。
7. 总结
DolphinDB提供的异常检测引擎是一个轻量、使用方便的流数据引擎,它通过与流数据表合作来完成流数据的实时检测任务,能够满足物联网实时监控和预警的需求。
相关文章