在日常工作中,经常需要处理各种工作流程,比如从多个数据库中收集数据、预处理数据、上传数据和报告数据。如果这些日常任务能够自动触发并在预定时间内执行,那将大大提高工作效率。Apache Airflow就是这样一个可以帮助工具。无论是数据科学家、数据工程师还是软件工程师,都会发现这个工具非常有用。
Apache Airflow是一个工作流引擎,它可以轻松地调度和运行复杂的数据管道。它确保数据管道中的每个任务都按照正确的顺序执行,并且每个任务都能获得所需的资源。它提供了一个出色的用户界面,用于监控和修复可能出现的问题。
易于使用:如果有Python知识,可以轻松地在Airflow上部署。
开源:它是免费的开源软件,拥有许多活跃的用户。
强大的集成:它提供了现成的操作符,让可以与Google Cloud Platform、Amazon AWS、Microsoft Azure等合作。
使用标准Python编码:可以使用Python创建简单到复杂的工作流,具有完全的灵活性。
出色的用户界面:可以监控和管理工作流。它允许检查已完成和正在进行的任务的状态。
让开始安装Apache Airflow。如果系统中已经安装了pip,可以跳过第一个命令。要安装pip,请在终端中运行以下命令:
sudo apt-get install python3-pip
接下来,Airflow需要在本地系统上有一个家。默认情况下,~/airflow
是默认位置,但可以根据需要更改它。
export AIRFLOW_HOME=~/airflow
现在,使用pip安装apache airflow:
pip3 install apache-airflow
Airflow需要一个数据库后端来运行工作流并维护它们。要初始化数据库,请运行以下命令:
airflow initdb
已经讨论过Airflow有一个出色的用户界面。要启动web服务器,请在终端中运行以下命令。默认端口是8080,如果使用的端口用于其他事情,那么可以更改它。
airflow webserver -p 8080
现在,使用以下命令启动Airflow调度器。它将一直运行并监控所有工作流并触发它们,就像分配的那样。
airflow scheduler
现在,在Airflow目录中创建一个名为dags的文件夹,将在其中定义工作流或DAGs,并打开Web浏览器,转到http://localhost:8080/admin/,将看到类似这样的界面:
DAG:它是有向无环图——一个想要运行的所有任务的集合,它被组织起来并显示了不同任务之间的关系。它在Python脚本中定义。
Web服务器:它是构建在Flask上的用户界面。它允许监控DAGs的状态并触发它们。
元数据数据库:Airflow存储所有任务的状态在一个数据库中,并从这里执行工作流的所有读写操作。
调度器:顾名思义,这个组件负责调度DAGs的执行。它检索和更新数据库中的任务状态。
现在已经安装了Airflow,让快速了解一下用户界面的一些组件。
DAGS视图:这是用户界面的默认视图。这将列出系统中存在的所有DAGs。它将给一个DAGs的总结视图,比如一个特定的DAG运行成功了多少次,失败了多少次,最后一次执行时间,以及其他一些有用的链接。
图形视图:在图形视图中,可以可视化工作流的每一步及其依赖关系和当前状态。可以通过不同的颜色代码检查当前状态:
树视图:树视图也代表DAG。如果认为管道执行时间比预期的要长,那么可以检查哪一部分执行时间较长,然后可以在这方面工作。
任务持续时间:在这个视图中,可以比较在不同时间间隔运行的任务的持续时间。可以在这里优化算法并比较性能。
代码:在这个视图中,可以快速查看用于生成DAG的代码。
让开始定义第一个DAG。
在本节中,将创建一个工作流,其中第一步是在终端上打印“获取实时板球得分”,然后使用API在终端上打印实时得分。让先测试API,为此,需要安装cricket-cli
库,使用以下命令。
sudo pip3 install cricket-cli
现在,运行以下命令并获取得分。
cricket scores
这可能需要一些时间,取决于互联网连接,并将返回类似于这样的输出:
现在,将使用Apache Airflow创建相同的工作流。代码将完全用Python定义一个DAG。让从导入需要的库开始。只需要使用BashOperator,因为工作流只需要运行Bash操作。
# 查看代码在Gist上。
对于每个DAG,需要传递一个参数字典。以下是一些可以传递的参数的描述:
owner:工作流所有者的名称,应该是字母数字的,可以有下划线,但不能包含任何空格。
depends_on_past:如果每次运行工作流时,数据都依赖于过去的运行,则将其标记为True,否则标记为False。
start_date:工作流的开始日期
email:电子邮件ID,以便在任何任务由于任何原因失败时接收电子邮件。
retry_delay:如果任何任务失败,那么它应该等待多长时间再重试。
# 查看代码在Gist上。
现在,将创建一个DAG对象,并传递dag_id
,这是DAG的名称,它应该是唯一的。传递在上一步中定义的参数,并添加一个描述和schedule_interval
,它将在指定的时间间隔后运行DAG。
# 查看代码在Gist上。
工作流将有两个任务:
print:在第一个任务中,将使用echo命令在终端上打印“获取实时板球得分!”。
get_cricket_scores:在第二个任务中,将使用安装的库打印实时板球得分。
在定义任务时,首先需要为任务选择正确的操作符。这里两个命令都是基于终端的,所以将使用BashOperator。将传递task_id
,这是任务的唯一标识符,将在DAG的图形视图的节点上看到这个名字。传递想要运行的bash命令,最后传递想要链接这个任务的DAG对象。
最后,通过在任务之间添加“>>”操作符来创建管道。
# 查看代码在Gist上。