使用 Step Functions 编排从数据库到数据仓库的数据ETL

2021-12-15 00:00:00 创建 数据 数据库 作业 点击

数据仓库是信息的中央存储库。业务分析师、数据工程师、数据科学家和决策者通过商业智能 (BI) 工具、SQL 客户端和其他分析应用程序访问数据。数据和分析已然成为各大企业保持竞争力所不可或缺的部分。企业用户依靠报告、控制面板和分析工具从其数据中获得洞察力、监控企业绩效以及更明智地决策。

通常,数据定期从事务系统、关系数据库和其他来源流入数据仓库。这个流入的过程,被称作ETL(Extract-Transform-Load)。在数据爆炸的今天,开发者经常需要通过Hadoop/Spark 集群,配合一些开源组件,如sqoop, Hive, Airflow等实现对海量数据的处理和迁移。除了集群本身的维护,如虚机的配置,操作系统的升级,安全管理,存储的扩展等,还要考虑如性能监控,日志,错误处理等诸多支撑性功能的实现。

本文介绍一个通过亚马逊云Serverless(无服务器)服务提供ETL的方案。它包含了亚马逊云Step Function, Glue, Lambda等组件,实现从mysql 数据库到亚马逊云Redshift 数仓的数据迁移以及迁移后的处理功能。

架构设计

①  通过定期执行的源数据爬虫任务,读取业务数据库和数据仓库的源数据。

②  通过Glue服务的ETL Job完成从业务数据库到数据仓库的数据拉取,转化和加载。

③  调用Lambda函数,对数仓中的数据进行进一步的加工,满足企业对数据分层等进一步处理的要求。

这其中,第2步和第3步都是通过Step Function来调度执行,实现可视化的作业管理。

环境准备

  • 首先,在环境中分别创建一个mysql 实例和一个redshift 数仓实例来模拟企业的场景。这里我们设置redshift数据库为“dev”:
  • 另外,需要创建s3和redshift-data的两个Endpoint 服务,用于VPC内程序的访问
  • 之后,创建一个SNS topic,用于在出现问题时进行通知
  • 后,我们需要为redshift创建一个secret manager 密钥,用于安全访问redshift

在这些基础框架建设完毕后,可以分别连接到数仓和数据库中,通过https://github.com/sun-biao/step-function-etl-redshift/blob/main/sql.script 中的建表语句,分别创建两个表,table1和table2。 另外在资源中还有一个创建存储过程的语句,可以在redshift中执行,这个存储过程用来模拟企业内部数仓中的执行程序。

环境搭建

步: 源数据管理

一 在AWS Glue/数据库/连接 中,创建两个JDBC连接,分别指向数据库和数仓。

二 在AWS Glue/爬网程序 中点击“创建爬网程序”,分别为库和仓创建两个爬网程序

三 选中爬网程序,点击“运行爬网程序”。在真实场景中,可以设置为定期触发方式,这里我们手动执行。

四 执行完毕后,点击 AWS Glue/数据库/表 查看添加后的源数据信息。注意这里的表名会根据数据库的名称不同而不同。

第二步: 创建ETL Job

一 点击AWS Glue/ETL/作业,点击“添加作业”

二 在“配置作业属性”页面中,输入“名称”并选择一个“角色”。如果角色中为空,参照链接创建一个新的角色:https://docs.aws.amazon.com/zh_cn/glue/latest/dg/create-service-policy.html 并点击下一步

三 在“选择一个数据源”页面,选中mysql 数据库的table2,点击“下一步”

四 在“选择转换类型”页面,点击“下一步”

五 在“选择一个数据目标”页面,选中redshift数据库的table2,点击“下一步”

六 检查字段mapping后点击“保存作业并编辑脚本”。

七 在后作业脚本页面,点击“保存”并“运行作业”。如无异常,作业会执行,数据会进入redshift

八 在redshift中可以通过“编辑器”对table2进行查询

九 重复上面1-8,创建一个新的作业,选择table1作为源和目标。这样我们就有了两个作业,分别对应table1和table2

第三步: 创建lambda程序

一 在AWS Lambda中选择“创建函数”

二 函数名为callredshift, “运行时”选择python3.8。

三 点击“设置”,在“VPC”中选择redshift 所在VPC, 安全组选择可以访问Redshift的安全组

四在“配置/常规配置”中,将超时时间设置为25秒。

五 在“配置/权限”中,为当前角色附加 “AdministratorAccess”策略

六 将下列代码粘贴到lambda_function.py中

import json
import boto3
import time

client = boto3.client('redshift-data')
def lambda_handler(event, context):
print('start.....')
try:
response = client.execute_statement(
ClusterIdentifier='redshift-cluster-1',
Database='dev',
SecretArn=‘<redshit的Secret Manager ARN>',
Sql='call test_sp1(1000000)',
# Sql='select count(*) from table1',
StatementName='get result'
)
except Exception as e:
subject = "Error:" + ":" + str(e)
print(subject)
raise
query_id = response["Id"]
done = False
while not done:
time.sleep(1)
status = status_check(client, query_id)
if status in ("STARTED", "FAILED", "FINISHED"):
print("status is: {}".format(status))
break
print(response)
desc = client.describe_statement(Id=response["Id"])
result = client.get_statement_result(Id=response["Id"])
print(result)
return str(result)
def status_check(client, query_id):
desc = client.describe_statement(Id=query_id)
status = desc["Status"]
if status == "FAILED":
raise Exception('SQL query failed:' + query_id + ": " + desc["Error"])
return status.strip('"')

七 保存后点击“Test”,创建一个“测试事件”后,再次点击”Test“,查看输出结果

第四步: 创建Step Functions 状态机

一 选择“Step Functions/状态机”,点击“创建状态机”

二 使用默认选项,在下面定义中,删除原Json文件,拷贝如下内容:

{
"Comment": "This is your state machine",
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Table1 Job",
"States": {
"Table1 Job": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": “<Table1 Job>"
},
"OutputPath": "$.JobRunState",
"End": true
}
}
},
{
"StartAt": "Table2 Job",
"States": {
"Table2 Job": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "<Table2 Job>"
},
"OutputPath": "$.JobRunState",
"End": true
}
}
}
],
"Next": "Choice"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"And": [
{
"Variable": "$[]",
"StringMatches": "SUCCEEDED"
},
{
"Variable": "$[1]",
"StringMatches": "SUCCEEDED"
}
],
"Next": "Call redshift"
}
],
"Default": "SNS Publish"
},
"Call redshift": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:<Region的名字,如ap-northeast-1>:<12位账号>:function:callredshift:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": true
},
"SNS Publish": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message.$": "$",
"TopicArn": "arn:aws:sns:<Region的名字,如ap-northeast-1>:<12位账号>:<SNS 主题名>"
},
"End": true
}
}
}

三 保存后,修改当前状态机的权限为管理员,执行该状态机并查看状态。

四 点击“图表检查器”中后一步“Call redshift”,查看右侧步骤输出,确认redshift中的程序被正确调用。

总结

一 整个流程中没有除了数据仓库,没有使用任何需要维护的计算资源,实现了“零运维”。

二 Step Functions状态机的每次执行,都提供了完备的流程日志,每个步骤都有详细的输入输出信息,方便调试。

三 依照Step Functions提供逻辑处理功能,通过判断,循环等可以实现客户复杂的逻辑。

四 Step Functions提供强大的服务整合能力,通过整合其它服务,提供诸如报警,数据,计算等等功能。


相关文章