Apache AirFlow

Airflow (Github) is a platform created by the community to programmatically author, schedule and monitor workflows.

Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

Directed acyclic graph

In mathematics, particularly graph theory, and computer science, a directed acyclic graph (DAG or dag) is a finite directed graph with no directed cycles. That is, it consists of finitely many vertices and edges (also called arcs), with each edge directed from one vertex to another, such that there is no way to start at any vertex v and follow a consistently-directed sequence of edges that eventually loops back to v again. Equivalently, a DAG is a directed graph that has a topological ordering, a sequence of the vertices such that every edge is directed from earlier to later in the sequence.

Principles

  • Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.
  • Extensible: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment.
  • Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
  • Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

Concepts

  • DAG: The work (tasks), and the order in which work should take place (dependencies), written in Python.
  • DAG Run: An instance of a DAG for a particular logical date and time.
  • Operator: A class that acts as a template for carrying out some work.
  • Task: Defines work by implementing an operator, written in Python.
  • Task Instance: An instance of a task - that has been assigned to a DAG and has a state associated with a specific DAG run (i.e for a specific execution_date).
  • execution_date: The logical date and time for a DAG Run and its Task Instances.

By combining DAGs and Operators to create TaskInstances, you can build complex workflows.

Macros reference

Variables and macros can be used in templates (see the Jinja Templating section).

Jinja

Jinja is a fast, expressive, extensible templating engine. Special placeholders in the template allow writing code similar to Python syntax. Then the template is passed data to render the final document.

Installation

The easiest way to install the latest stable version of Airflow is with pip:

pip install apache-airflow

You can also install Airflow with support for extra features like gcp or postgres:

pip install apache-airflow[postgres,gcp]

Initiating Airflow Database

Airflow requires a database to be initiated before you can run tasks. If you’re just experimenting and learning Airflow, you can stick with the default SQLite option. If you don’t want to use SQLite, then take a look at Initializing a Database Backend to setup a different database.

After configuration, you’ll need to initialize the database before you can run tasks:

airflow initdb

Example

Example Pipeline definition, “tutorial.py”

# ############## #
# Import Modules #
# ############## #
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# ################# #
# Default Arguments #
# ################# #
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
# ################# #
# Instantiate a DAG #
# ################# #
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)
# ##### #
# Tasks #
# ##### #
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__
# ################### #
# Tasks Documentation #
# ################### #
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# ##################### #
# Templating with Jinja #
# ##################### #
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""
# ##### #
# Tasks #
# ##### #
t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)
# ####################### #
# Setting up Dependencies #
# ####################### #
# The bit shift operator can also be
# used to chain operations:
t1 >> [t2, t3]

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
# t1.set_downstream([t2, t3])
# t1 >> [t2, t3]
# [t2, t3] << t1

Running the Script

Time to run some tests. First, let’s make sure the pipeline is parsed successfully.

Let’s assume we’re saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.

python ~/airflow/dags/tutorial.py

Command Line Metadata Validation

Run a few commands to validate this script further.

  • print the list of active DAGs airflow list_dags
  • prints the list of tasks the “tutorial” dag_id airflow list_tasks tutorial
  • prints the hierarchy of tasks in the tutorial DAG airflow list_tasks tutorial --tree

Testing

command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep
airflow test tutorial sleep 2015-06-01
# testing templated
airflow test tutorial templated 2015-06-01

backfill will respect your dependencies, emit logs into files and talk to the database to record status. If you do have a webserver up, you’ll be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07