Moving and Transforming Data with

Airflow


Airflow


Daniel Moreno

Data Science & Analytics - Mercadoni

PyCon Colombia (2017-02-11)

Hi


I am Daniel.

I am a statistician.

@demorenoc
in/demorenoc

Currently:

Mercadoni

Previously:

Tappsi

Outline

  • Motivation

  • Airflow - Overview

  • Airflow - Components

    • API and Concepts
    • Scheduler
    • Web UI
    • CLI
  • Airflow - Deployment (brief)

  • Resources and References

Motivation

  • Modern companies operate on complex and diverse processes powered by technology and producing diverse and vast amounts of data

    processes

Motivation

  • Continously available and usable data is key for modern companies success.
  • "Clean" data is vital for:
    • Better decision making
    • Continuous improvement (Measure-Analyze-Improve)
    • Reaserch and Development

Motivation

  • Moving and transforming data can get costly, specially when needed continously:
    • Cleaning takes around 80% of the time in data analysis
    • Overlooked process in early stages
    • Large diversity of tools producing complex and specialized "stacks"
    • Task repetition and redundancy
    • Hidden complexities and needs

Wishlist

Wishlist

  • Compact, homogeneous code
  • Simplicity (defining workflows and colaborating)
  • Flexibility (modularity + extensibility)
  • Traceability/accountability and ownership
  • Operational metadata
  • Scalability
  • Automation (programmatic/dynamic)
  • Useful UI
  • Flat learning curve

Motivation

How does your company manage their data workflows?

Airflow - Overview


Airflow

Incubator

Originally by Maxime Beauchemin @mistercrunch from Airbnb. Now used by more than 70 companies.

https://github.com/apache/incubator-airflow

https://airflow.incubator.apache.org/


pip install airflow
pip install "airflow[mysql]"
pip install "airflow[postgres]"
pip install "airflow[crypto, password]"

Airflow - Overview

Alternatives

For Hadoop

  • Azkaban by LinkedIn
  • Apache Oozie (originally by Yahoo!)

Airflow - Components

  • Executor ("backend" system)
    • Sequential
    • Local
    • Celery (and Mesos)
  • Scheduler
      airflow scheduler -n 10
  • Web UI
      airflow webserver -p 80
  • CLI
      airflow --help

Airflow - API and Concepts

Airflow - API and Concepts

Workflow Building Blocks - DAG

  • Building a workflow — a set of tasks with a dependency structure — is the main goal.

  • Workflows are modeled as DAGs: Directed Acyclic Graphs.

from airflow import DAG
from datetime import datetime, timedelta

default_args = {'owner': 'daniel', 'email': ['daniel.moreno@mercadoni.com'],
                'start_date': datetime(2017, 2, 10), 'depends_on_past': False,
                'retries': 1, 'retry_delay': timedelta(seconds = 30)}

dag = DAG(dag_id            = 'model',
          schedule_interval = timedelta(minutes = 10), # '0/10 * * * *'
          default_args      = default_args,
          dagrun_timeout    = timedelta(minutes = 3)
          template_searchpath = ['path/to/queries'])

Airflow - API and Concepts

Workflow Building Blocks - Operators and Tasks

  • Operators ⟶ Tasks ⟶ Task instances
  • Operators are Task "factories"
  • When a Taks is executed a Task instance is produced
from airflow.operators import BashOperator, MySqlOperator, PythonOperator

bash_task = BashOperator(task_id = 'bash_task',
                         bash_command = 'echo "Hello PyCon"',
                         dag = dag)

mysql_task = MySqlOperator(task_id = 'mysql_task',
                           mysql_conn_id = 'mysql_db_conn',
                           sql = 'query.sql',
                           dag = dag)

py_task = PythonOperator(task_id = 'py_task',
                         python_callable = py_function,
                         op_kwargs = {'arg' : 'value'},
                         dag = dag)

mysql_task.set_upstream(bash_task)

py_task.set_upstream(mysql_task)

Airflow - API and Concepts

Workflow Building Blocks - Operators

  • Local:

    • BashOperator
    • PythonOperator
    • DockerOperator
    • S3FileTransformOperator
  • Remote:

    • SimpleHttpOperator
    • [Hive|Pig|BigQuery]Operator
    • [MySql|Postgres|Sqlite|Jdbc|MsSql|Oracle]Operator
    • [PrestoCheck|PrestoValueCheck|PrestoIntervalCheck]Operator
  • Messaging:

    • EmailOperator
    • [SlackAPI|SlackAPIPost]Operator

Airflow - API and Concepts

Workflow Building Blocks - Operators

  • Transfer:

    • GenericTransfer
    • [MySqlToHive|S3ToHive|HiveToMySql]Transfer
    • [PrestoToMySql|HiveToDruid]Transfer
  • Flow:

    • [BranchPython|ShortCircuit]Operator
    • [TriggerDagRun|SubDag]Operator
    • DummyOperator
  • Sensors:

    • [ExternalTask|Hdfs|HivePartition|Http|MetastorePartition]Sensor
    • [S3Key|S3Prefix|Sql|TimeDelta|Time|WebHdfs]Sensor
  • Other (contributed):

    • Ssh, GCS, Spark SQL, Sqoop, Vertica, AWS EMR, Hipchat

Airflow - API and Concepts

A real-life DAG

Airflow - API and Concepts

Key Elements

  • Hooks: the building blocks of operators

    from airflow.hooks import MySqlHook
    
     mysql_db = MySqlHook(mysql_conn_id = "mysql_dwh")
     result = mysql_db.get_first(sql = "SELECT 'Airflow is cool' AS txt;")
    

Airflow - API and Concepts

Key Elements

  • Connections

connections-ui

Airflow - API and Concepts

Key Elements

  • Connections

connections-ui

Airflow - API and Concepts

Further Elements

  • Variables and XComs ⟶ Share "state"
  • Templates (Jinja) and Macros ⟶ Patterns and/or Parametrization
  • Pools and queues ⟶ Resources
  • SubDAGs, Branches and trigger rules ⟶ Flow
  • SLAs and policies ⟶ Quality and accountability
  • Documentation and notes ⟶ Collaboration and maintainability

Airflow - Scheduler

scheduler

    airflow scheduler -n $NUM

  • Monitors and triggers the DAGs creating DAG Runs and starting the executor

  • First DAG Run from DAG's start_date

  • Subsequent runs based on schedule_interval (cron expression or datetime.timedelta)

Airflow - Web UI

Airflow - Web UI

DAGs - Graph View

Airflow - Web UI

DAGs - Tasks sub-menu

Airflow - Web UI

DAGs - Tree View

Airflow - Web UI

DAGs - Task Duration

Airflow - Web UI

DAGs - Gantt

Airflow - Web UI

Other Menus

Airflow - Web UI

Other Menus

Airflow - CLI

cli

Rich set of commands for dev-ops: airflow [-h] <command>

Core services

    webserver -p 80
    scheduler -n $NUM

Meta-DB operations

   initdb resetdb upgradedb

Operate on DAGs

   pause unpause run trigger_dag backfill dag_state task_state clear

Development and test

   list_dags list_tasks variables render test

Airflow - Deployment

Setup

  • Setup a meta-DB (SQLALchemy powered) for the backend (executor)
  • Setup the env var for Airflow and the config file:

    • $AIRFLOW_HOME
    • $AIRFLOW_HOME/airflow.cfg

    Key vars: airflow_home, dags_folder, executor, sql_alchemy_conn, parallelism, dag_concurrency

  • Initialiaze the DB (once)

      airflow initdb

Airflow - Deployment

Initialize

  airflow webserver -p 80
  airflow scheduler -n $NUM

Run as a service

Setup Airflow (Web UI and Scheduler) as services with systemd or upstart (launchd in OSX):


  initctl start airflow-webserver
  initctl start airflow-scheduler

Colophon

demorenoc.github.io/slides/pycon-co-2017/airflow

This presentation was written in a Jupyter Notebook (available here) and compiled into a reveal.js presentation by nbconvert in the default slides template and powered by CDNJS. Published online in GitHub thanks to GitHub Pages.

This presentation was prepared by Daniel Moreno for a talk given in PyCon.co and it's original content (not much really) is under the CC-BY 4.0 license.

CC BY