ETL(提取、转换、加载)是数据工程中的一个管道过程,它涉及到大数据的创建。使用ETL工具来整合不同的数据源,包括数据库、电子表格、视频文件、音频等。在这个过程中,数据从数据源中提取出来,转换成可以分析的格式,并存储在大数据系统中,以便将来由机器学习工具进行建模。
ETL用于从多个数据源中合并数据,通常用于构建数据仓库。在这个过程中,数据从源系统中提取出来,转换成可以分析的格式,并存储在仓库或其他系统中。提取、加载、转换(ELT)是另一种相关的方法,旨在将处理推向数据库以提高性能。
ETL的使用始于20世纪70年代,当时公司开始使用多个存储库或数据库来存储不同类型的业务信息。数据整合的需求迅速增长。ETL成为了一个从不同来源提取数据,然后将其移动到数据仓库或大数据中的过程。
在20世纪90年代初,数据仓库爆炸式增长。作为一种不同的数据库模型,它们提供了从多个系统(包括大型机、小型机、个人计算机和电子表格)集成访问数据的能力。但不同的部门经常使用不同的ETL工具和不同的仓库。再加上合并和收购,许多公司最终拥有未集成的单独ETL解决方案。
目前,数据访问呈指数级增长,格式、来源和数据系统的数量几乎无限。ETL如今是企业中的关键过程。
许多企业采用ETL流程来获取推动最佳业务决策的数据。今天,从多个系统和来源整合数据仍然是大数据的命脉。
当与大数据一起使用时,ETL为公司提供了完整的历史背景。提供统一视图:
ETL使业务用户能够轻松分析和报告与他们计划相关的数据;
ETL已经发展到支持流数据等新兴集成需求;
组织需要ETL和ELT来合并数据,保持准确性,并提供存储数据的控制,创建报告和执行分析。
Airflow是由社区创建的平台,用于以编程方式编写、调度和监控工作流。
Airflow原则:
Airflow特点:
Airflow的安装和设置:
docker run -d -p 8080:8080 -v "$PWD/airflow/dags:/opt/airflow/dags/" --entrypoint=/bin/bash --name airflow apache/airflow:2.1.1-python3.8 -c '(airflow db init && airflow users create --username admin --password bootcamp --firstname Andre --lastname Lastname --role Admin --email [email protected]; airflow webserver & airflow scheduler)
安装环境所需的库:
docker container exec -it airflow bash
pip install pymysql xlrd openpyxl minio
如果没有错误,请访问Apache Airflow用户界面的地址(*等待大约5分钟后打开终端):
创建以下变量:
data_lake_server = 172.17.0.4:9001
data_lake_login = minioadmin
data_lake_password = minioadmin
database_server = 172.17.0.2
database_login = root
database_password = bootcamp
database_name = employees
from datetime import datetime, date, timedelta
import pandas as pd
from io import BytesIO
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable
from minio import Minio
from sqlalchemy.engine import create_engine
DEFAULT_ARGS = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 13),
}
dag = DAG('etl_department_salary_left_att', default_args=DEFAULT_ARGS, schedule_interval="@once")
data_lake_server = Variable.get("data_lake_server")
data_lake_login = Variable.get("data_lake_login")
data_lake_password = Variable.get("data_lake_password")
database_server = Variable.get("database_server")
database_login = Variable.get("database_login")
database_password = Variable.get("database_password")
database_name = Variable.get("database_name")
url_connection = "mysql+pymysql://{}:{}@{}/{}".format(str(database_login), str(database_password), str(database_server), str(database_name))
engine = create_engine(url_connection)
client = Minio(data_lake_server, access_key=data_lake_login, secret_key=data_lake_password, secure=False)
def extract():
query = """SELECT emp.department as department,sal.salary as salary, emp.left FROM employees emp INNER JOIN salaries sal ON emp.emp_no = sal.emp_id;"""
df_ = pd.read_sql_query(query, engine)
df_.to_csv("/tmp/department_salary_left.csv", index=False)
def load():
df_ = pd.read_csv("/tmp/department_salary_left.csv")
df_.to_parquet("/tmp/department_salary_left.parquet", index=False)
client.fput_object("processing", "department_salary_left.parquet", "/tmp/department_salary_left.parquet")
extract_task = PythonOperator(task_id='extract_data_from_database', provide_context=True, python_callable=extract, dag=dag)
load_task = PythonOperator(task_id='load_file_to_data_lake', provide_context=True, python_callable=load, dag=dag)
clean_task = BashOperator(task_id="clean_files_on_staging", bash_command="rm -f /tmp/*.csv;rm -f /tmp/*.json;rm -f /tmp/*.parquet;", dag=dag)
extract_task >> load_task >> clean_task