At this point, we should have finished the setup step, and now we have the example code setup with a fresh virtual environment, and our two Airflow instances running locally. Now, we can start writing Dagster code.
We'll start by creating asset representations of our DAGs in Dagster.
Create a new shell and navigate to the root of the tutorial directory. You will need to set up the dagster-airlift package in your Dagster environment:
Now, we can use the load_airflow_dag_asset_specs function to create asset representations of the DAGs in the warehouse Airflow instance:
from dagster_airlift.core import load_airflow_dag_asset_specs
assets = load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,)
Now, let's add these assets to a Definitions object:
from dagster import Definitions
defs = Definitions(assets=assets)
Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance:
# Set up environment variables to point to the airlift-federation-tutorial directory on your machineexportTUTORIAL_EXAMPLE_DIR=$(pwd)exportDAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home"
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py
If we navigate to the Dagster UI (running at http://localhost:3000), we should see the assets created from the warehouse Airflow instance.
There's a lot of DAGs in this instance, and we only want to focus on the load_customers DAG. Let's filter the assets to only include the load_customers DAG:
Let's instead add this asset to our Definitions object:
defs = Definitions(assets=[load_customers])
Now, our Dagster environment only includes the load_customers DAG from the warehouse Airflow instance.
Finally, we'll use a sensor to poll the warehouse Airflow instance for new runs. This way, whenever we get a successful run of the load_customers DAG, we'll see a materialization in the Dagster UI:
from dagster_airlift.core import build_airflow_polling_sensor
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers],
airflow_instance=warehouse_airflow_instance,)
Now, we can add this sensor to our Definitions object:
You can test this by navigating to the airflow UI at localhost:8081, and triggering a run of the load_customers DAG. When the run completes, you should see a materialization in the Dagster UI.
We can repeat the same process for the customer_metrics DAG in the metrics Airflow instance, which runs at http://localhost:8082. We'll leave this as an exercise to test your understanding.
Adding lineage between load_customers and customer_metrics#
Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the replace_attributes function to add a dependency from the load_customers asset to the customer_metrics asset:
from dagster._core.definitions.asset_spec import replace_attributes
customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
deps=[load_customers],)
Now, after adding the updated customer_metrics_dag_asset to our Definitions object, we should see the lineage between the two DAGs in the Dagster UI.