Apache Airflow是数据工程师必备的工具之一,它简化了工作流的创建和监控过程。当有多个工作流时,可能会多次使用相同的数据库和文件路径。使用变量是定义这些共享信息的最有效方式之一。本文将介绍Apache Airflow中的变量概念,并提供一个Python操作符的示例。
Apache Airflow是一个工作流引擎,可以轻松地调度和运行复杂的数据管道。它确保数据管道中的每个任务都按照正确的顺序执行,并为每个任务提供所需的资源。Airflow提供了一个出色的用户界面,用于监控和修复可能出现的任何问题。
在本系列的中已经讨论了安装步骤。要启动Airflow服务器,请打开终端并运行以下命令。默认端口是8080,如果使用的端口用于其他服务,则可以更改它。
airflow webserver -p 8080
现在,在不同的终端中使用以下命令启动Airflow调度器。它将监控所有工作流并按分配触发它们。
airflow scheduler
确保在Airflow目录中有一个名为dags
的文件夹,将在此定义DAG,并在Web浏览器中打开,将看到类似这样的界面。
让从导入所需的库开始。这次将使用PythonOperator。
from airflow.operators.python_operator import PythonOperator
定义DAG参数。对于每个DAG,需要传递一个参数字典。以下是一些可以传递的参数的描述:
现在,将定义一个Python函数,该函数将使用参数打印一个字符串,这个函数稍后将由PythonOperator使用。
def print_hello(**kwargs):
print('Apache Airflow是数据工程师的必备工具')
现在,将创建一个DAG对象,并传递dag_id,这是DAG的名称,并确保之前没有创建过任何具有此名称的DAG。传递之前定义的参数,并添加描述和schedule_interval,它将在指定的时间间隔后运行DAG。
from airflow import DAG
dag = DAG('example_dag', default_args=default_args, schedule_interval=timedelta(days=1))
工作流中只有一个任务:print。在任务中,将使用Python函数在终端上打印“Apache Airflow是数据工程师的必备工具”。
t1 = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag)
现在,当刷新Airflow仪表板时,将看到新DAG在列表中。点击DAG并打开图形视图,将看到类似这样的界面。工作流中的每个步骤都将在单独的框中。在此工作流中,只有一个步骤,即print。运行工作流并等待其边框变为深绿色,这表示它已成功完成。
点击节点“print”以获取有关此步骤的更多详细信息,然后点击Logs,将看到这样的输出。
知道Airflow可以用来创建和管理复杂的工作流。可以同时运行多个工作流。大多数工作流可能使用相同的数据库或相同的文件路径。现在,如果进行任何更改,例如更改保存文件的目录路径或更改数据库的配置。在这种情况下,不希望单独更新每个DAG。
Airflow为此提供了解决方案,可以创建变量,以便在多个DAG中存储和检索运行时数据。因此,如果发生任何重大更改,只需编辑变量,工作流就可以继续进行。
如何创建变量?打开Airflow仪表板,点击顶部菜单中的Admin,然后点击Variables。现在,点击Create创建一个新变量,将打开一个窗口。添加键和值并提交。在这里,正在创建一个名为data_path的变量,值为任何随机文本文件的路径。
现在,将创建一个DAG,将找出此文件中文本数据的词数。当想要使用变量时,需要导入它。让看看如何做到这一点:
from airflow.models import Variable
data_path = Variable.get('data_path')
def count_words_in_file(file_path):
with open(file_path, 'r') as file:
text = file.read()
words = text.split()
return len(words)