Software-Defined Assets with Pandas and PySpark#

You can find the code for this example on Github

The software-defined asset APIs sit atop of the graph/job/op APIs and enable a novel approach to orchestration that puts assets at the forefront.

In Dagster, an "asset" is a data product, an object produced by a data pipeline. Some examples are tables, machine learning models, or reports.

Conceptually, software-defined assets invert the typical relationship between assets and computation. Instead of defining a graph of ops and recording which assets those ops end up materializing, you define a set of assets. Each asset knows how to compute its contents from upstream assets.

Taking a software-defined asset approach has a few main benefits:

  • Write less code - Each asset knows about the assets it depends on; you don't need to use @graph / @job to wire up dependencies.
  • Track cross-job dependencies via asset lineage - Dagster allows you to find the parents and children of any asset, even if they live in different jobs. This is useful for finding the sources of problems and for understanding the consequences of changing or removing an asset.
  • Know when you need to take action on an asset - In a unified view, Dagster compares the assets you've defined in code to the assets you've materialized in storage. You can catch that you've deployed code for generating a new table, but that you haven't yet materialized it. Or that you've deployed code that adds a column to a table, but that your stored table is still missing that column. Or that you've removed an asset definition, but the table still exists in storage.

In this example, we'll define some tables with dependencies on each other. We have a table of temperature samples collected in five-minute increments, and we want to compute a table of the highest temperatures for each day.

Assets computed with Pandas and stored as CSVs#

Defining the assets#

Here are our asset definitions that define tables we want to materialize.

import pandas as pd
from pandas import DataFrame

from dagster import AssetKey, SourceAsset, asset

sfo_q2_weather_sample = SourceAsset(
    key=AssetKey("sfo_q2_weather_sample"),
    description="Weather samples, taken every five minutes at SFO",
    metadata={"format": "csv"},
)


@asset
def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame:
    """Computes the temperature high for each day"""
    sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
    return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})


@asset
def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
    """Computes the 10 hottest dates"""
    return daily_temperature_highs.nlargest(10, "max_tmpf")

sfo_q2_weather_sample represents our base temperature table. It's a SourceAsset, meaning that we rely on it, but don't generate it.

daily_temperature_highs represents a computed asset. It's derived by taking the sfo_q2_weather_sample table and applying the decorated function to it. Notice that it's defined using a pure function, a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on in an IOManager. This allows us to swap in different implementations in different environments. For example, in local development, we might want to store data in a local CSV file for easy testing. However in production, we would want to store data in a data warehouse.

hottest_dates is a computed asset that depends on another computed asset, daily_temperature_highs.

The framework infers asset dependencies by looking at the names of the arguments to the decorated functions. The function that defines the daily_temperature_highs asset has an argument named sfo_q2_weather_sample, which corresponds to the asset definition of the same name.

Connecting assets to external services#

Having defined some assets, we can combine them with resources, such as I/O managers, to determine how they're stored, and connect them to external services.

To load definitions such as assets and resources, we use Definitions. Let's take a closer look at how definitions are loaded:

  • We add the assets that we want Dagster to load in the assets argument to the Definitions object. It's common to use a utility like load_assets_from_modules or load_assets_from_package_name to pick up all the assets within a module or package, so you don't need to list them individually. The order that we supply the assets doesn't matter, since the dependencies are determined by each asset definition.

  • We supply resources mapped to the assets using the resources argument to the Definitions object.

# __init__.py
    from dagster import Definitions, load_assets_from_modules

    from .assets import table_assets
    from .local_filesystem_io_manager import local_filesystem_io_manager

    defs = Definitions(
        # imports the module called "assets" from the package containing the current module
        # the "assets" module contains the asset definitions
        assets=load_assets_from_modules([table_assets]),
        resources={
            "io_manager": local_filesystem_io_manager,
        },
    )

The functions we used to define our assets describe how to compute their contents, but not how to read and write them to persistent storage. For reading and writing, we define an IOManager. In this case, our LocalFileSystemIOManager stores DataFrames as CSVs on the local filesystem:

# local_filesystem_io_manager.py

import os

import pandas as pd
from pandas import DataFrame

from dagster import AssetKey, IOManager, io_manager


class LocalFileSystemIOManager(IOManager):
    """Translates between Pandas DataFrames and CSVs on the local filesystem."""

    def _get_fs_path(self, asset_key: AssetKey) -> str:
        rpath = os.path.join(*asset_key.path) + ".csv"
        return os.path.abspath(rpath)

    def handle_output(self, context, obj: DataFrame):
        """This saves the dataframe as a CSV."""
        fpath = self._get_fs_path(context.asset_key)
        obj.to_csv(fpath)

    def load_input(self, context):
        """This reads a dataframe from a CSV."""
        fpath = self._get_fs_path(context.asset_key)
        return pd.read_csv(fpath)


@io_manager
def local_filesystem_io_manager():
    return LocalFileSystemIOManager()

Adding in Spark assets#

Not all the assets in the same dependency graph need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the daily_temperature_highs asset we defined above using Pandas.

from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import Window
from pyspark.sql import functions as f

from dagster import asset


@asset
def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF:
    """Computes the difference between each day's high and the previous day's high"""
    window = Window.orderBy("valid_date")
    return daily_temperature_highs.select(
        "valid_date",
        (
            daily_temperature_highs["max_tmpf"]
            - f.lag(daily_temperature_highs["max_tmpf"]).over(window)
        ).alias("day_high_diff"),
    )

Here's an extended version of weather_assets that contains the new asset:

# __init__.py

    from dagster import Definitions, load_assets_from_modules

    from .assets import spark_asset, table_assets
    from .local_spark_filesystem_io_manager import local_filesystem_io_manager

    defs = Definitions(
        assets=load_assets_from_modules([table_assets, spark_asset]),
        resources={"io_manager": local_filesystem_io_manager},
    )

Defining a multi-type I/O Manager#

Because the same assets will be written and read into different Python types in different situations, we need to define an IOManager that can handle both of those types. Here's an extended version of the IOManager we defined before:

# local_spark_filesystem_io_manager.py

# Data is stored in Parquet files using the "Hadoop-style" layout in which each table corresponds to a
# directory, and each file within the directory contains some of the rows.

# The processing options are Pandas and Spark. A table can be created from a Pandas DataFrame
# and then consumed in a downstream computation as a Spark DataFrame, and vice versa.

import glob
import os
from typing import Union

import pandas as pd
from pandas import DataFrame as PandasDF
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import SparkSession

from dagster import AssetKey, IOManager
from dagster import _check as check
from dagster import io_manager


class LocalFileSystemIOManager(IOManager):
    def _get_fs_path(self, asset_key: AssetKey) -> str:
        return os.path.abspath(os.path.join(*asset_key.path))

    def handle_output(self, context, obj: Union[PandasDF, SparkDF]):
        """This saves the DataFrame as a CSV using the layout written and expected by Spark/Hadoop.

        E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", a directory
        will be created with two files inside it:

            /a/b/c/
                part-00000.csv
         2       _SUCCESS
        """
        if isinstance(obj, PandasDF):
            directory = self._get_fs_path(context.asset_key)
            os.makedirs(directory, exist_ok=True)
            open(os.path.join(directory, "_SUCCESS"), "wb").close()
            csv_path = os.path.join(directory, "part-00000.csv")
            obj.to_csv(csv_path)
        elif isinstance(obj, SparkDF):
            obj.write.format("csv").options(header="true").save(
                self._get_fs_path(context.asset_key), mode="overwrite"
            )
        else:
            raise ValueError("Unexpected input type")

    def load_input(self, context) -> Union[PandasDF, SparkDF]:
        """This reads a DataFrame from a CSV using the layout written and expected by Spark/Hadoop.

        E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", and that
        directory contains:

            /a/b/c/
                part-00000.csv
                part-00001.csv
                _SUCCESS

        then the produced dataframe will contain the concatenated contents of the two CSV files.
        """
        if context.dagster_type.typing_type == PandasDF:
            fs_path = os.path.abspath(self._get_fs_path(context.asset_key))
            paths = glob.glob(os.path.join(fs_path, "*.csv"))
            check.invariant(len(paths) > 0, f"No csv files found under {fs_path}")
            return pd.concat(map(pd.read_csv, paths))
        elif context.dagster_type.typing_type == SparkDF:
            return (
                SparkSession.builder.getOrCreate()
                .read.format("csv")
                .options(header="true")
                .load(self._get_fs_path(context.asset_key))
            )
        else:
            raise ValueError("Unexpected input type")


@io_manager
def local_filesystem_io_manager():
    return LocalFileSystemIOManager()