Construct an Airflow DAG corresponding to a given Dagster job/pipeline.
Tasks in the resulting DAG will execute the Dagster logic they encapsulate as a Python
callable, run by an underlying PythonOperator
. As a
consequence, both dagster, any Python dependencies required by your solid logic, and the module
containing your pipeline definition must be available in the Python environment within which
your Airflow tasks execute. If you cannot install requirements into this environment, or you
are looking for a containerized solution to provide better isolation, see instead
make_airflow_dag_containerized()
.
This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.
module_name (str) – The name of the importable module in which the pipeline/job definition can be found.
job_name (str) – The name of the job definition.
run_config (Optional[dict]) – The config, if any, with which to compile the pipeline/job to an execution plan, as a Python dict.
mode (Optional[str]) – The mode in which to execute the pipeline.
instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline/job.
dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to
DAG
).
dag_description (Optional[str]) – The description to use for the compiled Airflow DAG
(passed through to DAG
)
dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow
DAG
constructor, including default_args
.
op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow
operator (a subclass of
PythonOperator
).
pipeline_name (str) – (legacy) The name of the pipeline definition.
The generated Airflow DAG, and a list of its constituent tasks.
(airflow.models.DAG, List[airflow.models.BaseOperator])
Construct an Airflow DAG corresponding to a given Dagster job/pipeline and custom operator.
Tasks in the resulting DAG will execute the Dagster logic they encapsulate run by the given
Operator BaseOperator
. If you
are looking for a containerized solution to provide better isolation, see instead
make_airflow_dag_containerized()
.
This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.
recon_repo (dagster.ReconstructableRepository
) – reference to a Dagster RepositoryDefinition
that can be reconstructed in another process
job_name (str) – The name of the job definition.
operator (type) – The operator to use. Must be a class that inherits from
BaseOperator
run_config (Optional[dict]) – The config, if any, with which to compile the pipeline to an execution plan, as a Python dict.
mode (Optional[str]) – The mode in which to execute the pipeline.
instance (Optional[DagsterInstance]) – The Dagster instance to use to execute the pipeline.
dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to
DAG
).
dag_description (Optional[str]) – The description to use for the compiled Airflow DAG
(passed through to DAG
)
dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow
DAG
constructor, including default_args
.
op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow operator.
pipeline_name (str) – (legacy) The name of the pipeline definition.
The generated Airflow DAG, and a list of its constituent tasks.
(airflow.models.DAG, List[airflow.models.BaseOperator])
Construct a containerized Airflow DAG corresponding to a given Dagster job/pipeline.
Tasks in the resulting DAG will execute the Dagster logic they encapsulate using a subclass of
DockerOperator
. As a
consequence, both dagster, any Python dependencies required by your solid logic, and the module
containing your pipeline definition must be available in the container spun up by this operator.
Typically you’ll want to install these requirements onto the image you’re using.
This function should be invoked in an Airflow DAG definition file, such as that created by an invocation of the dagster-airflow scaffold CLI tool.
module_name (str) – The name of the importable module in which the pipeline/job definition can be found.
job_name (str) – The name of the job definition.
image (str) – The name of the Docker image to use for execution (passed through to
DockerOperator
).
run_config (Optional[dict]) – The config, if any, with which to compile the pipeline/job to an execution plan, as a Python dict.
mode (Optional[str]) – The mode in which to execute the pipeline.
dag_id (Optional[str]) – The id to use for the compiled Airflow DAG (passed through to
DAG
).
dag_description (Optional[str]) – The description to use for the compiled Airflow DAG
(passed through to DAG
)
dag_kwargs (Optional[dict]) – Any additional kwargs to pass to the Airflow
DAG
constructor, including default_args
.
op_kwargs (Optional[dict]) – Any additional kwargs to pass to the underlying Airflow
operator (a subclass of
DockerOperator
).
pipeline_name (str) – (legacy) The name of the pipeline definition.
The generated Airflow DAG, and a list of its constituent tasks.
(airflow.models.DAG, List[airflow.models.BaseOperator])
Construct a Dagster job corresponding to a given Airflow DAG.
Tasks in the resulting job will execute the execute()
method on the corresponding
Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module
containing your DAG definition must be available in the Python environment within which your
Dagster solids execute.
To set Airflow’s execution_date
for use with Airflow Operator’s execute()
methods,
either:
time (in UTC) of the run.
{'airflow_execution_date': utc_date_string}
to the job tags. This will overridebehavior from (1).
my_dagster_job = make_dagster_job_from_airflow_dag(
dag=dag,
tags={'airflow_execution_date': utc_execution_date_str}
)
my_dagster_job.execute_in_process()
{'airflow_execution_date': utc_date_string}
to the run tags,such as in the Dagit UI. This will override behavior from (1) and (2)
We apply normalized_name() to the dag id and task ids when generating job name and op names to ensure that names conform to Dagster’s naming conventions.
dag (DAG) – The Airflow DAG to compile into a Dagster job
tags (Dict[str, Field]) – Job tags. Optionally include tags={‘airflow_execution_date’: utc_date_string} to specify execution_date used within execution of Airflow Operators.
use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)
unique_id (int) – If not None, this id will be postpended to generated op names. Used by framework authors to enforce unique op names within a repo.
The generated Dagster job
Construct a Dagster repository corresponding to Airflow DAGs in dag_path.
DagBag.get_dag()
dependency requires Airflow DB to be initialized.
Create make_dagster_repo.py
:
from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dags_path
def make_repo_from_dir():
return make_dagster_repo_from_airflow_dags_path(
'/path/to/dags/', 'my_repo_name'
)
Use RepositoryDefinition as usual, for example:
dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir
dag_path (str) – Path to directory or file that contains Airflow Dags
repo_name (str) – Name for generated RepositoryDefinition
include_examples (bool) – True to include Airflow’s example DAGs. (default: False)
safe_mode (bool) – True to use Airflow’s default heuristic to find files that contain DAGs (ie find files that contain both b’DAG’ and b’airflow’) (default: True)
store_serialized_dags (bool) – True to read Airflow DAGS from Airflow DB. False to read DAGS from Python files. (default: False)
use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)
RepositoryDefinition
Construct a Dagster repository corresponding to Airflow DAGs in DagBag.
from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_dag_bag from airflow_home import my_dag_bag
return make_dagster_repo_from_airflow_dag_bag(my_dag_bag, ‘my_repo_name’)
dagit -f path/to/make_dagster_repo.py -n make_repo_from_dag_bag
dag_path (str) – Path to directory or file that contains Airflow Dags
repo_name (str) – Name for generated RepositoryDefinition
refresh_from_airflow_db (bool) – If True, will refresh DAG if expired via DagBag.get_dag(), which requires access to initialized Airflow DB. If False (recommended), gets dag from DagBag’s dags dict without depending on Airflow DB. (default: False)
use_airflow_template_context (bool) – If True, will call get_template_context() on the Airflow TaskInstance model which requires and modifies the DagRun table. (default: False)
RepositoryDefinition
Construct a Dagster repository for Airflow’s example DAGs.
‘example_external_task_marker_child’, ‘example_pig_operator’, ‘example_skip_dag’, ‘example_trigger_target_dag’, ‘example_xcom’, ‘test_utils’,
Usage:
- Create make_dagster_repo.py:
from dagster_airflow.dagster_pipeline_factory import make_dagster_repo_from_airflow_example_dags
- def make_airflow_example_dags():
return make_dagster_repo_from_airflow_example_dags()
- Use RepositoryDefinition as usual, for example:
dagit -f path/to/make_dagster_repo.py -n make_airflow_example_dags
repo_name (str) – Name for generated RepositoryDefinition
RepositoryDefinition
Construct a Dagster op corresponding to a given Airflow operator.
The resulting op will execute the execute()
method on the Airflow operator. Dagster and
any dependencies required by the Airflow Operator must be available in the Python environment
within which your Dagster ops execute.
To specify Airflow connections utilized by the operator, instantiate and pass Airflow connection
objects in a list to the connections
parameter of this function.
http_task = SimpleHttpOperator(task_id="my_http_task", endpoint="foo")
connections = [Connection(conn_id="http_default", host="https://mycoolwebsite.com")]
dagster_op = airflow_operator_to_op(http_task, connections=connections)
In order to specify extra parameters to the connection, call the set_extra()
method
on the instantiated Airflow connection:
s3_conn = Connection(conn_id=f's3_conn', conn_type="s3")
s3_conn_extra = {
"aws_access_key_id": "my_access_key",
"aws_secret_access_key": "my_secret_access_key",
}
s3_conn.set_extra(json.dumps(s3_conn_extra))
airflow_op (BaseOperator) – The Airflow operator to convert into a Dagster op
connections (Optional[List[Connection]]) – Airflow connections utilized by the operator.
capture_python_logs (bool) – If False, will not redirect Airflow logs to compute logs. (default: True)
return_output (bool) – If True, will return any output from the Airflow operator. (default: False)
The generated Dagster op
converted_op (OpDefinition)