fix(rdf): scope storeEntity DELETE to translator-managed predicates

Replace the literal-only FILTER(!isIRI(?o)) in JenaFusekiStorage.storeEntity
with a predicate-scoped DELETE so translator-emitted URI triples (tags,
glossary terms, owner, domain, tier, data products, structured sub-resources)
are refreshed from the new model on every entity write, while hook-managed
predicates (om:UPSTREAM, om:hasLineageDetails, om:owns / om:contains / ...)
stay intact.

Previously, with !isIRI(?o), every URI-valued triple survived storeEntity
forever — when a tag was removed or an owner changed, the old URI coexisted
with the new one because no hook ever cleans those up (tags live in
tag_usage, not entity_relationship; owners' translator-side predicate
om:hasOwner is not what the OWNS hook writes).

The DELETE set is the union of:
- RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES, a static list of
  predicates that may shrink to empty between writes (so the current model
  walk wouldn't see them) — rdf:type, om:hasOwner, prov:wasAttributedTo,
  om:hasTag, om:hasGlossaryTerm, om:hasTier, om:belongsToDomain,
  om:hasDataProduct, dct:source, om:sourceUrl, plus the structured-resource
  attachment predicates (om:hasLifeCycle / hasCertification / hasExtension /
  hasCustomProperty).
- the predicates the current model actually emits for the entity subject,
  covering JSON-LD context-driven predicates that aren't in the static list.

Added two coverage tests on RdfPropertyMapperTest: the static set contains
the documented core predicates, and never contains lineage-hook predicates
(om:UPSTREAM, prov:wasDerivedFrom, om:hasLineageDetails) — that overlap
would let storeEntity wipe lineage edges on every entity update.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sriharsha Chintalapani 2026-05-15 09:53:59 -07:00
parent e2575d51ab
commit 22d5825ceb
3 changed files with 148 additions and 14 deletions

View file

@ -27,6 +27,9 @@ import org.apache.jena.query.ResultSet;
import org.apache.jena.query.ResultSetFormatter;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.rdf.model.StmtIterator;
import org.apache.jena.rdfconnection.RDFConnection;
import org.apache.jena.rdfconnection.RDFConnectionFuseki;
import org.apache.jena.riot.RDFDataMgr;
@ -34,6 +37,7 @@ import org.apache.jena.riot.RDFFormat;
import org.apache.jena.update.UpdateFactory;
import org.apache.jena.update.UpdateRequest;
import org.openmetadata.schema.api.configuration.rdf.RdfConfiguration;
import org.openmetadata.service.rdf.translator.RdfPropertyMapper;
/**
* Apache Jena Fuseki implementation of RDF storage.
@ -317,24 +321,69 @@ public class JenaFusekiStorage implements RdfStorageInterface {
return false;
}
// Union the translator's static "always managed" predicates with whatever
// predicates the current model actually emits for this entity. The static
// set covers shrink-to-empty cases (e.g. all tags removed -> current model
// no longer emits om:hasTag, but we still need to clean up the old triples).
// The dynamic walk covers translator-only predicates introduced via the
// JSON-LD context that aren't in the static set.
private static Set<String> collectTranslatorPredicates(String entityUri, Model entityModel) {
Set<String> predicates =
new LinkedHashSet<>(RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES);
Resource entityResource = entityModel.createResource(entityUri);
StmtIterator stmts = entityModel.listStatements(entityResource, null, (RDFNode) null);
while (stmts.hasNext()) {
predicates.add(stmts.next().getPredicate().getURI());
}
return predicates;
}
private static String buildPredicateScopedDelete(String entityUri, Set<String> predicates) {
if (predicates.isEmpty()) {
// No-op delete: nothing to remove. Use a constant DELETE WHERE that
// matches nothing so the caller's retry/transaction path still has a
// well-formed UpdateRequest to execute.
return String.format(
"DELETE WHERE { GRAPH <%s> { <%s> <urn:om:noop> ?o . FILTER(false) } }",
KNOWLEDGE_GRAPH, entityUri);
}
StringBuilder filterIn = new StringBuilder();
boolean first = true;
for (String pred : predicates) {
if (!first) {
filterIn.append(", ");
}
first = false;
filterIn.append('<').append(pred).append('>');
}
return String.format(
"DELETE { GRAPH <%s> { <%s> ?p ?o } } WHERE { GRAPH <%s> { <%s> ?p ?o . FILTER(?p IN (%s)) } }",
KNOWLEDGE_GRAPH, entityUri, KNOWLEDGE_GRAPH, entityUri, filterIn);
}
@Override
public void storeEntity(String entityType, UUID entityId, Model entityModel) {
throwIfCircuitOpen("storeEntity");
String entityUri = baseUri + "entity/" + entityType + "/" + entityId;
// Refresh literal-valued triples (name, description, tags, etc.) from the
// translator, but preserve URI-valued triples those are inter-entity edges
// (om:hasOwner, om:belongsToDatabase, om:UPSTREAM, om:hasLineageDetails, )
// that are managed by add/removeRelationship and add/removeLineage hooks,
// not by the translator. A metadata-only update (e.g. PATCH description)
// doesn't fire relationship hooks, so a blanket DELETE-then-LOAD here would
// wipe relationships until the next weekly recreate-index. Filtering on
// !isIRI(?o) keeps every URI object intact; relationship lifecycle is owned
// by the dedicated hooks, and the LOAD that follows re-adds the translator's
// URI-typed triples (rdf:type, etc.) idempotently under RDF set semantics.
String deleteQuery =
String.format(
"DELETE { GRAPH <%s> { <%s> ?p ?o } } WHERE { GRAPH <%s> { <%s> ?p ?o . FILTER(!isIRI(?o)) } }",
KNOWLEDGE_GRAPH, entityUri, KNOWLEDGE_GRAPH, entityUri);
// Scope the DELETE to predicates the translator owns. The previous
// FILTER(!isIRI(?o)) preserved EVERY URI object, which let stale
// translator-emitted triples (old om:hasOwner, removed om:hasTag, etc.)
// accumulate across updates because no hook ever cleans them up owner /
// tag / glossary-term URIs aren't in entity_relationship. Predicate
// scoping lets the translator's fresh output replace the prior values,
// while hook-managed predicates (om:UPSTREAM, om:hasLineageDetails,
// om:owns / om:contains / ) are untouched so relationship and lineage
// state survives a metadata-only update.
//
// The set we delete is the union of:
// - RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES (covers the
// shrink-to-empty case where a field is now absent and the new model
// no longer emits its predicate), and
// - the predicates the current model actually emits for <entityUri>
// (covers translator-only predicates introduced via the JSON-LD
// context that aren't in the static set).
Set<String> predicatesToDelete = collectTranslatorPredicates(entityUri, entityModel);
String deleteQuery = buildPredicateScopedDelete(entityUri, predicatesToDelete);
int maxRetries = 3;
int retryCount = 0;

View file

@ -57,6 +57,42 @@ public class RdfPropertyMapper {
private static final Set<String> LINEAGE_PROPERTIES =
Set.of("upstreamEdges", "downstreamEdges", "lineage");
// Direct URI-valued predicates the translator emits from an entity. These
// are the predicates whose VALUE can change (or shrink to empty) between
// writes of the same entity e.g. tags removed, owner changed, domain
// unset without any relationship-hook firing. JenaFusekiStorage.storeEntity
// uses this set (unioned with the predicates actually emitted in the current
// model) to scope its DELETE, so old values get cleaned up while
// hook-managed predicates (om:UPSTREAM, om:owns/contains/, etc.) stay
// intact. Add to this set when a new URI-valued direct predicate is
// introduced in this class; the unit test
// RdfTranslatorManagedPredicatesTest will fail otherwise.
public static final Set<String> TRANSLATOR_MANAGED_DIRECT_PREDICATES =
Set.of(
// Identity / typing
RDF.type.getURI(),
// Owner / attribution
OM_NS + "hasOwner",
PROV_NS + "wasAttributedTo",
// Tags / glossary terms / tier (all addTagLabel paths)
OM_NS + "hasTag",
OM_NS + "hasGlossaryTerm",
OM_NS + "hasTier",
// Domain / data product
OM_NS + "belongsToDomain",
OM_NS + "hasDataProduct",
// Source provenance (translator only, not a hook)
DCT_NS + "source",
OM_NS + "sourceUrl",
// Structured sub-resources attached to the entity the entity's
// direct triple pointing at the blank node must be deleted so the
// new model's blank node replaces it. The blank node subtree itself
// becomes orphaned; that's a separate (out-of-scope) GC concern.
OM_NS + "hasLifeCycle",
OM_NS + "hasCertification",
OM_NS + "hasExtension",
OM_NS + "hasCustomProperty");
public RdfPropertyMapper(
String baseUri, ObjectMapper objectMapper, Map<String, Object> contextCache) {
this.baseUri = baseUri;

View file

@ -1271,4 +1271,53 @@ class RdfPropertyMapperTest {
this.aliases = aliases;
}
}
@Nested
@DisplayName("TRANSLATOR_MANAGED_DIRECT_PREDICATES coverage")
class TranslatorManagedPredicatesTests {
@Test
@DisplayName("Set must contain core direct URI predicates emitted by the translator")
void testCoreSetMembership() {
// These are emitted by addProvAttribution / addTagLabel / addEntityReference /
// the structured-property handlers. If any are removed from the set, downstream
// cleanup (JenaFusekiStorage.storeEntity) will leak stale state on entity updates.
java.util.Set<String> required =
java.util.Set.of(
"http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
OM_NS + "hasOwner",
PROV_NS + "wasAttributedTo",
OM_NS + "hasTag",
OM_NS + "hasGlossaryTerm",
OM_NS + "hasTier",
OM_NS + "belongsToDomain",
OM_NS + "hasDataProduct",
DCT_NS + "source",
OM_NS + "sourceUrl",
OM_NS + "hasLifeCycle",
OM_NS + "hasCertification",
OM_NS + "hasExtension",
OM_NS + "hasCustomProperty");
for (String pred : required) {
assertTrue(
RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES.contains(pred),
"TRANSLATOR_MANAGED_DIRECT_PREDICATES must include " + pred);
}
}
@Test
@DisplayName("Set must not include hook-managed lineage predicates")
void testNoOverlapWithLineageHookPredicates() {
// These are written by RdfRepository.addLineageWithDetails including them here
// would let storeEntity wipe lineage edges on every entity update.
java.util.Set<String> lineageHookPredicates =
java.util.Set.of(
OM_NS + "UPSTREAM", PROV_NS + "wasDerivedFrom", OM_NS + "hasLineageDetails");
for (String pred : lineageHookPredicates) {
assertFalse(
RdfPropertyMapper.TRANSLATOR_MANAGED_DIRECT_PREDICATES.contains(pred),
"TRANSLATOR_MANAGED_DIRECT_PREDICATES must NOT include hook-managed " + pred);
}
}
}
}