mex.extractors.pipeline package¶
Submodules¶
mex.extractors.pipeline.base module¶
- mex.extractors.pipeline.base.load_job_definitions() Definitions ¶
Scan the mex package for assets, define jobs and io and return definitions.
- mex.extractors.pipeline.base.run_job_in_process(group_name: str = 'default') bool ¶
Run the dagster job with the given group name locally in-process.
mex.extractors.pipeline.organigram module¶
mex.extractors.pipeline.primary_source module¶
mex.extractors.pipeline.wikidata module¶
Module contents¶
Pipeline helpers and auxiliary assets.
We use dagster (https://dagster.io) to orchestrate the ETL processes of the RKI Metadata Exchange. Dagster allows us to structure our code, visualize dependencies between different data sources and schedule the execution of pipelines.
software-defined assets¶
Each pipeline should have a main module, e.g. mex.extractors.artificial.main, that contains all steps needed for a successful execution. Each “step” should be marked as a software-defined asset using the decorator by dagster.asset and should be assigned to a group with a name unique to the pipeline.
# mex/foo_system/main.py
@asset(group_name=”foo_system”) def extracted_foo() -> Foo:
return Foo()
An individual step should be one semantic unit, like get something from there, transform this to that or link these things to those things. Steps that can be re-used by multiple pipelines should go into mex.extractors.pipeline and should be assigned to the “default” group.
# mex/pipeline/reusable_thing.py
@asset(group_name=”default”) def reusable_thing() -> Thing:
return Thing()
dependency injection¶
Dagster helps with modelling dependencies between the different steps of your pipeline. When a pipeline is started, dagster will build a graph of all the assets and automatically reuse the output of assets that are used by multiple other assets. The way to tell dagster which other assets your current asset is dependent on is by simply adding the name of the dependency in the function signature or passing the deps argument to the asset decorator in case you don’t need its return value.
# mex/foo_system/main.py
@asset(group_name=”foo_system”, deps=[“asset_that_should_run_first”]) def extracted_foo(reusable_thing: Thing) -> Foo:
return Foo(thing=reusable_thing)
running pipelines¶
Pipelines can be run in a couple of different ways:
run pdm run dagster dev and click materialize all on http://localhost:3000/locations/mex/jobs/foo_system
run pdm run foo-system according to the entrypoint in pyproject.toml
run pdm run dagster job execute -m mex -j foo_system using the asset group name
- mex.extractors.pipeline.load_job_definitions() Definitions ¶
Scan the mex package for assets, define jobs and io and return definitions.
- mex.extractors.pipeline.run_job_in_process(group_name: str = 'default') bool ¶
Run the dagster job with the given group name locally in-process.