手把手教你使用 Timestream 实现物联网时序数据存储和分析!

2022-04-24 00:00:00 数据 分析 时序 联网 实时

Amazon Timestream 是一种快速、可扩展的无服务器时间序列数据库服务,适用于物联网和运营应用程序,使用该服务每天可以轻松存储和分析数万亿个事件,速度提高了 1000 倍,而成本仅为关系数据库的十分之一。通过将近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层,Amazon Timestream 为客户节省了管理时间序列数据生命周期的时间和成本。Amazon Timestream 专门构建的查询引擎可用于访问和分析近期数据和历史数据,而无需在查询中显示指定数据是保存在内存中还是成本优化层中。Amazon Timestream 内置了时间序列分析函数,可以实现近乎实时地识别数据的趋势和模式。Amazon Timestream 是无服务器服务,可自动缩放以调整容量和性能,因此无需管理底层基础设施,可以专注于构建应用程序。

本文介绍通过Timestream、Kinesis Stream托管服务和Grafana 和Flink Connector开源软件实现物联网(以PM 2.5场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,能从中获得启发,助力业务发展。

架构

Amazon Timestream 能够使用内置的分析函数(如平滑、近似和插值)快速分析物联网应用程序生成的时间序列数据。例如,智能家居设备制造商可以使用 Amazon Timestream 从设备传感器收集运动或温度数据,进行插值以识别没有运动的时间范围,并提醒消费者采取措施(例如减少热量)以节约能源。

本文物联网(以PM 2.5场景为示例),实现PM2.5数据实时采集、时序数据存储和实时分析, 其中架构主要分成三大部分:

实时时序数据采集:通过Python数据采集程序结合Kinesis Stream和Kinesis Data Analytics for Apache Flink connector 模拟实现从PM 2.5监控设备, 将数据实时采集数据到Timestream。

时序数据存储:通过Amazon Timestream时序数据库实现时序数据存储,设定内存和磁性存储(成本优化层)存储时长,可以实现近期数据保留在内存中,并根据用户定义的策略将历史数据移至成本优化的存储层。

实时时序数据分析:通过Grafana (安装Timesteam For Grafana插件)实时访问Timestream数据,通过Grafana丰富的分析图表形式,结合Amazon Timestream 内置的时间序列分析函数,可以实现近乎实时地识别物联网数据的趋势和模式。

具体的架构图如下:

部署环境


1.1 创建Cloudformation

请使用自己帐号 (region请选择 us-east-1)

下载Cloudformation Yaml文件:

https://big; export TZ
Install python3

sudo yum install -y python3
Install python3 pip

sudo yum install -y python3-pip
pip3 install boto3

sudo pip3 install boto3
pip3 install numpy

sudo pip3 install numpy
install git

sudo yum install -y git
1.4 下载Github Timesteram Sample 程序库

git clone https://github.com/awslabs/amazon-timestream-tools amazon-timestream-tools
1.5 安装Grafana Server

连接到EC2堡垒机:

sudo vi /etc/yum.repos.d/grafana.repo
For OSS releases:(拷贝以下内容到grafana.repo)

[grafana]
name=grafana
baseurl=https://packages.grafana.com/oss/rpm
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=https://packages.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
安装grafana server:

sudo yum install -y grafana
启动grafana server:

sudo service grafana-server start
sudo service grafana-server status
配置grafana server在操作系统启动时 自动启动:

sudo /sbin/chkconfig --add grafana-server
1.6 安装timestream Plugin

sudo grafana-cli plugins install grafana-timestream-datasource
重启grafana

sudo service grafana-server restart
1.7 配置Grafana 要访问Timesteam服务所用的IAM Role

获取IAM Role Name



选择IAM服务, 选择要修改的role, role name:

timestream-iot-grafanaEC2rolelabview-us-east-1

修改role trust relationship:



将Policy document 全部选中, 替换成以下内容:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid":"",
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
{
"Sid":"",
"Effect": "Allow",
"Principal": {
"AWS": "[请替换成CloudFormation output中的role arn]"
},
"Action": "sts:AssumeRole"
}
]
}
修改后的trust relationship:



1.8 登录到Grafana server

次登录到Grafana Server:

1. 打开浏览器,访问 http://[Grafana server public ip]:3000

2. 缺省的Grafana Server 监听端口是:3000

如何获取Ec2 Public IP地址, 如下图所示, 访问Cloudformation output:



3. 在登陆界面, 输入 username: admin; password:admin.(输入用户名和密码都是admin)

4. 点击Log In.登陆成功后, 会收到提示修改密码

1.9 Grafana server中增加 Timestream 数据源

增加 Timestream 数据源



1.10 Grafana server中配置Timestream数据源

拷贝配置所需要role ARN信息 (从cloudformation output tab)Default Region: us-east-1







IoT数据存储



2.1 创建 Timestream 数据库iot





2.2 创建 Timestream 表 pm25





IoT数据导入



3.1安装Flink connector to Timestream

安装java8

sudo yum install -y java-1.8.0-openjdk*
java -version
安装debug info, otherwise jmap will throw exception

sudo yum --enablerepo='*-debug*' install -y java-1.8.0-openjdk-debuginfo
Install maven

sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
sudo yum install -y apache-maven
mvn --version
change java version from 1.7 to 1.8

sudo update-alternatives --config java
sudo update-alternatives --config javac
安装Apache Flink

新的Apache Flink 版本支持Kinesis Data Analytics是1.8.2.

1. Create flink folder

cd
mkdir flink
cd flink
2. 下载Apache Flink version 1.8.2 源代码:

wget https://archive.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz
3. 解压 Apache Flink 源代码:

tar -xvf flink-1.8.2-src.tgz
4. 进入到Apache Flink 源代码目录:

cd flink-1.8.2
5. Compile and install Apache Flink (这个编译时间比较长 需要大致20分钟):

mvn clean install -Pinclude-kinesis -DskipTests
3.2创建Kinesis Data Stream Timestreampm25Stream

aws kinesis create-stream --stream-name Timestreampm25Stream --shard-count 1
3.3 运行Flink Connector建立Kinesis连接到Timestream:

cd
cd amazon-timestream-tools/integrations/flink_connector
mvn clean compile
数据采集过程中 请持续运行以下命令:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesisanalytics.StreamingJob" -Dexec.args="--InputStreamName
Timestreampm25Stream --Region us-east-1 --TimestreamDbName iot --TimestreamTableName pm25"
3.4 准备PM2.5演示数据:

连接到EC2堡垒机

1. 下载5演示数据生成程序:

cd
mkdir pm25
cd pm25
wget https://big
and city='Beijing'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)
选择图形显示 select Gauge



Save Panel as Beijing PM2.5 analysis

Edit Panel Title:Beijing PM2.5 analysis



Save Dashboard PM2.5 analysis 1:



4.2.2 查询上海一天内各个监控站点PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'songjiang' THEN avg_pm25 ELSE NULL END AS songjiang,
CASE WHEN location = 'fengxian' THEN avg_pm25 ELSE NULL END AS fengxian,
CASE WHEN location = 'no 15 factory' THEN avg_pm25 ELSE NULL END AS No15_factory,
CASE WHEN location = 'xujing' THEN avg_pm25 ELSE NULL END AS xujing,
CASE WHEN location = 'pujiang' THEN avg_pm25 ELSE NULL END AS pujiang,
CASE WHEN location = 'putuo' THEN avg_pm25 ELSE NULL END AS putuo,
CASE WHEN location = 'shangshida' THEN avg_pm25 ELSE NULL END AS shangshida,
CASE WHEN location = 'jingan' THEN avg_pm25 ELSE NULL END AS jingan,
CASE WHEN location = 'xianxia' THEN avg_pm25 ELSE NULL END AS xianxia,
CASE WHEN location = 'hongkou' THEN avg_pm25 ELSE NULL END AS hongkou,
CASE WHEN location = 'jiading' THEN avg_pm25 ELSE NULL END AS jiading,
CASE WHEN location = 'zhangjiang' THEN avg_pm25 ELSE NULL END AS zhangjiang,
CASE WHEN location = 'miaohang' THEN avg_pm25 ELSE NULL END AS miaohang,
CASE WHEN location = 'yangpu' THEN avg_pm25 ELSE NULL END AS yangpu,
CASE WHEN location = 'huinan' THEN avg_pm25 ELSE NULL END AS huinan,
CASE WHEN location = 'chongming' THEN avg_pm25 ELSE NULL END AS chongming
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25"
where measure_name='pm2.5'
and city='Shanghai'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)


Save Panel as Shanghai PM2.5 analysis

Edit Panel Title:Shanghai PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.3查询广州各个监控站点PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'panyu' THEN avg_pm25 ELSE NULL END AS panyu,
CASE WHEN location = 'commercial school' THEN avg_pm25 ELSE NULL END AS commercial_school,
CASE WHEN location = 'No 5 middle school' THEN avg_pm25 ELSE NULL END AS No_5_middle_school,
CASE WHEN location = 'guangzhou monitor station' THEN avg_pm25 ELSE NULL END AS Guangzhou_monitor_station,
CASE WHEN location = 'nansha street' THEN avg_pm25 ELSE NULL END AS Nansha_street,
CASE WHEN location = 'No 86 middle school' THEN avg_pm25 ELSE NULL END AS No_86_middle_school,
CASE WHEN location = 'luhu' THEN avg_pm25 ELSE NULL END AS luhu,
CASE WHEN location = 'nansha' THEN avg_pm25 ELSE NULL END AS nansha,
CASE WHEN location = 'tiyu west' THEN avg_pm25 ELSE NULL END AS tiyu_west,
CASE WHEN location = 'jiulong town' THEN avg_pm25 ELSE NULL END AS jiulong_town,
CASE WHEN location = 'huangpu' THEN avg_pm25 ELSE NULL END AS Huangpu,
CASE WHEN location = 'baiyun' THEN avg_pm25 ELSE NULL END AS Baiyun,
CASE WHEN location = 'maofeng mountain' THEN avg_pm25 ELSE NULL END AS Maofeng_mountain,
CASE WHEN location = 'chong hua' THEN avg_pm25 ELSE NULL END AS Chonghua,
CASE WHEN location = 'huadu' THEN avg_pm25 ELSE NULL END AS huadu
from(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25"
where measure_name='pm2.5'
and city='Guangzhou'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)


Save Panel as Guangzhou PM2.5 analysis

Edit Panel Title:Guangzhou PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.4 查询深圳各个监控站点PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'huaqiao city' THEN avg_pm25 ELSE NULL END AS Huaqiao_city,
CASE WHEN location = 'xixiang' THEN avg_pm25 ELSE NULL END AS xixiang,
CASE WHEN location = 'guanlan' THEN avg_pm25 ELSE NULL END AS guanlan,
CASE WHEN location = 'longgang' THEN avg_pm25 ELSE NULL END AS Longgang,
CASE WHEN location = 'honghu' THEN avg_pm25 ELSE NULL END AS Honghu,
CASE WHEN location = 'pingshan' THEN avg_pm25 ELSE NULL END AS Pingshan,
CASE WHEN location = 'henggang' THEN avg_pm25 ELSE NULL END AS Henggang,
CASE WHEN location = 'minzhi' THEN avg_pm25 ELSE NULL END AS Minzhi,
CASE WHEN location = 'lianhua' THEN avg_pm25 ELSE NULL END AS Lianhua,
CASE WHEN location = 'yantian' THEN avg_pm25 ELSE NULL END AS Yantian,
CASE WHEN location = 'nanou' THEN avg_pm25 ELSE NULL END AS Nanou,
CASE WHEN location = 'meisha' THEN avg_pm25 ELSE NULL END AS Meisha
From(
SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25"
where measure_name='pm2.5'
and city='Shenzhen'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)


Save Panel as Shenzhen PM2.5 analysis

Edit Panel Title:Shenzhen PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.5 深圳华侨城时间序列分析(近5分钟内PM2.5分析)

New Panel

select location, CREATE_TIME_SERIES(time, measure_value::bigint) as PM25 FROM iot.pm25
where measure_name='pm2.5'
and location='huaqiao city'
and time >= ago(5m)
GROUP BY location
选择图形显示 select Lines; Select Points:



Save Panel as Shen Zhen Huaqiao City PM2.5 analysis

Edit Panel Title:深圳华侨城近5分钟PM2.5分析

Save Dashboard PM2.5 analysis 1

4.2.6找出过去2小时内深圳华侨城以30秒为间隔的平均PM2.5值 (使用线性插值填充缺失的值)

New Panel

WITH binned_timeseries AS (
SELECT location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
FROM "iot".pm25
WHERE measure_name = 'pm2.5'
AND location='huaqiao city'
AND time > ago(2h)
GROUP BY location, BIN(time, 30s)
), interpolated_timeseries AS (
SELECT location,
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
FROM binned_timeseries
GROUP BY location
)
SELECT time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25)
选择图形显示 select Lines:



Save Panel as Shen Zhen Huaqiao City PM2.5 analysis 1

Edit Panel Title:过去2小时深圳华侨城平均PM2.5值 (使用线性插值填充缺失值)

Save Dashboard PM2.5 analysis 1

4.2.7 过去5分钟内所有城市PM2.5平均值排名 (线性插值)

New Panel

SELECT CASE WHEN city = 'Shanghai' THEN inter_avg_PM25 ELSE NULL END AS Shanghai,
CASE WHEN city = 'Beijing' THEN inter_avg_PM25 ELSE NULL END AS Beijing,
CASE WHEN city = 'Guangzhou' THEN inter_avg_PM25 ELSE NULL END AS Guangzhou,
CASE WHEN city = 'Shenzhen' THEN inter_avg_PM25 ELSE NULL END AS Shenzhen,
CASE WHEN city = 'Hangzhou' THEN inter_avg_PM25 ELSE NULL END AS Hangzhou,
CASE WHEN city = 'Nanjing' THEN inter_avg_PM25 ELSE NULL END AS Nanjing,
CASE WHEN city = 'Chengdu' THEN inter_avg_PM25 ELSE NULL END AS Chengdu,
CASE WHEN city = 'Chongqing' THEN inter_avg_PM25 ELSE NULL END AS Chongqing,
CASE WHEN city = 'Tianjin' THEN inter_avg_PM25 ELSE NULL END AS Tianjin,
CASE WHEN city = 'Shenyang' THEN inter_avg_PM25 ELSE NULL END AS Shenyang,
CASE WHEN city = 'Sanya' THEN inter_avg_PM25 ELSE NULL END AS Sanya,
CASE WHEN city = 'Lasa' THEN inter_avg_PM25 ELSE NULL END AS Lasa
from(
WITH binned_timeseries AS (
SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
FROM "iot".pm25
WHERE measure_name = 'pm2.5'
AND time > ago(5m)
GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
SELECT city,location,
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
FROM binned_timeseries
GROUP BY city,location
), all_location_interpolated as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,avg(interpolated_avg_PM25) AS inter_avg_PM25
from all_location_interpolated
group by city
order by avg(interpolated_avg_PM25) desc)
选择Panel图形类型:





Save Panel as all city analysis 1

Edit Panel Title:过去5分钟所有城市PM2.5平均值

Save Dashboard PM2.5 analysis 1

4.2.8 过去5分钟内 PM2.5高的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (
SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
FROM "iot".pm25
WHERE measure_name = 'pm2.5'
AND time > ago(5m)
GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
SELECT city,location,
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s))
AS interpolated_avg_PM25
FROM binned_timeseries
GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc desc
limit 10
选择Table



Save Panel as all city analysis 2

Edit Panel Title:过去5分钟内 PM2.5高的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1

4.2.9 过去5分钟内 PM2.5低的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (
SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
FROM "iot".pm25
WHERE measure_name = 'pm2.5'
AND time > ago(5m)
GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
SELECT city,location,
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s))
AS interpolated_avg_PM25
FROM binned_timeseries
GROUP BY city,location
), interpolated_cross_join as (
SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc asc
limit 10
选择Table

Save Panel as all city analysis 3

Edit Panel Title:过去5分钟内 PM2.5低的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1

设置仪表板 每5秒钟刷新一次:

本blog着重介绍通过Timestream、Kinesis Stream托管服务和Grafana实现物联网(以PM 2.5场景为示例)时序数据实时采集、存储和分析,其中包含部署架构、环境部署、数据采集、数据存储和分析,希望当您有类似物联网时序数据存储和分析需求的时候,有所启发,实现海量物联网时序数据高效管理、挖掘物联网数据中蕴含的规律、模式和价值,助力业务发展。
————————————————
版权声明:本文为CSDN博主「亚马逊云开发者」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/awschina/article/details/118160981

相关文章