Using Dagster with Airflow#

You can find the code for this example on Github

The dagster-airflow package allows you to export Dagster jobs as Airflow DAGs, as well as to import Airflow DAGs into Dagster jobs.

Dagster is a fully-featured orchestrator and does not require a system like Airflow to deploy, execute, or schedule jobs. The main scenarios for using Dagster with Airflow are:

  • You have an existing Airflow setup that's too difficult to migrate away from, but you want to use Dagster for local development.
  • You want to migrate from Airflow to Dagster in an incremental fashion.

Exporting Dagster jobs to Airflow#

You can compile Dagster jobs into DAGs that can be understood by Airflow. Each op in the job becomes an Airflow task. For example, here's a Dagster job:

import csv

import requests

from dagster import job, op


@op
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]


@op
def find_sugariest(context, cereals):
    sorted_by_sugar = sorted(cereals, key=lambda cereal: cereal["sugars"])
    context.log.info(f'{sorted_by_sugar[-1]["name"]} is the sugariest cereal')


@job
def hello_cereal_job():
    find_sugariest(download_cereals())

To make this job available inside Airflow, you can write an Airflow DAG definition file that invokes make_airflow_dag:

# pylint: disable=unused-variable

import datetime

from dagster_airflow.factory import make_airflow_dag

DEFAULT_ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime.datetime(2019, 11, 7),
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
}

dag, tasks = make_airflow_dag(
    module_name="docs_snippets.integrations.airflow.hello_cereal",
    job_name="hello_cereal_job",
    dag_kwargs={"default_args": DEFAULT_ARGS, "max_active_runs": 1},
)

If you run this code interactively, you'll see that dag and tasks are ordinary Airflow objects, just as you'd expect to see when defining an Airflow pipeline manually:

>>> dag
<DAG: hello_cereal_job>
>>> tasks
[<Task(DagsterPythonOperator): hello_cereal>]

Like other Airflow DAG definition files, this should go inside $AIRLFLOW_HOME/dags. The docs_snippets.integrations.airflow.hello_cereal module that's passed as the value for the module_name argument must be importable via the sys.path.

After this, the DAG should show up inside Airflow:

intro_airflow_one.png

Running Containerized#

The approach above runs each op inside an operator that's similar to the Airflow PythonOperator. If you instead want to containerize your Dagster job and run it using an operator that's similar to the Airflow DockerOperator, you can use make_airflow_dag_containerized.

As in the uncontainerized case, you'll put a new Python file defining your DAG in the directory in which Airflow looks for DAGs:

# pylint: disable=unused-variable

import datetime

from dagster_airflow.factory import make_airflow_dag_containerized

DEFAULT_ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime.datetime(2019, 11, 7),
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
}

dag, steps = make_airflow_dag_containerized(
    module_name="docs_snippets.integrations.airflow.hello_cereal",
    job_name="hello_cereal_job",
    image="dagster-airflow-demo-repository",
    dag_kwargs={"default_args": DEFAULT_ARGS, "max_active_runs": 1},
)

The image argument is the name of the Docker image. Running in a containerized context requires a persistent intermediate storage layer available to the Dagster containers, such as a network filesystem, S3, or GCS. You can pass op_kwargs through to the the DagsterDockerOperator to use custom TLS settings, the private registry of your choice, etc., just as you would configure the ordinary Airflow DockerOperator.

If you want your containerized job to be available to Airflow operators running on other machines (for example, in environments where Airflow workers are running remotely) you'll need to push your Docker image to a Docker registry so that remote instances of Docker can pull the image by name, or otherwise ensure that the image is available on remote nodes.

Ingesting Airflow operators#

Dagster can convert Airflow operators to Dagster ops for any Airflow operators that require no instance-wide configuration via airflow.cfg. Using airflow_operator_to_op, you can convert instantiated Airflow operators to be executed within a Dagster op. Optionally, you can pass in a list of Airflow connection objects utilized by the operator:

http_task = SimpleHttpOperator(task_id="http_task", method="GET", endpoint="images")
connections = [Connection(conn_id="http_default", host="https://google.com")]
dagster_op = airflow_operator_to_op(http_task, connections=connections)


@job
def my_http_job():
    dagster_op()

To specify extra parameters to an instantiated Airflow connection, pass a JSON string into the set_extra() function of the Airflow connection object:

s3_conn = Connection(conn_id="s3_conn", conn_type="s3")
s3_conn.set_extra(
    json.dumps(
        {
            "aws_access_key_id": "my_access_key",
            "aws_secret_access_key": "my_secret_key",
        }
    )
)

Ingesting DAGs from Airflow#

This example demonstrates how to use make_dagster_job_from_airflow_dag to compile an Airflow DAG into a Dagster job that can be executed (and explored) the same way as a Dagster-native job.

There are two jobs in the repo:

  • airflow_simple_dag demonstrates the use of Airflow templates.
  • airflow_complex_dag shows the translation of a more complex dependency structure.
from dagster_airflow.dagster_job_factory import make_dagster_job_from_airflow_dag
from with_airflow.airflow_complex_dag import complex_dag
from with_airflow.airflow_simple_dag import simple_dag

from dagster import repository

airflow_simple_dag = make_dagster_job_from_airflow_dag(simple_dag)
airflow_complex_dag = make_dagster_job_from_airflow_dag(complex_dag)


@repository
def with_airflow():
    return [airflow_complex_dag, airflow_simple_dag]

Note that the "execution_date" for the Airflow DAG is specified through the job tags. To specify tags, call to:

airflow_simple_dag_with_execution_date = make_dagster_job_from_airflow_dag(
    dag=simple_dag, tags={"airflow_execution_date": "2021-11-01 00:00:00"}
)