mex.backend.graph package

Submodules

mex.backend.graph.connector module

class mex.backend.graph.connector.GraphConnector

Bases: BaseConnector

Connector to handle authentication and transactions with the graph database.

__init__() None

Create a new graph database connection.

_check_connectivity_and_authentication() Result

Check the connectivity and authentication to the graph.

_fetch_extracted_or_rule_items(query_string: str | None, stable_target_id: str | None, entity_type: Sequence[str], had_primary_source: Sequence[str] | None, skip: int, limit: int) Result

Query the graph for extracted or rule items.

Parameters:
  • query_string – Optional full text search query term

  • stable_target_id – Optional stable target ID filter

  • entity_type – List of allowed entity types

  • had_primary_source – Optional merged primary source identifier filter

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

_init_driver() Driver

Initialize and return a database driver.

_merge_edges(session: Session, model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AdditiveAccessPlatform | AdditiveActivity | AdditiveBibliographicResource | AdditiveConsent | AdditiveContactPoint | AdditiveDistribution | AdditiveOrganization | AdditiveOrganizationalUnit | AdditivePerson | AdditivePrimarySource | AdditiveResource | AdditiveVariable | AdditiveVariableGroup | SubtractiveAccessPlatform | SubtractiveActivity | SubtractiveBibliographicResource | SubtractiveConsent | SubtractiveContactPoint | SubtractiveDistribution | SubtractiveOrganization | SubtractiveOrganizationalUnit | SubtractivePerson | SubtractivePrimarySource | SubtractiveResource | SubtractiveVariable | SubtractiveVariableGroup | PreventiveAccessPlatform | PreventiveActivity | PreventiveBibliographicResource | PreventiveConsent | PreventiveContactPoint | PreventiveDistribution | PreventiveOrganization | PreventiveOrganizationalUnit | PreventivePerson | PreventivePrimarySource | PreventiveResource | PreventiveVariable | PreventiveVariableGroup, stable_target_id: Identifier, extra_refs: dict[str, Any] | None = None, **constraints: str | int | float | None | bool) Result

Merge edges into the graph for all relations originating from one model.

All fields containing references will be iterated over. When the referenced node is found and no such relation exists yet, it will be created. A position attribute is added to all edges, that stores the index the reference had in list of references on the originating model. This way, we can preserve the order for example of contact persons referenced on an activity.

Parameters:
  • model – Model to ensure all edges are created in the graph

  • session – Active Neo4j driver Session

  • stable_target_id – Identifier of the connected merged item

  • extra_refs – Optional extra references to inject into the merge

  • constraints – Mapping of field names and values to use as constraints when finding the current item

Returns:

Graph result instance

_merge_item(session: Session, model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AdditiveAccessPlatform | AdditiveActivity | AdditiveBibliographicResource | AdditiveConsent | AdditiveContactPoint | AdditiveDistribution | AdditiveOrganization | AdditiveOrganizationalUnit | AdditivePerson | AdditivePrimarySource | AdditiveResource | AdditiveVariable | AdditiveVariableGroup | SubtractiveAccessPlatform | SubtractiveActivity | SubtractiveBibliographicResource | SubtractiveConsent | SubtractiveContactPoint | SubtractiveDistribution | SubtractiveOrganization | SubtractiveOrganizationalUnit | SubtractivePerson | SubtractivePrimarySource | SubtractiveResource | SubtractiveVariable | SubtractiveVariableGroup | PreventiveAccessPlatform | PreventiveActivity | PreventiveBibliographicResource | PreventiveConsent | PreventiveContactPoint | PreventiveDistribution | PreventiveOrganization | PreventiveOrganizationalUnit | PreventivePerson | PreventivePrimarySource | PreventiveResource | PreventiveVariable | PreventiveVariableGroup, stable_target_id: Identifier, **constraints: str | int | float | None | bool) Result

Upsert an extracted or rule model including merged item and nested objects.

The given model is created or updated with all its inline properties. All nested properties (like Text or Link) are created as their own nodes and linked via edges. For multi-valued fields, the position of each nested object is stored as a property on the outbound edge. Any nested objects that are found in the graph, but are not present on the model any more are purged. In addition, a merged item is created (if it does not exist yet) and the extracted item is linked to it via an edge with the label stableTargetId.

Parameters:
  • model – Model to merge into the graph as a node

  • session – Active Neo4j driver Session

  • stable_target_id – Identifier the connected merged item should have

  • constraints – Mapping of field names and values to use as constraints when finding potential items to update

Returns:

Graph result instance

_seed_constraints() None

Ensure property constraints are created for all entity types.

_seed_data() None

Ensure the primary source mex is seeded and linked to itself.

_seed_indices() Result

Ensure there is a full text search index for all searchable fields.

close() None

Close the connector’s underlying requests session.

commit(query: Query | str, /, session: Session | None = None, **parameters: Any) Result

Send and commit a single graph transaction with retry configuration.

exists_merged_item(stable_target_id: Identifier, stem_types: list[str] | None = None) bool

Validate whether a merged item with the given identifier and type exists.

Parameters:
  • stable_target_id – Identifier of the to-be-checked merged item

  • stem_types – Allowed stem types of the to-be-checked merged item

Returns:

Boolean representing the existence of the requested item

fetch_extracted_items(query_string: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, had_primary_source: Sequence[str] | None, skip: int, limit: int) Result

Query the graph for extracted items.

Parameters:
  • query_string – Optional full text search query term

  • stable_target_id – Optional stable target ID filter

  • entity_type – Optional entity type filter

  • had_primary_source – Optional merged primary source identifier filter

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

fetch_identities(had_primary_source: Identifier | None = None, identifier_in_primary_source: str | None = None, stable_target_id: Identifier | None = None, limit: int = 1000) Result

Search the graph for nodes matching the given ID combination.

Identity queries can be filtered by stable_target_id, had_primary_source or identifier_in_primary_source.

Parameters:
  • had_primary_source – The stableTargetId of a connected PrimarySource

  • identifier_in_primary_source – The id the item had in its primary source

  • stable_target_id – The stableTargetId of an item

  • limit – How many results to return, defaults to 1000

Returns:

A graph result set containing identities

fetch_merged_items(query_string: str | None, identifier: str | None, entity_type: Sequence[str] | None, had_primary_source: Sequence[str] | None, skip: int, limit: int) Result

Query the graph for merged items.

Parameters:
  • query_string – Optional full text search query term

  • identifier – Optional merged item identifier filter

  • entity_type – Optional merged entity type filter

  • had_primary_source – Optional merged primary source identifier filter

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

fetch_rule_items(query_string: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, had_primary_source: Sequence[str] | None, skip: int, limit: int) Result

Query the graph for rule items.

Parameters:
  • query_string – Optional full text search query term

  • stable_target_id – Optional stable target ID filter

  • entity_type – Optional entity type filter

  • had_primary_source – Optional merged primary source identifier filter

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

flush() None

Flush the database (only in debug mode).

ingest(models: Sequence[ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse]) None

Ingest a list of models into the graph as nodes and connect all edges.

This is a two-step process: first all extracted and merged items are created along with their nested objects (like Text and Link); then all edges that represent references (like hadPrimarySource, parentUnit, etc.) are added to the graph in a second step.

Parameters:

models – Sequence of extracted models

ingest_v2(models: Sequence[ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse]) Generator[None, None, None]

Ingest a list of models into the graph as nodes and connect all edges.

ingest_v2_tx(tx: Transaction, data_in: IngestData) None

Ingest a single item in a database transaction.

class mex.backend.graph.connector.MExPrimarySource(*, version: Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])] | None = None, alternativeTitle: list[Text] = [], contact: list[Annotated[MergedOrganizationalUnitIdentifier | MergedPersonIdentifier | MergedContactPointIdentifier, AfterValidator(func=Identifier)]] = [], description: list[Text] = [], documentation: list[Link] = [], locatedAt: list[Link] = [], title: list[Text] = [], unitInCharge: list[MergedOrganizationalUnitIdentifier] = [], entityType: Literal['ExtractedPrimarySource'] = 'ExtractedPrimarySource', hadPrimarySource: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'), identifier: ExtractedPrimarySourceIdentifier = ExtractedPrimarySourceIdentifier('00000000000001'), identifierInPrimarySource: str = 'mex', stableTargetId: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'))

Bases: BasePrimarySource

An automatically extracted metadata set describing a primary source.

entityType: Annotated[Literal['ExtractedPrimarySource'], FieldInfo(annotation=NoneType, required=True, alias='$type', alias_priority=2, frozen=True)]
hadPrimarySource: MergedPrimarySourceIdentifier
identifier: ExtractedPrimarySourceIdentifier
identifierInPrimarySource: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'ignore', 'populate_by_name': True, 'str_max_length': 100000, 'str_min_length': 1, 'str_strip_whitespace': True, 'use_enum_values': True, 'validate_assignment': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'alternativeTitle': FieldInfo(annotation=list[Text], required=False, default=[]), 'contact': FieldInfo(annotation=list[Annotated[Union[MergedOrganizationalUnitIdentifier, MergedPersonIdentifier, MergedContactPointIdentifier], AfterValidator]], required=False, default=[]), 'description': FieldInfo(annotation=list[Text], required=False, default=[]), 'documentation': FieldInfo(annotation=list[Link], required=False, default=[]), 'entityType': FieldInfo(annotation=Literal['ExtractedPrimarySource'], required=False, default='ExtractedPrimarySource', alias='$type', alias_priority=2, frozen=True), 'hadPrimarySource': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier("00000000000000")), 'identifier': FieldInfo(annotation=ExtractedPrimarySourceIdentifier, required=False, default=ExtractedPrimarySourceIdentifier("00000000000001")), 'identifierInPrimarySource': FieldInfo(annotation=str, required=False, default='mex'), 'locatedAt': FieldInfo(annotation=list[Link], required=False, default=[]), 'stableTargetId': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier("00000000000000")), 'title': FieldInfo(annotation=list[Text], required=False, default=[]), 'unitInCharge': FieldInfo(annotation=list[MergedOrganizationalUnitIdentifier], required=False, default=[]), 'version': FieldInfo(annotation=Union[Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])], NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

stableTargetId: MergedPrimarySourceIdentifier

mex.backend.graph.exceptions module

exception mex.backend.graph.exceptions.InconsistentGraphError

Bases: BackendError

Exception raised for inconsistencies found in the graph database.

exception mex.backend.graph.exceptions.IngestionError(*args: Any, errors: Sequence[ErrorDetails] = (), retryable: bool = False)

Bases: BackendError

Error for ingestion failures with underlying details.

__init__(*args: Any, errors: Sequence[ErrorDetails] = (), retryable: bool = False) None

Construct a new ingestion failure with underlying details.

errors() list[ErrorDetails]

Details about underlying errors.

is_retryable() bool

Whether the error is retryable.

exception mex.backend.graph.exceptions.MultipleResultsFoundError

Bases: BackendError

A single database result was required but more than one were found.

exception mex.backend.graph.exceptions.NoResultFoundError

Bases: BackendError

A database result was required but none was found.

mex.backend.graph.models module

class mex.backend.graph.models.EdgeExporter

Bases: RecordExporter

Transformer class that turns edges into a string of format label {props}.

Full example:

shortName {position: 0}

transform(x: Any) Any

Transform a value, or collection of values.

class mex.backend.graph.models.GraphRel

Bases: TypedDict

Type definition for graph relations.

edgeLabel: str
edgeProps: dict[str, None | str | int | list[str] | list[int]]
nodeLabels: list[str]
nodeProps: dict[str, None | str | int | list[str] | list[int]]
class mex.backend.graph.models.IngestData(*, stableTargetId: str, identifier: str, entityType: str, nodeProps: dict[str, None | str | int | list[str] | list[int]], linkRels: list[GraphRel] = [], createRels: list[GraphRel] = [])

Bases: BaseModel

Type definition for ingestion data.

createRels: list[GraphRel]
entityType: str
identifier: str
linkRels: list[GraphRel]
metadata() dict[str, int | str]

Return log-able metadata.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'createRels': FieldInfo(annotation=list[GraphRel], required=False, default=[]), 'entityType': FieldInfo(annotation=str, required=True), 'identifier': FieldInfo(annotation=str, required=True), 'linkRels': FieldInfo(annotation=list[GraphRel], required=False, default=[]), 'nodeProps': FieldInfo(annotation=dict[str, Union[NoneType, str, int, list[str], list[int]]], required=True), 'stableTargetId': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

nodeProps: dict[str, None | str | int | list[str] | list[int]]
classmethod sort_create_rels(v: list[GraphRel]) list[GraphRel]

Sort the rels by edge label and position.

Sort the rels by edge label and position.

stableTargetId: str
class mex.backend.graph.models.IngestParams(*, merged_label: str, node_label: str, all_referenced_labels: list[str], all_nested_labels: list[str], detach_node_edges: list[str], delete_node_edges: list[str], has_link_rels: bool, has_create_rels: bool)

Bases: BaseModel

Type definition for query parameters.

all_nested_labels: list[str]
all_referenced_labels: list[str]
delete_node_edges: list[str]
detach_node_edges: list[str]
has_create_rels: bool
merged_label: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'all_nested_labels': FieldInfo(annotation=list[str], required=True), 'all_referenced_labels': FieldInfo(annotation=list[str], required=True), 'delete_node_edges': FieldInfo(annotation=list[str], required=True), 'detach_node_edges': FieldInfo(annotation=list[str], required=True), 'has_create_rels': FieldInfo(annotation=bool, required=True), 'has_link_rels': FieldInfo(annotation=bool, required=True), 'merged_label': FieldInfo(annotation=str, required=True), 'node_label': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

node_label: str
class mex.backend.graph.models.Result(result: Result)

Bases: object

Represent a set of graph results.

This class wraps neo4j.Result in an interface akin to sqlalchemy.engine.Result. We do this, to reduce vendor tie-in with neo4j and limit the dependency-scope of the neo4j driver library to the mex.backend.graph submodule.

__init__(result: Result) None

Wrap a neo4j result object in a mex-backend result.

all() list[dict[str, Any]]

Return all records as a list.

get_update_counters() dict[str, int]

Return a summary of counters for operations the query triggered.

log_notifications() None

Log neo4j notifications.

one() dict[str, Any]

Return exactly one record or raise an exception.

one_or_none() dict[str, Any] | None

Return at most one result or raise an exception.

Returns None if the result has no records. Raises MultipleResultsFound if multiple records are returned.

mex.backend.graph.query module

class mex.backend.graph.query.Query(name: str, template: Template, kwargs: dict[str, Any])

Bases: object

Wrapper for queries that can be rendered.

REPR_MODE = Mode(target_versions=set(), line_length=1024, string_normalization=True, is_pyi=False, is_ipynb=False, skip_source_first_line=False, magic_trailing_comma=True, python_cell_magics=set(), preview=False, unstable=False, enabled_features=set())
__init__(name: str, template: Template, kwargs: dict[str, Any]) None

Create a new query instance.

class mex.backend.graph.query.QueryBuilder

Bases: BaseConnector

Wrapper around jinja template loading and rendering.

__init__() None

Create a new jinja environment with template loader, filters and globals.

close() None

Clean up the connector.

mex.backend.graph.query.render_constraints(fields: list[~typing.Annotated[str, ~pydantic.types.StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=None, max_length=None, pattern=^[a-zA-Z]{1,255}$)]]) str

Convert a list of field names into cypher node/edge constraints.

mex.backend.graph.transform module

class mex.backend.graph.transform._SearchResultReference

Bases: TypedDict

Helper class to show the structure of search result references.

label: str
position: int
value: str | dict[str, str | None]
mex.backend.graph.transform.clean_dict(obj: Any) Any

Clean None and [] from dicts.

mex.backend.graph.transform.expand_references_in_search_result(refs: list[_SearchResultReference]) dict[str, list[str | dict[str, str | None]]]

Expand the _refs collection in a search result item.

Each item in a search result has a collection of _refs in the form of _SearchResultReference. Before parsing them into pydantic, we need to inline the references back into the item dictionary.

mex.backend.graph.transform.get_error_details_from_neo4j_error(data_in: IngestData, error: Neo4jError) list[ErrorDetails]

Convert ingest-data and a neo4j error into error details.

mex.backend.graph.transform.get_graph_rel_id(rel: GraphRel) tuple[str, int]

Returns a string uniquely identifying the GraphRel.

mex.backend.graph.transform.get_ingest_query_for_entity_type(entity_type: str) str

Create a v2 ingest query for the given entity type.

mex.backend.graph.transform.transform_edges_into_expectations_by_edge_locator(start_node_type: str, start_node_constraints: dict[str, str | int | float | None | bool], ref_labels: list[str], ref_identifiers: list[str], ref_positions: list[int]) dict[str, str]

Generate a all expected edges and render a CYPHER-style merge statement.

mex.backend.graph.transform.transform_model_into_ingest_data(model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup) IngestData

Transform the given model into an ingestion instruction.

mex.backend.graph.transform.validate_ingested_data(data_in: IngestData, data_out: IngestData) list[ErrorDetails]

Validate that the ingestion has been executed successfully.

Module contents