Software-defined assets can depend on other software-defined assets. An asset dependency means that the contents of an "upstream" asset are used to compute the contents of the "downstream" asset.
Why split up code into multiple assets? There are a few reasons:
Having defined a dataset of cereals, we'll define a downstream asset that contains only the cereals that are manufactured by Nabisco.
import csv
import requests
from dagster import asset
@asset
def cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@asset
def nabisco_cereals(cereals):
"""Cereals manufactured by Nabisco"""
return [row for row in cereals if row["mfr"] == "N"]
We've defined our new asset, nabisco_cereals, with an argument, cereals.
Dagster offers a few ways of specifying asset dependencies, but the easiest is to include an upstream asset name as an argument to the decorated function. When it's time to materialize the contents of the nabisco_cereals asset, the contents of cereals asset are provided as the value for the cereals argument to its compute function.
So:
cereals doesn't depend on any other assetnabisco_cereals depends on cerealsLet's visualize these assets in Dagit:
dagit -f serial_asset_graph.py
Navigate to http://127.0.0.1:3000:
To materialize the assets, click Materialize all.
Assets don't need to be wired together serially. An asset can depend on and be depended on by any number of other assets.
Here, we're interested in which of Nabisco's cereals has the most protein. We define four assets:
cereals and nabisco_cereals assets, same as abovecereal_protein_fractions asset, which records each cereal's protein content as a fraction of its total masshighest_protein_nabisco_cereal, which is the name of the Nabisco cereal that has the highest protein contentimport csv
import requests
from dagster import asset
@asset
def cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@asset
def nabisco_cereals(cereals):
"""Cereals manufactured by Nabisco"""
return [row for row in cereals if row["mfr"] == "N"]
@asset
def cereal_protein_fractions(cereals):
"""
For each cereal, records its protein content as a fraction of its total mass.
"""
result = {}
for cereal in cereals:
total_grams = float(cereal["weight"]) * 28.35
result[cereal["name"]] = float(cereal["protein"]) / total_grams
return result
@asset
def highest_protein_nabisco_cereal(nabisco_cereals, cereal_protein_fractions):
"""
The name of the nabisco cereal that has the highest protein content.
"""
sorted_by_protein = sorted(
nabisco_cereals, key=lambda cereal: cereal_protein_fractions[cereal["name"]]
)
return sorted_by_protein[-1]["name"]
Let's visualize these assets in Dagit:
dagit -f complex_asset_graph.py
Navigate to http://127.0.0.1:3000:
If you click the "Materialize All" button, you'll see that cereals executes first, followed by nabisco_cereals and cereal_protein_fractions executing in parallel, since they don't depend on each other's outputs. Finally, highest_protein_nabisco_cereal executes last, only after nabisco_cereals and cereal_protein_fractions have both executed.