Apache Airflow入门指南

在日常工作中,经常需要处理各种工作流程,比如从多个数据库中收集数据、预处理数据、上传数据和报告数据。如果这些日常任务能够自动触发并在预定时间内执行,那将大大提高工作效率。Apache Airflow就是这样一个可以帮助工具。无论是数据科学家、数据工程师还是软件工程师,都会发现这个工具非常有用。

目录

  • 引言
  • Apache Airflow是什么?
  • Apache Airflow的特点
  • 安装步骤
  • Apache Airflow的组件
  • 用户界面
  • 定义第一个DAG
  • 结论

Apache Airflow是什么?

Apache Airflow是一个工作流引擎,它可以轻松地调度和运行复杂的数据管道。它确保数据管道中的每个任务都按照正确的顺序执行,并且每个任务都能获得所需的资源。它提供了一个出色的用户界面,用于监控和修复可能出现的问题。

Apache Airflow的特点

易于使用:如果有Python知识,可以轻松地在Airflow上部署。

开源:它是免费的开源软件,拥有许多活跃的用户。

强大的集成:它提供了现成的操作符,让可以与Google Cloud Platform、Amazon AWS、Microsoft Azure等合作。

使用标准Python编码:可以使用Python创建简单到复杂的工作流,具有完全的灵活性。

出色的用户界面:可以监控和管理工作流。它允许检查已完成和正在进行的任务的状态。

安装步骤

让开始安装Apache Airflow。如果系统中已经安装了pip,可以跳过第一个命令。要安装pip,请在终端中运行以下命令:

sudo apt-get install python3-pip

接下来,Airflow需要在本地系统上有一个家。默认情况下,~/airflow是默认位置,但可以根据需要更改它。

export AIRFLOW_HOME=~/airflow

现在,使用pip安装apache airflow:

pip3 install apache-airflow

Airflow需要一个数据库后端来运行工作流并维护它们。要初始化数据库,请运行以下命令:

airflow initdb

已经讨论过Airflow有一个出色的用户界面。要启动web服务器,请在终端中运行以下命令。默认端口是8080,如果使用的端口用于其他事情,那么可以更改它。

airflow webserver -p 8080

现在,使用以下命令启动Airflow调度器。它将一直运行并监控所有工作流并触发它们,就像分配的那样。

airflow scheduler

现在,在Airflow目录中创建一个名为dags的文件夹,将在其中定义工作流或DAGs,并打开Web浏览器,转到http://localhost:8080/admin/,将看到类似这样的界面:

Apache Airflow的组件

DAG:它是有向无环图——一个想要运行的所有任务的集合,它被组织起来并显示了不同任务之间的关系。它在Python脚本中定义。

Web服务器:它是构建在Flask上的用户界面。它允许监控DAGs的状态并触发它们。

元数据数据库:Airflow存储所有任务的状态在一个数据库中,并从这里执行工作流的所有读写操作。

调度器:顾名思义,这个组件负责调度DAGs的执行。它检索和更新数据库中的任务状态。

用户界面

现在已经安装了Airflow,让快速了解一下用户界面的一些组件。

DAGS视图:这是用户界面的默认视图。这将列出系统中存在的所有DAGs。它将给一个DAGs的总结视图,比如一个特定的DAG运行成功了多少次,失败了多少次,最后一次执行时间,以及其他一些有用的链接。

图形视图:在图形视图中,可以可视化工作流的每一步及其依赖关系和当前状态。可以通过不同的颜色代码检查当前状态:

树视图:树视图也代表DAG。如果认为管道执行时间比预期的要长,那么可以检查哪一部分执行时间较长,然后可以在这方面工作。

任务持续时间:在这个视图中,可以比较在不同时间间隔运行的任务的持续时间。可以在这里优化算法并比较性能。

代码:在这个视图中,可以快速查看用于生成DAG的代码。

定义第一个DAG

让开始定义第一个DAG。

在本节中,将创建一个工作流,其中第一步是在终端上打印“获取实时板球得分”,然后使用API在终端上打印实时得分。让先测试API,为此,需要安装cricket-cli库,使用以下命令。

sudo pip3 install cricket-cli

现在,运行以下命令并获取得分。

cricket scores

这可能需要一些时间,取决于互联网连接,并将返回类似于这样的输出:

导入库

现在,将使用Apache Airflow创建相同的工作流。代码将完全用Python定义一个DAG。让从导入需要的库开始。只需要使用BashOperator,因为工作流只需要运行Bash操作。

# 查看代码在Gist上。

定义DAG参数

对于每个DAG,需要传递一个参数字典。以下是一些可以传递的参数的描述:

owner:工作流所有者的名称,应该是字母数字的,可以有下划线,但不能包含任何空格。

depends_on_past:如果每次运行工作流时,数据都依赖于过去的运行,则将其标记为True,否则标记为False。

start_date:工作流的开始日期

email:电子邮件ID,以便在任何任务由于任何原因失败时接收电子邮件。

retry_delay:如果任何任务失败,那么它应该等待多长时间再重试。

# 查看代码在Gist上。

定义DAG

现在,将创建一个DAG对象,并传递dag_id,这是DAG的名称,它应该是唯一的。传递在上一步中定义的参数,并添加一个描述和schedule_interval,它将在指定的时间间隔后运行DAG。

# 查看代码在Gist上。

定义任务

工作流将有两个任务:

print:在第一个任务中,将使用echo命令在终端上打印“获取实时板球得分!”。

get_cricket_scores:在第二个任务中,将使用安装的库打印实时板球得分。

在定义任务时,首先需要为任务选择正确的操作符。这里两个命令都是基于终端的,所以将使用BashOperator。将传递task_id,这是任务的唯一标识符,将在DAG的图形视图的节点上看到这个名字。传递想要运行的bash命令,最后传递想要链接这个任务的DAG对象。

最后,通过在任务之间添加“>>”操作符来创建管道。

# 查看代码在Gist上。
  • Apache Hive入门——所有大数据和数据工程专业人员必须知道的工具
  • Hadoop生态系统入门——大数据和数据工程
  • Apache Hive中的表类型——快速概览
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485