使用 kairosdb 存储自定义的数据类型

2022-04-24 00:00:00 数据 专区 类型 注册 离线

公司业务需要存储GPS坐标,如果使用现有的存储格式显然不合适。所以自定义了格式,使用kairosdb插件。

1、定义存储类型,继承 DataPointHelper 类

package com.enerbos.cloud.kairosdb.plugin;

import org.json.JSONException;
import org.json.JSONWriter;
import org.kairosdb.core.datapoints.DataPointHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutput;
import java.io.IOException;

/**
*
* @author saperliu
* @version 1.0.0
* @date 2018-12-17 9:43
* @Description GPS定位数据
*/

public class StoreEnerbosDataPoint extends DataPointHelper {

private static final Logger logger = LoggerFactory.getLogger(StoreEnerbosDataPoint.class);
private static final String API_TYPE = "enerbos";

/**
* 经度
*/

private double longitude;

/**
* 纬度
*/

private double latitude;

/**
* 速度 单位:km/h
*/

private double speed;

/**
* 方向 范围为[0,359],0度为正北方向,顺时针
*/

private int direction;

/**
* 定位精度 单位:米
*/

private double accuracy;

/**
* 高度, 单位:米
*/

private double height;

/**
* 修复后状态 //0 正常,-1 离线, 1 通讯异常,2 数据越界
*/

private int status;


/**
* 修复值
*/
private double value;

/**
* 原始值
*/
private double rvalue;

/**
* 原始状态 //0 正常,-1 离线, 1 通讯异常,2 数据越界
*/
private int rstatus;

public StoreEnerbosDataPoint(long timestamp, double longitude, double latitude, double speed, int direction, double accuracy, double height, int status, double value, double rvalue, int rstatus) {
super(timestamp);
this.longitude = longitude;
this.latitude = latitude;
this.speed = speed;
this.direction = direction;
this.accuracy = accuracy;
this.height = height;
this.status = status;
this.value = value;
this.rvalue = rvalue;
this.rstatus = rstatus;
}

@Override
public void writeValueToBuffer(DataOutput dataOutput) throws IOException {
dataOutput.writeDouble(longitude);
dataOutput.writeDouble(latitude);
dataOutput.writeDouble(height);
dataOutput.writeDouble(speed);
dataOutput.writeDouble(accuracy);
dataOutput.writeInt(direction);
dataOutput.writeInt(status);
dataOutput.writeDouble(value);
dataOutput.writeDouble(rvalue);
dataOutput.writeInt(rstatus);
logger.info("--dataOutput--value :{} rvalue :{}---------{}------", value, rvalue);
}

@Override
public void writeValueToJson(JSONWriter jsonWriter) throws JSONException {
jsonWriter.object();
jsonWriter.key("longitude").value(longitude);
jsonWriter.key("latitude").value(latitude);
jsonWriter.key("height").value(height);
jsonWriter.key("speed").value(speed);
jsonWriter.key("accuracy").value(accuracy);
jsonWriter.key("direction").value(direction);
jsonWriter.key("status").value(status);
jsonWriter.key("value").value(value);
jsonWriter.key("rvalue").value(rvalue);
jsonWriter.key("rstatus").value(rstatus);
jsonWriter.endObject();
logger.info("-----writeValueToJson--valuer :{} rvalue :{}---------{}------", value, rvalue);
}

@Override
public String getApiDataType() {
return API_TYPE;
}

@Override
public String getDataStoreDataType() {
return EnerbosDataPointFactory.DST_COMPLEX;
}

@Override
public boolean isLong() {
return false;
}

@Override
public long getLongValue() {
return 0;
}

@Override
public boolean isDouble() {
return false;
}

@Override
public double getDoubleValue() {
return 0;
}

public double getLongitude() {
return longitude;
}

public void setLongitude(double longitude) {
this.longitude = longitude;
}

public double getLatitude() {
return latitude;
}

public void setLatitude(double latitude) {
this.latitude = latitude;
}

public double getSpeed() {
return speed;
}

public void setSpeed(double speed) {
this.speed = speed;
}

public int getDirection() {
return direction;
}

public void setDirection(int direction) {
this.direction = direction;
}

public double getAccuracy() {
return accuracy;
}

public void setAccuracy(double accuracy) {
this.accuracy = accuracy;
}

public double getHeight() {
return height;
}

public void setHeight(double height) {
this.height = height;
}

public int getStatus() {
return status;
}

public void setStatus(int status) {
this.status = status;
}

public double getValue() {
return value;
}

public void setValue(double value) {
this.value = value;
}

public double getRvalue() {
return rvalue;
}

public void setRvalue(double rvalue) {
this.rvalue = rvalue;
}

public int getRstatus() {
return rstatus;
}

public void setRstatus(int rstatus) {
this.rstatus = rstatus;
}

@Override
public String toString() {
return "StoreEnerbosDataPoint{" +
"longitude=" + longitude +
", latitude=" + latitude +
", speed=" + speed +
", direction=" + direction +
", accuracy=" + accuracy +
", height=" + height +
", status=" + status +
", value=" + value +
", rvalue=" + rvalue +
", rstatus=" + rstatus +
'}';
}
}
2、定义Factory的实现

package com.enerbos.cloud.kairosdb.plugin;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.kairosdb.core.DataPoint;
import org.kairosdb.core.datapoints.DataPointFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInput;
import java.io.IOException;

/**
*
* @author saperliu
* @version 1.0.0
* @date 2018-12-17 13:44
* @Description 给kairosdb注册新类型
*/
public class EnerbosDataPointFactory implements DataPointFactory {

private static final Logger logger = LoggerFactory.getLogger(EnerbosDataPointFactory.class);
public static final String DST_COMPLEX = "kairos_enerbos";
public static final String GROUP_TYPE = "enerbos";

@Override
public String getDataStoreType() {
return DST_COMPLEX;
}

@Override
public String getGroupType() {
return GROUP_TYPE;
}

@Override
public DataPoint getDataPoint(long timestamp, JsonElement jsonElement) throws IOException {
//从前台传入的数据
if (jsonElement.isJsonObject()) {
JsonObject object = jsonElement.getAsJsonObject();
double longitude = -9999;
double latitude = -9999;
double height = -9999;
double speed = -9999;
double accuracy = -9999;
int direction = -9999;
try {
longitude = object.get("longitude").getAsDouble();
latitude = object.get("latitude").getAsDouble();
} catch (Exception e) {
logger.error("--latitude-getDataPoint-", e);
}
try {
height = object.get("height").getAsDouble();
} catch (Exception e) {
logger.error("--height-getDataPoint-", e);
}
try {
speed = object.get("speed").getAsDouble();
} catch (Exception e) {
logger.error("--speed-getDataPoint-", e);
}
try {
accuracy = object.get("accuracy").getAsDouble();
} catch (Exception e) {
logger.error("--accuracy-getDataPoint-", e);
}
try {
direction = object.get("direction").getAsInt();
} catch (Exception e) {
logger.error("--direction-getDataPoint-", e);
}
int status = object.get("status").getAsInt();
double value = -9999;
double rvalue = -9999;
int rstatus = status;
try {
value = object.get("value").getAsDouble();
rvalue = object.get("rvalue").getAsDouble();
rstatus = object.get("rstatus").getAsInt();
} catch (Exception e) {
logger.error("--value-getDataPoint-", e);
}
logger.info("-----jsonElement--value: {} rvalue :{}------rstatus :{}---------", value, rvalue, rstatus);
return new StoreEnerbosDataPoint(timestamp, longitude, latitude, speed, direction, accuracy, height, status, value, rvalue, rstatus);
} else {
throw new IOException("JSON object is not a valid enerbos data point");
}
}

@Override
public DataPoint getDataPoint(long timestamp, DataInput dataInput) throws IOException {
//从数据库读取数据
double longitude = dataInput.readDouble();
double latitude = dataInput.readDouble();
double height = dataInput.readDouble();
double speed = dataInput.readDouble();
double accuracy = dataInput.readDouble();
int direction = dataInput.readInt();
int status = dataInput.readInt();

double value = -9999;
double rvalue = -9999;
int rstatus = -9999;
try {
value = dataInput.readDouble();
} catch (Exception eof) {
logger.error("----value---", eof);
}

try {
rvalue = dataInput.readDouble();
} catch (Exception eof) {
logger.error("--rvalue-----", eof);
}
try {
rstatus = dataInput.readInt();
} catch (Exception e) {
logger.error("----rstatus---", e);
}
logger.info("---dataInput----value :{} rvalue :{}---------------", value, rvalue);
return new StoreEnerbosDataPoint(timestamp, longitude, latitude, speed, direction, accuracy, height, status, value, rvalue, rstatus);
}
}
3、绑定Module

package com.enerbos.cloud.kairosdb.plugin;

import com.google.inject.AbstractModule;
import com.google.inject.Singleton;

/**
*
* @author saperliu
* @version 1.0.0
* @date 2018-12-17 13:58
* @Description 注册到kairosdb 的 核心里
*/
public class EnerbosFactoryRegisterModule extends AbstractModule {

@Override
protected void configure() {
bind(EnerbosDataPointFactory.class).in(Singleton.class);
}
}
4、在配置文件中注册我们的新类型,我们在安装kairosdb的位置找到配置文件kairosdb.properties,添加内容如下:

kairosdb.service.enerbos=com.enerbos.cloud.kairosdb.plugin.EnerbosFactoryRegisterModule
kairosdb.datapoints.factory.enerbos=com.enerbos.cloud.kairosdb.plugin.EnerbosDataPointFactory
5、将3个类打包成jar文件,放到kairosdb安装的位置的lib的文件夹中。

6、重启kairosdb。

7、如果是使用java client包,需要 注册这个类型,查询的时候也需要注册。

client.registerCustomDataType("enerbos", StoreHisGPSVo.class); //enerbos与前边类注册的要一致,此处的StoreHisGPSVo类只要与插件里的类属性相同即可。
以上就完成了数据类型的自定义。

相关文章