Ask AI

You are viewing an unreleased or outdated version of the documentation

Airflow 1 Migration Tutorial#

Overview#

This guide covers using dagster-airlift to migrate an Airflow DAG to Dagster on apache-airflow below version 2.

Many APIs within the dagster-airlift package make use of Airflow's stable REST API, which was added in Airflow 2.0. However, we still enable a migration process for Airflow 1.x users.

This guide will cover the migration process using the same base example as the tutorial.

We recommend following the tutorial in order to understand the concepts and steps involved in the migration process, and then using this guide to apply those steps to an Airflow 1.x environment.

Setup#

-- turn this into a flipper If you previously ran the Airlift tutorial, you can follow along by doing the following:

  • clear tutorial_example/dagster_defs/definitions.py, and mark all tasks as unproxied in the proxied state YAML file.

Start by following the setup step of the migration tutorial, and we'll diverge from there.

With Airflow 1.x, we won't peer or observe Airflow DAGs first - we'll immediately skip to the migration step and proxy execution to Dagster.

Scaffolding proxied state#

To begin proxying tasks in a DAG, first you will need a file to track proxying state. In your Airflow DAG directory, create a proxied_state folder, and in it create a yaml file with the same name as your DAG. The included example at airflow_dags/proxied_state is used by make airflow_run, and can be used as a template for your own proxied state files.

Given our example DAG rebuild_customers_list with three tasks, load_raw_customers, run_dbt_model, and export_customers, proxied_state/rebuild_customers_list.yaml should look like the following:

tasks:
  - id: load_raw_customers
    proxied: False
  - id: build_dbt_models
    proxied: False
  - id: export_customers
    proxied: False

Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG:

# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path

from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml

dag = DAG("rebuild_customers_list", ...)

...

# Set this to True to begin the proxying process
PROXYING = False

if PROXYING:
    proxying_to_dagster(
        global_vars=globals(),
        proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    )

Set PROXYING to True or eliminate the if statement.

The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.)

Migration state rendering in Airflow UI

Migrating build_dbt_models#

We'll now create Dagster assets that correspond to each Airflow task. First, since Dagster provides out of the box integration with dbt, we'll use dagster-dbt to create assets for the build_dbt_models task in our tutorial_example/dagster_defs/definitions.py file:

import os
from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


def dbt_project_path() -> Path:
    env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
    assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
    return Path(env_val)


@dbt_assets(
    manifest=dbt_project_path() / "target" / "manifest.json",
    project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Now, we'll mark our dbt_project_assets as being mapped from Airflow:

from dagster_airlift.core import assets_with_task_mappings

mapped_assets = assets_with_task_mappings(
    dag_id="rebuild_customers_list",
    task_mappings={
        "build_dbt_models":
        # load rich set of assets from dbt project
        [dbt_project_assets],
    },
)

The assets_with_task_mappings function adds some metadata to each passed-in asset which, over the wire in Airflow, we'll use to determine which assets to execute in Dagster.

We'll provide the mapped assets to a Definitions object in our tutorial_example/dagster_defs/definitions.py file:

from dagster import Definitions

defs = Definitions(
    assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}
)

Note how this differs from the original migration tutorial; we're not using build_defs_from_airflow_instance, which relies on the REST API.

Finally, we'll mark the build_dbt_models task as proxied in the proxied state YAML file:

tasks:
  - id: load_raw_customers
    proxied: False
  - id: build_dbt_models
    proxied: True
  - id: export_customers
    proxied: False

Important: It may take up to 30 seconds for the proxied state in the Airflow UI to reflect this change. You must subsequently reload the definitions in Dagster via the UI or by restarting dagster dev.

You can now run the rebuild_customers_list DAG in Airflow, and the build_dbt_models task will be executed in a Dagster run:

dbt build executing in Dagster

Completed code#

Migrating the other tasks should follow the same pattern as in the migration tutorial. When you're done, your code should look like this:

import os
from pathlib import Path

from dagster import (
    AssetExecutionContext,
    AssetsDefinition,
    AssetSpec,
    DailyPartitionsDefinition,
    Definitions,
    multi_asset,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import assets_with_task_mappings
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets

# Code also invoked from Airflow
from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv
from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb

PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())


def dbt_project_path() -> Path:
    env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
    assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
    return Path(env_val)


def airflow_dags_path() -> Path:
    return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags"


def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition:
    @multi_asset(name=f"load_{args.table_name}", specs=[spec])
    def _multi_asset() -> None:
        load_csv_to_duckdb(args)

    return _multi_asset


def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition:
    @multi_asset(name=f"export_{args.table_name}", specs=[spec])
    def _multi_asset() -> None:
        export_duckdb_to_csv(args)

    return _multi_asset


@dbt_assets(
    manifest=dbt_project_path() / "target" / "manifest.json",
    project=DbtProject(dbt_project_path()),
    partitions_def=PARTITIONS_DEF,
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()


mapped_assets = assets_with_task_mappings(
    dag_id="rebuild_customers_list",
    task_mappings={
        "load_raw_customers": [
            load_csv_to_duckdb_asset(
                AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF),
                LoadCsvToDuckDbArgs(
                    table_name="raw_customers",
                    csv_path=airflow_dags_path() / "raw_customers.csv",
                    duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
                    names=["id", "first_name", "last_name"],
                    duckdb_schema="raw_data",
                    duckdb_database_name="jaffle_shop",
                ),
            )
        ],
        "build_dbt_models":
        # load rich set of assets from dbt project
        [dbt_project_assets],
        "export_customers": [
            export_duckdb_to_csv_defs(
                AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF),
                ExportDuckDbToCsvArgs(
                    table_name="customers",
                    csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv",
                    duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
                    duckdb_database_name="jaffle_shop",
                ),
            )
        ],
    },
)


defs = Definitions(
    assets=mapped_assets,
    resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
)

Conclusion#

To recap, we've covered the process of migrating an Airflow 1.x DAG to Dagster using dagster-airlift. We've made clearer what functionality works wth Airflow < 2.0, and what does not. We've shown how to create Dagster assets that correspond to Airflow tasks, and how to mark those tasks as proxied in the proxied state YAML file.