DolphinDB流计算引擎实现传感器数据异常检测

2022-03-28 00:00:00 数据 异常 引擎 检测 传感器

DolphinDB提供了流数据表(stream table)和流计算引擎用于实时数据处理,包括物联网中传感器数据的异常检测。内置的异常检测引擎(Anomaly Detection Engine)能满足大部分异常检测场景的需求。如果异常检测逻辑复杂且较为特殊,标准化的异常检测引擎不能满足要求,用户可以用自定义消息处理函数来实现。


1. 应用需求

一个监控系统,一秒钟采集一次数据。现有以下2个异常检测需求:

  • 每3分钟内,若传感器温度出现2次40摄氏度以上并且3次30摄氏度以上,系统报警。
  • 若传感器网络断开,5分钟内无数据,系统报警。

上述的报警是指若侦测到异常,向一个流数据表中写一条记录。


2. 设计思路

分布式时序数据库DolphinDB的流计算框架目前已支持时序聚合引擎、横截面聚合引擎、异常检测引擎和自定义流计算引擎:

  • 时序聚合引擎(Time-Series Aggregator):能对设备状态进行纵向聚合计算(按时间序列聚合),或者将多个设备状态横向聚合后再按时间聚合。时序聚合支持滑动窗口的流式计算。DolphinDB对内置的窗口聚合函数均进行了性能优化,单核CPU每秒可完成近百万状态的时序聚合。
  • 横截面聚合引擎(Cross Sectional Aggregator):是快照引擎的扩展,能对设备状态进行横向聚合计算,比如计算一批设备的温度均值。
  • 异常检测引擎(Anomaly Detection Engine):能实时检测数据是否符合用户自定义的警报指标,如发现异常数据,将它们输出到表中,满足物联网实时监控和预警的需求。
  • 自定义流计算引擎:当以上三种引擎都不能满足需求时,用户也可以使用DolphinDB脚本或API语言自定义消息处理函数。

对于个需求即3分钟内传感器温度出现异常系统即报警,异常检测引擎恰好适用。只需要简单的用DolphinDB脚本写一个表达式描述一下异常逻辑即可。但第2个需求不适用。异常检测引擎是按设备分组进行处理的。每次有新数据流入才触发计算,或每隔一段时间,在固定长度的移动窗口中才进行聚合计算。一个传感器若没有产生新数据,无法触发计算。解决办法是自定义一个消息处理函数(message handler)去计算和检测。具体实现思路是:用一个键值内存表记录每个传感器的新采集时间。消息以一定时间间隔(比如1秒)进入消息处理函数。消息处理函数首先更新键值内存表,然后检查这个表中每个设备记录的新采集时间是否超过5分钟,若有即报警。


3.详细实现步骤


3.1 定义输入输出流数据表

首先定义一个流数据表用于接收实时采集的传感器数据,并用enableTableShareAndPersistence函数把流数据表共享和持久化到硬盘上。cacheSize参数限制内存中保留的大数据量是100万行。虽然传感器设备有很多指标,因为本例只涉及温度指标,所以本例对表结构进行了简化,表结构仅包含三列,即传感器编号deviceID,时间ts和温度temperature。代码如下:

st=streamTable(1000000:0,`deviceID`ts`temperature,[INT,DATETIME,FLOAT])
enableTableShareAndPersistence(table=st,tableName=`sensor,asynWrite=false,compress=true, cacheSize=1000000)

相关文章