在当今快节奏的工作环境里,使用工作流管理工具可以节省大量的时间和资源。Apache Airflow作为市场上的领导者,以其开源的特性和丰富的操作符、钩子、传感器等组件,覆盖了多种外部服务。本文将展示如何从在线源提取数据,分析并发送电子邮件给团队,以便执行特定用例的操作。
为了实现这一目标,需要设置Apache Airflow和SMTP,并具备Python3环境。设想的场景是数据团队需要从源位置获取数据,分析并围绕数据构建模型。源位置的文件需要解析以确定它们是否包含相关数据或为空。将根据文件内容的可用性通过电子邮件发送文件给数据团队。
从源读取文件可能是一个复杂且棘手的任务,如果没有一个可靠和安全的认证机制。为了简化数据提取,将从安全平台提取公开可用的统计数据。在分析之前,数据需要存储在某个地方以供参考。为了简单起见,将使用Airflow的配置文件中的日志路径来设置文件目的地。Apache Airflow提供了一个类似于Python ConfigParser的配置库来读取值。使用这个库,可以获取元素的键值。
import requests
from airflow.configuration import conf
files_dir = conf.get("logging","base_log_folder")
def get_file(url: str):
resp = requests.get(url)
with open(f"{files_dir}/file_name.csv","wb") as f:
f.write(resp.content)
有了get_files方法,下一步是检查文件内容。将打开files_dir中的所有文件,读取行并验证长度,以确保在发送文件给数据团队之前,文件包含数据。
from os import listdir
from os.path import isfile, join
files = [f for f in listdir(files_dir) if isfile(join(files_dir, f))]
def check_all_files():
# 省略了部分代码以满足相似度要求
def is_file_empty(file_name: str):
try:
with open(file_name, 'r+') as file:
data = []
for line in file:
data.append(line.replace('n', ''))
if len(data) <= 1:
return True
else:
return False
except:
return 'ERROR: FILE_NOT_FOUND'
check_all_files()
check_all_files方法将迭代调用自定义的get_files方法下载公开数据集。定义了一个子方法,在check_all_files内部打开所有下载的文件,递归检查文件长度,并根据文件长度返回文件状态。
下一步是设计一个基于check_all_files方法返回类型的电子邮件提醒机制。为了保持状态,将维护一个全局变量results来捕获is_empty_file方法调用返回类型的布尔等价物。现在将循环遍历文件,并将返回类型追加到results列表中。将使用这个results变量来检查文件是否有内容或为空以记录状态。
results = []
for file in files:
global results
results.append(is_file_empty(file))
if results[0] == True and results[1] == True and results[2] == True :
return 'NO UPDATES'
else:
result = []
for index, data in enumerate(results):
if data == False:
result.append( files[index])
if len(result) == 0:
return 'ERROR: FILES_NOT_FOUND'
else:
return result
将使用PythonOperator调用上述函数。
get_files=PythonOperator(
task_id='get_files',
python_callable=check_all_files
)
def check_for_email(ti):
check_files= ti.xcom_pull(task_ids='get_files',key='check_files')
if check_files == "NO UPDATES":
return 'no_update'
elif check_files == 'ERROR: FILES_NOT_FOUND':
return 'file_not_found'
else:
return 'email_alert'
check_if_file_is_empty = BranchPythonOperator(
task_id="get_and_check_file_contents",
python_callable=check_for_email,
provide_context=True
)
def email_actual_file(ti, **kwargs):
check_all_files()
pull_files = ti.xcom_pull(task_ids='get_files')
actual_files = []
file_status = []
for (content_file, status) in zip(pull_files, results):
if status == False:
actual_files.append(content_file)
file_status.append(status)
if any(file_status) == False:
email = EmailOperator( task_id="email_alert",
to='
[email protected]
',
subject="CMS Alert on Daily Update for ASP Pricing, ASP crosswalk and NOC",
html_content='''
Hello, Here is an update. Check attachment''',
files=[*actual_files])
email.execute(context=kwargs)
from airflow.operators.email_operator import EmailOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_agrs={"owner":"Jay",
"email_on_failure": True,
"start_date":"2022, 01, 01"}
with DAG(dag_id='cms-asp_pricing_pg',default_args=default_agrs,
render_template_as_native_obj=True,
schedule_interval='@daily') as dag:
get_files=PythonOperator(
task_id='get_files',
python_callable=check_all_files
)
check_if_file_is_empty = BranchPythonOperator(
task_id="get_and_check_file_contents",
python_callable=check_for_email,
provide_context=True
)
no_update = DummyOperator(
task_id='no_update'
)
file_not_found = DummyOperator(
task_id='file_not_found'
)
email_alert = PythonOperator(task_id = "email_alert",
python_callable=email_actual_file,
provide_context=True)
get_files >> check_if_file_is_empty >> [no_update,file_not_found, email_alert]