mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
fix(rdf): scope storeEntity DELETE to translator-managed predicates
Replace the literal-only FILTER(!isIRI(?o)) in JenaFusekiStorage.storeEntity with a predicate-scoped DELETE so translator-emitted URI triples (tags, glossary terms, owner, domain, tier, data products, structured sub-resources) are refreshed from the new model on every entity write, while hook-managed predicates (om:UPSTREAM, om:hasLineageDetails, om:owns / om:contains / ...) stay intact. Previously, with !isIRI(?o), every URI-valued triple survived storeEntity forever — when a tag was removed or an owner changed, the old URI coexisted with the new one because no hook ever cleans those up (tags live in tag_usage, not entity_relationship; owners' translator-side predicate om:hasOwner is not what the OWNS hook writes). The DELETE set is the union of: - RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES, a static list of predicates that may shrink to empty between writes (so the current model walk wouldn't see them) — rdf:type, om:hasOwner, prov:wasAttributedTo, om:hasTag, om:hasGlossaryTerm, om:hasTier, om:belongsToDomain, om:hasDataProduct, dct:source, om:sourceUrl, plus the structured-resource attachment predicates (om:hasLifeCycle / hasCertification / hasExtension / hasCustomProperty). - the predicates the current model actually emits for the entity subject, covering JSON-LD context-driven predicates that aren't in the static list. Added two coverage tests on RdfPropertyMapperTest: the static set contains the documented core predicates, and never contains lineage-hook predicates (om:UPSTREAM, prov:wasDerivedFrom, om:hasLineageDetails) — that overlap would let storeEntity wipe lineage edges on every entity update. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e2575d51ab
commit
22d5825ceb
3 changed files with 148 additions and 14 deletions
|
|
@ -27,6 +27,9 @@ import org.apache.jena.query.ResultSet;
|
|||
import org.apache.jena.query.ResultSetFormatter;
|
||||
import org.apache.jena.rdf.model.Model;
|
||||
import org.apache.jena.rdf.model.ModelFactory;
|
||||
import org.apache.jena.rdf.model.RDFNode;
|
||||
import org.apache.jena.rdf.model.Resource;
|
||||
import org.apache.jena.rdf.model.StmtIterator;
|
||||
import org.apache.jena.rdfconnection.RDFConnection;
|
||||
import org.apache.jena.rdfconnection.RDFConnectionFuseki;
|
||||
import org.apache.jena.riot.RDFDataMgr;
|
||||
|
|
@ -34,6 +37,7 @@ import org.apache.jena.riot.RDFFormat;
|
|||
import org.apache.jena.update.UpdateFactory;
|
||||
import org.apache.jena.update.UpdateRequest;
|
||||
import org.openmetadata.schema.api.configuration.rdf.RdfConfiguration;
|
||||
import org.openmetadata.service.rdf.translator.RdfPropertyMapper;
|
||||
|
||||
/**
|
||||
* Apache Jena Fuseki implementation of RDF storage.
|
||||
|
|
@ -317,24 +321,69 @@ public class JenaFusekiStorage implements RdfStorageInterface {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Union the translator's static "always managed" predicates with whatever
|
||||
// predicates the current model actually emits for this entity. The static
|
||||
// set covers shrink-to-empty cases (e.g. all tags removed -> current model
|
||||
// no longer emits om:hasTag, but we still need to clean up the old triples).
|
||||
// The dynamic walk covers translator-only predicates introduced via the
|
||||
// JSON-LD context that aren't in the static set.
|
||||
private static Set<String> collectTranslatorPredicates(String entityUri, Model entityModel) {
|
||||
Set<String> predicates =
|
||||
new LinkedHashSet<>(RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES);
|
||||
Resource entityResource = entityModel.createResource(entityUri);
|
||||
StmtIterator stmts = entityModel.listStatements(entityResource, null, (RDFNode) null);
|
||||
while (stmts.hasNext()) {
|
||||
predicates.add(stmts.next().getPredicate().getURI());
|
||||
}
|
||||
return predicates;
|
||||
}
|
||||
|
||||
private static String buildPredicateScopedDelete(String entityUri, Set<String> predicates) {
|
||||
if (predicates.isEmpty()) {
|
||||
// No-op delete: nothing to remove. Use a constant DELETE WHERE that
|
||||
// matches nothing so the caller's retry/transaction path still has a
|
||||
// well-formed UpdateRequest to execute.
|
||||
return String.format(
|
||||
"DELETE WHERE { GRAPH <%s> { <%s> <urn:om:noop> ?o . FILTER(false) } }",
|
||||
KNOWLEDGE_GRAPH, entityUri);
|
||||
}
|
||||
StringBuilder filterIn = new StringBuilder();
|
||||
boolean first = true;
|
||||
for (String pred : predicates) {
|
||||
if (!first) {
|
||||
filterIn.append(", ");
|
||||
}
|
||||
first = false;
|
||||
filterIn.append('<').append(pred).append('>');
|
||||
}
|
||||
return String.format(
|
||||
"DELETE { GRAPH <%s> { <%s> ?p ?o } } WHERE { GRAPH <%s> { <%s> ?p ?o . FILTER(?p IN (%s)) } }",
|
||||
KNOWLEDGE_GRAPH, entityUri, KNOWLEDGE_GRAPH, entityUri, filterIn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeEntity(String entityType, UUID entityId, Model entityModel) {
|
||||
throwIfCircuitOpen("storeEntity");
|
||||
String entityUri = baseUri + "entity/" + entityType + "/" + entityId;
|
||||
// Refresh literal-valued triples (name, description, tags, etc.) from the
|
||||
// translator, but preserve URI-valued triples — those are inter-entity edges
|
||||
// (om:hasOwner, om:belongsToDatabase, om:UPSTREAM, om:hasLineageDetails, …)
|
||||
// that are managed by add/removeRelationship and add/removeLineage hooks,
|
||||
// not by the translator. A metadata-only update (e.g. PATCH description)
|
||||
// doesn't fire relationship hooks, so a blanket DELETE-then-LOAD here would
|
||||
// wipe relationships until the next weekly recreate-index. Filtering on
|
||||
// !isIRI(?o) keeps every URI object intact; relationship lifecycle is owned
|
||||
// by the dedicated hooks, and the LOAD that follows re-adds the translator's
|
||||
// URI-typed triples (rdf:type, etc.) idempotently under RDF set semantics.
|
||||
String deleteQuery =
|
||||
String.format(
|
||||
"DELETE { GRAPH <%s> { <%s> ?p ?o } } WHERE { GRAPH <%s> { <%s> ?p ?o . FILTER(!isIRI(?o)) } }",
|
||||
KNOWLEDGE_GRAPH, entityUri, KNOWLEDGE_GRAPH, entityUri);
|
||||
// Scope the DELETE to predicates the translator owns. The previous
|
||||
// FILTER(!isIRI(?o)) preserved EVERY URI object, which let stale
|
||||
// translator-emitted triples (old om:hasOwner, removed om:hasTag, etc.)
|
||||
// accumulate across updates because no hook ever cleans them up — owner /
|
||||
// tag / glossary-term URIs aren't in entity_relationship. Predicate
|
||||
// scoping lets the translator's fresh output replace the prior values,
|
||||
// while hook-managed predicates (om:UPSTREAM, om:hasLineageDetails,
|
||||
// om:owns / om:contains / …) are untouched so relationship and lineage
|
||||
// state survives a metadata-only update.
|
||||
//
|
||||
// The set we delete is the union of:
|
||||
// - RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES (covers the
|
||||
// shrink-to-empty case where a field is now absent and the new model
|
||||
// no longer emits its predicate), and
|
||||
// - the predicates the current model actually emits for <entityUri>
|
||||
// (covers translator-only predicates introduced via the JSON-LD
|
||||
// context that aren't in the static set).
|
||||
Set<String> predicatesToDelete = collectTranslatorPredicates(entityUri, entityModel);
|
||||
String deleteQuery = buildPredicateScopedDelete(entityUri, predicatesToDelete);
|
||||
|
||||
int maxRetries = 3;
|
||||
int retryCount = 0;
|
||||
|
|
|
|||
|
|
@ -57,6 +57,42 @@ public class RdfPropertyMapper {
|
|||
private static final Set<String> LINEAGE_PROPERTIES =
|
||||
Set.of("upstreamEdges", "downstreamEdges", "lineage");
|
||||
|
||||
// Direct URI-valued predicates the translator emits from an entity. These
|
||||
// are the predicates whose VALUE can change (or shrink to empty) between
|
||||
// writes of the same entity — e.g. tags removed, owner changed, domain
|
||||
// unset — without any relationship-hook firing. JenaFusekiStorage.storeEntity
|
||||
// uses this set (unioned with the predicates actually emitted in the current
|
||||
// model) to scope its DELETE, so old values get cleaned up while
|
||||
// hook-managed predicates (om:UPSTREAM, om:owns/contains/…, etc.) stay
|
||||
// intact. Add to this set when a new URI-valued direct predicate is
|
||||
// introduced in this class; the unit test
|
||||
// RdfTranslatorManagedPredicatesTest will fail otherwise.
|
||||
public static final Set<String> TRANSLATOR_MANAGED_DIRECT_PREDICATES =
|
||||
Set.of(
|
||||
// Identity / typing
|
||||
RDF.type.getURI(),
|
||||
// Owner / attribution
|
||||
OM_NS + "hasOwner",
|
||||
PROV_NS + "wasAttributedTo",
|
||||
// Tags / glossary terms / tier (all addTagLabel paths)
|
||||
OM_NS + "hasTag",
|
||||
OM_NS + "hasGlossaryTerm",
|
||||
OM_NS + "hasTier",
|
||||
// Domain / data product
|
||||
OM_NS + "belongsToDomain",
|
||||
OM_NS + "hasDataProduct",
|
||||
// Source provenance (translator only, not a hook)
|
||||
DCT_NS + "source",
|
||||
OM_NS + "sourceUrl",
|
||||
// Structured sub-resources attached to the entity — the entity's
|
||||
// direct triple pointing at the blank node must be deleted so the
|
||||
// new model's blank node replaces it. The blank node subtree itself
|
||||
// becomes orphaned; that's a separate (out-of-scope) GC concern.
|
||||
OM_NS + "hasLifeCycle",
|
||||
OM_NS + "hasCertification",
|
||||
OM_NS + "hasExtension",
|
||||
OM_NS + "hasCustomProperty");
|
||||
|
||||
public RdfPropertyMapper(
|
||||
String baseUri, ObjectMapper objectMapper, Map<String, Object> contextCache) {
|
||||
this.baseUri = baseUri;
|
||||
|
|
|
|||
|
|
@ -1271,4 +1271,53 @@ class RdfPropertyMapperTest {
|
|||
this.aliases = aliases;
|
||||
}
|
||||
}
|
||||
|
||||
@Nested
|
||||
@DisplayName("TRANSLATOR_MANAGED_DIRECT_PREDICATES coverage")
|
||||
class TranslatorManagedPredicatesTests {
|
||||
|
||||
@Test
|
||||
@DisplayName("Set must contain core direct URI predicates emitted by the translator")
|
||||
void testCoreSetMembership() {
|
||||
// These are emitted by addProvAttribution / addTagLabel / addEntityReference /
|
||||
// the structured-property handlers. If any are removed from the set, downstream
|
||||
// cleanup (JenaFusekiStorage.storeEntity) will leak stale state on entity updates.
|
||||
java.util.Set<String> required =
|
||||
java.util.Set.of(
|
||||
"http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
|
||||
OM_NS + "hasOwner",
|
||||
PROV_NS + "wasAttributedTo",
|
||||
OM_NS + "hasTag",
|
||||
OM_NS + "hasGlossaryTerm",
|
||||
OM_NS + "hasTier",
|
||||
OM_NS + "belongsToDomain",
|
||||
OM_NS + "hasDataProduct",
|
||||
DCT_NS + "source",
|
||||
OM_NS + "sourceUrl",
|
||||
OM_NS + "hasLifeCycle",
|
||||
OM_NS + "hasCertification",
|
||||
OM_NS + "hasExtension",
|
||||
OM_NS + "hasCustomProperty");
|
||||
for (String pred : required) {
|
||||
assertTrue(
|
||||
RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES.contains(pred),
|
||||
"TRANSLATOR_MANAGED_DIRECT_PREDICATES must include " + pred);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("Set must not include hook-managed lineage predicates")
|
||||
void testNoOverlapWithLineageHookPredicates() {
|
||||
// These are written by RdfRepository.addLineageWithDetails — including them here
|
||||
// would let storeEntity wipe lineage edges on every entity update.
|
||||
java.util.Set<String> lineageHookPredicates =
|
||||
java.util.Set.of(
|
||||
OM_NS + "UPSTREAM", PROV_NS + "wasDerivedFrom", OM_NS + "hasLineageDetails");
|
||||
for (String pred : lineageHookPredicates) {
|
||||
assertFalse(
|
||||
RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES.contains(pred),
|
||||
"TRANSLATOR_MANAGED_DIRECT_PREDICATES must NOT include hook-managed " + pred);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue