数据工程与ETL流程详解

数据工程是一个快速发展的技术领域,专注于从多种数据源中提取数据,将其转换为特定格式,并加载到单一的数据源中进行分析。本文将通过实际例子,帮助学习如何使用Python进行网络爬虫和API数据提取,从而开始数据工程之旅。

为什么数据工程更可靠?

数据工程是目前最可靠且增长最快的技术职业之一,因为它更多地集中在网络爬虫和数据集的抓取上。通过ETL(提取、转换、加载)循环,可以将来自多个源的数据集成到单一的信息源中。

ETL循环过程

ETL是从一个多样化的来源和格式中提取大量数据,并将其转换为单一格式,然后放入数据库或目标文件中的过程。例如,数据可能存储在CSV文件中,而其他数据存储在JSON文件中。必须将所有这些信息汇总到一个文件中,以便AI可以读取。如果数据是以英制单位存储的,但AI需要公制单位,将需要进行转换。

ETL步骤详解

在开始ETL过程之前,需要打开笔记本并导入所需的函数和模块。将使用glob模块来查找文件,使用pandas来处理数据,使用xml.etree.ElementTree来解析XML文件,以及使用datetime来记录时间。

import glob import pandas as pd import xml.etree.ElementTree as ET from datetime import datetime

将使用名为“dealership_data”的文件,其中包含CSV、JSON和XML文件,用于二手车数据,这些数据包含车型、制造年份、价格和燃料类型等特征。将从原始数据中提取文件,并将其转换为目标文件并加载到输出中。

!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip nzip datasource.zip -d dealership_data

接下来,将设置目标文件的路径,并定义临时文件和日志文件。

tmpfile = "dealership_temp.tmp" # 存储所有提取的数据 logfile = "dealership_logfile.txt" # 所有事件日志将存储在这里 targetfile = "dealership_transformed_data.csv" # 转换后的数据存储在这里

提取函数将从多个源批量提取大量数据。通过添加此函数,它将发现并加载所有CSV文件名,并将CSV文件添加到数据帧中,每次迭代都会添加,首先是第一次迭代,然后是第二次迭代,从而得到一系列提取的数据。

def extract_from_csv(file_to_process): dataframe = pd.read_csv(file_to_process) return dataframe def extract_from_json(file_to_process): dataframe = pd.read_json(file_to_process, lines=True) return dataframe def extract_from_xml(file_to_process): dataframe = pd.DataFrame(columns=['car_model', 'year_of_manufacture', 'price', 'fuel']) tree = ET.parse(file_to_process) root = tree.getroot() for person in root: car_model = person.find("car_model").text year_of_manufacture = int(person.find("year_of_manufacture").text) price = float(person.find("price").text) fuel = person.find("fuel").text dataframe = dataframe.append({"car_model": car_model, "year_of_manufacture": year_of_manufacture, "price": price, "fuel": fuel}, ignore_index=True) return dataframe

现在,将调用提取函数,分别对CSV、JSON和XML文件进行操作。

def extract(): extracted_data = pd.DataFrame(columns=['car_model', 'year_of_manufacture', 'price', 'fuel']) for csvfile in glob.glob("dealership_data/*.csv"): extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True) for jsonfile in glob.glob("dealership_data/*.json"): extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True) for xmlfile in glob.glob("dealership_data/*.xml"): extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True) return extracted_data

在收集数据之后,将进入“转换”阶段。这个函数将把英寸的高度列转换为毫米,把磅的重量列转换为千克,并返回结果。

def transform(data): data['price'] = round(data.price, 2) return data

现在已经收集并指定了数据,是时候将其加载到目标文件中了。在这种情况下,将pandas数据帧保存为CSV。已经完成了从各种来源提取、转换和加载数据到单一目标文件的步骤。在结束工作之前,需要建立一个日志条目。将通过编写一个日志函数来实现这一点。

def load(targetfile, data_to_load): data_to_load.to_csv(targetfile) def log(message): timestamp_format = '%H:%M:%S-%h-%d-%Y' now = datetime.now() timestamp = now.strftime(timestamp_format) with open("dealership_logfile.txt", "a") as f: f.write(timestamp + ',' + message + '\n')

首先通过调用extract_data函数开始。从这一步收到的数据将被转移到数据转换的第二步。完成后,数据将被加载到目标文件中。注意,在每一步的开始和结束时,都添加了开始和完成的时间和日期。

log("ETL Job Started") log("Extract phase Started") extracted_data = extract() log("Extract phase Ended") log("Transform phase Started") transformed_data = transform(extracted_data) log("Transform phase Ended") log("Load phase Started") load(targetfile, transformed_data) log("Load phase Ended") log("ETL Job Ended")
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485