execution_date in airflow: need to access as a variable

Airflow

Airflow Problem Overview


I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.

I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'

This is what my pipeline looks like(very simplified version):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime
                                  
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal()) Instead what I want here is 'execution_date'

How do I use 'execution_date' directly and assign it to a variable in my python file?

I have having this general issue of accessing args. Any help will be genuinely appreciated.

Thanks

Airflow Solutions


Solution 1 - Airflow

The BashOperator's bash_command argument is a template. You can access execution_date in any template as a datetime object using the execution_date variable. In the template, you can use any jinja2 methods to manipulate it.

Using the following as your BashOperator bash_command string:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

If you just want the string equivalent of the execution date, ds will return a datestamp (YYYY-MM-DD), ds_nodash returns same without dashes (YYYYMMDD), etc. More on macros is available in the Api Docs.


Your final operator would look like:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)

Solution 2 - Airflow

The PythonOperator constructor takes a 'provide_context' parameter (see https://pythonhosted.org/airflow/code.html). If it's True, then it passes a number of parameters into the python_callable via kwargs. kwargs['execution_date'] is what you want, I believe.

Something like this:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)

I'm not sure how to do it with the BashOperator, but you might start with this issue: https://github.com/airbnb/airflow/issues/775

Solution 3 - Airflow

I think you can't assign variables with values from the airflow context outside of a task instance, they are only available at run-time. Basically there are 2 different steps when a dag is loaded and executed in airflow :

  • First your dag file is interpreted and parsed. It has to work and compile and the task definitions must be correct (no syntax error or anything). During this step, if you make function calls to fill some values, these functions won't be able to access airflow context (the execution date for example, even more if you're doing some backfilling).

  • The second step is the execution of the dag. It's only during this second step that the variables provided by airflow (execution_date, ds, etc...) are available as they are related to an execution of the dag.

So you can't initialize global variables using the Airflow context, however, Airflow gives you multiple mechanisms to achieve the same effect :

  1. Using jinja template in your command (it can be in a string in the code or in a file, both will be processed). You have the list of available templates here : https://airflow.apache.org/macros.html#default-variables. Note that some functions are also available, particularly for computing days delta and date formatting.

  2. Using a PythonOperator in which you pass the context (with the provide_context argument). This will allow you to access the same template with the syntax kwargs['<variable_name']. If you need so, you can return a value from a PythonOperator, this one will be stored in an XCOM variable you can use later in any template. Access to XCOM variables use this syntax : https://airflow.apache.org/concepts.html#xcoms

  3. If you write your own operator, you can access airflow variables with the dict context.

Solution 4 - Airflow

def execute(self, context):
    execution_date = context.get("execution_date")

This should be inside the execute() method of Operator

Solution 5 - Airflow

To print execution date inside the callable function of your PythonOperator you can use the following in your Airflow Script and also can add start_time and end_time as follows:

def python_func(**kwargs):
    ts = kwargs["execution_date"]
    end_time = str(ts)
    start_time = str(ts.add(minutes=-30))

I have converted the datetime value to string as I need to pass it in a SQL Query. We can use it otherwise also.

Solution 6 - Airflow

You may consider SimpleHttpOperator https://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator. It’s so simple for making http request. you can pass execution_date with endpoint parameter via template.

Solution 7 - Airflow

Here's another way without context. using the dag's last execution time can be very helpful in scheduled ETL jobs. Such as a dag that 'downloads all newly added files'. Instead of hardcoding a datetime.datetime, use the dag's last execution date as your time filter.

Airflow Dags actually have a class called DagRun that can be accessed like so: dag_runs = DagRun.find(dag_id=dag_id)

Here's an easy way to get the most recent run's execution time:

def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[1] if len(dag_runs) > 1 else None

Then, within your pythonOperator, you can dynamically access the dag's last execution by calling the function you created above:

last_execution = get_most_recent_dag_run('svb_to_s3')

Now its a variable!

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionRogerView Question on Stackoverflow
Solution 1 - AirflowErik SchuchmannView Answer on Stackoverflow
Solution 2 - AirflowZiggy EunicienView Answer on Stackoverflow
Solution 3 - AirflowBabcoolView Answer on Stackoverflow
Solution 4 - Airflowl0n3r4n83rView Answer on Stackoverflow
Solution 5 - AirflowAditi SrivastavaView Answer on Stackoverflow
Solution 6 - AirflowgigkokmanView Answer on Stackoverflow
Solution 7 - AirflowKevinGView Answer on Stackoverflow