Apache Airflow变量与Python操作符指南

Apache Airflow是数据工程师必备的工具之一,它简化了工作流的创建和监控过程。当有多个工作流时,可能会多次使用相同的数据库和文件路径。使用变量是定义这些共享信息的最有效方式之一。本文将介绍Apache Airflow中的变量概念,并提供一个Python操作符的示例。

目录

Apache Airflow简介

Apache Airflow是一个工作流引擎,可以轻松地调度和运行复杂的数据管道。它确保数据管道中的每个任务都按照正确的顺序执行,并为每个任务提供所需的资源。Airflow提供了一个出色的用户界面,用于监控和修复可能出现的任何问题。

启动Airflow

在本系列的中已经讨论了安装步骤。要启动Airflow服务器,请打开终端并运行以下命令。默认端口是8080,如果使用的端口用于其他服务,则可以更改它。

airflow webserver -p 8080

现在,在不同的终端中使用以下命令启动Airflow调度器。它将监控所有工作流并按分配触发它们。

airflow scheduler

确保在Airflow目录中有一个名为dags的文件夹,将在此定义DAG,并在Web浏览器中打开,将看到类似这样的界面。

Apache Airflow中的Python操作符

让从导入所需的库开始。这次将使用PythonOperator。

from airflow.operators.python_operator import PythonOperator

定义DAG参数。对于每个DAG,需要传递一个参数字典。以下是一些可以传递的参数的描述:

  • owner:工作流所有者的名称,应该是字母数字的,可以包含下划线,但不能包含任何空格。
  • depends_on_past:如果每次运行工作流时数据都依赖于之前的运行,则将其标记为True,否则为False。
  • start_date:工作流的开始日期
  • email:电子邮件ID,以便在任何任务由于任何原因失败时接收电子邮件。
  • retry_delay:如果任何任务失败,它应该等待多长时间再重试。

现在,将定义一个Python函数,该函数将使用参数打印一个字符串,这个函数稍后将由PythonOperator使用。

def print_hello(**kwargs): print('Apache Airflow是数据工程师的必备工具')

现在,将创建一个DAG对象,并传递dag_id,这是DAG的名称,并确保之前没有创建过任何具有此名称的DAG。传递之前定义的参数,并添加描述和schedule_interval,它将在指定的时间间隔后运行DAG。

from airflow import DAG dag = DAG('example_dag', default_args=default_args, schedule_interval=timedelta(days=1))

工作流中只有一个任务:print。在任务中,将使用Python函数在终端上打印“Apache Airflow是数据工程师的必备工具”。

t1 = PythonOperator( task_id='print_hello', python_callable=print_hello, dag=dag)

现在,当刷新Airflow仪表板时,将看到新DAG在列表中。点击DAG并打开图形视图,将看到类似这样的界面。工作流中的每个步骤都将在单独的框中。在此工作流中,只有一个步骤,即print。运行工作流并等待其边框变为深绿色,这表示它已成功完成。

点击节点“print”以获取有关此步骤的更多详细信息,然后点击Logs,将看到这样的输出。

知道Airflow可以用来创建和管理复杂的工作流。可以同时运行多个工作流。大多数工作流可能使用相同的数据库或相同的文件路径。现在,如果进行任何更改,例如更改保存文件的目录路径或更改数据库的配置。在这种情况下,不希望单独更新每个DAG。

Airflow为此提供了解决方案,可以创建变量,以便在多个DAG中存储和检索运行时数据。因此,如果发生任何重大更改,只需编辑变量,工作流就可以继续进行。

如何创建变量?打开Airflow仪表板,点击顶部菜单中的Admin,然后点击Variables。现在,点击Create创建一个新变量,将打开一个窗口。添加键和值并提交。在这里,正在创建一个名为data_path的变量,值为任何随机文本文件的路径。

现在,将创建一个DAG,将找出此文件中文本数据的词数。当想要使用变量时,需要导入它。让看看如何做到这一点:

from airflow.models import Variable data_path = Variable.get('data_path') def count_words_in_file(file_path): with open(file_path, 'r') as file: text = file.read() words = text.split() return len(words)
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485