This reference shows usage of Dagster Pipes with other entities in the Dagster system. For a step-by-step walkthrough, refer to the Dagster Pipes tutorial.
When launching the subprocess, you may want to make environment variables or additional parameters available in the external process. Extras are arbitrary, user-defined parameters made available on the context object in the external process.
In the external code, you can access extras via the PipesContext object:
import os
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})
total_orders =len(orders_df)# get the Dagster Pipes context
context = PipesContext.get()# get all extras provided by Dagster assetprint(context.extras)# get the value of an extraprint(context.get_extra("foo"))# get env varprint(os.environ["MY_ENV_VAR_IN_SUBPROCESS"])if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
The run method to the PipesSubprocessClient resource also accepts env and extras , which allow you to specify environment variables and extra arguments when executing the subprocess:
Note: We're using os.environ in this example, but Dagster's recommendation is to use EnvVar in production.
Sometimes, you may not want to materialize an asset, but instead want to report a data quality check result. When your asset has data quality checks defined in @asset_check:
From the external code, you can report to Dagster that an asset check has been performed via PipesContext.report_asset_check. Note that asset_key in this case is required, and must match the asset key defined in @asset_check:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})# get the Dagster Pipes context
context = PipesContext.get()# send structured metadata back to Dagster
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",)if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
On Dagster's side, the PipesClientCompletedInvocation object returned from PipesSubprocessClient includes a get_asset_check_result method, which you can use to access the AssetCheckResult event reported by the subprocess.
Sometimes, you may invoke a single call to an API that results in multiple tables being updated, or you may have a single script that computes multiple assets. In these cases, you can use Dagster Pipes to report back on multiple assets at once.
Note: when working with multi-assets, `PipesContext.report_asset_materialization may only be called once per unique asset key. If called more than once, an error similar to the following will surface:
Calling {method} with asset key {asset_key} is undefined. Asset has already been materialized, so no additional data can be reported for it
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2,3],"item_id":[432,878,102],"user_id":["a","b","a"]})
total_orders =len(orders_df)
total_users = orders_df["user_id"].nunique()# get the Dagster Pipes context
context = PipesContext.get()# send structured metadata back to Dagster. asset_key is required when there are multiple assets
context.report_asset_materialization(
asset_key="orders", metadata={"total_orders": total_orders})
context.report_asset_materialization(
asset_key="users", metadata={"total_users": total_users})if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
In the Dagster code, you can use @multi_asset to define a single asset that represents multiple assets. The PipesClientCompletedInvocation object returned from PipesSubprocessClient includes a get_results method, which you can use to access all the events, such as multiple AssetMaterializations and AssetCheckResults, reported by the subprocess:
Sometimes, you may want to pass data back from the external process for use in the orchestration code for purposes other than reporting directly to Dagster such as use in creating an output. In this example we use custom messages to create an I/O managed output that is returned from the asset.
In the external code, we send messages using report_custom_message. The message can be any data that is JSON serializable.
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():# get the Dagster Pipes context
context = PipesContext.get()# compute the full orders data
orders = pd.DataFrame({"order_id":[1,2,3],"item_id":[321,654,987],"order_details":[...,...,...],# imagine large data,# and more columns})# send a smaller table to be I/O managed by Dagster and passed to downstream assets
summary_table = pd.DataFrame(orders[["order_id","item_id"]])
context.report_custom_message(summary_table.to_dict())
context.report_asset_materialization(metadata={"total_orders":len(orders)})if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
In the Dagster code we receive custom messages using get_custom_messages.
import shutil
import pandas as pd
from dagster import(
AssetExecutionContext,
Definitions,
Output,
PipesSubprocessClient,
asset,
file_relative_path,)@assetdefsubprocess_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,)-> Output[pd.DataFrame]:
cmd =[shutil.which("python"), file_relative_path(__file__,"external_code.py")]
result = pipes_subprocess_client.run(
command=cmd,
context=context,)# a small summary table gets reported as a custom message
messages = result.get_custom_messages()iflen(messages)!=1:raise Exception("summary not reported")
summary_df = pd.DataFrame(messages[0])# grab any reported metadata off of the materialize result
metadata = result.get_materialize_result().metadata
# return the summary table to be loaded by Dagster for downstream assetsreturn Output(
value=summary_df,
metadata=metadata,)
defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},)