In this guide, we'll walk through how to transition your data pipelines from local development to staging and production deployments.
Let's say we’ve been tasked with fetching the N most recent entries from Hacker News and splitting it into two datasets: one containing all of the data about stories and one containing all of the data about comments. In order to make the pipeline maintainable and testable, we have two additional requirements:
Using a few Dagster concepts, we can easily tackle this task! Here’s an overview of the main concepts we’ll be using in this guide:
Using these Dagster concepts we will:
To follow along with this guide, you can copy the full code example and install a few additional pip libraries:
dagster project from-example --name my-dagster-project --example development_to_production cd my-dagster-project pip install -e .
In this section we will:
Let’s start by writing our three assets. We'll use Pandas DataFrames to interact with the data.
# assets.py import pandas as pd import requests from dagster import asset @asset( config_schema={"N": int}, io_manager_key="snowflake_io_manager", ) def items(context) -> pd.DataFrame: """Items from the Hacker News API: each is a story or a comment on a story.""" rows = [] max_id = requests.get( "https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5 ).json() # Hacker News API is 1-indexed, so adjust range by 1 for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1): item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json" rows.append(requests.get(item_url, timeout=5).json()) # ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"]) result.rename(columns={"by": "user_id"}, inplace=True) return result @asset( io_manager_key="snowflake_io_manager", ) def comments(items: pd.DataFrame) -> pd.DataFrame: """Comments from the Hacker News API.""" return items[items["type"] == "comment"] @asset( io_manager_key="snowflake_io_manager", ) def stories(items: pd.DataFrame) -> pd.DataFrame: """Stories from the Hacker News API.""" return items[items["type"] == "story"]
Now we can add these assets to our repository and materialize them via Dagit as part of our local development workflow. We will use the configured API to add configuration for the snowflake_io_manager
.
# repository.py from dagster_snowflake import build_snowflake_io_manager from dagster_snowflake_pandas import SnowflakePandasTypeHandler from development_to_production.assets import comments, items, stories from dagster import repository, with_resources snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()]) # Note that storing passwords in configuration is bad practice. It will be resolved later in the guide. @repository def repo(): resource_defs = { "snowflake_io_manager": snowflake_io_manager.configured( { "account": "abc1234.us-east-1", "user": "me@company.com", # password in config is bad practice "password": "my_super_secret_password", "database": "LOCAL", "schema": "ALICE", } ), } return [*with_resources([items, comments, stories], resource_defs=resource_defs)]
Note that we have passwords in our configuration in this code snippet. This is bad practice, and we will resolve it shortly.
This results in an asset graph that looks like this:
We can materialize the assets the Dagit and ensure that the data appears in Snowflake as we expect:
While we define our assets as Pandas DataFrames, the Snowflake I/O manager automatically translates the data to and from Snowflake tables. The Python asset name determines the Snowflake table name. In this case three tables will be created: ITEMS
, COMMENTS
and STORIES
.
In this section we will:
Now that our assets work locally, we can start the deployment process! We'll first set up our assets for production, and then discuss the options for our staging deployment.
We want to store the assets in a production Snowflake database, so we need to update the configuration for the snowflake_io_manager
. But if we were to simply update the values we set for local development, we would run into an issue: the next time a developer wants to work on these assets, they will need to remember to change the configuration back to the local values. This leaves room for a developer to accidentally overwrite the production asset during local development.
Instead, we can set up the repository to determine the configuration for resources based on the environment:
# repository.py # Note that storing passwords in configuration is bad practice. It will be resolved soon. @repository def repo(): resource_defs = { "local": { "snowflake_io_manager": snowflake_io_manager.configured( { "account": "abc1234.us-east-1", "user": "me@company.com", # password in config is bad practice "password": "my_super_secret_password", "database": "LOCAL", "schema": "ALICE", } ), }, "production": { "snowflake_io_manager": snowflake_io_manager.configured( { "account": "abc1234.us-east-1", "user": "dev@company.com", # password in config is bad practice "password": "company_super_secret_password", "database": "PRODUCTION", "schema": "HACKER_NEWS", } ), }, } deployment_name = os.getenv("DAGSTER_DEPLOYMENT", "local") return [ *with_resources( [items, comments, stories], resource_defs=resource_defs[deployment_name] ) ]
Note that we still have passwords in our configuration in this code snippet. This is bad practice, and we will resolve it next.
Now, we can set the environment variable DAGSTER_DEPLOYMENT=production
in our deployment and the correct resources will be applied to the assets.
We still have some problems with this setup:
user
and password
to their credentials and schema
to their name when developing locally.We can easily solve these problems because the Snowflake I/O manager accepts configuration from environment variables using the StringSource
configuration type. This allows us to store configuration values as environment variables and point Dagster to those environment variables in the run configuration:
# repository.py @repository def repo(): resource_defs = { "local": { "snowflake_io_manager": snowflake_io_manager.configured( { "account": "abc1234.us-east-1", "user": {"env": "DEV_SNOWFLAKE_USER"}, "password": {"env": "DEV_SNOWFLAKE_PASSWORD"}, "database": "LOCAL", "schema": {"env": "DEV_SNOWFLAKE_SCHEMA"}, } ), }, "production": { "snowflake_io_manager": snowflake_io_manager.configured( { "account": "abc1234.us-east-1", "user": "system@company.com", "password": {"env": "SYSTEM_SNOWFLAKE_PASSWORD"}, "database": "PRODUCTION", "schema": "HACKER_NEWS", } ), }, } deployment_name = os.getenv("DAGSTER_DEPLOYMENT", "local") return [ *with_resources( [items, comments, stories], resource_defs=resource_defs[deployment_name] ) ]
Depending on your organization’s Dagster setup, there are a couple of options for a staging environment.
For Dagster Cloud users, we recommend using Branch Deployments as your staging step. A branch deployment is a new Dagster deployment that is automatically generated for each git branch. Check out our comprehensive guide to branch deployments to learn how to use branch deployments to verify data pipelines before deploying them to production.
For a self-hosted staging deployment, we’ve already done most of the necessary work to run our assets in staging! All we need to do is add another entry to the resource_defs
dictionary in our repository and set DAGSTER_DEPLOYMENT=staging
in our staging deployment.
resource_defs = { "local": {...}, "production": {...}, "staging": { "snowflake_io_manager": snowflake_io_manager.configured( { "account": "abc1234.us-east-1", "user": "system@company.com", "password": {"env": "SYSTEM_SNOWFLAKE_PASSWORD"}, "database": "STAGING", "schema": "HACKER_NEWS", } ), }, }
You may have noticed a missing step in the development workflow presented in this guide — unit tests! While the main purpose of the guide is to help you transition your code from local development to a production deployment, unit testing is still an important part of the development cycle. In this section, we'll explore a pattern you may find useful when writing your own unit tests.
When we write unit tests for the items
asset, we could make more precise assertions if we knew exactly what data we'd receive from Hacker News. If we refactor our interactions with the Hacker News API as a resource, we can leverage Dagster's resource system to provide a stub resource in our unit tests.
Before we get into implementation, let's go over some best practices:
In many cases, interacting with an external service directly in assets or ops is more convenient than refactoring the interactions with the service as a resource. We recommend refactoring code to use resources in the following cases:
Determining when it makes sense to stub a resource for a unit test can be a topic of much debate. There are certainly some resources where it would be too complicated to write and maintain a stub. For example, it would be difficult to mock a database like Snowflake with a lightweight database since the SQL syntax may vary. In general, if a resource is relatively simple, writing a stub can be helpful for unit testing the assets and ops that use the resource.
We'll start by writing the "real" Hacker News API Client:
# resources.py from typing import Any, Dict, Optional import requests from dagster import resource class HNAPIClient: """ Hacker News client that fetches live data """ def fetch_item_by_id(self, item_id: int) -> Optional[Dict[str, Any]]: """Fetches a single item from the Hacker News API by item id.""" item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json" item = requests.get(item_url, timeout=5).json() return item def fetch_max_item_id(self) -> int: return requests.get( "https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5 ).json() @resource def hn_api_client(): return HNAPIClient()
We'll also need to update the items
asset to use this resource:
# assets.py @asset( config_schema={"N": int}, required_resource_keys={"hn_client"}, io_manager_key="snowflake_io_manager", ) def items(context) -> pd.DataFrame: """Items from the Hacker News API: each is a story or a comment on a story.""" hn_client = context.resources.hn_client max_id = hn_client.fetch_max_item_id() rows = [] # Hacker News API is 1-indexed, so adjust range by 1 for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1): rows.append(hn_client.fetch_item_by_id(item_id)) result = pd.DataFrame(rows, columns=hn_client.item_field_names).drop_duplicates( subset=["id"] ) result.rename(columns={"by": "user_id"}, inplace=True) return result
For the sake of brevity, we've omitted the implementation of the property
item_field_names
HNAPIClient
We'll also need to add the hn_api_client
to the resource_defs
in our repository.
resource_defs = { "local": {"hn_client": hn_api_client, "snowflake_io_manager": {...}}, "production": {"hn_client": hn_api_client, "snowflake_io_manager": {...}}, "staging": {"hn_client": hn_api_client, "snowflake_io_manager": {...}}, }
Now we can write a stubbed version of the Hacker News resource. We want to make sure the stub has implementations for each method HNAPIClient
implements.
# resources.py class StubHNClient: """ Hacker News Client that returns fake data """ def __init__(self): self.data = { 1: { "id": 1, "type": "comment", "title": "the first comment", "by": "user1", }, 2: {"id": 2, "type": "story", "title": "an awesome story", "by": "user2"}, } def fetch_item_by_id(self, item_id: int) -> Optional[Dict[str, Any]]: return self.data.get(item_id) def fetch_max_item_id(self) -> int: return 2 @property def item_field_names(self): return ["id", "type", "title", "by"] @resource def stub_hn_client(): return StubHNClient()
Since the stub Hacker News resource and the real Hacker News resource need to implement the same methods, this would be a great time to write an interface. We’ll skip the implementation in this guide, but you can find it in the full code example.
Now we can use the stub Hacker News resource to test that the items
asset transforms the data in the way we expect:
# test_assets.py def test_items(): context = build_op_context( resources={"hn_client": stub_hn_client}, op_config={"N": StubHNClient().fetch_max_item_id()}, ) hn_dataset = items(context) assert isinstance(hn_dataset, pd.DataFrame) expected_data = pd.DataFrame(StubHNClient().data.values()).rename( columns={"by": "user_id"} ) assert (hn_dataset == expected_data).all().all()
This guide demonstrates how we recommend writing your assets and jobs so that they transition from local development to staging and production environments without requiring code changes at each step. While we focused on assets in this guide, the same concepts and APIs can be used to swap out run configuration for jobs.