AWS数据管道与Glue服务实践指南

在当今的大数据时代,数据的提取、转换和加载(ETL)是数据分析和处理中不可或缺的一环。AWSData Wrangler通过与AWS的多种大数据服务(如S3、Glue Catalog、Athena、数据库、EMR等)集成,为工程师们提供了极大的便利。此外,它还支持导入Pandas和PyArrow等包,以帮助编写转换逻辑。本文将通过一个假设的用例,指导如何从Glue目录表读取数据,获取过滤值以从Redshift检索数据,创建Glue与Redshift的连接,使用AWS Data Wrangler与AWS Glue 2.0读取Glue目录表中的数据,从Redshift数据库检索过滤后的数据,并将结果数据集写入S3。在此过程中,还将讨论如何解决Glue网络连接问题。

AWS Glue连接

要通过Glue作业连接到Redshift数据库,需要创建一个Glue连接。在AWS Glue的Data catalog下找到connections,然后添加新的连接。创建连接后,进行测试。在测试过程中,遇到了以下错误:

Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-630140c22a02f8cc2 in Vpc vpc-xxxxxxxxxxx.

连接失败的原因很直接:用于连接的子网没有S3端点或NAT网关。要查看更多细节,请转到AWS控制台的VPC页面,选择子网。

公共与私有子网的区别

公共子网中的实例可以直接向互联网发送出站流量,而私有子网可以通过位于公共子网中的网络地址转换(NAT)网关访问互联网。在进行更改后,再次测试连接。这次测试连接将花费一些时间,但最终会成功。

添加AWSGlue作业

接下来,将创建一个新的Glue作业。在本文中,选择了“由编写的新脚本”,并提供了Connections选项。在AWS控制台中,转到AWS Glue >ETL> Jobs > Add job。可以选择在Monitoring options部分启用“Continuous logging”。继续前进,在页面下方有一个添加作业参数的选项。AWS Data Wrangler开发团队简化了包集成。在添加新作业时,只需在Job Parameters中指定“--additional-python-modules”作为键,将“awswrangler”作为值即可使用Data Wrangler。

编写脚本

以下是作业脚本的示例:

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job import awswrangler as wr import pandas as pd @params: [TempDir, JOB_NAME] args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) db_username = "admin" db_password = "xxxxxxxxx" def GetTableData(): ## get total row count using aws data wrangler getTotalRowCount = "SELECT count(*) as totalRowCount from orders" df_count = wr.athena.read_sql_query(sql=getTotalRowCount, database="myredshift") totalRowCount = df_count.iloc[0,0] print("Total row count from Glue Table: ", totalRowCount) ## get max Order Date using aws data wrangler maxOrderDateSql = "SELECT max(o_orderdate) as maxOrderDate FROM orders" df = wr.athena.read_sql_query(sql=maxOrderDateSql, database="myredshift") maxOrderDate = df.iloc[0,0] print("MaxOrderdate from Glue Table: ", maxOrderDate) return maxOrderDate print("Get the max order date from glue table myredshift.orders to create redsfhit query") maxOrderDate = GetTableData() query = "SELECT * from admin.orders where o_orderdate > '{}'".format(maxOrderDate) print("Query to be executed in redshift: ", query) ## define the redshift connection options connection_options = { "url": "jdbc:redshift://dwtest.paidfoobarrr.us-east-1.redshift.amazonaws.com:5439/dwtest", "query": query, "user": db_username, "password": db_password, "redshiftTmpDir": args["TempDir"], "aws_iam_role": "arn:aws:iam::xxxxxxxxxxxx:role/dwtest-s3-access" } ## create glue dynamicframe df=glueContext.create_dynamic_frame_from_options(connection_type="redshift", connection_options=connection_options) #get row count print("Record count from redshift: ", df.toDF().count()) ## write the data to S3 location using glue catalog sink = glueContext.write_dynamic_frame_from_catalog(frame=df, database="myredshift", table_name="orders") print("Completed writing data to Glue table myredshift.orders") print("Get the total row count and current max order from Glue table") GetTableData() job.commit()
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485