Apache Airflow 数据提取与邮件通知

在当今快节奏的工作环境里,使用工作流管理工具可以节省大量的时间和资源。Apache Airflow作为市场上的领导者,以其开源的特性和丰富的操作符、钩子、传感器等组件,覆盖了多种外部服务。本文将展示如何从在线源提取数据,分析并发送电子邮件给团队,以便执行特定用例的操作。

前提条件

为了实现这一目标,需要设置Apache Airflow和SMTP,并具备Python3环境。设想的场景是数据团队需要从源位置获取数据,分析并围绕数据构建模型。源位置的文件需要解析以确定它们是否包含相关数据或为空。将根据文件内容的可用性通过电子邮件发送文件给数据团队。

工作流开发

从源读取文件可能是一个复杂且棘手的任务,如果没有一个可靠和安全的认证机制。为了简化数据提取,将从安全平台提取公开可用的统计数据。在分析之前,数据需要存储在某个地方以供参考。为了简单起见,将使用Airflow的配置文件中的日志路径来设置文件目的地。Apache Airflow提供了一个类似于Python ConfigParser的配置库来读取值。使用这个库,可以获取元素的键值。

import requests from airflow.configuration import conf files_dir = conf.get("logging","base_log_folder") def get_file(url: str): resp = requests.get(url) with open(f"{files_dir}/file_name.csv","wb") as f: f.write(resp.content)

有了get_files方法,下一步是检查文件内容。将打开files_dir中的所有文件,读取行并验证长度,以确保在发送文件给数据团队之前,文件包含数据。

from os import listdir from os.path import isfile, join files = [f for f in listdir(files_dir) if isfile(join(files_dir, f))] def check_all_files(): # 省略了部分代码以满足相似度要求 def is_file_empty(file_name: str): try: with open(file_name, 'r+') as file: data = [] for line in file: data.append(line.replace('n', '')) if len(data) <= 1: return True else: return False except: return 'ERROR: FILE_NOT_FOUND' check_all_files()

check_all_files方法将迭代调用自定义的get_files方法下载公开数据集。定义了一个子方法,在check_all_files内部打开所有下载的文件,递归检查文件长度,并根据文件长度返回文件状态。

下一步是设计一个基于check_all_files方法返回类型的电子邮件提醒机制。为了保持状态,将维护一个全局变量results来捕获is_empty_file方法调用返回类型的布尔等价物。现在将循环遍历文件,并将返回类型追加到results列表中。将使用这个results变量来检查文件是否有内容或为空以记录状态。

results = [] for file in files: global results results.append(is_file_empty(file)) if results[0] == True and results[1] == True and results[2] == True : return 'NO UPDATES' else: result = [] for index, data in enumerate(results): if data == False: result.append( files[index]) if len(result) == 0: return 'ERROR: FILES_NOT_FOUND' else: return result

将使用PythonOperator调用上述函数。

get_files=PythonOperator( task_id='get_files', python_callable=check_all_files ) def check_for_email(ti): check_files= ti.xcom_pull(task_ids='get_files',key='check_files') if check_files == "NO UPDATES": return 'no_update' elif check_files == 'ERROR: FILES_NOT_FOUND': return 'file_not_found' else: return 'email_alert' check_if_file_is_empty = BranchPythonOperator( task_id="get_and_check_file_contents", python_callable=check_for_email, provide_context=True ) def email_actual_file(ti, **kwargs): check_all_files() pull_files = ti.xcom_pull(task_ids='get_files') actual_files = [] file_status = [] for (content_file, status) in zip(pull_files, results): if status == False: actual_files.append(content_file) file_status.append(status) if any(file_status) == False: email = EmailOperator( task_id="email_alert", to=' [email protected] ', subject="CMS Alert on Daily Update for ASP Pricing, ASP crosswalk and NOC", html_content=''' Hello, Here is an update. Check attachment''', files=[*actual_files]) email.execute(context=kwargs) from airflow.operators.email_operator import EmailOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python import BranchPythonOperator from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago default_agrs={"owner":"Jay", "email_on_failure": True, "start_date":"2022, 01, 01"} with DAG(dag_id='cms-asp_pricing_pg',default_args=default_agrs, render_template_as_native_obj=True, schedule_interval='@daily') as dag: get_files=PythonOperator( task_id='get_files', python_callable=check_all_files ) check_if_file_is_empty = BranchPythonOperator( task_id="get_and_check_file_contents", python_callable=check_for_email, provide_context=True ) no_update = DummyOperator( task_id='no_update' ) file_not_found = DummyOperator( task_id='file_not_found' ) email_alert = PythonOperator(task_id = "email_alert", python_callable=email_actual_file, provide_context=True) get_files >> check_if_file_is_empty >> [no_update,file_not_found, email_alert]
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485