在生产环境运行Airflow一段时间后,由于定时Job会在DagRun,TaskInstance等表插入大量的数据,会逐渐拖慢Airflow系统的内部SQL查询,进一步会影响前端管理页面的响应速度,所以需要定时清理不需要的历史数据,来保证前端管理页面的响应速度。
根据Airflow版本的不同,分为1.10(V1)版本和2.0之后(V2)的版本两种,代码有细微差异。
V2
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Ref: https://docs.aws.amazon.com/mwaa/latest/userguide/samples-database-cleanup.html
"""
from airflow import DAG, settings
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagModel, DagRun, ImportError, Log, SlaMiss, TaskFail, TaskInstance, \
TaskReschedule, Variable, XCom, RenderedTaskInstanceFields
from datetime import timedelta
import os
DEFAULT_ARGS = {
'owner': 'dataops',
'depends_on_past': False,
'email': ['your_email@domain.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
DEFAULT_MAX_AGE_IN_DAYS = 90
# 定义需要被清理的对象和时间类型的过滤字段
OBJECTS_TO_CLEAN = [[BaseJob, BaseJob.latest_heartbeat],
[DagRun, DagRun.execution_date],
[ImportError, ImportError.timestamp],
[Log, Log.dttm],
[SlaMiss, SlaMiss.execution_date],
[RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date],
[TaskFail, TaskFail.execution_date],
[TaskInstance, TaskInstance.execution_date],
[TaskReschedule, TaskReschedule.execution_date],
[XCom, XCom.execution_date],
]
# 清理函数
def cleanup_db_fn(**kwargs):
# 获取Airflow DB的Session
session = settings.Session()
print("session: ", str(session))
# 计算出数据保留的截止日期
oldest_date = days_ago(int(Variable.get("max_metadb_storage_days", default_var=DEFAULT_MAX_AGE_IN_DAYS)))
print("oldest_date: ", oldest_date)
#依次遍历每个被清理的对象
for x in OBJECTS_TO_CLEAN:
# 构造一个查询,找出所有比截止日期早的记录,然后删除并提交
query = session.query(x[0]).filter(x[1] <= oldest_date)
query.delete(synchronize_session=False)
session.commit()
print(f'cleaned up {x[0]}')
return "OK"
with DAG(
dag_id=os.path.basename(__file__).replace(".py", ""),
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(minutes=10),
start_date=days_ago(1),
schedule_interval='30 8 * * *', # UTC Time
max_active_runs=1
) as dag:
cleanup_db = PythonOperator(
task_id="cleanup_db",
python_callable=cleanup_db_fn,
provide_context=True
)
V1
from airflow import DAG, settings
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.jobs import BaseJob
from airflow.models import DAG, DagModel, DagRun, ImportError, Log, SlaMiss, TaskFail, TaskInstance, TaskReschedule, Variable, XCom
from datetime import timedelta
import os
DEFAULT_ARGS = {
'owner': 'dataops',
'depends_on_past': False,
'email': ['your_email@domain.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
DEFAULT_MAX_AGE_IN_DAYS = 90
OBJECTS_TO_CLEAN = [[BaseJob,BaseJob.latest_heartbeat],
[DagModel,DagModel.last_scheduler_run],
[DagRun,DagRun.execution_date],
[ImportError,ImportError.timestamp],
[Log,Log.dttm],
[SlaMiss,SlaMiss.execution_date],
[TaskFail,TaskFail.execution_date],
[TaskInstance, TaskInstance.execution_date],
[TaskReschedule, TaskReschedule.execution_date],
[XCom,XCom.execution_date],
]
def cleanup_db_fn(**kwargs):
session = settings.Session()
print("session: ",str(session))
oldest_date = days_ago(int(Variable.get("max_metadb_storage_days", default_var=DEFAULT_MAX_AGE_IN_DAYS)))
print("oldest_date: ",oldest_date)
for x in OBJECTS_TO_CLEAN:
query = session.query(x[0]).filter(x[1] <= oldest_date)
query.delete(synchronize_session=False)
session.commit()
print(f'cleaned up {x[0]}')
return "OK"
with DAG(
dag_id=os.path.basename(__file__).replace(".py", ""),
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='30 8 * * *', # UTC Time
max_active_runs=1
) as dag:
cleanup_db = PythonOperator(
task_id="cleanup_db",
python_callable=cleanup_db_fn,
provide_context=True
)
使用方法:
1. 将上面的代码复制到一个新py文件airflow_db_cleanup_dag.py,保存在DAG目录下。
2. 可以通过在Airflow变量里增加一个变量max_metadb_storage_days来配置元数据保留天数,如果不配置这个变量,默认是90天。
3. 可以修改schedule_interval变量来设置DAG执行时间,目前是每天执行一次,在UTC时间的8点半,北京时间下午4点半。
注意事项:
1. 请根据你Airflow实际上线时间来判断,将要被删除的数据量的大小,如果数据量很大,会导致Job卡住或者响应变慢,建议在调度的低峰时间或者分批删除数据。分批删除的方法是,通过调整变量max_metadb_storage_days来控制删除的数据的时间范围,比如先删除1年前的,再删除6个月-1年之间的,最后删除6个月到3个月的数据。
2. 如果有每季度执行一次的任务,需要将max_metadb_storage_days调大至120天,否则可能会导致最近一次执行的DagRun被清理后,Dag又重新被触发一次。原因是Scheduler会持续检查每个DAG是否满足执行条件,如果找不到DagRun记录,会认为该Dag还没有被执行过,从而又执行一次。
留言与评论(共有 0 条评论) “” |