mex.backend.graph package¶
Submodules¶
mex.backend.graph.connector module¶
- class mex.backend.graph.connector.GraphConnector¶
Bases:
BaseConnector
Connector to handle authentication and transactions with the graph database.
- __init__() None ¶
Create a new graph database connection.
- _check_connectivity_and_authentication() Result ¶
Check the connectivity and authentication to the graph.
- _fetch_extracted_or_rule_items(query_string: str | None, identifier: str | None, stable_target_id: str | None, entity_type: Sequence[str], referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result ¶
Query the graph for extracted or rule items.
- Parameters:
query_string – Optional full text search query term
identifier – Optional identifier filter
stable_target_id – Optional stable target ID filter
entity_type – List of allowed entity types
referenced_identifiers – Optional merged item identifiers filter
reference_field – Optional field name to filter for
skip – How many items to skip for pagination
limit – How many items to return at most
- Returns:
Graph result instance
- _init_driver() Driver ¶
Initialize and return a database driver.
- _run_ingest_in_transaction(tx: Transaction, model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse | MExPrimarySource) None ¶
Ingest a single item in a database transaction.
- _seed_constraints() None ¶
Ensure property constraints are created for all entity types.
- _seed_data() None ¶
Ensure the primary source mex is seeded and linked to itself.
- close() None ¶
Close the connector’s underlying requests session.
- commit(query: Query, /, access_mode: str = 'READ', **parameters: Any) Result ¶
Send and commit a single graph transaction with retry configuration.
- Parameters:
query – The query string or Query object to execute
access_mode – Whether to run the query with read or write access
**parameters – Query parameters to substitute in the Cypher query
- Returns:
Result object containing query execution results and metadata
- exists_item(identifier: Identifier, entity_type: str) bool ¶
Validate whether an item with the given identifier and entity type exists.
- Parameters:
identifier – Identifier of the to-be-checked item
entity_type – Entity type of the to-be-checked item
- Returns:
Boolean representing the existence of the requested item
- fetch_extracted_items(query_string: str | None, identifier: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result ¶
Query the graph for extracted items.
- Parameters:
query_string – Optional full text search query term
identifier – Optional identifier filter
stable_target_id – Optional stable target ID filter
entity_type – Optional entity type filter
referenced_identifiers – Optional merged item identifiers filter
reference_field – Optional field name to filter for
skip – How many items to skip for pagination
limit – How many items to return at most
- Returns:
Graph result instance
- fetch_identities(had_primary_source: Identifier | None = None, identifier_in_primary_source: str | None = None, stable_target_id: Identifier | None = None, limit: int = 1000) Result ¶
Search the graph for nodes matching the given ID combination.
Identity queries can be filtered by stable_target_id, had_primary_source or identifier_in_primary_source.
- Parameters:
had_primary_source – The stableTargetId of a connected PrimarySource
identifier_in_primary_source – The id the item had in its primary source
stable_target_id – The stableTargetId of an item
limit – How many results to return, defaults to 1000
- Returns:
A graph result set containing identities
- fetch_merged_items(query_string: str | None, identifier: str | None, entity_type: Sequence[str] | None, referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result ¶
Query the graph for merged items.
- Parameters:
query_string – Optional full text search query term
identifier – Optional merged item identifier filter
entity_type – Optional merged entity type filter
referenced_identifiers – Optional merged item identifiers filter
reference_field – Optional field name to filter for
skip – How many items to skip for pagination
limit – How many items to return at most
- Returns:
Graph result instance
- fetch_rule_items(query_string: str | None, identifier: str | None, stable_target_id: str | None, entity_type: Sequence[str] | None, referenced_identifiers: Sequence[str] | None, reference_field: str | None, skip: int, limit: int) Result ¶
Query the graph for rule items.
- Parameters:
query_string – Optional full text search query term
identifier – Optional identifier filter
stable_target_id – Optional stable target ID filter
entity_type – Optional entity type filter
referenced_identifiers – Optional merged item identifiers filter
reference_field – Optional field name to filter for
skip – How many items to skip for pagination
limit – How many items to return at most
- Returns:
Graph result instance
- flush() None ¶
Flush the database by deleting all nodes, constraints and indexes.
This operation only executes when debug mode is enabled in settings. Completely wipes the Neo4j database including all data, constraints, and indexes. Used for testing and development cleanup.
- ingest_items(models: Iterable[ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse | MExPrimarySource]) Generator[None, None, None] ¶
Ingest a list of extracted models or rule set responses into the graph.
mex.backend.graph.exceptions module¶
- exception mex.backend.graph.exceptions.InconsistentGraphError¶
Bases:
BackendError
Exception raised for inconsistencies found in the graph database.
- exception mex.backend.graph.exceptions.IngestionError(*args: Any, errors: Sequence[ErrorDetails] = (), retryable: bool = False)¶
Bases:
BackendError
Error for ingestion failures with underlying details.
- __init__(*args: Any, errors: Sequence[ErrorDetails] = (), retryable: bool = False) None ¶
Construct a new ingestion failure with underlying details.
- errors() list[ErrorDetails] ¶
Details about underlying errors.
- is_retryable() bool ¶
Whether the error is retryable.
- exception mex.backend.graph.exceptions.MultipleResultsFoundError¶
Bases:
BackendError
A single database result was required but more than one were found.
- exception mex.backend.graph.exceptions.NoResultFoundError¶
Bases:
BackendError
A database result was required but none was found.
mex.backend.graph.models module¶
- class mex.backend.graph.models.EdgeExporter¶
Bases:
RecordExporter
Transformer class that turns edges into a string of format label {props}.
- Full example:
shortName {position: 0}
- transform(x: Any) Any ¶
Transform a value, or collection of values.
- class mex.backend.graph.models.GraphRel¶
Bases:
TypedDict
Type definition for graph relations.
- edgeLabel: str¶
- edgeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]]¶
- nodeLabels: list[str]¶
- nodeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]]¶
- class mex.backend.graph.models.IngestData(*, stableTargetId: str, identifier: str | None, entityType: str, nodeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]], linkRels: list[GraphRel] = [], createRels: list[GraphRel] = [])¶
Bases:
BaseModel
Type definition for ingestion data.
- entityType: str¶
- identifier: str | None¶
- metadata() dict[str, int | str | None] ¶
Return log-able metadata.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'createRels': FieldInfo(annotation=list[GraphRel], required=False, default=[]), 'entityType': FieldInfo(annotation=str, required=True), 'identifier': FieldInfo(annotation=Union[str, NoneType], required=True), 'linkRels': FieldInfo(annotation=list[GraphRel], required=False, default=[]), 'nodeProps': FieldInfo(annotation=dict[str, Union[str, int, float, NoneType, bool, list[Union[str, int, float, NoneType, bool]]]], required=True), 'stableTargetId': FieldInfo(annotation=str, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- nodeProps: dict[str, str | int | float | None | bool | list[str | int | float | None | bool]]¶
- classmethod sort_create_rels(v: list[GraphRel]) list[GraphRel] ¶
Sort the rels by edge label and position.
- classmethod sort_link_rels(v: list[GraphRel]) list[GraphRel] ¶
Sort the rels by edge label and position.
- stableTargetId: str¶
- class mex.backend.graph.models.IngestParams(*, merged_label: str, node_label: str, all_referenced_labels: list[str], all_nested_labels: list[str], detach_node_edges: list[str], delete_node_edges: list[str], has_link_rels: bool, has_create_rels: bool)¶
Bases:
BaseModel
Type definition for query parameters.
- all_nested_labels: list[str]¶
- all_referenced_labels: list[str]¶
- delete_node_edges: list[str]¶
- detach_node_edges: list[str]¶
- has_create_rels: bool¶
- has_link_rels: bool¶
- merged_label: str¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'all_nested_labels': FieldInfo(annotation=list[str], required=True), 'all_referenced_labels': FieldInfo(annotation=list[str], required=True), 'delete_node_edges': FieldInfo(annotation=list[str], required=True), 'detach_node_edges': FieldInfo(annotation=list[str], required=True), 'has_create_rels': FieldInfo(annotation=bool, required=True), 'has_link_rels': FieldInfo(annotation=bool, required=True), 'merged_label': FieldInfo(annotation=str, required=True), 'node_label': FieldInfo(annotation=str, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- node_label: str¶
- class mex.backend.graph.models.MExPrimarySource(*, version: Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])] | None = None, alternativeTitle: list[Text] = [], contact: list[Annotated[MergedOrganizationalUnitIdentifier | MergedPersonIdentifier | MergedContactPointIdentifier, AfterValidator(func=Identifier)]] = [], description: list[Text] = [], documentation: list[Link] = [], locatedAt: list[Link] = [], title: list[Text] = [], unitInCharge: list[MergedOrganizationalUnitIdentifier] = [], entityType: Literal['ExtractedPrimarySource'] = 'ExtractedPrimarySource', hadPrimarySource: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'), identifier: ExtractedPrimarySourceIdentifier = ExtractedPrimarySourceIdentifier('00000000000001'), identifierInPrimarySource: str = 'mex', stableTargetId: MergedPrimarySourceIdentifier = MergedPrimarySourceIdentifier('00000000000000'))¶
Bases:
BasePrimarySource
Static metadata for the MEx primary source itself.
An instance of this class will bypass the IdentityProvider. This way we can ensure that the MEx primary source receives static identifiers.
- entityType: Annotated[Literal['ExtractedPrimarySource'], FieldInfo(annotation=NoneType, required=True, alias='$type', alias_priority=2, frozen=True)]¶
- hadPrimarySource: MergedPrimarySourceIdentifier¶
- identifier: ExtractedPrimarySourceIdentifier¶
- identifierInPrimarySource: str¶
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'ignore', 'populate_by_name': True, 'str_max_length': 100000, 'str_min_length': 1, 'str_strip_whitespace': True, 'use_enum_values': True, 'validate_assignment': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'alternativeTitle': FieldInfo(annotation=list[Text], required=False, default=[]), 'contact': FieldInfo(annotation=list[Annotated[Union[MergedOrganizationalUnitIdentifier, MergedPersonIdentifier, MergedContactPointIdentifier], AfterValidator]], required=False, default=[]), 'description': FieldInfo(annotation=list[Text], required=False, default=[]), 'documentation': FieldInfo(annotation=list[Link], required=False, default=[]), 'entityType': FieldInfo(annotation=Literal['ExtractedPrimarySource'], required=False, default='ExtractedPrimarySource', alias='$type', alias_priority=2, frozen=True), 'hadPrimarySource': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier("00000000000000")), 'identifier': FieldInfo(annotation=ExtractedPrimarySourceIdentifier, required=False, default=ExtractedPrimarySourceIdentifier("00000000000001")), 'identifierInPrimarySource': FieldInfo(annotation=str, required=False, default='mex'), 'locatedAt': FieldInfo(annotation=list[Link], required=False, default=[]), 'stableTargetId': FieldInfo(annotation=MergedPrimarySourceIdentifier, required=False, default=MergedPrimarySourceIdentifier("00000000000000")), 'title': FieldInfo(annotation=list[Text], required=False, default=[]), 'unitInCharge': FieldInfo(annotation=list[MergedOrganizationalUnitIdentifier], required=False, default=[]), 'version': FieldInfo(annotation=Union[Annotated[str, FieldInfo(annotation=NoneType, required=True, examples=['v1', '2023-01-16', 'Schema 9'])], NoneType], required=False, default=None)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- stableTargetId: MergedPrimarySourceIdentifier¶
- class mex.backend.graph.models.Result(result: Result)¶
Bases:
object
Represent a set of graph results.
This class wraps neo4j.Result in an interface akin to sqlalchemy.engine.Result. We do this, to reduce vendor tie-in with neo4j and limit the dependency-scope of the neo4j driver library to the mex.backend.graph submodule.
- __init__(result: Result) None ¶
Wrap a neo4j result object in a mex-backend result.
- all() list[dict[str, Any]] ¶
Return all records as a list.
- get_update_counters() dict[str, int] ¶
Return a summary of counters for operations the query triggered.
- log_notifications() None ¶
Log neo4j notifications.
- one() dict[str, Any] ¶
Return exactly one record or raise an exception.
- one_or_none() dict[str, Any] | None ¶
Return at most one result or raise an exception.
Returns None if the result has no records. Raises MultipleResultsFound if multiple records are returned.
mex.backend.graph.query module¶
- class mex.backend.graph.query.Query(name: str, template: Template, kwargs: dict[str, Any])¶
Bases:
object
Wrapper for queries that can be rendered.
- __init__(name: str, template: Template, kwargs: dict[str, Any]) None ¶
Create a new query instance.
- render() str ¶
Render the query for database execution.
- class mex.backend.graph.query.QueryBuilder¶
Bases:
BaseConnector
Wrapper around jinja template loading and rendering.
- __init__() None ¶
Create a new jinja environment with template loader, filters and globals.
- _get_ingest_query_for_entity_type(entity_type: str) str ¶
Create an ingest query for the given entity type.
Generates a complex Cypher query template for ingesting extracted or rule models into the graph database. The query handles creation of nodes, nested objects (Text, Link), and reference relationships. Results are cached for performance.
- Parameters:
entity_type – The entity type name (e.g. “ExtractedPerson”, “AdditivePerson”)
- Raises:
KeyError – If the entity type is not found in the model classes
- Returns:
Cypher query string template for ingesting this entity type
- close() None ¶
Clean up the connector.
- mex.backend.graph.query.render_constraints(fields: list[~typing.Annotated[str, ~pydantic.types.StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=None, max_length=None, pattern=^[a-zA-Z]{1,255}$)]]) str ¶
Convert a list of field names into cypher node/edge constraints.
mex.backend.graph.transform module¶
- class mex.backend.graph.transform._SearchResultReference¶
Bases:
TypedDict
Helper class to show the structure of search result references.
- label: str¶
- position: int¶
- value: str | dict[str, str | None]¶
- mex.backend.graph.transform.clean_dict(obj: Any) Any ¶
Clean None and [] from dicts.
- mex.backend.graph.transform.expand_references_in_search_result(refs: list[_SearchResultReference]) dict[str, list[str | dict[str, str | None]]] ¶
Expand the _refs collection in a search result item.
Each item in a search result has a collection of _refs in the form of _SearchResultReference. Before parsing them into pydantic, we need to inline the references back into the item dictionary.
- mex.backend.graph.transform.get_error_details_from_neo4j_error(model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | AccessPlatformRuleSetResponse | ActivityRuleSetResponse | BibliographicResourceRuleSetResponse | ConsentRuleSetResponse | ContactPointRuleSetResponse | DistributionRuleSetResponse | OrganizationRuleSetResponse | OrganizationalUnitRuleSetResponse | PersonRuleSetResponse | PrimarySourceRuleSetResponse | ResourceRuleSetResponse | VariableRuleSetResponse | VariableGroupRuleSetResponse | MExPrimarySource, error: Neo4jError) list[ErrorDetails] ¶
Convert ingest-data and a neo4j error into error details.
- mex.backend.graph.transform.get_graph_rel_id(rel: GraphRel) tuple[str, int] ¶
Returns a string uniquely identifying the GraphRel.
- mex.backend.graph.transform.transform_model_into_ingest_data(model: ExtractedAccessPlatform | ExtractedActivity | ExtractedBibliographicResource | ExtractedConsent | ExtractedContactPoint | ExtractedDistribution | ExtractedOrganization | ExtractedOrganizationalUnit | ExtractedPerson | ExtractedPrimarySource | ExtractedResource | ExtractedVariable | ExtractedVariableGroup | MExPrimarySource | AdditiveAccessPlatform | AdditiveActivity | AdditiveBibliographicResource | AdditiveConsent | AdditiveContactPoint | AdditiveDistribution | AdditiveOrganization | AdditiveOrganizationalUnit | AdditivePerson | AdditivePrimarySource | AdditiveResource | AdditiveVariable | AdditiveVariableGroup | SubtractiveAccessPlatform | SubtractiveActivity | SubtractiveBibliographicResource | SubtractiveConsent | SubtractiveContactPoint | SubtractiveDistribution | SubtractiveOrganization | SubtractiveOrganizationalUnit | SubtractivePerson | SubtractivePrimarySource | SubtractiveResource | SubtractiveVariable | SubtractiveVariableGroup | PreventiveAccessPlatform | PreventiveActivity | PreventiveBibliographicResource | PreventiveConsent | PreventiveContactPoint | PreventiveDistribution | PreventiveOrganization | PreventiveOrganizationalUnit | PreventivePerson | PreventivePrimarySource | PreventiveResource | PreventiveVariable | PreventiveVariableGroup, stable_target_id: str) IngestData ¶
Transform the given model into an ingestion instruction.
Converts an extracted or rule model into structured data ready for database ingestion. Handles field categorization (mutable vs final), reference field processing, and nested object preparation.
- Parameters:
model – The extracted or rule model to transform for ingestion
stable_target_id – Identifier of the associated merged item
- Returns:
IngestData object containing query parameters and metadata
- mex.backend.graph.transform.validate_ingested_data(data_in: IngestData, data_out: IngestData) list[ErrorDetails] ¶
Validate that the ingestion has been executed successfully.