This guide utilizes the Airflow Migration Tutorial, and assumes you've completed the initial setup and peer stages. This guide will pick up from there.If you've already completed the migration tutorial, we advise downloading a fresh copy and following along with those steps. This guide will perform the observe and migrate steps at the DAG-level instead of on a task-by-task basis, for the rebuild_customers_list DAG.
When migrating an entire DAG at once, we'll want to create assets which map to the entire DAG. Whereas in the task-by-task observation step, we used the assets_with_task_mappings function, we'll instead use the assets_with_dag_mappings function.
For our rebuild_customers_list DAG, let's take a look at what the new observation code looks like:
import os
from pathlib import Path
from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster_airlift.core import(
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_dag_mappings,
build_defs_from_airflow_instance,)from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
defdbt_project_path()-> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")assert env_val,"TUTORIAL_DBT_PROJECT_DIR must be set"return Path(env_val)@dbt_assets(
manifest=dbt_project_path()/"target"/"manifest.json",
project=DbtProject(dbt_project_path()),)defdbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):yieldfrom dbt.cli(["build"], context=context).stream()# Instead of mapping assets to individual tasks, we map them to the entire DAG.
mapped_assets = assets_with_dag_mappings(
dag_mappings={"rebuild_customers_list":[
AssetSpec(key=["raw_data","raw_customers"]),
dbt_project_assets,
AssetSpec(key="customers_csv", deps=["customers"]),],},)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",),
name="airflow_instance_one",),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},),)
Now, instead of getting a materialization when a particular task completes, each mapped asset will receive a materialization when the entire DAG completes.
Recall that in the task-by-task migration step, we "proxy" execution on a task by task basis, which is controlled by a yaml document. For DAG-mapped assets, execution is proxied on a per-DAG basis. Proxying execution to Dagster will require all assets mapped to that DAG be executable within Dagster. Let's take a look at some fully migrated code mapped to DAGs instead of tasks:
Now that all of our assets are fully executable, we can create a simple yaml file to proxy execution for the whole dag:
proxied:True
We will similarly use proxying_to_dagster at the end of our DAG file (the code is exactly the same here as it was for the per-task migration step)
# Dags file can be found at tutorial_example/airflow_dags/dags.pyfrom pathlib import Path
from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
dag = DAG("rebuild_customers_list",...)...# Set this to True to begin the proxying process
PROXYING =Falseif PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent /"proxied_state"),)
Once the proxied bit is flipped to True, we can go to the Airflow UI, and we'll see that our tasks have been replaced with a single task.
When performing dag-level mapping, we don't preserve task structure in the Airflow dags. This single task will materialize all mapped Dagster assets instead of executing the original Airflow task business logic.
We can similarly mark proxied back to False, and the original task structure and business logic will return unchanged.