Asset Observations#

An asset observation is an event that records metadata about a given asset. Unlike asset materializations, asset observations do not signify that an asset has been mutated.

Relevant APIs#

NameDescription
AssetObservationDagster event indicating that an asset's metadata has been recorded.
AssetKeyA unique identifier for a particular external asset.

Overview#

AssetObservation events are used record metadata in Dagster about a given asset. Asset observation events can be logged at runtime within ops and assets. An asset must be defined using the @asset decorator or have existing materializations in order for its observations to be displayed.

Logging an AssetObservation from an Op#

To make Dagster aware that we have recorded metadata about an asset, we can log an AssetObservation event from within an op. To do this, we use the method OpExecutionContext.log_event on the context:

from dagster import AssetObservation, op


@op
def observation_op(context):
    df = read_df()
    context.log_event(
        AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
    )
    return 5

We should now see an observation event in the event log when we execute this asset.

asset-observation

Attaching Metadata to an AssetObservation#

There are a variety of types of metadata that can be associated with an observation event, all through the MetadataEntry class. Each observation event optionally takes a dictionary of metadata entries that are then displayed in the event log and the Asset Details page. Check our API docs for MetadataEntry for more details on the types of event metadata available.

from dagster import op, AssetObservation, MetadataValue


@op
def observes_dataset_op(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    context.log_event(
        AssetObservation(
            asset_key="my_dataset",
            metadata={
                "text_metadata": "Text-based metadata for this event",
                "path": MetadataValue.path(remote_storage_path),
                "dashboard_url": MetadataValue.url(
                    "http://mycoolsite.com/url_for_my_data"
                ),
                "size (bytes)": calculate_bytes(df),
            },
        )
    )
    context.log_event(AssetMaterialization(asset_key="my_dataset"))
    return remote_storage_path

In the Asset Details page, we can see observations in the Asset Activity table.

asset-activity-observation

Specifying a partition for an AssetObservation#

If you are observing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the partition argument on the object.

from dagster import op, AssetMaterialization


@op(config_schema={"date": str})
def partitioned_dataset_op(context):
    partition_date = context.op_config["date"]
    df = read_df_for_date(partition_date)
    context.log_event(
        AssetObservation(asset_key="my_partitioned_dataset", partition=partition_date)
    )
    return df