To organize the ops inside a job, you can nest sets of ops into sub-graphs.
Name | Description |
---|---|
@graph | The decorator used to define a graph. |
A Dagster job is usually based on a graph of connected ops, but its graph can also contain other graphs. Nesting graphs is useful for organizing large or complicated graphs and for abstracting away complexity. Dagster supports arbitrary levels of nesting.
We use the term node to refer to both ops and graphs, because both ops and graphs can be used as nodes inside graphs.
As a baseline, here's a job that does not use nesting. It starts with an op that returns a number, then uses two ops to convert it from Celsius to Fahrenheight, then logs the result:
from dagster import job, op @op def return_fifty(): return 50.0 @op def add_thirty_two(number): return number + 32.0 @op def multiply_by_one_point_eight(number): return number * 1.8 @op def log_number(context, number): context.log.info(f"number: {number}") @job def all_together_unnested(): log_number(add_thirty_two(multiply_by_one_point_eight(return_fifty())))
We can put the ops that perform the Celsius-to-Fahrenheit conversion into their own sub-graph and invoke that sub-graph from our job's main graph:
@graph def celsius_to_fahrenheit(number): return add_thirty_two(multiply_by_one_point_eight(number)) @job def all_together_nested(): log_number(celsius_to_fahrenheit(return_fifty()))
When executed, the above example will do the exact same thing as the non-nested version, but the nesting allows better organization of code and simplifies the presentation of the main graph in Dagit.
As shown in the example above, sub-graphs can have inputs and outputs - celsius_to_fahrenheit
accepts a number
argument, and it has a return statement. Sub-graph inputs and outputs enable connecting the inputs and outputs of nodes inside the graph to the inputs and outputs of nodes outside the graph. In the all_together_nested
example:
number
input of the celsius_to_fahrenheit
graph is passed as an argument to the multiply_by_one_point_eight
op. This means that, when an outer graph invokes celsius_to_fahrenheit
and provides the output of another op or sub-graph for the number
arg, the output of that op or sub-graph will be passed to multiply_by_one_point_eight
, and multiply_by_one_point_eight
will not execute until the upstream op that produces the output has completed.celsius_to_fahrenheit
graph returns the result of the add_thirty_two
op. This means that, when an outer graph invokes celsius_to_fahrenheit
and passes its output to the input of another node, the output of add_thirty_two
will be provided to that node, and any ops that ultimately receive that input will not execute until add_thirty_two
has completed.If you want to add a description to an input (that will display in Dagit), you can provide a GraphIn
when constructing the graph.
To provide configuration to ops inside a sub-graph when launching a run, you provide config for them under a key with the name of that sub-graph.
This example two ops that both take config and are wrapped by a graph, which is included inside a job.
@op(config_schema={"n": float}) def add_n(context, number): return number + context.op_config["n"] @op(config_schema={"m": float}) def multiply_by_m(context, number): return number * context.op_config["m"] @graph def add_n_times_m_graph(number): return multiply_by_m(add_n(number)) @job def subgraph_config_job(): add_n_times_m_graph(return_fifty())
To kick off a run of this job, you will need to specify the config for both add_n
and multiply_by_m
through the sub-graph:
ops: add_n_times_m_graph: ops: add_n: config: n: 3 multiply_by_m: config: m: 2
Sub-graphs can dictate config for the ops and sub-graphs inside them. If the full config is known at the time that you're defining the graph, you can pass a dictionary to the config
argument of the @graph
decorator.
from dagster import graph, op @op(config_schema={"n": float}) def add_n(context, number): return number + context.op_config["n"] @op(config_schema={"m": float}) def multiply_by_m(context, number): return number * context.op_config["m"] @graph(config={"multiply_by_m": {"config": {"m": 1.8}}, "add_n": {"config": {"n": 32}}}) def celsius_to_fahrenheit(number): return multiply_by_m(add_n(number))
Alternatively, you can use "config mapping", i.e. you can provide a function that accepts config that's provided to the graph and generates config for the nodes inside the graph.
from dagster import config_mapping, graph, op @op(config_schema={"n": float}) def add_n(context, number): return number + context.op_config["n"] @op(config_schema={"m": float}) def multiply_by_m(context, number): return number * context.op_config["m"] @config_mapping(config_schema={"from_unit": str}) def generate_config(val): if val["from_unit"] == "celsius": n = 32 elif val["from_unit"] == "kelvin": n = -459.67 else: raise ValueError() return {"multiply_by_m": {"config": {"m": 1.8}}, "add_n": {"config": {"n": n}}} @graph(config=generate_config) def to_fahrenheit(number): return multiply_by_m(add_n(number))
To run a job that contains to_fahrenheit
as a sub-graph, you need to provide a value for the from_unit
config option:
ops: to_fahrenheit: config: from_unit: celsius
To have multiple outputs from a graph, you need to define the outputs it maps and return a dictionary, where the keys are the output names and the values are the output values.
from dagster import GraphOut @op def echo(i): print(i) @op def one() -> int: return 1 @op def hello() -> str: return "hello" @graph(out={"x": GraphOut(), "y": GraphOut()}) def graph_with_multiple_outputs(): x = one() y = hello() return {"x": x, "y": y} @job def subgraph_multiple_outputs_job(): x, y = graph_with_multiple_outputs() echo(x) echo(y)