mex.backend.graph package

Submodules

mex.backend.graph.connector module

class mex.backend.graph.connector.GraphConnector

Bases: BaseConnector

Connector to handle authentication and transactions with the graph database.

__init__() None

Create a new graph database connection.

_check_connectivity_and_authentication() Result

Check the connectivity and authentication to the graph.

_fetch_extracted_or_rule_items(query_string: str | None, identifier: str | None, stable_target_id: str | None, entity_type: Sequence[str], referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result

Query the graph for extracted or rule items.

Parameters:
  • query_string – Optional full text search query term

  • identifier – Optional identifier filter

  • stable_target_id – Optional stable target ID filter

  • entity_type – List of allowed entity types

  • referenced_identifiers – Optional merged item identifiers filter

  • reference_field – Optional field name to filter for

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

_init_driver() Driver

Initialize and return a database driver.

_run_ingest_in_transaction(tx: Transaction, model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse | MExPrimarySource) None

Ingest a single item in a database transaction.

_seed_constraints() None

Ensure property constraints are created for all entity types.

_seed_data() None

Ensure the primary source mex is seeded and linked to itself.

_seed_indices() Result

Ensure there is a full text search index for all searchable fields.

close() None

Close the connector’s underlying requests session.

commit(query: Query, /, access_mode: str = 'READ', **parameters: Any) Result

Send and commit a single graph transaction with retry configuration.

Parameters:
  • query – The query string or Query object to execute

  • access_mode – Whether to run the query with read or write access

  • **parameters – Query parameters to substitute in the Cypher query

Returns:

Result object containing query execution results and metadata

exists_item(identifier: Identifier, entity_type: str) bool

Validate whether an item with the given identifier and entity type exists.

Parameters:
  • identifier – Identifier of the to-be-checked item

  • entity_type – Entity type of the to-be-checked item

Returns:

Boolean representing the existence of the requested item

fetch_extracted_items(query_string: str | None, identifier: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result

Query the graph for extracted items.

Parameters:
  • query_string – Optional full text search query term

  • identifier – Optional identifier filter

  • stable_target_id – Optional stable target ID filter

  • entity_type – Optional entity type filter

  • referenced_identifiers – Optional merged item identifiers filter

  • reference_field – Optional field name to filter for

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

fetch_identities(had_primary_source: Identifier | None = None, identifier_in_primary_source: str | None = None, stable_target_id: Identifier | None = None, limit: int = 1000) Result

Search the graph for nodes matching the given ID combination.

Identity queries can be filtered by stable_target_id, had_primary_source or identifier_in_primary_source.

Parameters:
  • had_primary_source – The stableTargetId of a connected PrimarySource

  • identifier_in_primary_source – The id the item had in its primary source

  • stable_target_id – The stableTargetId of an item

  • limit – How many results to return, defaults to 1000

Returns:

A graph result set containing identities

fetch_merged_items(query_string: str | None, identifier: str | None, entity_type: Sequence[str] | None, referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result

Query the graph for merged items.

Parameters:
  • query_string – Optional full text search query term

  • identifier – Optional merged item identifier filter

  • entity_type – Optional merged entity type filter

  • referenced_identifiers – Optional merged item identifiers filter

  • reference_field – Optional field name to filter for

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

fetch_rule_items(query_string: str | None, identifier: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result

Query the graph for rule items.

Parameters:
  • query_string – Optional full text search query term

  • identifier – Optional identifier filter

  • stable_target_id – Optional stable target ID filter

  • entity_type – Optional entity type filter

  • referenced_identifiers – Optional merged item identifiers filter

  • reference_field – Optional field name to filter for

  • skip – How many items to skip for pagination

  • limit – How many items to return at most

Returns:

Graph result instance

flush() None

Flush the database by deleting all nodes, constraints and indexes.

This operation only executes when debug mode is enabled in settings. Completely wipes the Neo4j database including all data, constraints, and indexes. Used for testing and development cleanup.

ingest_items(models: Iterable[ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse | MExPrimarySource]) Generator[None, None, None]

Ingest a list of extracted models or rule set responses into the graph.

mex.backend.graph.exceptions module

exception mex.backend.graph.exceptions.InconsistentGraphError

Bases: BackendError

Exception raised for inconsistencies found in the graph database.

exception mex.backend.graph.exceptions.IngestionError(*args: Any, errors: Sequence[ErrorDetails] = (), retryable: bool = False)

Bases: BackendError

Error for ingestion failures with underlying details.

__init__(*args: Any, errors: Sequence[ErrorDetails] = (), retryable: bool = False) None

Construct a new ingestion failure with underlying details.

errors() list[ErrorDetails]

Details about underlying errors.

is_retryable() bool

Whether the error is retryable.

exception mex.backend.graph.exceptions.MultipleResultsFoundError

Bases: BackendError

A single database result was required but more than one were found.

exception mex.backend.graph.exceptions.NoResultFoundError

Bases: BackendError

A database result was required but none was found.

mex.backend.graph.models module

class mex.backend.graph.models.EdgeExporter

Bases: RecordExporter

Transformer class that turns edges into a string of format label {props}.

Full example:

shortName {position: 0}

transform(x: Any) Any

Transform a value, or collection of values.

class mex.backend.graph.models.GraphRel

Bases: TypedDict

Type definition for graph relations.

edgeLabel: str
edgeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]]
nodeLabels: list[str]
nodeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]]
class mex.backend.graph.models.IngestData(*, stableTargetId: str, identifier: str | None, entityType: str, nodeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]], linkRels: list[GraphRel] = [], createRels: list[GraphRel] = [])

Bases: BaseModel

Type definition for ingestion data.

createRels: list[GraphRel]
entityType: str
identifier: str | None
linkRels: list[GraphRel]
metadata() dict[str, int | str | None]

Return log-able metadata.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'createRels': FieldInfo(annotation=list[GraphRel], required=False, default=[]), 'entityType': FieldInfo(annotation=str, required=True), 'identifier': FieldInfo(annotation=Union[str, NoneType], required=True), 'linkRels': FieldInfo(annotation=list[GraphRel], required=False, default=[]), 'nodeProps': FieldInfo(annotation=dict[str, Union[str, int, float, NoneType, bool, list[Union[str, int, float, NoneType, bool]]]], required=True), 'stableTargetId': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

nodeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]]
classmethod sort_create_rels(v: list[GraphRel]) list[GraphRel]

Sort the rels by edge label and position.

Sort the rels by edge label and position.

stableTargetId: str
class mex.backend.graph.models.IngestParams(*, merged_label: str, node_label: str, all_referenced_labels: list[str], all_nested_labels: list[str], detach_node_edges: list[str], delete_node_edges: list[str], has_link_rels: bool, has_create_rels: bool)

Bases: BaseModel

Type definition for query parameters.

all_nested_labels: list[str]
all_referenced_labels: list[str]
delete_node_edges: list[str]
detach_node_edges: list[str]
has_create_rels: bool
merged_label: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'all_nested_labels': FieldInfo(annotation=list[str], required=True), 'all_referenced_labels': FieldInfo(annotation=list[str], required=True), 'delete_node_edges': FieldInfo(annotation=list[str], required=True), 'detach_node_edges': FieldInfo(annotation=list[str], required=True), 'has_create_rels': FieldInfo(annotation=bool, required=True), 'has_link_rels': FieldInfo(annotation=bool, required=True), 'merged_label': FieldInfo(annotation=str, required=True), 'node_label': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

node_label: str
class mex.backend.graph.models.MExPrimarySource(*, version: Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])] | None = None, alternativeTitle: list[Text] = [], contact: list[Annotated[MergedOrganizationalUnitIdentifier | MergedPersonIdentifier | MergedContactPointIdentifier, AfterValidator(func=Identifier)]] = [], description: list[Text] = [], documentation: list[Link] = [], locatedAt: list[Link] = [], title: list[Text] = [], unitInCharge: list[MergedOrganizationalUnitIdentifier] = [], entityType: Literal['ExtractedPrimarySource'] = 'ExtractedPrimarySource', hadPrimarySource: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'), identifier: ExtractedPrimarySourceIdentifier = ExtractedPrimarySourceIdentifier('00000000000001'), identifierInPrimarySource: str = 'mex', stableTargetId: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'))

Bases: BasePrimarySource

Static metadata for the MEx primary source itself.

An instance of this class will bypass the IdentityProvider. This way we can ensure that the MEx primary source receives static identifiers.

entityType: Annotated[Literal['ExtractedPrimarySource'], FieldInfo(annotation=NoneType, required=True, alias='$type', alias_priority=2, frozen=True)]
hadPrimarySource: MergedPrimarySourceIdentifier
identifier: ExtractedPrimarySourceIdentifier
identifierInPrimarySource: str
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'ignore', 'populate_by_name': True, 'str_max_length': 100000, 'str_min_length': 1, 'str_strip_whitespace': True, 'use_enum_values': True, 'validate_assignment': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'alternativeTitle': FieldInfo(annotation=list[Text], required=False, default=[]), 'contact': FieldInfo(annotation=list[Annotated[Union[MergedOrganizationalUnitIdentifier, MergedPersonIdentifier, MergedContactPointIdentifier], AfterValidator]], required=False, default=[]), 'description': FieldInfo(annotation=list[Text], required=False, default=[]), 'documentation': FieldInfo(annotation=list[Link], required=False, default=[]), 'entityType': FieldInfo(annotation=Literal['ExtractedPrimarySource'], required=False, default='ExtractedPrimarySource', alias='$type', alias_priority=2, frozen=True), 'hadPrimarySource': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier("00000000000000")), 'identifier': FieldInfo(annotation=ExtractedPrimarySourceIdentifier, required=False, default=ExtractedPrimarySourceIdentifier("00000000000001")), 'identifierInPrimarySource': FieldInfo(annotation=str, required=False, default='mex'), 'locatedAt': FieldInfo(annotation=list[Link], required=False, default=[]), 'stableTargetId': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier("00000000000000")), 'title': FieldInfo(annotation=list[Text], required=False, default=[]), 'unitInCharge': FieldInfo(annotation=list[MergedOrganizationalUnitIdentifier], required=False, default=[]), 'version': FieldInfo(annotation=Union[Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])], NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

stableTargetId: MergedPrimarySourceIdentifier
class mex.backend.graph.models.Result(result: Result)

Bases: object

Represent a set of graph results.

This class wraps neo4j.Result in an interface akin to sqlalchemy.engine.Result. We do this, to reduce vendor tie-in with neo4j and limit the dependency-scope of the neo4j driver library to the mex.backend.graph submodule.

__init__(result: Result) None

Wrap a neo4j result object in a mex-backend result.

all() list[dict[str, Any]]

Return all records as a list.

get_update_counters() dict[str, int]

Return a summary of counters for operations the query triggered.

log_notifications() None

Log neo4j notifications.

one() dict[str, Any]

Return exactly one record or raise an exception.

one_or_none() dict[str, Any] | None

Return at most one result or raise an exception.

Returns None if the result has no records. Raises MultipleResultsFound if multiple records are returned.

mex.backend.graph.query module

class mex.backend.graph.query.Query(name: str, template: Template, kwargs: dict[str, Any])

Bases: object

Wrapper for queries that can be rendered.

__init__(name: str, template: Template, kwargs: dict[str, Any]) None

Create a new query instance.

render() str

Render the query for database execution.

class mex.backend.graph.query.QueryBuilder

Bases: BaseConnector

Wrapper around jinja template loading and rendering.

__init__() None

Create a new jinja environment with template loader, filters and globals.

_get_ingest_query_for_entity_type(entity_type: str) str

Create an ingest query for the given entity type.

Generates a complex Cypher query template for ingesting extracted or rule models into the graph database. The query handles creation of nodes, nested objects (Text, Link), and reference relationships. Results are cached for performance.

Parameters:

entity_type – The entity type name (e.g. “ExtractedPerson”, “AdditivePerson”)

Raises:

KeyError – If the entity type is not found in the model classes

Returns:

Cypher query string template for ingesting this entity type

close() None

Clean up the connector.

mex.backend.graph.query.render_constraints(fields: list[~typing.Annotated[str, ~pydantic.types.StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=None, max_length=None, pattern=^[a-zA-Z]{1,255}$)]]) str

Convert a list of field names into cypher node/edge constraints.

mex.backend.graph.transform module

class mex.backend.graph.transform._SearchResultReference

Bases: TypedDict

Helper class to show the structure of search result references.

label: str
position: int
value: str | dict[str, str | None]
mex.backend.graph.transform.clean_dict(obj: Any) Any

Clean None and [] from dicts.

mex.backend.graph.transform.expand_references_in_search_result(refs: list[_SearchResultReference]) dict[str, list[str | dict[str, str | None]]]

Expand the _refs collection in a search result item.

Each item in a search result has a collection of _refs in the form of _SearchResultReference. Before parsing them into pydantic, we need to inline the references back into the item dictionary.

mex.backend.graph.transform.get_error_details_from_neo4j_error(model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse | MExPrimarySource, error: Neo4jError) list[ErrorDetails]

Convert ingest-data and a neo4j error into error details.

mex.backend.graph.transform.get_graph_rel_id(rel: GraphRel) tuple[str, int]

Returns a string uniquely identifying the GraphRel.

mex.backend.graph.transform.transform_model_into_ingest_data(model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | MExPrimarySource | AdditiveAccessPlatform | AdditiveActivity | AdditiveBibliographicResource | AdditiveConsent | AdditiveContactPoint | AdditiveDistribution | AdditiveOrganization | AdditiveOrganizationalUnit | AdditivePerson | AdditivePrimarySource | AdditiveResource | AdditiveVariable | AdditiveVariableGroup | SubtractiveAccessPlatform | SubtractiveActivity | SubtractiveBibliographicResource | SubtractiveConsent | SubtractiveContactPoint | SubtractiveDistribution | SubtractiveOrganization | SubtractiveOrganizationalUnit | SubtractivePerson | SubtractivePrimarySource | SubtractiveResource | SubtractiveVariable | SubtractiveVariableGroup | PreventiveAccessPlatform | PreventiveActivity | PreventiveBibliographicResource | PreventiveConsent | PreventiveContactPoint | PreventiveDistribution | PreventiveOrganization | PreventiveOrganizationalUnit | PreventivePerson | PreventivePrimarySource | PreventiveResource | PreventiveVariable | PreventiveVariableGroup, stable_target_id: str) IngestData

Transform the given model into an ingestion instruction.

Converts an extracted or rule model into structured data ready for database ingestion. Handles field categorization (mutable vs final), reference field processing, and nested object preparation.

Parameters:
  • model – The extracted or rule model to transform for ingestion

  • stable_target_id – Identifier of the associated merged item

Returns:

IngestData object containing query parameters and metadata

mex.backend.graph.transform.validate_ingested_data(data_in: IngestData, data_out: IngestData) list[ErrorDetails]

Validate that the ingestion has been executed successfully.

Module contents