如何正确解锁TDengine中的订阅功能?

2022-05-17 00:00:00 查询 数据 订阅 参数 时间
什么是订阅

订阅,是一种数据查询方式,其特点为:客户端执行一个查询语句后,可以增量形式,不断收到新到达服务端的、符合查询条件的数据。订阅的实现模型有两种,一种是“推”,即服务器主动将数据发到客户端;另一种是“拉”,即客户端主动向服务器请求数据。两种方式各有优缺点,这里不做详细的对比,只是说明一下,TDengine使用的是“拉”模型。


什么时候需要使用订阅?

为了便于用户程序消费TDengine中的数据,TDengine实现了基于SQL的数据查询语法,并提供了丰富的聚合函数,这种方式的优势已在多个实际案例中得到了体现。但由于时序数据的特点,单纯的直接数据查询并不能满足用户程序的需求,比如:我们管理着一批温度测量设备,希望当某个设备检测到的温度超过限制(比如80°C)后能得到通知并进行一些处理时,肯定会先为所有的设备建立一张超级表:

create table devices (ts timestamp, temperature float) tags(id int);


并为每个设备创建一张子表:

create table device1 using devices tags(1);
create table device2 using devices tags(2);
...


这种设计满足了设备管理的需求,但如何满足温度监测的需求呢?如果仅使用普通的查询,有两种方法:一是分别对每张子表进行查询,每次查询后记录后一条数据的时间戳,后续只查询这个时间戳之后的数据:

select temperature from device1 where ts > last_timestamp1 and temperature > 80;
select temperature from device2 where ts > last_timestamp2 and temperature > 80;
...


这确实可行,但随着设备数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响,当设备数增长到一定的程度,系统就无法承受了。


另一种方法是对超级表进行查询。这样,无论有多少设备,都只需一次查询:

select * from devices where ts > last_timestamp and temperature > 80;


但是,如何选择 last_timestamp 就成了一个新的问题。因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大;另一方面,不同设备的数据到达TDengine的时间也会有差异。所以,如果我们在查询中使用慢的那台设备的数据的时间戳作为 last_timestamp,就可能重复读入其它设备的数据;如果使用快的设备的时间戳,其它设备的数据就可能被漏掉。


TDengine的订阅功能为上面这个问题提供了一个彻底的解决方案。


如何使用TDengine中的订阅功能?

TDengine的API中,与订阅相关的主要有以下三个:

  • taos_subscribe

  • taos_consume

  • taos_unsubscribe


这三个API的具体说明请见《C/C++数据订阅接口》,下面结合一个示例,介绍下其使用方法。《C/C++数据订阅接口》及完整的示例代码请点击「阅读原文」获取。


首先是创建订阅:

TAOS_SUB* tsub = NULL;
if (async) {
  // create an asynchronized subscription, the callback function will be called every 1s
  tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
  // create an synchronized subscription, need to call 'taos_consume' manually
  tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, );
}


TDengine中的订阅既可以是同步的,也可以是异步的,上面的代码会根据从命令行获取的参数 async 的值来决定使用哪种方式。这里,同步的意思是用户程序要直接调用 taos_consume 来拉取数据,而异步则由API在内部的另一个线程中调用taos_consume,然后把拉取到的数据交给回调函数 subscribe_callback 去处理。


参数 taos 是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在API的内部线程中被调用,而TDengine的部分API不是线程安全的。


参数 sql 是查询语句,可以在其中使用 where 子句指定过滤条件,比如说,如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以这样写:

select * from devices where ts > now - 1d;


订阅的 topic 实际上是它的名字,因为订阅功能是在客户端API中实现的,所以没必要保证它全局,但需要它在一台客户端机器上。


如果名 topic 的订阅不存在,参数 restart 没有意义,但如果这个订阅已经存在了,restart 就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。举个例子,假如参数 sql 是这样一条查询语句:

select * from devices;


如果 restart 是 true(非零值),用户程序肯定会读到所有数据。但如果这个订阅之前就存在了,并且已经读取了一部分数据,且 restart 是 false(0),用户程序就不会读到之前已经读取的数据了。


taos_subscribe 的后一个参数是以毫秒为单位的轮询周期。在同步模式下,如过前后两次调用 taos_consume 的时间间隔小于此时间,taos_consume 会阻塞,直到间隔超过此时间。异步模式下,这个时间是两次调用回调函数的小时间间隔。


taos_subscribe 的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅API不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。


订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:

if (async) {
  getchar();
} else while(1) {
  TAOS_RES* res = taos_consume(tsub);
  if (res == NULL) {
    printf("failed to consume data.");
    break;
  } else {
    print_result(res, blockFetch);
    getchar();
  }
}


这里是一个 while 循环,用户每按一次回车键就调用一次 taos_consume,而 taos_consume 的返回值是查询到的结果集,与 taos_use_result 完全相同,例子中使用这个结果集的代码是函数 print_result

void print_result(TAOS_RES* res, int blockFetch) {
  TAOS_ROW row = NULL;
  int num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
  int nRows = ;
  if (blockFetch) {
    nRows = taos_fetch_block(res, &row);
    for (int i = ; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);puts(temp);
      nRows++;
    }
  }
  printf("%d rows consumed.\n", nRows);
}


而异步模式下,消费订阅到的数据则显得更为简单:

void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res, *(int*)param);
}


当要结束一次数据订阅时,需要调用 taos_unsubscribe

taos_unsubscribe(tsub, keep);


其第二个参数,用于决定是否在客户端保留订阅的进度信息,如果大家还记得前面说过“订阅功能是在客户端API中实现的”的话,应该可以猜到,如果这个参数是 false(0),那无论下次调用 taos_subscribe 时的 restart 参数是什么,订阅都只能重新开始了。另外,进度信息的保存位置是 {DataDir}/subscribe/,这个目录下,每个订阅有一个与其 topic 同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。


用作消息队列

本文开头的例子,是用订阅实现了一个报警监控的功能,但其实订阅也可以用在其它场景中,比如:消息队列。


应用程序可以订阅数据库某些表的内容,同一个表也可以被多个应用订阅,一旦表有新的记录,应用将立即得到通知。这样,再把数据插入看做Publish操作,用户完全可以把TDengine作为一个消息队列中间件来使用。


所以,当下次面对需要使用Kafka的场景时,不妨先考虑下TDengine,因为TDengine除了安装包超小、运维超简单的优点外,还有一个Kafka不具备的功能——数据过滤:可以在查询语句中指定过滤条件,保证读到的数据都是有用的,不用再在代码中手写过滤逻辑了。


与InfluxDB的对比

概念上说,InfluxDB的订阅和TDengine的订阅区别很大,我们可以认为订阅在InfluxDB中更像一种数据同步机制,而TDengine中的订阅则是一种数据查询机制:

  • InfluxDB将收到的数据实时推送给其它节点,TDengine通过轮询的方式拉取数据,InfluxDB具有更好的实时性。

  • InfluxDB中只能订阅全部数据,TDengine中可以指定数据过滤条件。

  • InfluxDB中只能订阅当前时间之后的数据,TDengine中可以在订阅中读到历史数据。


所以,两相对比,InfluxDB的优势是实时性,而TDengine则以稍微牺牲实时性为代价提供了更强大的功能。


限制条件

下面是一些TDengine订阅功能的局限,大家需要在使用中注意。

  • 订阅的查询语句只能是 select 语句,只能查询原始数据(不支持聚合函数),只能按时间正序查询数据。

  • 在满足应用需求的情况下,请尽量将轮询周期设置的大一些,否则会对系统性能造成影响。

  • 暂不支持乱序数据,用户程序可能读不到使用 import 方式插入的数据。

  • 如果用户程序异常退出或没有正确调用 taos_unsubscribe,进度信息可能会有错误,这时,后续的同名订阅可能读到之前已经读过的数据。

来源 https://www.modb.pro/db/168923

相关文章