在当今的大数据时代,数据的提取、转换和加载(ETL)是数据分析和处理中不可或缺的一环。AWSData Wrangler通过与AWS的多种大数据服务(如S3、Glue Catalog、Athena、数据库、EMR等)集成,为工程师们提供了极大的便利。此外,它还支持导入Pandas和PyArrow等包,以帮助编写转换逻辑。本文将通过一个假设的用例,指导如何从Glue目录表读取数据,获取过滤值以从Redshift检索数据,创建Glue与Redshift的连接,使用AWS Data Wrangler与AWS Glue 2.0读取Glue目录表中的数据,从Redshift数据库检索过滤后的数据,并将结果数据集写入S3。在此过程中,还将讨论如何解决Glue网络连接问题。
要通过Glue作业连接到Redshift数据库,需要创建一个Glue连接。在AWS Glue的Data catalog下找到connections,然后添加新的连接。创建连接后,进行测试。在测试过程中,遇到了以下错误:
Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-630140c22a02f8cc2 in Vpc vpc-xxxxxxxxxxx.
连接失败的原因很直接:用于连接的子网没有S3端点或NAT网关。要查看更多细节,请转到AWS控制台的VPC页面,选择子网。
公共子网中的实例可以直接向互联网发送出站流量,而私有子网可以通过位于公共子网中的网络地址转换(NAT)网关访问互联网。在进行更改后,再次测试连接。这次测试连接将花费一些时间,但最终会成功。
接下来,将创建一个新的Glue作业。在本文中,选择了“由编写的新脚本”,并提供了Connections选项。在AWS控制台中,转到AWS Glue >ETL> Jobs > Add job。可以选择在Monitoring options部分启用“Continuous logging”。继续前进,在页面下方有一个添加作业参数的选项。AWS Data Wrangler开发团队简化了包集成。在添加新作业时,只需在Job Parameters中指定“--additional-python-modules”作为键,将“awswrangler”作为值即可使用Data Wrangler。
以下是作业脚本的示例:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import awswrangler as wr
import pandas as pd
@params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
db_username = "admin"
db_password = "xxxxxxxxx"
def GetTableData():
## get total row count using aws data wrangler
getTotalRowCount = "SELECT count(*) as totalRowCount from orders"
df_count = wr.athena.read_sql_query(sql=getTotalRowCount, database="myredshift")
totalRowCount = df_count.iloc[0,0]
print("Total row count from Glue Table: ", totalRowCount)
## get max Order Date using aws data wrangler
maxOrderDateSql = "SELECT max(o_orderdate) as maxOrderDate FROM orders"
df = wr.athena.read_sql_query(sql=maxOrderDateSql, database="myredshift")
maxOrderDate = df.iloc[0,0]
print("MaxOrderdate from Glue Table: ", maxOrderDate)
return maxOrderDate
print("Get the max order date from glue table myredshift.orders to create redsfhit query")
maxOrderDate = GetTableData()
query = "SELECT * from admin.orders where o_orderdate > '{}'".format(maxOrderDate)
print("Query to be executed in redshift: ", query)
## define the redshift connection options
connection_options = {
"url": "jdbc:redshift://dwtest.paidfoobarrr.us-east-1.redshift.amazonaws.com:5439/dwtest",
"query": query,
"user": db_username,
"password": db_password,
"redshiftTmpDir": args["TempDir"],
"aws_iam_role": "arn:aws:iam::xxxxxxxxxxxx:role/dwtest-s3-access"
}
## create glue dynamicframe
df=glueContext.create_dynamic_frame_from_options(connection_type="redshift", connection_options=connection_options)
#get row count
print("Record count from redshift: ", df.toDF().count())
## write the data to S3 location using glue catalog
sink = glueContext.write_dynamic_frame_from_catalog(frame=df, database="myredshift", table_name="orders")
print("Completed writing data to Glue table myredshift.orders")
print("Get the total row count and current max order from Glue table")
GetTableData()
job.commit()