Using dbt with Dagster#

Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single workflow. Dagster's software-defined asset (SDA) abstractions make it simple to define data assets that depend on specific dbt models, or to define the computation required to compute the sources that your dbt models depend on.

This guide focuses on how to work with dbt models through the SDA framework, which we recommend for most use cases.

If you're using dbt Cloud, check out Using dbt Cloud with Dagster instead!


Dagster software-defined assets vs. dbt models#

A software-defined asset contains an asset key, a set of upstream asset keys, and an operation that is responsible for computing the asset from its upstream dependencies. Models defined in a dbt project are conceptually similar to Dagster's software-defined assets:

  • The asset key for a dbt model is (by default) the name of the model.
  • The upstream dependencies of a dbt model are defined with ref or source calls within the model's definition.
  • The computation required to compute the asset from its upstream dependencies is the SQL within the model's definition.

These similarities make it natural to interact with dbt models as SDAs. Dagster has built-in support for loading dbt models, seeds, and snapshots as SDAs. This allows you to:

  • Visualize and orchestrate a graph of dbt assets, and execute them with a single dbt invocation
  • View detailed historical metadata and logs for each asset
  • Define python computations that depend directly on tables updated using dbt
  • Track data lineage through dbt and your other tools

Loading dbt models from a dbt project#

For smaller dbt projects, where compilation time is not a concern, the simplest way to load your dbt assets into Dagster is the following:

from dagster_dbt import load_assets_from_dbt_project

dbt_assets = load_assets_from_dbt_project(project_dir="path/to/dbt/project")

The load_assets_from_dbt_project function:

  1. Compiles your dbt project,
  2. Parses the metadata that dbt provides, and
  3. Generates a set of software-defined assets that reflect the models in the project. These assets will share the same underlying operation, which will invoke dbt to run the models represented by the loaded assets.

For larger projects, the overhead involved with recompiling the entire project may be a concern. In these cases, you can load dbt models from an existing dbt manifest.json file:

import json

from dagster_dbt import load_assets_from_dbt_manifest

dbt_assets = load_assets_from_dbt_manifest(
    json.load("path/to/dbt/manifest.json", encoding="utf8"),
)

Note: if you make any changes to your dbt project that change the structure of the project (such as changing the dependencies of a model or adding a new one), you'll need to regenerate your manifest file for those changes to be reflected in Dagster.


Adding a resource#

Assets loaded from dbt require a dbt resource, which is responsible for firing off dbt CLI commands. This resource will be responsible for firing off dbt CLI commands. The dagster-dbt integration provides the dbt_cli_resource for this purpose. This resource can be configured with CLI flags that will be passed into every dbt invocation.

The most important flag to set is the project_dir flag, which points Dagster at the directory of your dbt project. For a full list of configuration options, refer to the dbt_cli_resource API docs.

You can configure this resource and add it to your dbt assets by doing the following:

from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project

from dagster import with_resources

DBT_PROJECT_PATH = "path/to/dbt_project"

dbt_assets = with_resources(
    load_assets_from_dbt_project(DBT_PROJECT_PATH),
    {
        "dbt": dbt_cli_resource.configured(
            {"project_dir": DBT_PROJECT_PATH},
        )
    },
)

Scheduling dbt jobs#

Once you have your dbt assets, you can define a job that runs some or all of these assets on a schedule:

from dagster import ScheduleDefinition, define_asset_job, repository

run_everything_job = define_asset_job("run_everything", selection="*")

# only my_model and its children
run_something_job = define_asset_job("run_something", selection="my_model*")

@repository
def my_repo():
    return [
        dbt_assets,
        ScheduleDefinition(
            job=run_something_job,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@weekly",
        ),
    ]

Refer to the Schedule documentation for more info on running jobs on a schedule.


Understanding asset keys#

In Dagster, each asset has an asset key to identify it. Dagster automatically generates these keys for each dbt node in the project as well as the sources for each node.

For models, seeds, and snapshots, the default asset key will be the configured schema for that node (if any), concatenated with the name of the node.

For example, if you have configured a custom schema for a subdirectory in your dbt_project.yml file:

models:
  my_project:
    marketing:
      +schema: marketing

Then the asset key for a model named "some_model" will be marketing/some_model. If you have not configured a custom schema, then the asset key will simply be some_model.

For sources, the default asset key will be the name of the source concatenated with the name of the source table.

For example, the source table defined in the following sources.yaml will be jaffle_shop/orders:

sources:
  - name: jaffle_shop
    tables:
      - name: orders

Adding a prefix to asset keys#

A common pattern is to use the prefix of an asset key to indicate what database an asset is stored in. For example, you might want all of your assets stored in Snowflake to start with the prefix snowflake.

To add a prefix to the models generated by your dbt project, you can pass in a key_prefix argument to either the load_assets_from_dbt_manifest or load_assets_from_dbt_project function:

dbt_assets = load_assets_from_dbt_project(
    "path/to/dbt_project",
    key_prefix="snowflake",
)

Note: The key_prefix argument only applies to models. If you want to apply a prefix to the source keys that Dagster generates, pass in a source_key_prefix argument:

dbt_assets = load_assets_from_dbt_project(
    "path/to/dbt_project",
    source_key_prefix="snowflake",
)

Defining upstream dependencies#

Dagster parses information about assets that are upstream of specific dbt models from the dbt project itself. Whenever a model is downstream of a dbt Source, that source will be parsed as an upstream asset.

For example, if you a source defined in your sources.yml file like this:

sources:
  - name: jaffle_shop
    tables:
      - name: orders

and use it in a model:

select * from {{source("jaffle_shop", "orders")}}
where foo=1

Then the asset created for that model will be given an upstream asset key of jaffle_shop/orders. In many cases, this upstream asset might also be managed by Dagster. If you add an asset definition to your repository which produces jaffle_shop/orders, then this asset will be upstream of your dbt model.

@asset(key_prefix="jaffle_shop")
def orders():
    return ...

Defining downstream dependencies#

Dagster allows you to define assets that are downstream of specific dbt models. One property of dbt-based assets is that the external tool - in this case, dbt - handles storing each model in the database internally, rather than Dagster directly storing the tables that are updated.

This means that there's a range of ways to load a dbt model as input to a Python function. For example, you might want to load the contents as a Pandas dataframe or into a PySpark session. You can specify this loading behavior on each downstream asset. For example, if you wanted to consume a dbt model with the asset key my_dbt_model as a Pandas dataframe, that would look something like the following:

@asset(
    ins={"my_dbt_model": AssetIn(input_manager_key="pandas_df_manager")},
)
def my_downstream_asset(my_dbt_model):
    # my_dbt_model is a Pandas dataframe
    return my_dbt_model.where(foo="bar")

Defining an IO manager#

The implementation of your IO manager depends on:

  • The Python object you want to use to represent your table, and
  • The database that dbt is writing tables to

A simple IO manager implementation that loads data from a dbt-managed table into a Pandas dataframe would look something like the following:

import pandas as pd

from dagster import IOManager, io_manager

class PandasIOManager(IOManager):
    def __init__(self, con_string: str):
        self._con = con_string

    def handle_output(self, context, obj):
        # dbt handles outputs for us
        pass

    def load_input(self, context) -> pd.DataFrame:
        """Load the contents of a table as a pandas DataFrame."""
        table_name = context.asset_key.path[-1]
        return pd.read_sql(f"SELECT * FROM {table_name}", con=self._con)

@io_manager(config_schema={"con_string": str})
def pandas_io_manager(context):
    return PandasIOManager(context.resource_config["con_string"])

Once your IO manager is defined, you can supply it like any other resource when calling with_resources:

from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project

from dagster import with_resources

dbt_assets = with_resources(
    load_assets_from_dbt_project(...),
    {
        "dbt": dbt_cli_resource.configured(
            {"project_dir": "path/to/dbt_project"},
        ),
        "pandas_df_manager": pandas_io_manager.configured(
            {"con_string": "..."},
        ),
    },
)

Conclusion#

If you find a bug or want to add a feature to the dagster-dbt library, we invite you to contribute.

If you have questions on using dbt with Dagster, we'd love to hear from you:

join-us-on-slack