使用 Amazon Kinesis 和 Amazon EMR 构建数据批处理分析架构

2021-12-14 00:00:00 创建 数据 代码 配置 数据处理

这篇文章主要介绍如何使用Amazon Kinesis和Amazon EMR构建批量数据处理过程,实现Hour+1的数据批量处理分析,并汇入到数仓中。方案架构图如下图所示:

  

1 创建Kinesis Firehose

在上篇Blog里我们已经演示了创建Kinesis Data Stream,并在EC2上通过配置agent把数据导入到Kinesis Data Stream里,这里我们直接创建Kinesis Data Firehose,把Kinesis Data Stream里的数据通过Firehose导入到S3上。

1.1 创建新的传输流
     
1.2 选择默认值
  
1.3 选择目标S3,选择S3桶名,输入rawdata作为前缀,即为原始日志存放的路径,点击下一步。
  
1.4   配置设置部分所有内容都保持默认配置,点下一步,再点击创建传输流,这样用来传输原始日志的Firehose就建成功了。

1.5 运行一段时间,会发现Firehose自动在S3里把数据按照月/日/时的路径区分。
  

2 通过Cloudwatch定时触发Step Functions,调取EMR做数据ETL,并发送报警信息:

2.1 登录到Step Functions的控制台,创建状态机
  
2.2 选择使用代码段创作以及标准类型
  
把以下代码粘贴到代码区,关键点请查看备注,粘贴的时候请注意去掉备注部分

{

"Comment": "An example of the Amazon States Language for running jobs on Amazon EMR",

"StartAt": "Create an EMR cluster", — 从Create an EMR Cluster开始

"States": {

"Create an EMR cluster": { — 定义Create an EMR Cluster

"Type": "Task",

"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",

"Parameters": {

"Name": "ExampleCluster",

"VisibleToAllUsers": true,

"ReleaseLabel": "emr-5.31.0", — 版本信息

"Applications": [

{ "Name": "Spark" }

],

"ServiceRole": "StepFunctionsSample-EMRJobManagemen-EMRServiceRole-7AN48YIUFZ4Y",

"JobFlowRole": "StepFunctionsSample-EMRJobManagementf9d53d8a-e756-480c-a44e-83285aaf241a-EMREc2InstanceProfile-5CT3AR0IIMEL",

"LogUri": "s3://stepfunctions-emrproject-049970088233-us-west-2/logs/",

"Instances": {

"KeepJobFlowAliveWhenNoSteps": true,

"InstanceFleets": [

{

"Name": "MyMasterFleet",

"InstanceFleetType": "MASTER", — Master节点配置

"TargetOnDemandCapacity": 1,

"InstanceTypeConfigs": [

{

"InstanceType": "m5.xlarge"

}

]

},

{

"Name": "MyCoreFleet",

"InstanceFleetType": "CORE", — Core节点配置,视情况调整数量,task节点同理配置

"TargetOnDemandCapacity": 2,

"InstanceTypeConfigs": [

{

"InstanceType": "m5.xlarge"

}

]

}

]

}

},

"ResultPath": "$.cluster",

"Next": "Run first step"

},

"Run first step": {

"Type": "Task",

"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",

"Parameters": {

"ClusterId.$": "$.cluster.ClusterId",

"Step": {

"Name": "My first EMR step",

"ActionOnFailure": "CONTINUE",

"HadoopJarStep": {

"Jar": "command-runner.jar",

"Args": ["spark-submit","s3://gamedemo-s3/scripts/spark-etl.py"] — 执行代码部分,注意把代码上传到相应的S3路径,具体代码见下面

}

}

},

"Retry" : [

{

"ErrorEquals": [ "States.ALL" ],

"IntervalSeconds": 1,

"MaxAttempts": 3,

"BackoffRate": 2.0

}

],

"ResultPath": "$.firstStep",

"Next": "Run second step"

},

"Run second step": {

"Type": "Task",

"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",

"Parameters": {

"ClusterId.$": "$.cluster.ClusterId",

"Step": {

"Name": "My second EMR step", — 这里second step是置空的,如果实际业务场景中有需要second EMR step,可以直接改下面的内容,添加second EMR step

"ActionOnFailure": "CONTINUE",

"HadoopJarStep": {

"Jar": "command-runner.jar",

"Args": ["bash", "-c", "ls"]

}

}

},

"Retry" : [

{

"ErrorEquals": [ "States.ALL" ],

"IntervalSeconds": 1,

"MaxAttempts": 3,

"BackoffRate": 2.0

}

],

"ResultPath": "$.secondStep",

"Next": "Terminate Cluster"

},

"Terminate Cluster": {

"Type": "Task",

"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster",

"Parameters": {

"ClusterId.$": "$.cluster.ClusterId"

},

"End": true

}

}

}


2.3 粘贴完代码,右边即会显示执行逻辑,点击下一步
  
2.4 输入状态机的名字,选择创建新角色(Step Functions会根据代码里需要的资源创建相应的角色), 点击创建状态机
  
  
2.5 EMR上的ETL代码如下,读取前一个小时的原始日志的路径rawdata2020,对原始数据增加一列,输出为parquet列式格式,并以相应的月/日/时模式写入到ETL后的目录step-function-etl-data下

import sys

import datetime

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

if __name__ == "__main__":

spark = SparkSession.builder.appName("gamedemo-spark-etl").getOrCreate()

DT = datetime.datetime.now()

LASTTIME = (DT + datetime.timedelta(hours=-1))

MONTH = LASTTIME.strftime("%m")

DAY = LASTTIME.strftime("%d")

LASTHOUR = LASTTIME.strftime("%H")

gameData = spark.read.json("s3://gamedemo-s3/rawdata2020/{}/{}/{}".format(MONTH,DAY,LASTHOUR))

updatedgameData = gameData.withColumn("c_date", lit(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M")))

updatedgameData.write.mode("overwrite").parquet("s3://gamedemo-s3/step-function-etl-data/{}/{}/{}".format(MONTH,DAY,LASTHOUR))

至此 Step Functions调EMR执行ETL任务,执行完任务后会自动关闭集群的功能就实现了,这种模式成本要比long-running EMR和Glue的成本低很多。

2.6 进入Cloudwatch,创建一条定时触发的规则
  
2.7 按照下图,配置定时触发,即每小时第5分钟触发Step Functions
  
2.8 这里你还可以配置另外一条rule,指定当Step Functions发生变化的时候,发送SNS报警。
  
2.9 运行一段时间以后,可以检查Step Functions的状态,以及ETL后S3路径的数据输出
  
  

3 创建EC2,定时COPY数据到Redshift:

3.1 创建EC2 参看https://docs.aws.amazon.com/zh_cn/efs/latest/ug/gs-step-one-create-ec2-resources.html

3.2 登录到EC2上,安装psql

sudo yum install -y postgresql-server postgresql-devel

3.3 复制以下脚本,并修改相应参数,用以导入数据到Redshift

#!/bin/bash

export PGPASSWORD=xxxxx – 替换Redshift的密码

MONTH=`date "+%m"`

DAY=`date "+%d"`

LASTHOUR=`date -d '1 hour ago' "+%H"`

CREATE_TABLE="create table hour_table_${MONTH}_${DAY}_${LASTHOUR} (email varchar(1024),first_name varchar(1024),gender varchar(16),id varchar(64),ip_address varchar(1024),last_name varchar(1024),c_date varchar(1024));"

COPY_DATA="COPY hour_table_${MONTH}_${DAY}_${LASTHOUR} FROM 's3://gamedemo-s3/step-function-etl-data/${MONTH}/${DAY}/${LASTHOUR}/' IAM_ROLE 'arn:aws:iam::123456789:role/redshift-s3-readonly' FORMAT AS PARQUET;"
- 123456789替换自己的数字ID
aws s3 ls s3://gamedemo-s3/step-function-etl-data/$MONTH/$DAY/$LASTHOUR/_SUCCESS

if [ $? == ];then

psql -h redshift-cluster-1.cjjg7qqfm.us-west-2.redshift.amazonaws.com -U awsuser -d myredshift -p 5439 -c "$CREATE_TABLE"

psql -h redshift-cluster-1.cjjg7qqfm.us-west-2.redshift.amazonaws.com -U awsuser -d myredshift -p 5439 -c "$COPY_DATA"

fi


3.4 把以上脚本写入到crontab里定时执行,后在Redshift里看到的定时创建的表,且数据已经加载到表里了
  

总结:

通过以上实验,就完成了Hour+1 的批量数据处理,加载数据到Redshift数仓的工作,结合之前流式数据处理的架构,终得出这样的一个完整的架构图,这套架构利用Kinesis系列 + Step Functions + EMR 构建出了一套流式数据处理和Hour + 1的批数据处理分析的过程,并汇总所有结果到Redshift,供业务查询。整个架构搭建起来非常简单,维护工作量也不大,数据工程师可以轻松利用此架构实现流式数据分析和批数据处理分析的目的。
  

相关文章