Using declarative scheduling with software-defined assets#

Orchestrators help you build the right data assets in the right order and at the right time. Dagster assets accomplish this goal by allowing you to define what you want to exist instead of specifying step-by-step how the assets should be built.

While it's possible in Dagster to use traditional cron-based schedules, we recommend trying freshness policies instead. Freshness policies allow you to declare SLAs for the assets you deliver to stakeholders and Dagster determines when everything else needs to run to meet those SLAs.


Prerequisites#

To understand freshness policies, it helps to run a few experiments. Install Dagster to follow along and run the examples in this guide.


Step 1: Schedule assets using cron#

To run each code snippet, copy the code into a Python file and then run dagit -f YOUR_FILE_NAME.py

Let's begin with a regular schedule:

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    asset,
    define_asset_job,
    repository,
)


@asset
def a():
    pass


@asset
def b(a):
    pass


update_job = define_asset_job(
    name="update_job", selection=AssetSelection.keys("a", "b")
)

update_job_schedule = ScheduleDefinition(
    name="update_job_schedule", job=update_job, cron_schedule="* * * * *"
)


@repository
def my_repo():
    return [[a, b], [update_job_schedule]]

This schedule runs a job every minute called update_job that materialiazes asset a and then asset b. This setup represents a traditional cron-based schedule. To ensure asset b is updated with fresh data, you tell the orchestrator to run a job targetting asset a and b, and Dagster knows to run a before b because a is an input to the asset function b.


Step 2: Introduce a reconciliation sensor#

Now, let's take a step towards a more declarative approach to scheduling that describes what assets you want to exist:

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    asset,
    build_asset_reconciliation_sensor,
    define_asset_job,
    repository,
)


@asset
def a():
    pass


@asset
def b(a):
    pass


update_job = define_asset_job(name="update_job", selection=AssetSelection.keys("a"))

# add a reconciliation sensor
update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)

update_job_schedule = ScheduleDefinition(
    name="update_job_schedule", job=update_job, cron_schedule="* * * * *"
)


@repository
def my_repo():
    return [[a, b], [update_job_schedule], [update_sensor]]

This example adds a reconciliation sensor called update_sensor and modifies the scheduled job to only target asset a. When the scheduled job runs:

  1. a is updated by the scheduled job and asset b is marked as stale
  2. The reconciliation sensor identifies b is stale and starts a run to materialize b
  3. b is updated and marked as fresh

This approach is more declarative: you state that "b should be as up-to-date as possible" and Dagster determines when b needs to run.


Step 3: Introduce freshness policies#

In this section, we'll introduce another asset, c. What if you don't need c to be as up-to-date as a and b? In traditional cron-based schedules this requirement quickly becomes confusing:

  • Shoud the scheduled job target c and try to re-use the last value of a?
  • Should the scheduled job instead run a and c?
  • Does scheduling c create any side-effects that will impact b?

In Dagster, you can avoid all of these questions and instead declare how fresh you want c to be, and let Dagster figure out the rest. This declaration is done through a freshness policy:

from dagster import (
    AssetSelection,
    FreshnessPolicy,
    ScheduleDefinition,
    asset,
    build_asset_reconciliation_sensor,
    define_asset_job,
    repository,
)


@asset
def a():
    pass


@asset
def b(a):
    pass


# add a freshness policy
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=2))
def c(a):
    pass


update_job = define_asset_job(name="update_job", selection=AssetSelection.keys("a"))

update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)

update_job_schedule = ScheduleDefinition(
    name="update_job_schedule", job=update_job, cron_schedule="* * * * *"
)


@repository
def my_repo():
    return [[a, b, c], [update_job_schedule], [update_sensor]]

One way to think about a freshness policy is that it adds a tolerance to the reconciliation sensor. When a is updated, the reconciliation sensor immediately knows that b is stale and then creates a run to refresh b. The freshness policy tells the reconciliation sensor that c can tolerate being stale for up to two minutes. Instead of creating a run to update c immediately, the reconciliation sensor will wait until c is more than two minutes stale and then will create a run to update c:

  1. First, a is updated by the schedule. c is marked stale but is not violating the freshness policy:

  2. After two minutes, c is marked late because the freshness policy is violated. A run is started to update c:

  3. Once the run completes c is both on-time and fresh:


Step 4: Remove the cron schedule#

Our code still contains a schedule that updates a. The final step is to remove this schedule:

from dagster import (
    AssetSelection,
    FreshnessPolicy,
    asset,
    build_asset_reconciliation_sensor,
    repository,
)


@asset
def a():
    pass


@asset
def b(a):
    pass


@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=2))
def c(a):
    pass


update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)


@repository
def my_repo():
    return [[a, b, c], [update_sensor]]

Here is where the reconciliation sensor and freshness policies become really powerful! Dagster will determine that after two minutes asset c is late and in violation of its freshness policy. Dagster will also determine that in order for c to be fresh, asset a needs to be updated as well. Dagster will create a run to update both a and c:

  1. c is late because it was last updated more than two minutes ago, thus violating the freshness policy:

  2. A run is triggered that updates a and c:

  3. a and c are both updated. Asset b is now stale and will be updated based on its policy:

In the current code asset b has no policy, but b is monitored by the reconciliation sensor. As a result, as soon as a is updated, asset b will be marked as stale and then a run will be started to update b. If this immediate update isn't desirable, you can add a freshness policy to asset b:

from dagster import (
    AssetSelection,
    FreshnessPolicy,
    asset,
    build_asset_reconciliation_sensor,
    repository,
)


@asset
def a():
    pass


# add a freshness policy for b
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=5))
def b(a):
    pass


@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=2))
def c(a):
    pass


update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)


@repository
def my_repo():
    return [[a, b, c], [update_sensor]]

When multiple freshness policies exist, Dagster determines the minimal amount of work needed to meet all of the policies. In this example, a is refreshed every two minutes by c, so b can be refreshed without re-running a again. In contrast, a simple cron scheduler would redundantly run a for each run of b and c. Freshness policies reduce the work done by the scheduler!

The data assets are now fully declarative. You tell Dagster how fresh c should be and Dagster does the rest. Asset a is updated when it needs to be, not any more or less frequently.


Step 5: Change code versions#

While this guide has focused on assets becoming stale due to time passing and new data becoming available, there's one more aspect to consider. Assets can also become stale if their definitions change because code has been updated.

In Dagster, it's possible to indicate that an asset is stale by updating its code_version. For example, existing code in production might be labeled with version 0.1:

from dagster import AssetSelection, asset, build_asset_reconciliation_sensor, repository


@asset
def a():
    pass


# original code version
@asset(code_version="0.1")
def b(a):
    pass


@asset
def c(b):
    pass


update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)


@repository
def my_repo():
    return [[a, b, c], [update_sensor]]

These assets would be managed by the reconciliation scheduler and considered fresh when all three have been materialized:

If you make a substantial change to your code, you can increment the op_version:

from dagster import AssetSelection, asset, build_asset_reconciliation_sensor, repository


@asset
def a():
    pass


# update code version
@asset(code_version="0.2")
def b(a):
    return "significant change"


@asset
def c(b):
    pass


update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)


@repository
def my_repo():
    return [[a, b, c], [update_sensor]]

When the new asset definitions are loaded, b and the downstream asset c will be flagged as stale:

In testing environments, the stale assets can be manually materialized to verify the code change:

In production, a reconciliation sensor will launch runs to refresh the stale assets taking into account any of their freshness policies.


Conclusion#

Declarative scheduling simplifies how data pipelines are built, and it helps data engineers meet the needs of their stakeholders. Freshness policies can map to data SLAs. An executive dashboard with KPIs might have a strict SLA and freshness policy with a low lag time, whereas retraining a ML model may accept a greater lag.