mex.extractors package

Subpackages

Submodules

mex.extractors.drop module

class mex.extractors.drop.DropApiConnector

Bases: HTTPConnector

Connector class to handle interaction with the Drop API.

API_VERSION = 'v0'
_check_availability() None

Send a GET request to verify the API is available.

_set_authentication() None

Set the drop API key to all session headers.

_set_url() None

Set the drop api url with the version path.

get_file(x_system: str, file_id: str) dict[str, Any]

Get the content of a file from the x_system.

Parameters:
  • x_system – name of the x_system

  • file_id – name of the file

Returns:

content of the file

list_files(x_system: str) list[str]

Get available files for the x_system.

Parameters:

x_system – name of the x_system to list the files for

Returns:

list of available filenames for the x_system

mex.extractors.filters module

mex.extractors.filters.filter_by_global_rules(primary_source_id: Identifier, items: Iterable[RawDataT]) Generator[RawDataT, None, None]

Filter out items according to global filter rules, build filtered Generator.

Parameters:
  • primary_source_id – identifier of the primary source

  • items – items, source or resource to be filtered

mex.extractors.identity module

class mex.extractors.identity.BackendIdentityProvider

Bases: BaseProvider, HTTPConnector

Identity provider that communicates with the backend HTTP API.

API_VERSION = 'v0'
_check_availability() None

Send a GET request to verify the API is available.

_set_authentication() None

Set the backend API key to all session headers.

_set_url() None

Set the backend api url with the version path.

assign(had_primary_source: MergedPrimarySourceIdentifier, identifier_in_primary_source: str) Identity

Find an Identity in a database or assign a new one.

fetch(*, had_primary_source: Identifier | None = None, identifier_in_primary_source: str | None = None, stable_target_id: Identifier | None = None) list[Identity]

Find Identity instances matching the given filters.

Either provide stableTargetId or hadPrimarySource and identifierInPrimarySource together to get a unique result.

mex.extractors.logging module

mex.extractors.logging.log_filter(identifier_in_primary_source: str | None, primary_source_id: Identifier, reason: str) None

Log filtered sources.

Parameters:
  • identifier_in_primary_source – optional identifier in the primary source

  • primary_source_id – identifier of the primary source

  • reason – string explaining the reason for filtering

mex.extractors.main module

mex.extractors.models module

class mex.extractors.models.BaseRawData

Bases: BaseModel

Raw-data base providing standardized access to attributes for filtering.

abstract get_end_year() TemporalEntity | None

Return end year from extractor.

abstract get_identifier_in_primary_source() str | None

Return identifier in primary source from extractor.

abstract get_partners() Sequence[str | None]

Return partners from extractor.

abstract get_start_year() TemporalEntity | None

Return start year from extractor.

abstract get_units() Sequence[str | None]

Return units from extractor.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'ignore', 'populate_by_name': True, 'str_max_length': 100000, 'str_min_length': 1, 'str_strip_whitespace': True, 'use_enum_values': True, 'validate_assignment': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

mex.extractors.settings module

class mex.extractors.settings.Settings(_env_file: Path | str | Sequence[Path | str] | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, pdb: bool = False, MEX_SINK: list[Sink] = [Sink.NDJSON], MEX_ASSETS_DIR: Path = PosixPath('/home/runner/work/mex-extractors/mex-extractors/assets'), MEX_WORK_DIR: Path = PosixPath('/home/runner/work/mex-extractors/mex-extractors'), MEX_IDENTITY_PROVIDER: IdentityProvider | ExtractorIdentityProvider = IdentityProvider.MEMORY, MEX_BACKEND_API_URL: Url = Url('http://localhost:8080/'), MEX_BACKEND_API_KEY: SecretStr = SecretStr('**********'), MEX_VERIFY_SESSION: bool | AssetsPath = True, MEX_ORGANIGRAM_PATH: AssetsPath = AssetsPath('raw-data/organigram/organizational_units.json'), MEX_PRIMARY_SOURCES_PATH: AssetsPath = AssetsPath('raw-data/primary-sources/primary-sources.json'), MEX_LDAP_URL: SecretStr = SecretStr('**********'), MEX_WIKI_API_URL: Url = Url('https://wikidata/'), MEX_WIKI_QUERY_SERVICE_URL: Url = Url('https://wikidata/'), MEX_WEB_USER_AGENT: str = 'rki/mex', MEX_SKIP_EXTRACTORS: list[str] = [], MEX_SKIP_MERGED_ITEMS: list[str] = ['MergedPrimarySource', 'MergedConsent'], MEX_SKIP_PARTNERS: list[str] = ['test'], MEX_SKIP_UNITS: list[str] = ['IT', 'PRAES', 'ZV'], MEX_SKIP_YEARS_BEFORE: int = 1970, MEX_DROP_API_KEY: SecretStr = SecretStr('**********'), MEX_DROP_API_URL: Url = Url('http://localhost:8081/'), MEX_SCHEDULE: str = '0 0 * * *', kerberos_user: str = 'user@domain.tld', kerberos_password: SecretStr = SecretStr('**********'), artificial: ArtificialSettings = ArtificialSettings(count=100, chattiness=10, seed=0, locale=['de_DE', 'en_US'], mesh_file=AssetsPath('raw-data/artificial/asciimesh.bin')), biospecimen: BiospecimenSettings = BiospecimenSettings(raw_data_path=AssetsPath('raw-data/biospecimen'), key_col='Feldname', val_col='zu extrahierender Wert (maschinenlesbar)', mapping_path=AssetsPath('mappings/__final__/biospecimen')), blueant: BlueAntSettings = BlueAntSettings(api_key=SecretStr('**********'), url='https://blueant', skip_labels=['test'], delete_prefixes=['_', '1_', '2_', '3_', '4_', '5_', '6_', '7_', '8_', '9_'], mapping_path=AssetsPath('mappings/__final__/blueant')), confluence_vvt: ConfluenceVvtSettings = ConfluenceVvtSettings(url='https://confluence.vvt', username=SecretStr('**********'), password=SecretStr('**********'), overview_page_id='123456', template_v1_mapping_path=AssetsPath('mappings/__final__/confluence-vvt_template_v1'), skip_pages=['123456']), datscha_web: DatschaWebSettings = DatschaWebSettings(url='https://datscha/', vorname=SecretStr('**********'), nachname=SecretStr('**********'), pw=SecretStr('**********'), organisation='RKI'), ff_projects: FFProjectsSettings = FFProjectsSettings(file_path=AssetsPath('raw-data/ff-projects/ff-projects.xlsx'), skip_funding=['Sonstige'], skip_topics=['Sonstige'], skip_years_strings=['fehlt', 'keine', 'offen'], skip_clients=['Sonstige'], mapping_path=AssetsPath('mappings/__final__/ff-projects')), grippeweb: GrippewebSettings = GrippewebSettings(mapping_path=AssetsPath('mappings/__final__/grippeweb'), mssql_connection_dsn='DRIVER={ODBC Driver 18 for SQL Server};SERVER=domain.tld;DATABASE=database'), ifsg: IFSGSettings = IFSGSettings(mapping_path=AssetsPath('mappings/__final__/ifsg'), mssql_connection_dsn='DRIVER={ODBC Driver 18 for SQL Server};SERVER=domain.tld;DATABASE=database'), international_projects: InternationalProjectsSettings = InternationalProjectsSettings(file_path=AssetsPath('raw-data/international-projects/international_projects.xlsx'), mapping_path=AssetsPath('mappings/__final__/international-projects')), odk: ODKSettings = ODKSettings(raw_data_path=AssetsPath('raw-data/odk'), mapping_path=AssetsPath('mappings/__final__/odk')), open_data: OpenDataSettings = OpenDataSettings(url='https://zenodo', community_rki='robertkochinstitut'), rdmo: RDMOSettings = RDMOSettings(url='https://rdmo/', username=SecretStr('**********'), password=SecretStr('**********')), seq_repo: SeqRepoSettings = SeqRepoSettings(mapping_path=AssetsPath('mappings/__final__/seq-repo')), sumo: SumoSettings = SumoSettings(raw_data_path=AssetsPath('raw-data/sumo'), mapping_path=AssetsPath('mappings/__final__/sumo')), voxco: VoxcoSettings = VoxcoSettings(mapping_path=AssetsPath('mappings/__final__/voxco')), synopse: SynopseSettings = SynopseSettings(report_server_url='https://report-server/', report_server_username=SecretStr('**********'), report_server_password=SecretStr('**********'), variablenuebersicht_path=AssetsPath('raw-data/synopse/variablenuebersicht.csv'), projekt_und_studienverwaltung_path=AssetsPath('raw-data/synopse/projekt_und_studienverwaltung.csv'), metadaten_zu_datensaetzen_path=AssetsPath('raw-data/synopse/metadaten_zu_datensaetzen.csv'), datensatzuebersicht_path=AssetsPath('raw-data/synopse/datensatzuebersicht.csv'), mapping_path=AssetsPath('mappings/__final__/synopse')))

Bases: BaseSettings

Settings definition class for extractors and related scripts.

artificial: ArtificialSettings
biospecimen: BiospecimenSettings
blueant: BlueAntSettings
confluence_vvt: ConfluenceVvtSettings
datscha_web: DatschaWebSettings
drop_api_key: SecretStr
drop_api_url: Url
ff_projects: FFProjectsSettings
grippeweb: GrippewebSettings
identity_provider: IdentityProvider | ExtractorIdentityProvider
ifsg: IFSGSettings
international_projects: InternationalProjectsSettings
kerberos_password: SecretStr
kerberos_user: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': '.env', 'env_file_encoding': 'utf-8', 'env_ignore_empty': False, 'env_nested_delimiter': '__', 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'mex_', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'populate_by_name': True, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_assignment': True, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'artificial': FieldInfo(annotation=ArtificialSettings, required=False, default=ArtificialSettings(count=100, chattiness=10, seed=0, locale=['de_DE', 'en_US'], mesh_file=AssetsPath("raw-data/artificial/asciimesh.bin"))), 'assets_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('/home/runner/work/mex-extractors/mex-extractors/assets'), alias_priority=2, validation_alias='MEX_ASSETS_DIR', description='Path to directory that contains input files treated as read-only, looks for a folder named `assets` in the current directory by default.'), 'backend_api_key': FieldInfo(annotation=SecretStr, required=False, default=SecretStr('**********'), alias_priority=2, validation_alias='MEX_BACKEND_API_KEY', description='Backend API key with write access to call POST/PUT endpoints'), 'backend_api_url': FieldInfo(annotation=Url, required=False, default=Url('http://localhost:8080/'), alias_priority=2, validation_alias='MEX_BACKEND_API_URL', description='MEx backend API url.'), 'biospecimen': FieldInfo(annotation=BiospecimenSettings, required=False, default=BiospecimenSettings(raw_data_path=AssetsPath("raw-data/biospecimen"), key_col='Feldname', val_col='zu extrahierender Wert (maschinenlesbar)', mapping_path=AssetsPath("mappings/__final__/biospecimen"))), 'blueant': FieldInfo(annotation=BlueAntSettings, required=False, default=BlueAntSettings(api_key=SecretStr('**********'), url='https://blueant', skip_labels=['test'], delete_prefixes=['_', '1_', '2_', '3_', '4_', '5_', '6_', '7_', '8_', '9_'], mapping_path=AssetsPath("mappings/__final__/blueant"))), 'confluence_vvt': FieldInfo(annotation=ConfluenceVvtSettings, required=False, default=ConfluenceVvtSettings(url='https://confluence.vvt', username=SecretStr('**********'), password=SecretStr('**********'), overview_page_id='123456', template_v1_mapping_path=AssetsPath("mappings/__final__/confluence-vvt_template_v1"), skip_pages=['123456'])), 'datscha_web': FieldInfo(annotation=DatschaWebSettings, required=False, default=DatschaWebSettings(url='https://datscha/', vorname=SecretStr('**********'), nachname=SecretStr('**********'), pw=SecretStr('**********'), organisation='RKI')), 'debug': FieldInfo(annotation=bool, required=False, default=False, alias='pdb', alias_priority=2, validation_alias='MEX_DEBUG', description='Jump into post-mortem debugging after any uncaught exception.'), 'drop_api_key': FieldInfo(annotation=SecretStr, required=False, default=SecretStr('**********'), alias_priority=2, validation_alias='MEX_DROP_API_KEY', description='Drop API key with admin access to call all GET endpoints'), 'drop_api_url': FieldInfo(annotation=Url, required=False, default=Url('http://localhost:8081/'), alias_priority=2, validation_alias='MEX_DROP_API_URL', description='MEx drop API url.'), 'ff_projects': FieldInfo(annotation=FFProjectsSettings, required=False, default=FFProjectsSettings(file_path=AssetsPath("raw-data/ff-projects/ff-projects.xlsx"), skip_funding=['Sonstige'], skip_topics=['Sonstige'], skip_years_strings=['fehlt', 'keine', 'offen'], skip_clients=['Sonstige'], mapping_path=AssetsPath("mappings/__final__/ff-projects"))), 'grippeweb': FieldInfo(annotation=GrippewebSettings, required=False, default=GrippewebSettings(mapping_path=AssetsPath("mappings/__final__/grippeweb"), mssql_connection_dsn='DRIVER={ODBC Driver 18 for SQL Server};SERVER=domain.tld;DATABASE=database')), 'identity_provider': FieldInfo(annotation=Union[IdentityProvider, ExtractorIdentityProvider], required=False, default=<IdentityProvider.MEMORY: 'memory'>, alias_priority=2, validation_alias='MEX_IDENTITY_PROVIDER', description='Provider to assign stableTargetIds to new model instances.'), 'ifsg': FieldInfo(annotation=IFSGSettings, required=False, default=IFSGSettings(mapping_path=AssetsPath("mappings/__final__/ifsg"), mssql_connection_dsn='DRIVER={ODBC Driver 18 for SQL Server};SERVER=domain.tld;DATABASE=database')), 'international_projects': FieldInfo(annotation=InternationalProjectsSettings, required=False, default=InternationalProjectsSettings(file_path=AssetsPath("raw-data/international-projects/international_projects.xlsx"), mapping_path=AssetsPath("mappings/__final__/international-projects"))), 'kerberos_password': FieldInfo(annotation=SecretStr, required=False, default=SecretStr('**********'), description='Kerberos password to authenticate against MSSQL server.'), 'kerberos_user': FieldInfo(annotation=str, required=False, default='user@domain.tld', description='Kerberos user to authenticate against MSSQL server.'), 'ldap_url': FieldInfo(annotation=SecretStr, required=False, default=SecretStr('**********'), alias_priority=2, validation_alias='MEX_LDAP_URL', description='LDAP server for person queries with authentication credentials. Must follow format `ldap://user:pw@host:port`, where `user` is the username, and `pw` is the password for authenticating against ldap, `host` is the url of the ldap server, and `port` is the port of the ldap server.'), 'mex_web_user_agent': FieldInfo(annotation=str, required=False, default='rki/mex', alias_priority=2, validation_alias='MEX_WEB_USER_AGENT', description='a user agent is sent in the header of some requests to external services '), 'odk': FieldInfo(annotation=ODKSettings, required=False, default=ODKSettings(raw_data_path=AssetsPath("raw-data/odk"), mapping_path=AssetsPath("mappings/__final__/odk"))), 'open_data': FieldInfo(annotation=OpenDataSettings, required=False, default=OpenDataSettings(url='https://zenodo', community_rki='robertkochinstitut')), 'organigram_path': FieldInfo(annotation=AssetsPath, required=False, default=AssetsPath("raw-data/organigram/organizational_units.json"), alias_priority=2, validation_alias='MEX_ORGANIGRAM_PATH', description='Path to the JSON file describing the organizational units, absolute path or relative to `assets_dir`.'), 'primary_sources_path': FieldInfo(annotation=AssetsPath, required=False, default=AssetsPath("raw-data/primary-sources/primary-sources.json"), alias_priority=2, validation_alias='MEX_PRIMARY_SOURCES_PATH', description='Path to the JSON file describing the primary sources, absolute path or relative to `assets_dir`.'), 'rdmo': FieldInfo(annotation=RDMOSettings, required=False, default=RDMOSettings(url='https://rdmo/', username=SecretStr('**********'), password=SecretStr('**********'))), 'schedule': FieldInfo(annotation=str, required=False, default='0 0 * * *', alias_priority=2, validation_alias='MEX_SCHEDULE', description='A valid cron string defining when to run extractor jobs'), 'seq_repo': FieldInfo(annotation=SeqRepoSettings, required=False, default=SeqRepoSettings(mapping_path=AssetsPath("mappings/__final__/seq-repo"))), 'sink': FieldInfo(annotation=list[Sink], required=False, default=[<Sink.NDJSON: 'ndjson'>], alias_priority=2, validation_alias='MEX_SINK', description='Where to send data that is extracted or ingested. Defaults to writing ndjson files, but can be configured to push to the backend or the graph.'), 'skip_extractors': FieldInfo(annotation=list[str], required=False, default=[], alias_priority=2, validation_alias='MEX_SKIP_EXTRACTORS', description='Skip execution of these extractors in dagster'), 'skip_merged_items': FieldInfo(annotation=list[str], required=False, default=['MergedPrimarySource', 'MergedConsent'], alias_priority=2, validation_alias='MEX_SKIP_MERGED_ITEMS', description='Skip merged items with these types'), 'skip_partners': FieldInfo(annotation=list[str], required=False, default=['test'], alias_priority=2, validation_alias='MEX_SKIP_PARTNERS', description='Skip projects with these external partners'), 'skip_units': FieldInfo(annotation=list[str], required=False, default=['IT', 'PRAES', 'ZV'], alias_priority=2, validation_alias='MEX_SKIP_UNITS', description='Skip projects with these responsible units'), 'skip_years_before': FieldInfo(annotation=int, required=False, default=1970, alias_priority=2, validation_alias='MEX_SKIP_YEARS_BEFORE', description='Skip projects conducted before this year'), 'sumo': FieldInfo(annotation=SumoSettings, required=False, default=SumoSettings(raw_data_path=AssetsPath("raw-data/sumo"), mapping_path=AssetsPath("mappings/__final__/sumo"))), 'synopse': FieldInfo(annotation=SynopseSettings, required=False, default=SynopseSettings(report_server_url='https://report-server/', report_server_username=SecretStr('**********'), report_server_password=SecretStr('**********'), variablenuebersicht_path=AssetsPath("raw-data/synopse/variablenuebersicht.csv"), projekt_und_studienverwaltung_path=AssetsPath("raw-data/synopse/projekt_und_studienverwaltung.csv"), metadaten_zu_datensaetzen_path=AssetsPath("raw-data/synopse/metadaten_zu_datensaetzen.csv"), datensatzuebersicht_path=AssetsPath("raw-data/synopse/datensatzuebersicht.csv"), mapping_path=AssetsPath("mappings/__final__/synopse"))), 'verify_session': FieldInfo(annotation=Union[bool, AssetsPath], required=False, default=True, alias_priority=2, validation_alias='MEX_VERIFY_SESSION', description="Either a boolean that controls whether we verify the server's TLS certificate, or a path to a CA bundle to use. If a path is given, it can be either absolute or relative to the `assets_dir`. Defaults to True."), 'voxco': FieldInfo(annotation=VoxcoSettings, required=False, default=VoxcoSettings(mapping_path=AssetsPath("mappings/__final__/voxco"))), 'wiki_api_url': FieldInfo(annotation=Url, required=False, default=Url('https://wikidata/'), alias_priority=2, validation_alias='MEX_WIKI_API_URL', description='URL of Wikidata API, this URL is used to send wikidata organization ID to get all the info about the organization, which includes basic info, aliases, labels, descriptions, claims, and sitelinks'), 'wiki_query_service_url': FieldInfo(annotation=Url, required=False, default=Url('https://wikidata/'), alias_priority=2, validation_alias='MEX_WIKI_QUERY_SERVICE_URL', description='URL of Wikidata query service, this URL is to send organization name in plain text to wikidata and receive search results with wikidata organization ID'), 'work_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('/home/runner/work/mex-extractors/mex-extractors'), alias_priority=2, validation_alias='MEX_WORK_DIR', description='Path to directory that stores generated and temporary files. Defaults to the current working directory.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

odk: ODKSettings
open_data: OpenDataSettings
rdmo: RDMOSettings
schedule: str
seq_repo: SeqRepoSettings
skip_extractors: list[str]
skip_merged_items: list[str]
skip_partners: list[str]
skip_units: list[str]
skip_years_before: int
sumo: SumoSettings
synopse: SynopseSettings
voxco: VoxcoSettings

mex.extractors.sinks module

mex.extractors.sinks.load(models: Iterable[ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup]) None

Load models to the backend API or write to NDJSON files.

Parameters:

models – Iterable of extracted models

Settings:

sink: Where to load the provided models

mex.extractors.types module

class mex.extractors.types.ExtractorIdentityProvider(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

Identity providers implemented by mex-extractors.

BACKEND = 'backend'

Module contents