mex.pipeline package

Submodules

mex.pipeline.base module

mex.pipeline.base.load_job_definitions() Definitions

Scan the mex package for assets, define jobs and io and return definitions.

mex.pipeline.base.run_job_in_process(group_name: str = 'default') ExecutionResult

Run the dagster job with the given group name locally in-process.

mex.pipeline.organigram module

mex.pipeline.primary_source module

mex.pipeline.wikidata module

Module contents

Pipeline helpers and auxiliary assets.

We use dagster (https://dagster.io) to orchestrate the ETL processes of the RKI Metadata Exchange. Dagster allows us to structure our code, visualize dependencies between different data sources and schedule the execution of pipelines.

In order to decrease the vendor-lock, we try to restrict the use of dagster APIs to the mex.pipeline module and avoid directly importing dagster modules from the extractors.

software-defined assets

Each pipeline should have a main module, e.g. mex.artificial.main, that contains all steps needed for a successful execution. Each “step” should be marked as a software-defined asset using the decorator exposed by mex.pipeline.asset and should be assigned to a group with a name unique to the pipeline.

# mex/foo_system/main.py

@asset(group_name=”foo_system”) def extracted_foo() -> Foo:

return Foo()

An individual step should be one semantic unit, like get something from there, transform this to that or link these things to those things. Steps that can be re-used by multiple pipelines should go into mex.pipeline and should be assigned to the “default” group.

# mex/pipeline/reusable_thing.py

@asset(group_name=”default”) def reusable_thing() -> Thing:

return Thing()

dependency injection

Dagster helps with modelling dependencies between the different steps of your pipeline. When a pipeline is started, dagster will build a graph of all the assets and automatically reuse the output of assets that are used by multiple other assets. The way to tell dagster which other assets your current asset is dependent on is by simply adding the name of the dependency in the function signature or passing the deps argument to the asset decorator in case you don’t need its return value.

# mex/foo_system/main.py

@asset(group_name=”foo_system”, deps=[“asset_that_should_run_first”]) def extracted_foo(reusable_thing: Thing) -> Foo:

return Foo(thing=reusable_thing)

running pipelines

Pipelines can be run in a couple of different ways:

  • run pdm run dagster dev and click materialize all on http://localhost:3000/locations/mex/jobs/foo_system

  • run pdm run foo-system according to the entrypoint in pyproject.toml

  • run pdm run dagster job execute -m mex -j foo_system using the asset group name

mex.pipeline.asset(compute_fn: Callable[[...], Any] | None = None, *, name: str | None = None, key_prefix: str | Sequence[str] | None = None, ins: Mapping[str, AssetIn] | None = None, deps: Iterable[AssetKey | str | Sequence[str] | AssetSpec | AssetsDefinition | SourceAsset | AssetDep] | None = None, metadata: Mapping[str, Any] | None = None, tags: Mapping[str, str] | None = None, description: str | None = None, config_schema: Type[bool | float | int | str] | Type[Dict[Any, Any] | List[Any]] | ConfigType | Field | Mapping[str, Any] | Sequence[Any] | None = None, required_resource_keys: AbstractSet[str] | None = None, resource_defs: Mapping[str, object] | None = None, io_manager_def: object | None = None, io_manager_key: str | None = None, compute_kind: str | None = None, dagster_type: DagsterType | None = None, partitions_def: PartitionsDefinition | None = None, op_tags: Mapping[str, Any] | None = None, group_name: str | None = None, output_required: bool = True, freshness_policy: FreshnessPolicy | None = None, automation_condition: AutomationCondition | None = None, backfill_policy: BackfillPolicy | None = None, retry_policy: RetryPolicy | None = None, code_version: str | None = None, key: AssetKey | str | Sequence[str] | None = None, non_argument_deps: Set[AssetKey] | Set[str] | None = None, check_specs: Sequence[AssetCheckSpec] | None = None, owners: Sequence[str] | None = None, auto_materialize_policy: AutoMaterializePolicy | None = None) AssetsDefinition | Callable[[Callable[[...], Any]], AssetsDefinition]

Create a definition for how to compute an asset.

A software-defined asset is the combination of:
  1. An asset key, e.g. the name of a table.

  2. A function, which can be run to compute the contents of the asset.

  3. A set of upstream assets that are provided as inputs to the function when computing the asset.

Unlike an op, whose dependencies are determined by the graph it lives inside, an asset knows about the upstream assets it depends on. The upstream assets are inferred from the arguments to the decorated function. The name of the argument designates the name of the upstream asset.

An asset has an op inside it to represent the function that computes it. The name of the op will be the segments of the asset key, separated by double-underscores.

Parameters:
  • name (Optional[str]) – The name of the asset. If not provided, defaults to the name of the decorated function. The asset’s name must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, the asset’s key is the concatenation of the key_prefix and the asset’s name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to information about the input.

  • deps (Optional[Sequence[Union[AssetDep, AssetsDefinition, SourceAsset, AssetKey, str]]]) – The assets that are upstream dependencies, but do not correspond to a parameter of the decorated function. If the AssetsDefinition for a multi_asset is provided, dependencies on all assets created by the multi_asset will be created.

  • config_schema (Optional[ConfigSchema) – The configuration schema for the asset’s underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.

  • metadata (Optional[Dict[str, Any]]) – A dict of metadata entries for the asset.

  • tags (Optional[Mapping[str, str]]) – Tags for filtering and organizing. These tags are not attached to runs of the asset.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the op.

  • io_manager_key (Optional[str]) – The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: “io_manager”). Only one of io_manager_key and io_manager_def can be provided.

  • io_manager_def (Optional[object]) – (Experimental) The IOManager used for storing the output of the op as an asset, and for loading it in downstream ops. Only one of io_manager_def and io_manager_key can be provided.

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in the Dagster UI as a badge on the asset.

  • dagster_type (Optional[DagsterType]) – Allows specifying type validation functions that will be executed on the output of the decorated function after it runs.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the asset.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

  • resource_defs (Optional[Mapping[str, object]]) – (Experimental) A mapping of resource keys to resources. These resources will be initialized during execution, and can be accessed from the context within the body of the function.

  • output_required (bool) – Whether the decorated function will always materialize an asset. Defaults to True. If False, the function can return None, which will not be materialized to storage and will halt execution of downstream assets.

  • freshness_policy (FreshnessPolicy) – (Deprecated) A constraint telling Dagster how often this asset is intended to be updated with respect to its root data.

  • automation_condition (AutomationCondition) – (Experimental) A condition describing when Dagster should materialize this asset.

  • backfill_policy (BackfillPolicy) – (Experimental) Configure Dagster to backfill this asset according to its BackfillPolicy.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for the op that computes the asset.

  • code_version (Optional[str]) – (Experimental) Version of the code that generates this asset. In general, versions should be set only for code that deterministically produces the same output when given the same inputs.

  • check_specs (Optional[Sequence[AssetCheckSpec]]) – (Experimental) Specs for asset checks that execute in the decorated function after materializing the asset.

  • non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]) – Deprecated, use deps instead. Set of asset keys that are upstream dependencies, but do not pass an input to the asset.

  • key (Optional[CoeercibleToAssetKey]) – The key for this asset. If provided, cannot specify key_prefix or name.

  • owners (Optional[Sequence[str]]) – A list of strings representing owners of the asset. Each string can be a user’s email address, or a team name prefixed with team:, e.g. team:finops.

Examples

@asset
def my_asset(my_upstream_asset: int) -> int:
    return my_upstream_asset + 1
mex.pipeline.load_job_definitions() Definitions

Scan the mex package for assets, define jobs and io and return definitions.

mex.pipeline.run_job_in_process(group_name: str = 'default') ExecutionResult

Run the dagster job with the given group name locally in-process.