使用 AWS Glue 编排基于 Amazon Redsh的 ETL 工作流

2021-12-21 00:00:00 选择 集群 作业 所示 工作流
Original URL: https://amazonaws-china.com/blogs/big-data/orchestrate-amazon-redshift-based-etl-workflows-with-aws-step-functions-and-aws-glue/

Amazon Redshift 是云中完全托管的、PB 级的数据仓库服务,可借助与您今天所用相同的基于 SQL 的工具和商业智能应用程序,提供快速查询性能。许多客户还喜欢将 Amazon Redshift 用作提取、转换和加载 (ETL) 引擎,以使用现有的 SQL 开发人员技能集,快速迁移预先存在的基于 SQL 的 ETL 脚本,并且由于 Amazon Redshift 完全兼容ACID,作为合并来自源数据系统的变更数据的有效机制。在本文中,我将展示如何使用 AWS Step Functions 和 AWS Glue Python Shell 以完全无服务器的方式为那些基于Amazon Redshift 的 ETL 工作流编排任务。AWS Glue Python Shell 是一个 Python 运行时环境,用于运行中小型 ETL 任务,例如提交 SQL 查询和等待响应。Step Functions 可让您将多个 AWS 服务协调到工作流中,从而可以轻松运行和监视一系列 ETL 任务。AWS Glue Python Shell 和 Step Functions 均无服务器,允许自动运行和扩展它们以响应定义的事件,而无需配置、扩展和管理服务器。尽管许多基于 SQL 的传统工作流都使用内部数据库结构(如触发器和存储过程),但将工作流编排、任务和计算引擎组件分离为独立的服务,使您可以独立开发、优化甚至重复使用每个组件。因此,尽管本博文以 Amazon Redshift 为例,但我的目的是更广泛地向您展示如何编排任何基于 SQL 的 ETL。

先决条件

如果您想使用自己的 AWS 帐户遵循本博文中的示例,则需要一个 Virtual Private Cloud (VPC),其至少具有两个到 S3 VPC 终端节点的路由的私有子网。

如果您没有 VPC,或者不确定您的 VPC 是否满足这些要求,我提供一个 AWS CloudFormation 模板堆栈,您可以通过选择以下按钮将其启动。在首页上提供堆栈名称,其他所有内容保留默认设置。等待堆栈显示创建完成(这只需要几分钟),然后转到其他部分。

场景

对于本文中的示例,我使用 Amazon Customer Reviews Dataset 数据集构建 ETL 工作流,该工作流完成了以下两个代表简单 ETL 过程的任务。

  • 任务 1:将包含 2015 年及以后的评论的数据集的副本从 S3 移动到 Amazon Redshift 表。
  • 任务 2:生成一组输出文件到另一个 Amazon S3 位置,该位置按市场和产品类别标识“有用”的评论,从而使分析团队可以收集有关高质量评论的信息。

该数据集可通过 Amazon Simple Storage Service (Amazon S3) 存储桶公开获得。完成以下任务以进行设置。

解决方案概览

下图突出显示了端到端的解决方案架构:

此过程中的步骤如下:

  1. 状态机启动一系列运行的 AWS Glue Python Shell 作业(有关如何以及为何我稍后在本博文中使用单个作业的更多信息!),并带有用于从 AWS Secrets Manager 检索数据库连接信息和从 S3 检索 .sql 文件的参数。
  2. AWS Glue Python Shell 作业每次运行时都使用数据库连接信息来连接到 Amazon Redshift 集群并提交.sql 文件中包含的查询。
    1. 对于任务 1:集群利用 Amazon Redshift Spectrum 从 S3 读取数据并将其加载到 Amazon Redshift 表中。Amazon Redshift Spectrum 通常用作将数据加载到 Amazon Redshift 的一种方式。(有关更多信息,请参阅 Amazon Redshift Spectrum 十二个佳实践 的步骤 7。)
    2. 对于任务 2:集群执行聚合查询,然后通过 UNLOAD 将结果导出到另一个 Amazon S3 位置。
  3. 如果管道发生故障,状态机会将通知发送到 Amazon Simple Notification Service (SNS) 主题。
  4. 用户可以从集群查询数据和/或直接从 S3 检索报告输出文件。

我包括一个 AWS CloudFormation 模板以快速启动 ETL 环境,这样我可以将本博文重点放在专门用于构建任务和编排流程组件的步骤上。模板启动下列资源:

  • Amazon Redshift 集群
  • 用于存储 Amazon Redshift 集群信息和凭证的 Secrets Manager 密钥
  • S3 存储桶已预加载 Python 脚本和 .sql 文件
  • AWS Glue Python Shell 作业的 Identity and Access Management (IAM) 角色

请参阅以下资源,了解如何手动完成这些步骤:

  • 在 VPC 中创建 Redshift 集群
  • 使用 AWS Secrets Manager 创建和管理密钥
  • Simple Storage Service (S3) 入门
  • 创建 IAM 角色

确保至少选择两个专用子网和相应的 VPC,如以下屏幕截图所示。如果您从上方使用 VPC 模板,则 VPC 显示为 10.71.0.0/16,子网名称为 A private 和 B private.。

堆栈应该需要 10-15 分钟才能启动。一旦显示 Create Complete,您可以继续进行下一部分。请确保注意 AWS CloudFormation 控制台中的 Resources 选项卡,如以下屏幕截图所示,因为我在整个帖子中都引用了这些资源。

使用 AWS Glue Python Shell 构建

首先,在 AWS 管理控制台中导航到 AWS Glue

建立连接

Amazon Redshift 集群位于 VPC 中,因此您首先需要使用 AWS Glue 创建连接。连接包含访问数据存储所需的属性,包括 VPC 网络信息。您终将此连接附加到 Glue Python Shell 作业,以便其可以到达 Amazon Redshift 集群。

从菜单栏中选择连接,然后选择添加连接。为您的连接指定一个名字(例如blog_rs_connection),选择 Amazon Redshift 作为 连接类型,然后选择下一步,如以下屏幕截图所示。

集群下,输入 AWS CloudFormation 模板启动的集群的名称,即 blogstack-redshiftcluster-####因为我为此博客提供的 Python 代码已处理凭证检索,所以您在此处输入的围绕数据库信息的其余值大部分为占位符。您与连接关联的关键信息与网络相关。

请注意,如果没有正确的集群信息,将无法测试连接如果您对此感兴趣,请注意,在选择了正确的集群之后,将自动填充数据库名称用户名,如以下屏幕截图所示。请遵循此处的说明 Secrets Manager 中检索密码信息,然后将其复制到密码字段中。

ETL 代码审查

了解此示例中使用的两个主要的 Python 脚本:

Pygresql_redshift_common.py 是一组功能,可以从 Secrets Manger 中检索群集连接信息和凭据,建立与集群的连接,并分别提交查询。通过传递的参数检索运行时集群信息,这些功能使作业可以连接到有权限的任何集群。您可以按照说明创建 python .egg 文件(已作为 AWS CloudFormation 模板启动的一部分完成)将这些函数打包到库中。请注意,AWS Glue Python Shell 本机原生支持多个 python 库。

import pg
import boto3
import base64
from botocore.exceptions import ClientError
import json

#uses session manager name to return connection and credential information
def connection_info(db):

session = boto3.session.Session()
client = session.client(
service_name='secretsmanager'
)

get_secret_value_response = client.get_secret_value(SecretId=db)

if 'SecretString' in get_secret_value_response:
secret = json.loads(get_secret_value_response['SecretString'])
else:
secret = json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))

return secret


#creates a connection to the cluster
def get_connection(db,db_creds):

con_params = connection_info(db_creds)

rs_conn_string = "host=%s port=%s dbname=%s user=%s password=%s" % (con_params['host'], con_params['port'], db, con_params['username'], con_params['password'])
rs_conn = pg.connect(dbname=rs_conn_string)
rs_conn.query("set statement_timeout = 1200000")

return rs_conn


#submits a query to the cluster
def query(con,statement):
res = con.query(statement)
return res

调用时,AWS Glue Python Shell 作业运行 rs_query.py。它首先解析在调用时传递的作业参数。它使用其中一些参数从 S3 检索 .sql 文件,然后使用 pygresql_redshift_common.py 中的函数连接文件中的语句并将其提交给集群。因此,除了使用刚打包的 Python 库连接到任何集群之外,它还可以检索并运行任何 SQL 语句。这意味着您只需将参数传输到完成管道中每个任务应连接的位置和应提交的项目,即可为所有基于 Amazon Redshift 的 ETL 管理单个 AWS Glue Python Shell 作业。

from redshift_module import pygresql_redshift_common as rs_common
import sys
from awsglue.utils import getResolvedOptions
import boto3

#get job args
args = getResolvedOptions(sys.argv,['db','db_creds','bucket','file'])
db = args['db']
db_creds = args['db_creds']
bucket = args['bucket']
file = args['file']

#get sql statements
s3 = boto3.client('s3')
sqls = s3.get_object(Bucket=bucket, Key=file)['Body'].read().decode('utf-8')
sqls = sqls.split(';')

#get database connection
print('connecting...')
con = rs_common.get_connection(db,db_creds)

#run each sql statement
print("connected...running query...")
results = []
for sql in sqls[:-1]:
sql = sql + ';'
result = rs_common.query(con, sql)
print(result)
results.append(result)

print(results)

创建 Glue Python Shell 作业

接下来,将该代码付诸实践:

  1. 导航到 AWS Glue 控制台页面左侧菜单上的作业,然后从那里选择添加作业
  2. 为作业指定名称,例如 blog_rs_query
  3. 对于 IAM 角色,选择您先前在 AWS CloudFormation 控制台的资源部分中记下的相同 GlueExecutionRole
  4. 对于类型,选择 Python shell,将 Python version 保留为 Python 3 的默认值,并为此作业运行选择一个您提供的现有脚本
  5. 对于存储脚本的 S3 路径,请导航到 AWS CloudFormation模板创建的脚本存储桶(在资源中查找 ScriptBucket),然后选择 python/py 文件。
  6. 展开安全性配置、脚本库和作业参数部分,将带有 Amazon Redshift 连接库的 Python .egg 文件添加到 Python 库路径。它也位于下 python /redshift_module-0.1-py3.6.egg 下的脚本存储区中。

完成上述所有操作后,所有内容应看上去与以下屏幕截图中的相同:

选择下一步。通过选择选择将其移动到必需连接下,添加您创建的连接。(从建立连接部分开始回想,这使作业能够与 VPC 进行交互。) 选择保存作业并编辑脚本完成操作,如以下屏幕截图所示。

测试驱动 Python Shell 作业

创建作业后,会转到 AWS Glue Python Shell IDE。如果一切顺利,您应该看到 rs_query.py 代码。现在,Amazon Redshift 集群进入空闲状态,因此请使用 Python 代码运行以下 SQL 语句并使用表格进行填充。

  1. 创建一个外部数据库(amzreviews
  2. 创建一个外部表(评论,Amazon Redshift Spectrum 可以从该表中读取 S3 中的源数据(公共评论数据集)。该表按 product_category 进行分区,因为源文件是按类别进行组织的,但是通常您应该对经常筛选的列进行分区(请参阅 #4)。
  3. 将分区添加到外部表。
  4. 创建一个本地内部表(评论到 Amazon Redshift 集群。product_id 作为 DISTKEY 使用时效果很好,因为它具有高基数,分布均匀,并且很可能(尽管不是此博客场景的明确组成部分)有一列将用于与其他表联接。我选择 review_date 作为 SORTKEY,以有效过滤掉不属于我的目标查询的评论数据(2015 年之后)。通过阅读设计表文档,了解有关如何佳选择 DISTKEY/SORTKEY 以及其他表设计参数,进而优化性能的更多信息。CREATE EXTERNAL SCHEMA amzreviews 
    from data catalog
    database 'amzreviews'
    iam_role 'rolearn'
    CREATE EXTERNAL database IF NOT EXISTS;



    CREATE EXTERNAL TABLE amzreviews.reviews(
    marketplace varchar(10),
    customer_id varchar(15),
    review_id varchar(15),
    product_id varchar(25),
    product_parent varchar(15),
    product_title varchar(50),
    star_rating int,
    helpful_votes int,
    total_votes int,
    vine varchar(5),
    verified_purchase varchar(5),
    review_headline varchar(25),
    review_body varchar(1024),
    review_date date,
    year int)
    PARTITIONED BY (
    product_category varchar(25))
    ROW FORMAT SERDE
    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
    's3://amazon-reviews-pds/parquet/';



    ALTER TABLE amzreviews.reviews ADD
    partition(product_category='Apparel')
    location 's3://amazon-reviews-pds/parquet/product_category=Apparel/'
    partition(product_category='Automotive')
    location 's3://amazon-reviews-pds/parquet/product_category=Automotive'
    partition(product_category='Baby')
    location 's3://amazon-reviews-pds/parquet/product_category=Baby'
    partition(product_category='Beauty')
    location 's3://amazon-reviews-pds/parquet/product_category=Beauty'
    partition(product_category='Books')
    location 's3://amazon-reviews-pds/parquet/product_category=Books'
    partition(product_category='Camera')
    location 's3://amazon-reviews-pds/parquet/product_category=Camera'
    partition(product_category='Grocery')
    location 's3://amazon-reviews-pds/parquet/product_category=Grocery'
    partition(product_category='Furniture')
    location 's3://amazon-reviews-pds/parquet/product_category=Furniture'
    partition(product_category='Watches')
    location 's3://amazon-reviews-pds/parquet/product_category=Watches'
    partition(product_category='Lawn_and_Garden')
    location 's3://amazon-reviews-pds/parquet/product_category=Lawn_and_Garden';


    CREATE TABLE reviews(
    marketplace varchar(10),
    customer_id varchar(15),
    review_id varchar(15),
    product_id varchar(25) DISTKEY,
    product_parent varchar(15),
    product_title varchar(50),
    star_rating int,
    helpful_votes int,
    total_votes int,
    vine varchar(5),
    verified_purchase varchar(5),
    review_date date,
    year int,
    product_category varchar(25))

    SORTKEY (
    review_date
    );

手动执行此项作业,以便您可以看到我所讨论的所有元素在哪里发挥作用。选择 IDE 屏幕顶部的运行作业。展开安全性配置、脚本库和作业参数部分。您可以在此处将参数作为键值对添加,如以下屏幕截图所示。

–dbreviews
–db_credsreviewssecret
–bucket<name of s3 script bucket>
–filesql/reviewsschema.sql


选择运行作业以将其启动。完成该作业需要几秒钟。您可以在 IDE 中的代码下方查找日志输出,以观察作业进度。

作业完成后,请导航至 AWS Glue 控制台中的数据库,然后查找 amzreviews 数据库和 reviews 表,如以下屏幕截图所示。如果它们在该位置,则一切会按计划运行! 您还可以使用 Redshift 查询编辑器或您自己的SQL 客户端工具连接到 Amazon Redshift 集群,并查找本地评论表。


Step Functions 编排

现在您已有机会手动执行作业,可以转到由 Step Functions 编排的更具编程性的内容。

启动模板

我还提供了第三个 AWS CloudFormation 模板来启动此过程。它可以创建一个 Step Functions 状态机,调用刚创建的 AWS Glue Python Shell 作业的两个实例,以完成我在本博文开头概述的两项任务。

对于 BucketName,粘贴在第二个 AWS CloudFormation 堆栈中创建的脚本存储桶的名称。对于 GlueJobName,输入刚创建作业的名称。其他信息保留为默认值,如以下屏幕截图所示。启动堆栈并等待其显示创建完成(这只需几分钟),然后再转到下一部分。

使用 Step Functions 状态机

状态机由一系列步骤组成,使您可以将服务拼接到稳健的 ETL 工作流中。您可以在执行过程中监控每个步骤,这意味着您可以在 ETL 工作流中快速发现并解决问题,终自动完成。

看一下刚启动的状态机以更深入地了解。导航到 AWS 控制台中的 Step Functions 并查找名称类似 GlueJobStateMachine-###### 的状态机。 选择编辑以查看状态机配置,如以下屏幕截图所示。

它的外观应与以下屏幕截图相同:


如您所见,状态机是使用由任务定义和工作流逻辑组成的 JSON 模板创建的。您可以运行并行任务,发现错误,甚至暂停工作流并等待手动回调继续。我提供的示例包含两个用于运行 SQL 语句的任务,这些任务可以完成我在博文开头介绍的目标:

  1. 使用 Redshift Spectrum 从 S3 加载数据
  2. 转换数据并将其写回到 S3

每个任务都包含基本的错误处理,如果发现这些错误,则会将工作流路由到错误通知任务。本示例是一个展示如何构建基本工作流简单示例,但是您可以参考 Step Functions 文档了解更复杂的工作流示例,以帮助构建稳健的 ETL 管道。Step Functions 还支持通过嵌套工作流重复使用模块化组件。

SQL 审查

状态机将检索并运行以下 SQL 语句:

INSERT INTO reviews
SELECT marketplace, customer_id, review_id, product_id, product_parent, product_title, star_rating, helpful_votes, total_votes, vine, verified_purchase, review_date, year, product_category
FROM amzreviews.reviews
WHERE year > 2015;

相关文章