mex.extractors.grippeweb package

Submodules

mex.extractors.grippeweb.connector module

class mex.extractors.grippeweb.connector.GrippewebConnector

Bases: BaseConnector

Connector to handle authentication and queries for Grippeweb SQL server.

__init__() None

Create a new connector instance.

close() None

Close the underlying connection.

parse_columns_by_column_name(table_name: str) dict[str, list[Any]]

Execute whitelisted queries and zip results to column name.

mex.extractors.grippeweb.extract module

mex.extractors.grippeweb.extract.extract_columns_by_table_and_column_name() dict[str, dict[str, list[Any]]]

Extract sql tables and parse them into column lists by table and column name.

Returns:

list of columns by column names and table names

mex.extractors.grippeweb.extract.extract_grippeweb_organizations(grippeweb_resource_mappings: list[Any]) dict[str, WikidataOrganization]

Search and extract grippeweb organization from wikidata.

Parameters:

grippeweb_resource_mappings – grippeweb resource mapping models

Returns:

mapping default values

and values: WikidataOrganization

Return type:

Dict with keys

mex.extractors.grippeweb.extract.extract_ldap_actors_for_functional_accounts(grippeweb_resource_mappings: list[Any]) list[LDAPActor]

Extract LDAP actors functional accounts from grippeweb resource mapping contacts.

Parameters:

grippeweb_resource_mappings – list of resources default value mapping models

Returns:

list of LDAP actors

mex.extractors.grippeweb.extract.extract_ldap_persons(grippeweb_resource_mappings: list[Any], grippeweb_access_platform: Any) list[LDAPPerson]

Extract LDAP persons for grippeweb.

Parameters:
  • grippeweb_resource_mappings – list of resources default value mapping models

  • grippeweb_access_platform – grippeweb access platform mapping model

Returns:

list of LDAP persons

mex.extractors.grippeweb.main module

mex.extractors.grippeweb.settings module

class mex.extractors.grippeweb.settings.GrippewebSettings(*, mapping_path: AssetsPath = AssetsPath('mappings/__final__/grippeweb'), mssql_connection_dsn: str = 'DRIVER={ODBC Driver 18 for SQL Server};SERVER=domain.tld;DATABASE=database')

Bases: BaseModel

Settings definition for the infection protection act data.

mapping_path: AssetsPath
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'ignore', 'populate_by_name': True, 'str_max_length': 100000, 'str_min_length': 1, 'str_strip_whitespace': True, 'use_enum_values': True, 'validate_assignment': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'mapping_path': FieldInfo(annotation=AssetsPath, required=False, default=AssetsPath("mappings/__final__/grippeweb"), description='Path to the directory with the Grippeweb mapping files containing the default values, absolute path or relative to `assets_dir`.'), 'mssql_connection_dsn': FieldInfo(annotation=str, required=False, default='DRIVER={ODBC Driver 18 for SQL Server};SERVER=domain.tld;DATABASE=database', description='Connection string for the ODBC Driver for SQL Server: https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

mssql_connection_dsn: str

mex.extractors.grippeweb.transform module

mex.extractors.grippeweb.transform.get_or_create_external_partner(resource: Any, grippeweb_organization_ids_by_query_string: dict[str, MergedOrganizationIdentifier], extracted_primary_source_grippeweb: ExtractedPrimarySource) list[MergedOrganizationIdentifier] | None

Get external partner from wikidata or create organization.

Parameters:
  • resource – grippeweb resource mapping model

  • grippeweb_organization_ids_by_query_string – extracted grippeweb organizations dict

  • extracted_primary_source_grippeweb – extracted grippeweb primary source

Returns:

dict extracted grippeweb resource by identifier in primary source

mex.extractors.grippeweb.transform.transform_grippeweb_access_platform_to_extracted_access_platform(grippeweb_access_platform: Any, unit_stable_target_ids_by_synonym: dict[str, MergedOrganizationalUnitIdentifier], extracted_primary_source: ExtractedPrimarySource, extracted_mex_persons_grippeweb: list[ExtractedPerson]) ExtractedAccessPlatform

Transform grippeweb access platform to ExtractedAccessPlatform.

Parameters:
  • grippeweb_access_platform – grippeweb access platform mapping model

  • unit_stable_target_ids_by_synonym – Unit stable target ids by synonym

  • extracted_primary_source – Extracted primary source

  • extracted_mex_persons_grippeweb – extracted grippeweb persons

Returns:

ExtractedAccessPlatform grippeweb

mex.extractors.grippeweb.transform.transform_grippeweb_resource_mappings_to_dict(grippeweb_resource_mappings: list[Any], unit_stable_target_ids_by_synonym: dict[str, MergedOrganizationalUnitIdentifier], grippeweb_extracted_access_platform: ExtractedAccessPlatform, extracted_primary_source_grippeweb: ExtractedPrimarySource, extracted_mex_persons_grippeweb: list[ExtractedPerson], grippeweb_organization_ids_by_query_string: dict[str, MergedOrganizationIdentifier], extracted_mex_functional_units_grippeweb: dict[Email, MergedContactPointIdentifier]) dict[str, ExtractedResource]

Transform grippe web values to extracted resources.

Parameters:
  • grippeweb_resource_mappings – grippeweb resource mapping models

  • unit_stable_target_ids_by_synonym – merged organizational units by name

  • grippeweb_extracted_access_platform – extracted grippeweb access platform

  • extracted_primary_source_grippeweb – extracted grippeweb primary source

  • extracted_mex_persons_grippeweb – extracted grippeweb mex persons

  • grippeweb_organization_ids_by_query_string – extracted grippeweb organizations dict

  • extracted_mex_functional_units_grippeweb – extracted grippeweb mex functional accounts

Returns:

dict extracted grippeweb resource by identifier in primary source

mex.extractors.grippeweb.transform.transform_grippeweb_resource_mappings_to_extracted_resources(grippeweb_resource_mappings: list[Any], unit_stable_target_ids_by_synonym: dict[str, MergedOrganizationalUnitIdentifier], grippeweb_extracted_access_platform: ExtractedAccessPlatform, extracted_primary_source_grippeweb: ExtractedPrimarySource, extracted_mex_persons_grippeweb: list[ExtractedPerson], grippeweb_organization_ids_by_query_string: dict[str, MergedOrganizationIdentifier], extracted_mex_functional_units_grippeweb: dict[Email, MergedContactPointIdentifier]) dict[str, ExtractedResource]

Transform grippe web values to extracted resources and link them.

Parameters:
  • grippeweb_resource_mappings – grippeweb resource mapping models

  • unit_stable_target_ids_by_synonym – merged organizational units by name

  • grippeweb_extracted_access_platform – extracted grippeweb access platform

  • extracted_primary_source_grippeweb – extracted grippeweb primary source

  • extracted_mex_persons_grippeweb – extracted grippeweb mex persons

  • grippeweb_organization_ids_by_query_string – extracted grippeweb organizations dict

  • extracted_mex_functional_units_grippeweb – extracted grippeweb mex functional accounts

  • extracted_confluence_vvt_activities – extracted confluence vvt activities

Returns:

grippeweb resources by identifierInPrimarySource

mex.extractors.grippeweb.transform.transform_grippeweb_variable_group_to_extracted_variable_groups(grippeweb_variable_group: Any, grippeweb_columns: dict[str, dict[str, list[Any]]], grippeweb_extracted_resource_dict: dict[str, ExtractedResource], extracted_primary_source_grippeweb: ExtractedPrimarySource) list[ExtractedVariableGroup]

Transform Grippeweb variable groups to extracted variable groups.

Parameters:
  • grippeweb_variable_group – grippeweb variable group mapping model

  • grippeweb_columns – grippeweb data by column and table

  • grippeweb_extracted_resource_dict – extracted resources by name

  • extracted_primary_source_grippeweb – Extracted primary source

Returns:

list of extracted variable groups

mex.extractors.grippeweb.transform.transform_grippeweb_variable_to_extracted_variables(grippeweb_variable: Any, grippeweb_extracted_variable_group: list[ExtractedVariableGroup], grippeweb_columns: dict[str, dict[str, list[Any]]], grippeweb_extracted_resource_dict: dict[str, ExtractedResource], extracted_primary_source_grippeweb: ExtractedPrimarySource) list[ExtractedVariable]

Transform Grippeweb variables to extracted variables.

Parameters:
  • grippeweb_variable – grippeweb variable mapping model

  • grippeweb_extracted_variable_group – extracted grippeweb variable groups

  • grippeweb_columns – grippeweb data by column and table

  • grippeweb_extracted_resource_dict – extracted resources by name

  • extracted_primary_source_grippeweb – Extracted primary source

Returns:

list of extracted variables

Module contents