mex.backend.graph package

Submodules

mex.backend.graph.connector module

class mex.backend.graph.connector.GraphConnector

Bases: BaseConnector

Connector to handle authentication and transactions with the graph database.

__init__() None

Create a new graph database connection.

_check_connectivity_and_authentication() Result

Check the connectivity and authentication to the graph.

_fetch_extracted_or_rule_items(query_string: str | None, stable_target_id: str | None, entity_type: Sequence[str], skip: int, limit: int) Result

Query the graph for extracted or rule items.

Parameters:
  • query_string – Optional full text search query term

  • stable_target_id – Optional stable target ID filter

  • entity_type – List of allowed entity types

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

_init_driver() Driver

Initialize and return a database driver.

_merge_edges(model: ExtractedAccessPlatform | ExtractedActivity | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AdditiveAccessPlatform | AdditiveActivity | AdditiveContactPoint | AdditiveDistribution | AdditiveOrganization | AdditiveOrganizationalUnit | AdditivePerson | AdditivePrimarySource | AdditiveResource | AdditiveVariable | AdditiveVariableGroup | SubtractiveAccessPlatform | SubtractiveActivity | SubtractiveContactPoint | SubtractiveDistribution | SubtractiveOrganization | SubtractiveOrganizationalUnit | SubtractivePerson | SubtractivePrimarySource | SubtractiveResource | SubtractiveVariable | SubtractiveVariableGroup | PreventiveAccessPlatform | PreventiveActivity | PreventiveContactPoint | PreventiveDistribution | PreventiveOrganization | PreventiveOrganizationalUnit | PreventivePerson | PreventivePrimarySource | PreventiveResource | PreventiveVariable | PreventiveVariableGroup, stable_target_id: Identifier, extra_refs: dict[str, Any] | None = None, **constraints: str | int | float | None | bool) Result

Merge edges into the graph for all relations originating from one model.

All fields containing references will be iterated over. When the referenced node is found and no such relation exists yet, it will be created. A position attribute is added to all edges, that stores the index the reference had in list of references on the originating model. This way, we can preserve the order for example of contact persons referenced on an activity.

Parameters:
  • model – Model to ensure all edges are created in the graph

  • stable_target_id – Identifier of the connected merged item

  • extra_refs – Optional extra references to inject into the merge

  • constraints – Mapping of field names and values to use as constraints when finding the current item

Returns:

Graph result instance

_merge_item(model: ExtractedAccessPlatform | ExtractedActivity | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AdditiveAccessPlatform | AdditiveActivity | AdditiveContactPoint | AdditiveDistribution | AdditiveOrganization | AdditiveOrganizationalUnit | AdditivePerson | AdditivePrimarySource | AdditiveResource | AdditiveVariable | AdditiveVariableGroup | SubtractiveAccessPlatform | SubtractiveActivity | SubtractiveContactPoint | SubtractiveDistribution | SubtractiveOrganization | SubtractiveOrganizationalUnit | SubtractivePerson | SubtractivePrimarySource | SubtractiveResource | SubtractiveVariable | SubtractiveVariableGroup | PreventiveAccessPlatform | PreventiveActivity | PreventiveContactPoint | PreventiveDistribution | PreventiveOrganization | PreventiveOrganizationalUnit | PreventivePerson | PreventivePrimarySource | PreventiveResource | PreventiveVariable | PreventiveVariableGroup, stable_target_id: Identifier, **constraints: str | int | float | None | bool) Result

Upsert an extracted or rule model including merged item and nested objects.

The given model is created or updated with all its inline properties. All nested properties (like Text or Link) are created as their own nodes and linked via edges. For multi-valued fields, the position of each nested object is stored as a property on the outbound edge. Any nested objects that are found in the graph, but are not present on the model any more are purged. In addition, a merged item is created (if it does not exist yet) and the extracted item is linked to it via an edge with the label stableTargetId.

Parameters:
  • model – Model to merge into the graph as a node

  • stable_target_id – Identifier the connected merged item should have

  • constraints – Mapping of field names and values to use as constraints when finding potential items to update

Returns:

Graph result instance

_seed_constraints() list[Result]

Ensure uniqueness constraints are enabled for all entity types.

_seed_data() list[Identifier]

Ensure the primary source mex is seeded and linked to itself.

_seed_indices() Result

Ensure there is a full text search index for all searchable fields.

close() None

Close the connector’s underlying requests session.

commit(query: str, **parameters: Any) Result

Send and commit a single graph transaction.

create_rule_set(model: AccessPlatformRuleSetRequest | ActivityRuleSetRequest | ContactPointRuleSetRequest | DistributionRuleSetRequest | OrganizationRuleSetRequest | OrganizationalUnitRuleSetRequest | PersonRuleSetRequest | PrimarySourceRuleSetRequest | ResourceRuleSetRequest | VariableRuleSetRequest | VariableGroupRuleSetRequest, stable_target_id: Identifier) None

Create a new rule set to be applied to one merged item.

This is a two-step process: first the rule and merged items are created along with their nested objects (like Text and Link); then all edges that represent references (like hadPrimarySource, parentUnit, etc.) are added to the graph in a second step.

Parameters:
  • model – A rule-set model

  • stable_target_id – Identifier of the merged item

exists_merged_item(stable_target_id: Identifier, stem_types: list[str] | None = None) bool

Validate whether a merged item with the given identifier and type exists.

Parameters:
  • stable_target_id – Identifier of the to-be-checked merged item

  • stem_types – Allowed stem types of the to-be-checked merged item

Returns:

Boolean representing the existence of the requested item

fetch_extracted_items(query_string: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, skip: int, limit: int) Result

Query the graph for extracted items.

Parameters:
  • query_string – Optional full text search query term

  • stable_target_id – Optional stable target ID filter

  • entity_type – Optional entity type filter

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

fetch_identities(had_primary_source: Identifier | None = None, identifier_in_primary_source: str | None = None, stable_target_id: Identifier | None = None, limit: int = 1000) Result

Search the graph for nodes matching the given ID combination.

Identity queries can be filtered by stable_target_id, had_primary_source or identifier_in_primary_source.

Parameters:
  • had_primary_source – The stableTargetId of a connected PrimarySource

  • identifier_in_primary_source – The id the item had in its primary source

  • stable_target_id – The stableTargetId of an item

  • limit – How many results to return, defaults to 1000

Returns:

A graph result set containing identities

fetch_merged_items(query_string: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, skip: int, limit: int) Result

Query the graph for merged items.

Parameters:
  • query_string – Optional full text search query term

  • stable_target_id – Optional stable target ID filter

  • entity_type – Optional merged entity type filter

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

fetch_rule_items(query_string: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, skip: int, limit: int) Result

Query the graph for rule items.

Parameters:
  • query_string – Optional full text search query term

  • stable_target_id – Optional stable target ID filter

  • entity_type – Optional entity type filter

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

ingest(models: list[ExtractedAccessPlatform | ExtractedActivity | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup]) list[Identifier]

Ingest a list of models into the graph as nodes and connect all edges.

This is a two-step process: first all extracted and merged items are created along with their nested objects (like Text and Link); then all edges that represent references (like hadPrimarySource, parentUnit, etc.) are added to the graph in a second step.

Parameters:

models – List of extracted models

Returns:

List of identifiers of the ingested models

class mex.backend.graph.connector.MExPrimarySource(*, version: Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])] | None = None, alternativeTitle: list[Text] = [], contact: list[MergedOrganizationalUnitIdentifier | MergedPersonIdentifier | MergedContactPointIdentifier] = [], description: list[Text] = [], documentation: list[Link] = [], locatedAt: list[Link] = [], title: list[Text] = [], unitInCharge: list[MergedOrganizationalUnitIdentifier] = [], entityType: Literal['ExtractedPrimarySource'] = 'ExtractedPrimarySource', hadPrimarySource: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'), identifier: ExtractedPrimarySourceIdentifier = ExtractedPrimarySourceIdentifier('00000000000001'), identifierInPrimarySource: str = 'mex', stableTargetId: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'))

Bases: BasePrimarySource

An automatically extracted metadata set describing a primary source.

entityType: Annotated[Literal['ExtractedPrimarySource'], FieldInfo(annotation=NoneType, required=True, alias='$type', alias_priority=2, frozen=True)]
hadPrimarySource: MergedPrimarySourceIdentifier
identifier: ExtractedPrimarySourceIdentifier
identifierInPrimarySource: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'ignore', 'populate_by_name': True, 'str_max_length': 100000, 'str_min_length': 1, 'str_strip_whitespace': True, 'use_enum_values': True, 'validate_assignment': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'alternativeTitle': FieldInfo(annotation=list[Text], required=False, default=[]), 'contact': FieldInfo(annotation=list[Union[MergedOrganizationalUnitIdentifier, MergedPersonIdentifier, MergedContactPointIdentifier]], required=False, default=[]), 'description': FieldInfo(annotation=list[Text], required=False, default=[]), 'documentation': FieldInfo(annotation=list[Link], required=False, default=[]), 'entityType': FieldInfo(annotation=Literal['ExtractedPrimarySource'], required=False, default='ExtractedPrimarySource', alias='$type', alias_priority=2, frozen=True), 'hadPrimarySource': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier('00000000000000')), 'identifier': FieldInfo(annotation=ExtractedPrimarySourceIdentifier, required=False, default=ExtractedPrimarySourceIdentifier('00000000000001')), 'identifierInPrimarySource': FieldInfo(annotation=str, required=False, default='mex'), 'locatedAt': FieldInfo(annotation=list[Link], required=False, default=[]), 'stableTargetId': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier('00000000000000')), 'title': FieldInfo(annotation=list[Text], required=False, default=[]), 'unitInCharge': FieldInfo(annotation=list[MergedOrganizationalUnitIdentifier], required=False, default=[]), 'version': FieldInfo(annotation=Union[Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])], NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

stableTargetId: MergedPrimarySourceIdentifier

mex.backend.graph.exceptions module

exception mex.backend.graph.exceptions.MultipleResultsFoundError

Bases: MExError

A single database result was required but more than one were found.

exception mex.backend.graph.exceptions.NoResultFoundError

Bases: MExError

A database result was required but none was found.

mex.backend.graph.models module

class mex.backend.graph.models.Result(result: Result)

Bases: object

Represent a set of graph results.

This class wraps neo4j.Result in an interface akin to sqlalchemy.engine.Result. We do this, to reduce vendor tie-in with neo4j and limit the dependency-scope of the neo4j driver library to the mex.backend.graph submodule.

__init__(result: Result) None

Wrap a neo4j result object in a mex-backend result.

all() list[dict[str, Any]]

Return all records as a list.

get_update_counters() dict[str, int]

Return a summary of counters for operations the query triggered.

one() dict[str, Any]

Return exactly one record or raise an exception.

one_or_none() dict[str, Any] | None

Return at most one result or raise an exception.

Returns None if the result has no records. Raises MultipleResultsFound if multiple records are returned.

mex.backend.graph.query module

class mex.backend.graph.query.QueryBuilder

Bases: BaseConnector

Wrapper around jinja template loading and rendering.

__init__() None

Create a new jinja environment with template loader, filters and globals.

close() None

Clean up the connector.

mex.backend.graph.query.render_constraints(fields: list[~typing.Annotated[str, ~pydantic.types.StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=None, max_length=None, pattern=^[a-zA-Z]{1,255}$)]]) str

Convert a list of field names into cypher node/edge constraints.

mex.backend.graph.transform module

class mex.backend.graph.transform._SearchResultReference

Bases: TypedDict

Helper class to show the structure of search result references.

label: str
position: int
value: str | dict[str, str | None]
mex.backend.graph.transform.expand_references_in_search_result(item: dict[str, Any]) None

Expand the _refs collection in a search result item.

Each item in a search result has a collection of _refs in the form of _SearchResultReference. Before parsing them into pydantic, we need to inline the references back into the item dictionary.

mex.backend.graph.transform.to_primitive(obj: BaseModel, include: set[str] | None = None, exclude: set[str] | None = None, by_alias: bool = True, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) Any

Convert model object into python primitives compatible with graph ingestion.

Module contents