TDengine在WebRTC日志上报中的实践

2022-05-17 00:00:00 数据 时间 日志 上报 座席

小 T 导读: 天润融通是一家云呼叫中心服务商,其中CTI-Cloud为大量头部客户提供高效、稳定的呼叫中心服务。现在,天润通过T-Phone SDK将CTI-Cloud的功能延伸到移动端,为客户提供移动端的呼叫服务。



场景

在天润的T-Phone SDK中,我们需要采集WebRTC信息来进行数据的分析并作出优化的建议,所以需要将SDK中采集到的相关日志进行上报;为了精简日志上报的数据,我们只针对其中的传输数据每隔5秒在双方接通后上传,针对传输中网络的抖动和链接状态可以数据化展示,提供对每一通话的数据分析,以便在后续SDK演进中提供数据支撑;另外我们对每一通电话做了操作日志,记录了接口被调用的操作和时间,为用户在某一通电话的操作记录做还原,分析可能的误操作等,为客户提供更好的交互体验。


因为现在仍处于项目初期,我们更关心用户在某一个时间段内的使用情况,在大量使用的场景中是否仍然能保证较高的通话质量,同时我们应该尽可能做到对每个座席都可以进行分析,做到每一个座席都应该有自己的数据表。


举个例子:如果我们要查询企业7000001的座席9001 2020年2月14日12:00-12:10分的一通通话的WebRTC日志,如果没有按照座席进行分表,SQL语句应该是这样:

select log_time, audio_bytes_sent,...  from aladdin.webrtc_log where device_id = '70000019001' and where log_time between 1581652800000 and 1581653400000;


如果要提升查询的速度,我们首先要对device_id和log_time字段建立索引,但是当数据量比较大的时候,索引的存储也会是问题,所以要考虑分表(我们之前使用的数据库是aws的rds,所以没有分库的概念)


分表的选择有两种,按照时间分表或者按照座席分表。为什么我们要按照座席分表?如果按照时间分表,这样就会出现不同表的数据量差异过大,甚至存在某个表里没有数据的情况,因为很少有人半夜做外呼。但是我们也不能这样武断的不为半夜的时间段建立表,万一人打的是国际长途呢(手动滑稽)?但是一个座席不可能存在不外呼的情况,而且对于移动端的应用,我们在排查问题时更多是通过某个座席向我们反馈发生的问题,我们再针对这个座席进行排查,所以在查询的时候device_id这个字段是必须要体现的,如果按照device_id进行分表,我们在查询的时候就不再需要对这个字段建立索引了。因而选择按照座席进行分表。


如果要使用传统的数据库做分表,我们在插入数据之前一定要先判断这张表是否存在,同时我们还需要提前创建好这些表。这种步骤在我看来就显得很鸡肋。如果能有数据库可以做到在插入数据时指定表名,如果存在则插入,如果不存在则自动创建表,这样就方便多了。


日志上报的整体处理流程

整个流程需要T-Phone SDK,CTI-Cloud的Interface模块(CTI-Cloud对客户开放的接口)和日志上报模块相互协作



设计

考虑到日志上报的频率较高,对IO吞吐的要求比较高。我们可以通过全异步的方式进行数据的采集。这次使用了Vert.x作为全异步项目开发的工具。


在数据存储上基于以下几点考虑我们选择了TDengine:


1. 不管是WebRTC日志还是操作日志,都是按照时间产生的数据流。而TDengine正好是一个专门为物联网结构化数据流设计的时序数据库。


2. WebRTC日志和操作日志存储的数据格式都是一致的,但是如果要做到都每个使用的座席都可以进行分析,好的方式是每个座席都能有一张自己的数据表。TDengine提供了超级表,在超级表中定义数据结构,并按照tag区分,只要在插入数据时指定表名即可做到分表。显然解决了上述的鸡肋问题。按照TDengine官网上的介绍:为充分利用其数据的时序性和其他数据特点,TDengine要求对每个数据采集点单独建表其实我们的座席就相当于是一个独立的数据采集点,TDengine在我们的场景中是很贴合业务的。


3. 时间。时间也是我们在查询中重点关注的部分,在传统的数据库中,我们需要通过对字段建立索引来提升查询速度,可是我们仍然不想建立索引,因为索引仍需要占用存储空间,我们是否可以通过类似分表的方式来取代索引呢?答案是肯定的:TDengine中写入的数据在硬盘上是按时间维度进行分片的。同一个vnode中的表在同一时间范围内的数据都存放在同一文件组中。这一数据分片方式可以大大简化数据在时间维度的查询,提高查询速度。在默认配置下,硬盘上的每个数据文件存放10天数据。用户可根据需要修改系统配置参数daysPerFile进行个性化配置


4. 插入和查询的速度要快,稳定。


在我们的开发服务器上尝试了一下TDengine。和官网上介绍的出入不大,查询和存储速度确实很快,而且也不依赖其他文件系统,所以就使用TDengine作为这个模块的存储引擎。由于TDengine中对列有长度限制,长4096,而且我们上报的字段比较多,所以尽量分配好每个字段的长度。


在数据的采集过程中,TPhone SDK不会直接和我们进行数据交互,而是会先将数据存储到SQS中,我们再从SQS中拉取数据,然后对数据处理后进行存储。


先来创建一个超级表,TDengine提供的超级表在我看来还是很方便的,我们可以直接利用超级表来做到自动的对数据进行分表存储。

create database aladdin;


use aladdin;


create table webrtc_log(
createTime timestamp,
deviceId binary(100),
audioBytesSent bigint,
audioBytesReceived bigint,
...
ssrcSendGoogCurrentDelayMs int,
ssrcSendGoogJitterBufferMs int
) tags (
deviceIdTag binary(100)
);


TDengine提供了非常多的连接方式,为了更好的配合Vertx进行异步存储,我们在这里使用了Rest方式进行数据库操作。


开始

在有了整体思路之后我们开始上手开发:


1. 应用配置:

{
"aws.region": "<your aws region>",
"aws.accessKey": "<your aws ak>",
"aws.secretAccessKey": "<your aws sk>",
"aladdin.maxPool": 100,
"aladdin.maxWaitQueue": 1500,
"aladdin.queue.name": ["queuename1","queuename2"],
"aladdin.cache.expireAfterWrite": 30,
"aladdin.cache.expireAfterAccess": 30,
"tdengine.host": "<your tdengine host>",
"tdengine.port": 6020,
"tdengine.user": "root",
"tdengine.password": "<your tdengine password>"
}


2. 重写Launcher

import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.SLF4JLogDelegateFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* @author qianwj
* @since v1.0
*/
public class AladdinLauncher extends Launcher {


private static Configurer configurer = new Configurer();


private Logger logger = LoggerFactory.getLogger(AladdinLauncher.class);


public static void main(String[] args) {
System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName());
new AladdinLauncher().dispatch(args);
}


@Override
public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
logger.info("Loading config starting...");
JsonObject config = configurer.load();
JsonObject local = deploymentOptions.getConfig();
if (!config.isEmpty()) { // 将consul配置注入到context中
local.mergeIn(config);
deploymentOptions.setConfig(local);
}
super.beforeDeployingVerticle(deploymentOptions);
logger.info("Loading config completed, config: {}", deploymentOptions.getConfig());
}


@Override
public void afterConfigParsed(JsonObject config) {
logger.info("Loading local config complete, local config: {}", config.encodePrettily());
}


@Override
public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
logger.error("Deploy verticle occur exception: {}, App will be closed immediately!", cause.getLocalizedMessage(), cause);
vertx.close();
}
}


其实写完第二步就可以知道这个配置文件存在不是必要的,我们使用了Consul作为配置中心来进行集中配置,这一步主要是为了注入consul的配置以及加载日志。


3. 拉取SQS中的数据

import com.amazonaws.AmazonServiceException;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.service.AwsSQSService;
import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.List;


public class DataCollectVerticle extends AbstractVerticle {


private Logger logger = LoggerFactory.getLogger(DataCollectVerticle.class);


private volatile boolean shutdown = false;


@Override
public void start() throws Exception {
logger.info("DataCollectVerticle starting...");
AwsSQSService sqsService = Configurer.sqsService();
EventBus bus = vertx.eventBus();
vertx.setPeriodic(1000, id -> {
try {
if (shutdown) {
vertx.cancelTimer(id);
}
JsonArray array = config().getJsonArray(Configurer.QUEUE_URL);
List<YunMessage> msgs = sqsService.receiveMessageAndDelete(array.getString());
List<YunMessage> userActionMsgs = sqsService.receiveMessageAndDelete(array.getString(1));
bus.send(Configurer.CHANNEL_ADDRESS, Json.encode(msgs));
} catch (AmazonServiceException e) {
logger.warn("msgs received failed, cause: {}", e.getLocalizedMessage(), e);
}
});
}






@Override
public void stop() throws Exception {
shutdown = true;
logger.info("DataCollectVerticle closing...");
}
}


4. 数据存储到TDengine中

import com.github.benmanes.caffeine.cache.Cache;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.DataOperator;
import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.model.WebRTCLog;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.*;


public class SaveVerticle extends AbstractVerticle {


private Logger logger = LoggerFactory.getLogger(SaveVerticle.class);


@Override
public void start() throws Exception {
logger.info("SaveVerticle starting....");


// 从event bus接收数据
EventBus bus = vertx.eventBus();
bus.consumer(Configurer.CHANNEL_ADDRESS, (Handler<Message<String>>) msg -> {
JsonArray coming = new JsonArray(msg.body());
if (coming != null)
save(coming);
});
}


private void save(JsonArray array) {
WebClient client = Configurer.tdClient();
List<WebRTCLog> data = new ArrayList<>();
Cache<String, WebRTCLog> cache = Configurer.cache();
if (array.size() > ) {
final WebRTCLog empty = new WebRTCLog();
for (int i = ; i < array.size(); i++) {
String message = array.getJsonObject(i).mapTo(YunMessage.class).getBody();
try {
JsonObject json = DataOperator.toJsonObject(message);
WebRTCLog log = json.mapTo(WebRTCLog.class);
String cacheKey = log.getDeviceId();
WebRTCLog org = cache.get(cacheKey, k -> empty);
if (!Objects.equals(org, empty)) { // 如果不是次插入
DataOperator.merge(log, org);
}
cache.put(cacheKey, log);
data.add(log);
} catch (Exception e) {
logger.error("log saved failed, cause: {}", e.getLocalizedMessage(), e);
}
}
client.post("/rest/sql")
.basicAuthentication(config().getString("tdengine.user"), config().getString("tdengine.password"))
.sendBuffer(insert(data), ar -> {
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
if (response != null) {
JsonObject res = response.bodyAsJsonObject();
if (!"succ".equals(res.getString("status"))) {
logger.warn("data insert failed! data: {}, cause: {}", Json.encode(data), res.getString("desc"));
}
}
} else {
logger.error("data insert failed! {}", Json.encode(data), ar.cause());
}
});
}
}


private Buffer insert(WebRTCLog log) throws Exception {
String formatter = "INSERT INTO ALADDIN.WEBRTC_LOG_%s " +
" USING ALADDIN.WEBRTC_LOG TAGS(%s) " +
"VALUES(%s)";
String sql = String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log));
return Buffer.buffer(sql);
}


private Buffer insert(List<WebRTCLog> data) throws IllegalAccessException {
StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ");
String formatter = "ALADDIN.WEBRTC_LOG_%s USING ALADDIN.WEBRTC_LOG TAGS(%s) VALUES(%s) ";
for (WebRTCLog log : data) {
sqlBuilder.append(String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log)));
}
return Buffer.buffer(sqlBuilder.toString());
}


@Override
public void stop() throws Exception {
logger.info("SaveVerticle closing....");
}
}


5. 部署Verticle

import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.verticle.DataCollectVerticle;
import com.tinet.twatch.aladdin.verticle.SaveVerticle;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.client.WebClientOptions;


public class MainVerticle extends AbstractVerticle {


private Logger logger = LoggerFactory.getLogger(MainVerticle.class);


@Override
public void start(Promise<Void> startPromise) throws Exception {
logger.info("MainVerticle starting...");
// 初始化sqs
String region = config().getString("aws.region");
String accessKey = config().getString("aws.accessKey");
String secretKey = config().getString("aws.secretAccessKey");
Configurer.initSQSService(region, accessKey, secretKey, config().getJsonArray(Configurer.QUEUE_URL));


DeploymentOptions dataCollectDeploymentOptions = new DeploymentOptions();
dataCollectDeploymentOptions.setInstances(1);
dataCollectDeploymentOptions.setConfig(config());
dataCollectDeploymentOptions.setWorker(true);
Configurer.initCache(config().getInteger("aladdin.cache.expireAfterWrite"), config().getInteger("aladdin.cache.expireAfterAccess"));
vertx.deployVerticle(DataCollectVerticle.class.getName(), dataCollectDeploymentOptions, ar -> {
if (ar.succeeded()) {
logger.info("DataCollectVerticle started!");
} else {
logger.warn("DataCollectVerticle deploy failed! {}", ar.cause().getLocalizedMessage(), ar.cause());
}
});
// 初始化webclient
WebClientOptions options = new WebClientOptions();
options.setMaxWaitQueueSize(config().getInteger("aladdin.maxWaitQueue"));
options.setMaxPoolSize(config().getInteger("aladdin.maxPool"));
options.setDefaultHost(config().getString("tdengine.host"));
options.setDefaultPort(config().getInteger("tdengine.port"));
Configurer.initTDClient(vertx, options);

DeploymentOptions saveDeploymentOptions = new DeploymentOptions();
saveDeploymentOptions.setInstances(1);
saveDeploymentOptions.setConfig(config());
vertx.deployVerticle(SaveVerticle.class.getName(), saveDeploymentOptions, ar -> {
if (ar.succeeded()) {
logger.info("SaveVerticle started!");
} else {
logger.warn("SaveVerticle deploy failed!");
}
});
}
}


这样就快速实现了一个日志上报的模块,且多个实例部署时相互之间不会产生影响,当然在实际的生产环境中,我们需要考虑的会更多。


当然,日志上报只是开始。在之后的项目开发中,我还会继续向大家介绍TDengine在数据分析中的应用实践,感谢观看。

来源 https://www.modb.pro/db/168921

相关文章