Ask AI

You are viewing an unreleased or outdated version of the documentation

Automating assets using schedules and jobs#

After creating some asset definitions, you may want to automate their materialization.

In this guide, we'll show you one method of accomplishing this by using schedules. To do this for ops, refer to the Automating ops using schedules guide.

By the end of this guide, you'll be able to:

  • Create a schedule that directly targets some assets
  • Add the new schedule to your project's Definitions object
  • Turn the schedule on

Prerequisites#

To follow this guide, you'll need:


Step 1: Define some assets#

The first step in creating a schedule is to define the target assets we want to materialize. Define a few assets in a group named ecommerce_assets:

@asset(group_name="ecommerce_assets")
def orders_asset():
    return 1


@asset(group_name="ecommerce_assets")
def users_asset():
    return 2

Step 2: Define the schedule#

Next, we'll construct the schedule using ScheduleDefinition and use it to target the assets we created in Step 1.

ecommerce_schedule = ScheduleDefinition(
    name="ecommerce_schedule",
    target=AssetSelection.groups("ecommerce_assets"),
    cron_schedule="15 5 * * 1-5",
    default_status=DefaultScheduleStatus.RUNNING,
)

To build the schedule, we:

  1. Imported DefaultScheduleStatus and ScheduleDefinition from dagster

  2. Created a schedule using ScheduleDefinition that:

    • Targets the assets we defined in Step 1 using AssetSelection.groups
    • Has a cron expression of 15 5 * * 1-5, which translates to Every Monday through Friday of every month at 5:15AM
    • Is turned on by default (default_status). We'll discuss this more in Step 4.

Step 3: Update the Definitions object#

Next, we'll update our project's Definitions object to include the new assets and schedule. This ensures the job and schedule are available to Dagster processes, such as the Dagster UI.

defs = Definitions(
    assets=[orders_asset, users_asset],
    schedules=[ecommerce_schedule],
)
We also could have passed our asset definitions directly as the `target` of the schedule, and they would be automatically included as assets in the `Definitions` object. Since we targeted them here using `AssetSelection.groups`, we needed to include them separately in `assets`.

At this point, your code should look like the following:

from dagster import (
    AssetSelection,
    DefaultScheduleStatus,
    Definitions,
    ScheduleDefinition,
    asset,
    define_asset_job,
)


@asset(group_name="ecommerce_assets")
def orders_asset():
    return 1


@asset(group_name="ecommerce_assets")
def users_asset():
    return 2


ecommerce_schedule = ScheduleDefinition(
    name="ecommerce_schedule",
    target=AssetSelection.groups("ecommerce_assets"),
    cron_schedule="15 5 * * 1-5",
    default_status=DefaultScheduleStatus.RUNNING,
)

defs = Definitions(
    assets=[orders_asset, users_asset],
    schedules=[ecommerce_schedule],
)

Step 4: Turn the schedule on#

Schedules must be turned on before they can be used. In our case, we already turned the schedule on by using the default_status parameter in its ScheduleDefinition, but there are a few other ways to do this:

Heads up! Starting or stopping a schedule in the UI will override any default status set in code.

To turn on a schedule in the Dagster UI, navigate to Overview > Schedules:

Schedules tab in the Dagster UI

After the schedule is started, it will begin executing immediately if the dagster-daemon process is running. This process starts automatically when dagster dev is run.


APIs in this guide#

NameDescription
AssetSelectionA class that defines a selection of assets. Typically used with define_asset_job.
ScheduleDefinitionA class that defines a schedule and attaches it to a job.
DefinitionsThe object that contains all the definitions defined within a code location. Definitions include assets, jobs, resources, schedules, and sensors.