Using declarative scheduling with software-defined assets#
Orchestrators help you build the right data assets in the right order and at the right time. Dagster assets accomplish this goal by allowing you to define what you want to exist instead of specifying step-by-step how the assets should be built.
While it's possible in Dagster to use traditional cron-based schedules, we recommend trying freshness policies instead. Freshness policies allow you to declare SLAs for the assets you deliver to stakeholders and Dagster determines when everything else needs to run to meet those SLAs.
This schedule runs a job every minute called update_job that materialiazes asset a and then asset b. This setup represents a traditional cron-based schedule. To ensure asset b is updated with fresh data, you tell the orchestrator to run a job targetting asset a and b, and Dagster knows to run a before b because a is an input to the asset function b.
In this section, we'll introduce another asset, c. What if you don't need c to be as up-to-date as a and b? In traditional cron-based schedules this requirement quickly becomes confusing:
Shoud the scheduled job target c and try to re-use the last value of a?
Should the scheduled job instead run a and c?
Does scheduling c create any side-effects that will impact b?
In Dagster, you can avoid all of these questions and instead declare how fresh you want c to be, and let Dagster figure out the rest. This declaration is done through a freshness policy:
One way to think about a freshness policy is that it adds a tolerance to the reconciliation sensor. When a is updated, the reconciliation sensor immediately knows that b is stale and then creates a run to refresh b. The freshness policy tells the reconciliation sensor that c can tolerate being stale for up to two minutes. Instead of creating a run to update c immediately, the reconciliation sensor will wait until c is more than two minutes stale and then will create a run to update c:
First, a is updated by the schedule. c is marked stale but is not violating the freshness policy:
After two minutes, c is marked late because the freshness policy is violated. A run is started to update c:
Once the run completes c is both on-time and fresh:
Here is where the reconciliation sensor and freshness policies become really powerful! Dagster will determine that after two minutes asset c is late and in violation of its freshness policy. Dagster will also determine that in order for c to be fresh, asset a needs to be updated as well. Dagster will create a run to update both a and c:
c is late because it was last updated more than two minutes ago, thus violating the freshness policy:
A run is triggered that updates a and c:
a and c are both updated. Asset b is now stale and will be updated based on its policy:
In the current code asset b has no policy, but b is monitored by the reconciliation sensor. As a result, as soon as a is updated, asset b will be marked as stale and then a run will be started to update b. If this immediate update isn't desirable, you can add a freshness policy to asset b:
from dagster import(
AssetSelection,
FreshnessPolicy,
asset,
build_asset_reconciliation_sensor,
repository,)@assetdefa():pass# add a freshness policy for b@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=5))defb(a):pass@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=2))defc(a):pass
update_sensor = build_asset_reconciliation_sensor(
name="update_sensor", asset_selection=AssetSelection.all())@repositorydefmy_repo():return[[a, b, c],[update_sensor]]
When multiple freshness policies exist, Dagster determines the minimal amount of work needed to meet all of the policies. In this example, a is refreshed every two minutes by c, so b can be refreshed without re-running a again. In contrast, a simple cron scheduler would redundantly run a for each run of b and c. Freshness policies reduce the work done by the scheduler!
The data assets are now fully declarative. You tell Dagster how fresh c should be and Dagster does the rest. Asset a is updated when it needs to be, not any more or less frequently.
While this guide has focused on assets becoming stale due to time passing and new data becoming available, there's one more aspect to consider. Assets can also become stale if their definitions change because code has been updated.
In Dagster, it's possible to indicate that an asset is stale by updating its code_version. For example, existing code in production might be labeled with version 0.1:
from dagster import AssetSelection, asset, build_asset_reconciliation_sensor, repository
@assetdefa():pass# original code version@asset(code_version="0.1")defb(a):pass@assetdefc(b):pass
update_sensor = build_asset_reconciliation_sensor(
name="update_sensor", asset_selection=AssetSelection.all())@repositorydefmy_repo():return[[a, b, c],[update_sensor]]
These assets would be managed by the reconciliation scheduler and considered fresh when all three have been materialized:
If you make a substantial change to your code, you can increment the op_version:
Declarative scheduling simplifies how data pipelines are built, and it helps data engineers meet the needs of their stakeholders. Freshness policies can map to data SLAs. An executive dashboard with KPIs might have a strict SLA and freshness policy with a low lag time, whereas retraining a ML model may accept a greater lag.