How to restart a failed task on Airflow
PythonHadoopAirflowPython Problem Overview
I am using a LocalExecutor and my dag has 3 tasks where task(C) is dependant on task(A). Task(B) and task(A) can run in parallel something like below
A-->C
B
So task(A) has failed and but task(B) ran fine. Task(C) is yet to run as task(A) has failed.
My question is how do i re run Task(A) alone so Task(C) runs once Task(A) completes and Airflow UI marks them as success.
Python Solutions
Solution 1 - Python
In the UI:
- Go to the dag, and dag run of the run you want to change
- Click on GraphView
- Click on task A
- Click "Clear"
This will let task A run again, and if it succeeds, task C should run. This works because when you clear a task's status, the scheduler will treat it as if it hadn't run before for this dag run.
Solution 2 - Python
Here's an alternate solution where you can have it clear and retry certain tasks automatically. If you only want to clear a certain task, you would not use the -d (downstream) flag:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def clear_upstream_task(context):
execution_date = context.get("execution_date")
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {execution_date} -t t1 -d -y clear_upstream_task'
)
return clear_tasks.execute(context=context)
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
with DAG('clear_upstream_task',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=5),
default_args=default_args,
catchup=False
) as dag:
t0 = DummyOperator(
task_id='t0'
)
t1 = DummyOperator(
task_id='t1'
)
t2 = DummyOperator(
task_id='t2'
)
t3 = BashOperator(
task_id='t3',
bash_command='exit 123',
on_failure_callback=clear_upstream_task
)
t0 >> t1 >> t2 >> t3