数据工程是一个快速发展的技术领域,专注于从多种数据源中提取数据,将其转换为特定格式,并加载到单一的数据源中进行分析。本文将通过实际例子,帮助学习如何使用Python进行网络爬虫和API数据提取,从而开始数据工程之旅。
数据工程是目前最可靠且增长最快的技术职业之一,因为它更多地集中在网络爬虫和数据集的抓取上。通过ETL(提取、转换、加载)循环,可以将来自多个源的数据集成到单一的信息源中。
ETL是从一个多样化的来源和格式中提取大量数据,并将其转换为单一格式,然后放入数据库或目标文件中的过程。例如,数据可能存储在CSV文件中,而其他数据存储在JSON文件中。必须将所有这些信息汇总到一个文件中,以便AI可以读取。如果数据是以英制单位存储的,但AI需要公制单位,将需要进行转换。
在开始ETL过程之前,需要打开笔记本并导入所需的函数和模块。将使用glob模块来查找文件,使用pandas来处理数据,使用xml.etree.ElementTree来解析XML文件,以及使用datetime来记录时间。
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime
将使用名为“dealership_data”的文件,其中包含CSV、JSON和XML文件,用于二手车数据,这些数据包含车型、制造年份、价格和燃料类型等特征。将从原始数据中提取文件,并将其转换为目标文件并加载到输出中。
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
nzip datasource.zip -d dealership_data
接下来,将设置目标文件的路径,并定义临时文件和日志文件。
tmpfile = "dealership_temp.tmp" # 存储所有提取的数据
logfile = "dealership_logfile.txt" # 所有事件日志将存储在这里
targetfile = "dealership_transformed_data.csv" # 转换后的数据存储在这里
提取函数将从多个源批量提取大量数据。通过添加此函数,它将发现并加载所有CSV文件名,并将CSV文件添加到数据帧中,每次迭代都会添加,首先是第一次迭代,然后是第二次迭代,从而得到一系列提取的数据。
def extract_from_csv(file_to_process):
dataframe = pd.read_csv(file_to_process)
return dataframe
def extract_from_json(file_to_process):
dataframe = pd.read_json(file_to_process, lines=True)
return dataframe
def extract_from_xml(file_to_process):
dataframe = pd.DataFrame(columns=['car_model', 'year_of_manufacture', 'price', 'fuel'])
tree = ET.parse(file_to_process)
root = tree.getroot()
for person in root:
car_model = person.find("car_model").text
year_of_manufacture = int(person.find("year_of_manufacture").text)
price = float(person.find("price").text)
fuel = person.find("fuel").text
dataframe = dataframe.append({"car_model": car_model, "year_of_manufacture": year_of_manufacture, "price": price, "fuel": fuel}, ignore_index=True)
return dataframe
现在,将调用提取函数,分别对CSV、JSON和XML文件进行操作。
def extract():
extracted_data = pd.DataFrame(columns=['car_model', 'year_of_manufacture', 'price', 'fuel'])
for csvfile in glob.glob("dealership_data/*.csv"):
extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
for jsonfile in glob.glob("dealership_data/*.json"):
extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
for xmlfile in glob.glob("dealership_data/*.xml"):
extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)
return extracted_data
在收集数据之后,将进入“转换”阶段。这个函数将把英寸的高度列转换为毫米,把磅的重量列转换为千克,并返回结果。
def transform(data):
data['price'] = round(data.price, 2)
return data
现在已经收集并指定了数据,是时候将其加载到目标文件中了。在这种情况下,将pandas数据帧保存为CSV。已经完成了从各种来源提取、转换和加载数据到单一目标文件的步骤。在结束工作之前,需要建立一个日志条目。将通过编写一个日志函数来实现这一点。
def load(targetfile, data_to_load):
data_to_load.to_csv(targetfile)
def log(message):
timestamp_format = '%H:%M:%S-%h-%d-%Y'
now = datetime.now()
timestamp = now.strftime(timestamp_format)
with open("dealership_logfile.txt", "a") as f:
f.write(timestamp + ',' + message + '\n')
首先通过调用extract_data函数开始。从这一步收到的数据将被转移到数据转换的第二步。完成后,数据将被加载到目标文件中。注意,在每一步的开始和结束时,都添加了开始和完成的时间和日期。
log("ETL Job Started")
log("Extract phase Started")
extracted_data = extract()
log("Extract phase Ended")
log("Transform phase Started")
transformed_data = transform(extracted_data)
log("Transform phase Ended")
log("Load phase Started")
load(targetfile, transformed_data)
log("Load phase Ended")
log("ETL Job Ended")