ETL与大数据集成实践

  • ETL是什么?
  • ETL的历史
  • ETL的优势
  • Apache Airflow简介
  • 如何安装Apache Airflow
  • Airflow配置
  • Airflow DAG
  • Airflow运行
  • Airflow的替代方案
  • 结论
  • 参考文献

ETL是什么?

ETL(提取、转换、加载)是数据工程中的一个管道过程,它涉及到大数据的创建。使用ETL工具来整合不同的数据源,包括数据库、电子表格、视频文件、音频等。在这个过程中,数据从数据源中提取出来,转换成可以分析的格式,并存储在大数据系统中,以便将来由机器学习工具进行建模。

ETL用于从多个数据源中合并数据,通常用于构建数据仓库。在这个过程中,数据从源系统中提取出来,转换成可以分析的格式,并存储在仓库或其他系统中。提取、加载、转换(ELT)是另一种相关的方法,旨在将处理推向数据库以提高性能。

ETL的使用始于20世纪70年代,当时公司开始使用多个存储库或数据库来存储不同类型的业务信息。数据整合的需求迅速增长。ETL成为了一个从不同来源提取数据,然后将其移动到数据仓库大数据中的过程。

在20世纪90年代初,数据仓库爆炸式增长。作为一种不同的数据库模型,它们提供了从多个系统(包括大型机、小型机、个人计算机和电子表格)集成访问数据的能力。但不同的部门经常使用不同的ETL工具和不同的仓库。再加上合并和收购,许多公司最终拥有未集成的单独ETL解决方案。

目前,数据访问呈指数级增长,格式、来源和数据系统的数量几乎无限。ETL如今是企业中的关键过程。

ETL的优势

许多企业采用ETL流程来获取推动最佳业务决策的数据。今天,从多个系统和来源整合数据仍然是大数据的命脉。

当与大数据一起使用时,ETL为公司提供了完整的历史背景。提供统一视图:

ETL使业务用户能够轻松分析和报告与他们计划相关的数据;

ETL已经发展到支持流数据等新兴集成需求;

组织需要ETL和ELT来合并数据,保持准确性,并提供存储数据的控制,创建报告和执行分析。

Apache Airflow:工作流管理平台

Airflow是由社区创建的平台,用于以编程方式编写、调度和监控工作流。

Airflow原则:

  • 可扩展性
  • 动态性
  • 广泛性
  • 优雅性

Airflow特点:

  • 纯Python
  • 有用的用户界面
  • 强大的集成
  • 开源

如何安装Apache Airflow

Airflow的安装和设置:

  1. 在示例目录中创建airflow目录。
  2. 导航到airflow目录并创建dags目录。
  3. 下载镜像并在Docker中运行Apache Airflow对象
  4. 如果使用的是Windows,请打开Shell终端并运行命令:
docker run -d -p 8080:8080 -v "$PWD/airflow/dags:/opt/airflow/dags/" --entrypoint=/bin/bash --name airflow apache/airflow:2.1.1-python3.8 -c '(airflow db init && airflow users create --username admin --password bootcamp --firstname Andre --lastname Lastname --role Admin --email [email protected]; airflow webserver & airflow scheduler)

安装环境所需的库:

docker container exec -it airflow bash pip install pymysql xlrd openpyxl minio

如果没有错误,请访问Apache Airflow用户界面的地址(*等待大约5分钟后打开终端):

Airflow配置

创建以下变量:

data_lake_server = 172.17.0.4:9001 data_lake_login = minioadmin data_lake_password = minioadmin database_server = 172.17.0.2 database_login = root database_password = bootcamp database_name = employees

Airflow DAG

from datetime import datetime, date, timedelta import pandas as pd from io import BytesIO from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.bash import BashOperator from airflow.models import Variable from minio import Minio from sqlalchemy.engine import create_engine DEFAULT_ARGS = { 'owner': 'Airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 13), } dag = DAG('etl_department_salary_left_att', default_args=DEFAULT_ARGS, schedule_interval="@once") data_lake_server = Variable.get("data_lake_server") data_lake_login = Variable.get("data_lake_login") data_lake_password = Variable.get("data_lake_password") database_server = Variable.get("database_server") database_login = Variable.get("database_login") database_password = Variable.get("database_password") database_name = Variable.get("database_name") url_connection = "mysql+pymysql://{}:{}@{}/{}".format(str(database_login), str(database_password), str(database_server), str(database_name)) engine = create_engine(url_connection) client = Minio(data_lake_server, access_key=data_lake_login, secret_key=data_lake_password, secure=False) def extract(): query = """SELECT emp.department as department,sal.salary as salary, emp.left FROM employees emp INNER JOIN salaries sal ON emp.emp_no = sal.emp_id;""" df_ = pd.read_sql_query(query, engine) df_.to_csv("/tmp/department_salary_left.csv", index=False) def load(): df_ = pd.read_csv("/tmp/department_salary_left.csv") df_.to_parquet("/tmp/department_salary_left.parquet", index=False) client.fput_object("processing", "department_salary_left.parquet", "/tmp/department_salary_left.parquet") extract_task = PythonOperator(task_id='extract_data_from_database', provide_context=True, python_callable=extract, dag=dag) load_task = PythonOperator(task_id='load_file_to_data_lake', provide_context=True, python_callable=load, dag=dag) clean_task = BashOperator(task_id="clean_files_on_staging", bash_command="rm -f /tmp/*.csv;rm -f /tmp/*.json;rm -f /tmp/*.parquet;", dag=dag) extract_task >> load_task >> clean_task
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485