diff --git a/.github/workflows/openmetadata-service-unit-tests.yml b/.github/workflows/openmetadata-service-unit-tests.yml index 39f72e6eeef..cb2971141a8 100644 --- a/.github/workflows/openmetadata-service-unit-tests.yml +++ b/.github/workflows/openmetadata-service-unit-tests.yml @@ -64,15 +64,18 @@ jobs: k8s_operator: - 'openmetadata-k8s-operator/**' + # The openmetadata-service unit tests are pure JVM tests with no database + # interaction (no testcontainers, no JDBC). The {mysql, postgresql} matrix used + # to run the suite twice with different `-Pmysql` / `-Ppostgresql` profiles, but + # those profiles are only defined in openmetadata-sdk/pom.xml and only affect + # failsafe (integration) tests that aren't enabled in this workflow. Result: + # both matrix jobs ran an identical surefire suite. DB-specific coverage + # belongs in `openmetadata-integration-tests`, not here. openmetadata-service-unit-tests: runs-on: ubuntu-latest timeout-minutes: 90 needs: changes if: ${{ needs.changes.outputs.java == 'true' }} - strategy: - fail-fast: false - matrix: - database: [mysql, postgresql] steps: - name: Checkout uses: actions/checkout@v4 @@ -100,12 +103,12 @@ jobs: librdkafka-dev unixodbc-dev libevent-dev jq sudo make install_antlr_cli - - name: Run openmetadata-service unit tests (${{ matrix.database }}) + - name: Run openmetadata-service unit tests env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | mvn -B clean package -pl openmetadata-service -am \ - -Pstatic-code-analysis,${{ matrix.database }} \ + -Pstatic-code-analysis \ -DfailIfNoTests=false \ -Dsonar.skip=true @@ -113,7 +116,7 @@ jobs: if: ${{ failure() && hashFiles('openmetadata-service/target/surefire-reports/TEST-*.xml') != '' }} uses: actions/upload-artifact@v4 with: - name: openmetadata-service-surefire-reports-${{ matrix.database }} + name: openmetadata-service-surefire-reports path: openmetadata-service/target/surefire-reports/ - name: Publish Test Report @@ -123,7 +126,7 @@ jobs: github_token: ${{ secrets.GITHUB_TOKEN }} fail_on_test_failures: true report_paths: "openmetadata-service/target/surefire-reports/TEST-*.xml" - check_name: "Test Report (${{ matrix.database }})" + check_name: "Test Report" k8s_operator-unit-tests: runs-on: ubuntu-latest diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/RdfTagsTierCertificationIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/RdfTagsTierCertificationIT.java new file mode 100644 index 00000000000..3ef762ebb7c --- /dev/null +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/RdfTagsTierCertificationIT.java @@ -0,0 +1,303 @@ +/* + * Copyright 2026 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.it.tests; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.openmetadata.it.factories.DatabaseSchemaTestFactory; +import org.openmetadata.it.factories.DatabaseServiceTestFactory; +import org.openmetadata.it.factories.TableTestFactory; +import org.openmetadata.it.util.RdfTestUtils; +import org.openmetadata.it.util.SdkClients; +import org.openmetadata.it.util.TestNamespace; +import org.openmetadata.it.util.TestNamespaceExtension; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.type.AssetCertification; +import org.openmetadata.schema.type.TagLabel; +import org.openmetadata.sdk.client.OpenMetadataClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for the RDF/Fuseki pipeline verifying that classification tags, Tier + * assignments, and asset certifications are materialised as real RDF links rather than synthetic + * FQN URIs or opaque JSON literals. + * + *

Exercises the mapper guarantees added to {@code RdfPropertyMapper}: + *

+ */ +@Execution(ExecutionMode.SAME_THREAD) +@Tag("integration") +@Tag("rdf") +@ExtendWith(TestNamespaceExtension.class) +class RdfTagsTierCertificationIT { + + private static final Logger LOG = LoggerFactory.getLogger(RdfTagsTierCertificationIT.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String ENTITY_URI_PREFIX = "https://open-metadata.org/entity/"; + private static final String ENTITY_TAG_URI_PREFIX = ENTITY_URI_PREFIX + "tag/"; + private static final String SYNTHETIC_TAG_URI_PREFIX = "https://open-metadata.org/tag/"; + private static final String OM_NS = "https://open-metadata.org/ontology/"; + private static final Duration AWAIT_TIMEOUT = Duration.ofSeconds(60); + private static final Duration POLL_INTERVAL = Duration.ofSeconds(1); + + static boolean isRdfEnabled() { + return RdfTestUtils.isRdfEnabled(); + } + + @Test + @EnabledIf("isRdfEnabled") + void classificationTag_linksToRealTagEntityUri(TestNamespace ns) { + Table table = + createTableWithTags( + ns, + new TagLabel() + .withTagFQN("PII.Sensitive") + .withSource(TagLabel.TagSource.CLASSIFICATION) + .withLabelType(TagLabel.LabelType.MANUAL)); + + String entityUri = entityUri("table", table.getId()); + + awaitTagBoundByFqn(entityUri, "hasTag", "PII.Sensitive"); + + String tagUri = fetchTagUri(entityUri, "hasTag", "PII.Sensitive"); + assertTrue( + tagUri.startsWith(ENTITY_TAG_URI_PREFIX), + "hasTag must resolve to entity/tag/{uuid}; got: " + tagUri); + assertFalse( + tagUri.startsWith(SYNTHETIC_TAG_URI_PREFIX), + "hasTag must not use the synthetic tag/FQN URI; got: " + tagUri); + } + + @Test + @EnabledIf("isRdfEnabled") + void tierTag_emitsHasTierShortcut(TestNamespace ns) { + Table table = + createTableWithTags( + ns, + new TagLabel() + .withTagFQN("Tier.Tier1") + .withSource(TagLabel.TagSource.CLASSIFICATION) + .withLabelType(TagLabel.LabelType.MANUAL)); + + String entityUri = entityUri("table", table.getId()); + + awaitTagBoundByFqn(entityUri, "hasTier", "Tier.Tier1"); + + String tierUri = fetchTagUri(entityUri, "hasTier", "Tier.Tier1"); + assertTrue( + tierUri.startsWith(ENTITY_TAG_URI_PREFIX), + "hasTier target must be entity/tag/{uuid}; got: " + tierUri); + + boolean typedAsTag = + RdfTestUtils.executeSparqlAsk( + "ASK { GRAPH ?g { <" + tierUri + "> a <" + OM_NS + "Tag> } }"); + assertTrue(typedAsTag, "hasTier target " + tierUri + " must be rdf:type om:Tag"); + } + + @Test + @EnabledIf("isRdfEnabled") + void certification_emitsStructuredTriplesNotJsonBlob(TestNamespace ns) { + OpenMetadataClient client = SdkClients.adminClient(); + Table table = createTableWithTags(ns); + + TagLabel certTag = + new TagLabel() + .withTagFQN("Certification.Bronze") + .withSource(TagLabel.TagSource.CLASSIFICATION) + .withLabelType(TagLabel.LabelType.MANUAL); + long now = System.currentTimeMillis(); + table.setCertification( + new AssetCertification() + .withTagLabel(certTag) + .withAppliedDate(now) + .withExpiryDate(now + Duration.ofDays(30).toMillis())); + client.tables().update(table.getId().toString(), table); + + String entityUri = entityUri("table", table.getId()); + + awaitAsk( + "hasCertification edge should appear and target a Bronze tag", + "ASK { GRAPH ?g { <" + + entityUri + + "> <" + + OM_NS + + "hasCertification> ?cert . " + + "?cert <" + + OM_NS + + "tagFQN> \"Certification.Bronze\" } }"); + + String certUri = fetchTagUri(entityUri, "hasCertification", "Certification.Bronze"); + assertTrue( + certUri.startsWith(ENTITY_TAG_URI_PREFIX), + "hasCertification target must be entity/tag/{uuid}; got: " + certUri); + + awaitAsk( + "certificationLevel literal should be 'Bronze'", + "ASK { GRAPH ?g { <" + entityUri + "> <" + OM_NS + "certificationLevel> \"Bronze\" } }"); + + awaitAsk( + "certificationAppliedAt must be a non-string literal", + "ASK { GRAPH ?g { <" + + entityUri + + "> <" + + OM_NS + + "certificationAppliedAt> ?t" + + " FILTER(isLiteral(?t) && DATATYPE(?t) != ) } }"); + + boolean jsonLiteralLeaks = + RdfTestUtils.executeSparqlAsk( + "ASK { GRAPH ?g { <" + + entityUri + + "> <" + + OM_NS + + "certification> ?o FILTER(isLiteral(?o)) } }"); + assertFalse(jsonLiteralLeaks, "Certification must not be stored as a JSON string literal"); + } + + @Test + @EnabledIf("isRdfEnabled") + void tagEntity_isReachableAndTyped(TestNamespace ns) { + Table table = + createTableWithTags( + ns, + new TagLabel() + .withTagFQN("PII.Sensitive") + .withSource(TagLabel.TagSource.CLASSIFICATION) + .withLabelType(TagLabel.LabelType.MANUAL)); + String entityUri = entityUri("table", table.getId()); + awaitAsk( + "hasTag target must be an om:Tag with om:tagFQN 'PII.Sensitive'", + "ASK { GRAPH ?g { <" + + entityUri + + "> <" + + OM_NS + + "hasTag> ?tag . " + + "?tag a <" + + OM_NS + + "Tag> ; " + + "<" + + OM_NS + + "tagFQN> \"PII.Sensitive\" } }"); + } + + /* ----------------------------- helpers ---------------------------------- */ + + private Table createTableWithTags(TestNamespace ns, TagLabel... tagLabels) { + var service = DatabaseServiceTestFactory.createPostgres(ns); + var schema = DatabaseSchemaTestFactory.createSimple(ns, service); + Table table = TableTestFactory.createSimple(ns, schema.getFullyQualifiedName()); + if (tagLabels.length > 0) { + table.setTags(List.of(tagLabels)); + SdkClients.adminClient().tables().update(table.getId().toString(), table); + table = SdkClients.adminClient().tables().get(table.getId().toString(), "tags,certification"); + } + return table; + } + + private static String entityUri(String type, UUID id) { + return ENTITY_URI_PREFIX + type + "/" + id; + } + + /** + * Wait until a predicate link from the entity to some tag resource identified by tagFQN exists. + * Independent of the tag URI shape (entity/real vs synthetic) so this doubles as a + * "RDF listener has caught up" probe. + */ + private static void awaitTagBoundByFqn(String entityUri, String predicate, String tagFqn) { + String sparql = + "ASK { GRAPH ?g { <" + + entityUri + + "> <" + + OM_NS + + predicate + + "> ?tag . " + + "?tag <" + + OM_NS + + "tagFQN> \"" + + tagFqn + + "\" } }"; + awaitAsk(predicate + " should eventually bind a tag with FQN '" + tagFqn + "'", sparql); + } + + /** Retrieves the concrete URI bound to `entityUri predicate ?tag` where tag has the given FQN. */ + private static String fetchTagUri(String entityUri, String predicate, String tagFqn) { + String sparql = + "SELECT ?tag WHERE { GRAPH ?g { <" + + entityUri + + "> <" + + OM_NS + + predicate + + "> ?tag . " + + "?tag <" + + OM_NS + + "tagFQN> \"" + + tagFqn + + "\" } } LIMIT 1"; + String json = RdfTestUtils.executeSparqlSelect(sparql); + if (json == null) { + fail("SPARQL SELECT returned null for predicate " + predicate); + } + try { + JsonNode results = MAPPER.readTree(json).path("results").path("bindings"); + if (!results.isArray() || results.size() == 0) { + fail("No binding found for predicate " + predicate + " on " + entityUri); + } + String uri = results.get(0).path("tag").path("value").asText(); + LOG.info( + "RDF: {} --{}--> {} (expected prefix {})", + entityUri, + predicate, + uri, + ENTITY_TAG_URI_PREFIX); + return uri; + } catch (Exception e) { + fail("Could not parse SPARQL response: " + e.getMessage() + "; body=" + json); + return null; + } + } + + private static void awaitAsk(String message, String sparql) { + try { + Awaitility.await(message) + .atMost(AWAIT_TIMEOUT) + .pollInterval(POLL_INTERVAL) + .until(() -> RdfTestUtils.executeSparqlAsk(sparql)); + } catch (Exception e) { + LOG.warn("Await failed for query: {}", sparql); + throw e; + } + assertTrue(RdfTestUtils.executeSparqlAsk(sparql), message); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfBatchProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfBatchProcessor.java index 777205916d8..77145e9b1b5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfBatchProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfBatchProcessor.java @@ -67,6 +67,7 @@ public class RdfBatchProcessor { BooleanSupplier effectiveStopRequested = stopRequested != null ? stopRequested : () -> false; int successCount = 0; int failedCount = 0; + String lastError = null; List indexedEntities = new ArrayList<>(); for (EntityInterface entity : entities) { @@ -80,25 +81,77 @@ public class RdfBatchProcessor { } catch (Exception e) { LOG.error("Failed to index entity {} to RDF", entity.getId(), e); failedCount++; + lastError = describeEntityError(entityType, entity.getId(), e); } } + int relationshipFailures = 0; + String relationshipError = null; if (!indexedEntities.isEmpty()) { - processBatchRelationships(entityType, indexedEntities); + RelationshipProcessingResult relResult = + processBatchRelationships(entityType, indexedEntities); + relationshipFailures += relResult.failureCount(); + if (relResult.lastError() != null) { + relationshipError = relResult.lastError(); + } if ("glossaryTerm".equals(entityType)) { - processGlossaryTermRelations(indexedEntities, effectiveStopRequested); + RelationshipProcessingResult glossResult = + processGlossaryTermRelations(indexedEntities, effectiveStopRequested); + relationshipFailures += glossResult.failureCount(); + if (glossResult.lastError() != null) { + relationshipError = glossResult.lastError(); + } } } - return new BatchProcessingResult(successCount, failedCount); + // Relationship failures are tracked separately from entity write failures. + // failedCount becomes "failedRecords" in the index stats, where a record is + // an entity row — folding relationship failures (which are per-edge, not + // per-entity) into it would inflate failedRecords beyond the totalRecords + // entity count and make stats nonsensical. Surface relationship errors only + // through lastError when no entity-level failure already provided one. + if (lastError == null && relationshipError != null) { + lastError = relationshipError; + } + + return new BatchProcessingResult(successCount, failedCount, relationshipFailures, lastError); } - public void processBatchRelationships( + public record RelationshipProcessingResult(int failureCount, String lastError) { + static final RelationshipProcessingResult OK = new RelationshipProcessingResult(0, null); + } + + /** + * Format a single failure with a context-specific prefix using the root cause's + * message (or class name when the message is blank). Used by the per-entity, + * bulk-relationship, and lineage-relationship error paths to keep their output + * format consistent. + */ + private static String describeError(String prefix, Throwable error) { + Throwable rootCause = error; + while (rootCause.getCause() != null && rootCause.getCause() != rootCause) { + rootCause = rootCause.getCause(); + } + String message = rootCause.getMessage(); + if (message == null || message.isBlank()) { + message = rootCause.getClass().getSimpleName(); + } + return prefix + ": " + message; + } + + private static String describeEntityError(String entityType, UUID entityId, Throwable error) { + return describeError(entityType + "/" + entityId, error); + } + + public RelationshipProcessingResult processBatchRelationships( String entityType, List entities) { if (entities == null || entities.isEmpty()) { - return; + return RelationshipProcessingResult.OK; } + int failures = 0; + String lastError = null; + try { List entityIds = entities.stream().map(entity -> entity.getId().toString()).collect(Collectors.toList()); @@ -124,7 +177,11 @@ public class RdfBatchProcessor { } if (rel.getRelation() == Relationship.UPSTREAM.ordinal() && rel.getJson() != null) { - processLineageRelationship(rel); + String error = processLineageRelationship(rel); + if (error != null) { + failures++; + lastError = error; + } } else { if ("glossaryTerm".equals(entityType) && rel.getRelation() == Relationship.RELATED_TO.ordinal() @@ -141,18 +198,40 @@ public class RdfBatchProcessor { } if (rel.getJson() != null) { - processLineageRelationship(rel); + String error = processLineageRelationship(rel); + if (error != null) { + failures++; + lastError = error; + } } else { allRelationships.add(convertToEntityRelationship(rel)); } } if (!allRelationships.isEmpty()) { - rdfRepository.bulkAddRelationships(allRelationships); + try { + rdfRepository.bulkAddRelationships(allRelationships); + } catch (Exception e) { + LOG.error( + "Failed to bulk add {} relationships for entity type {}", + allRelationships.size(), + entityType, + e); + failures += allRelationships.size(); + lastError = describeBulkError(entityType, "bulkRelationships", e); + } } } catch (Exception e) { LOG.error("Failed to process batch relationships for entity type {}", entityType, e); + failures++; + lastError = describeBulkError(entityType, "batchRelationships", e); } + + return new RelationshipProcessingResult(failures, lastError); + } + + private static String describeBulkError(String entityType, String stage, Throwable error) { + return describeError(entityType + "/" + stage, error); } public org.openmetadata.schema.type.EntityRelationship convertToEntityRelationship( @@ -172,24 +251,44 @@ public class RdfBatchProcessor { || EXCLUDED_RELATIONSHIP_TYPES.contains(rel.getRelation()); } - void processLineageRelationship(EntityRelationshipObject rel) { + String processLineageRelationship(EntityRelationshipObject rel) { + UUID fromId; + UUID toId; + LineageDetails lineageDetails; try { - UUID fromId = UUID.fromString(rel.getFromId()); - UUID toId = UUID.fromString(rel.getToId()); - LineageDetails lineageDetails = JsonUtils.readValue(rel.getJson(), LineageDetails.class); - rdfRepository.addLineageWithDetails( - rel.getFromEntity(), fromId, rel.getToEntity(), toId, lineageDetails); - } catch (Exception e) { - LOG.debug("Failed to parse lineage details, falling back to basic relationship", e); + fromId = UUID.fromString(rel.getFromId()); + toId = UUID.fromString(rel.getToId()); + lineageDetails = JsonUtils.readValue(rel.getJson(), LineageDetails.class); + } catch (Exception parseError) { + LOG.debug("Failed to parse lineage details, falling back to basic relationship", parseError); try { rdfRepository.addRelationship(convertToEntityRelationship(rel)); + return null; } catch (Exception ex) { - LOG.debug("Failed to add basic lineage relationship", ex); + LOG.error( + "Failed to add basic lineage relationship for {}->{}", + rel.getFromId(), + rel.getToId(), + ex); + return describeLineageError(rel, ex); } } + + try { + rdfRepository.addLineageWithDetails( + rel.getFromEntity(), fromId, rel.getToEntity(), toId, lineageDetails); + return null; + } catch (Exception e) { + LOG.error("Failed to add lineage with details for {}->{}", rel.getFromId(), rel.getToId(), e); + return describeLineageError(rel, e); + } } - void processGlossaryTermRelations( + private static String describeLineageError(EntityRelationshipObject rel, Throwable error) { + return describeError("lineage " + rel.getFromId() + "->" + rel.getToId(), error); + } + + RelationshipProcessingResult processGlossaryTermRelations( List entities, BooleanSupplier stopRequested) { List relations = new ArrayList<>(); @@ -221,10 +320,41 @@ public class RdfBatchProcessor { } } - if (!relations.isEmpty()) { + if (relations.isEmpty()) { + return RelationshipProcessingResult.OK; + } + + try { rdfRepository.bulkAddGlossaryTermRelations(relations); + return RelationshipProcessingResult.OK; + } catch (Exception e) { + LOG.error("Failed to bulk add {} glossary term relations", relations.size(), e); + return new RelationshipProcessingResult( + relations.size(), describeBulkError("glossaryTerm", "glossaryRelations", e)); } } - public record BatchProcessingResult(int successCount, int failedCount) {} + /** + * Outcome of processing a batch of entities. + * + * @param successCount entity-level write successes + * @param failedCount entity-level write failures (counts toward failedRecords stats) + * @param relationshipFailureCount per-edge relationship/lineage failures, kept + * separate so they don't inflate the entity-level failedRecords stat + * @param lastError most recent failure message (entity or relationship) + */ + public record BatchProcessingResult( + int successCount, int failedCount, int relationshipFailureCount, String lastError) { + public BatchProcessingResult(int successCount, int failedCount) { + this(successCount, failedCount, 0, null); + } + + public BatchProcessingResult(int successCount, int failedCount, String lastError) { + this(successCount, failedCount, 0, lastError); + } + + public boolean hasAnyFailure() { + return failedCount > 0 || relationshipFailureCount > 0; + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexApp.java index 17c3362819f..68ae50c96e2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexApp.java @@ -330,33 +330,20 @@ public class RdfIndexApp extends AbstractNativeApplication { Stats aggregatedStats = statsAggregator.toStats(latestJob); rdfIndexStats.set(aggregatedStats); jobData.setStats(aggregatedStats); - sendUpdates(jobExecutionContext, false); + if (latestJob.getStatus() + != org.openmetadata + .service + .apps + .bundles + .searchIndex + .distributed + .IndexJobStatus + .STOPPING) { + sendUpdates(jobExecutionContext, false); + } if (latestJob.isTerminal()) { - if (latestJob.getStatus() - == org.openmetadata - .service - .apps - .bundles - .searchIndex - .distributed - .IndexJobStatus - .STOPPED) { - stopped = true; - } else if (latestJob.getStatus() - == org.openmetadata - .service - .apps - .bundles - .searchIndex - .distributed - .IndexJobStatus - .FAILED) { - jobData.setFailure( - new IndexingError() - .withErrorSource(IndexingError.ErrorSource.JOB) - .withMessage(latestJob.getErrorMessage())); - } + handleTerminalDistributedJob(latestJob); return; } } @@ -369,6 +356,51 @@ public class RdfIndexApp extends AbstractNativeApplication { } } + private void handleTerminalDistributedJob(RdfIndexJob latestJob) { + org.openmetadata.service.apps.bundles.searchIndex.distributed.IndexJobStatus jobStatus = + latestJob.getStatus(); + if (jobStatus + == org.openmetadata.service.apps.bundles.searchIndex.distributed.IndexJobStatus.STOPPED) { + stopped = true; + return; + } + + boolean failedOutright = + jobStatus + == org.openmetadata.service.apps.bundles.searchIndex.distributed.IndexJobStatus.FAILED; + // The coordinator marks a job COMPLETED_WITH_ERRORS when any partition is + // FAILED or CANCELLED, which can happen even with failedRecords == 0 (e.g. + // user-initiated stop that cancels in-flight partitions before any record + // failures accrue). Surface that case too so the run record reflects + // partition-level outcomes, not just record-level ones. + boolean completedWithErrors = + jobStatus + == org.openmetadata + .service + .apps + .bundles + .searchIndex + .distributed + .IndexJobStatus + .COMPLETED_WITH_ERRORS; + + if (!failedOutright && !completedWithErrors) { + return; + } + + String message = latestJob.getErrorMessage(); + if (message == null || message.isBlank()) { + message = + latestJob.getFailedRecords() > 0 + ? String.format( + "RDF index job completed with %d failed record(s)", latestJob.getFailedRecords()) + : "RDF index job completed with errors at the partition level"; + } + LOG.error("RDF index job {} terminated with errors: {}", latestJob.getId(), message); + jobData.setFailure( + new IndexingError().withErrorSource(IndexingError.ErrorSource.JOB).withMessage(message)); + } + private void awaitDistributedExecution(Future distributedExecution) throws InterruptedException { try { @@ -418,17 +450,38 @@ public class RdfIndexApp extends AbstractNativeApplication { RdfBatchProcessor.BatchProcessingResult result = batchProcessor.processEntities(entityType, entities, () -> stopped); + // failedRecords stays an entity-level stat (relationship failures are + // per-edge, not per-record). But for surfacing failures on the run + // record we want either kind of failure to count, so use hasAnyFailure(). StepStats currentStats = new StepStats() .withSuccessRecords(result.successCount()) .withFailedRecords(result.failedCount()); updateEntityStats(entityType, currentStats); + if (result.hasAnyFailure() && result.lastError() != null) { + recordIndexingFailure( + entityType, + result.failedCount() + result.relationshipFailureCount(), + result.lastError()); + } sendUpdates(jobExecutionContext, false); } catch (Exception e) { LOG.error("Error processing batch for entity type {}", entityType, e); updateEntityStats( entityType, new StepStats().withSuccessRecords(0).withFailedRecords(entities.size())); + recordIndexingFailure(entityType, entities.size(), e.getMessage()); + } + } + + private void recordIndexingFailure(String entityType, int failedCount, String errorMessage) { + String message = + String.format( + "%d record(s) failed for entity type %s: %s", + failedCount, entityType, errorMessage != null ? errorMessage : ""); + if (jobData.getFailure() == null) { + jobData.setFailure( + new IndexingError().withErrorSource(IndexingError.ErrorSource.JOB).withMessage(message)); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinator.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinator.java index b30b0d9786c..42e20359491 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinator.java @@ -20,12 +20,16 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.system.EventPublisherJob; +import org.openmetadata.schema.type.Include; import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; import org.openmetadata.service.apps.bundles.searchIndex.distributed.IndexJobStatus; import org.openmetadata.service.apps.bundles.searchIndex.distributed.PartitionStatus; import org.openmetadata.service.apps.bundles.searchIndex.distributed.ServerIdentityResolver; @@ -35,6 +39,10 @@ import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfAggr import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfEntityStatsRecord; import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfIndexPartitionRecord; import org.openmetadata.service.jdbi3.CollectionDAO.RdfIndexPartitionDAO.RdfServerPartitionStatsRecord; +import org.openmetadata.service.jdbi3.EntityRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.util.FullyQualifiedName; +import org.openmetadata.service.util.RestUtil; @Slf4j public class DistributedRdfIndexCoordinator { @@ -44,12 +52,19 @@ public class DistributedRdfIndexCoordinator { private static final int MAX_PARTITION_RETRIES = 3; private static final double IMMEDIATE_CLAIMABLE_PERCENT = 0.50; private static final long PARTITION_RELEASE_WINDOW_MS = TimeUnit.SECONDS.toMillis(5); + private static final int MAX_ERROR_SAMPLES = 5; + private static final int MAX_ERROR_MESSAGE_LENGTH = 4000; + private static final int MAX_IN_FLIGHT_PARTITIONS_PER_SERVER = 5; + private static final int CURSOR_WALK_BATCH_SIZE = 10_000; private final CollectionDAO collectionDAO; private final RdfPartitionCalculator partitionCalculator; private final String serverId; private final AtomicLong lastClaimTimestamp = new AtomicLong(0); + private final ConcurrentHashMap>> partitionStartCursors = + new ConcurrentHashMap<>(); + public DistributedRdfIndexCoordinator(CollectionDAO collectionDAO) { this(collectionDAO, new RdfPartitionCalculator()); } @@ -216,15 +231,137 @@ public class DistributedRdfIndexCoordinator { .updatedAt(System.currentTimeMillis()) .build(); updateJob(updated); + precomputePartitionStartCursors(jobId, partitions); return updated; } + public String getPartitionStartCursor(UUID jobId, String entityType, long rangeStart) { + if (rangeStart <= 0 || jobId == null) { + return null; + } + Map> jobCache = partitionStartCursors.get(jobId); + if (jobCache == null) { + return null; + } + Map entityCursors = jobCache.get(entityType); + if (entityCursors == null) { + return null; + } + return entityCursors.get(rangeStart); + } + + private void precomputePartitionStartCursors(UUID jobId, List partitions) { + Map> byEntity = + partitions.stream() + .filter(p -> p.getEntityType() != null) + .collect(Collectors.groupingBy(RdfIndexPartition::getEntityType)); + + Map> jobCache = new HashMap<>(); + for (Map.Entry> e : byEntity.entrySet()) { + try { + jobCache.put(e.getKey(), walkBoundaries(e.getKey(), e.getValue())); + } catch (Exception ex) { + LOG.warn( + "Failed to precompute RDF partition start cursors for entity {}; workers fall back to OFFSET path", + e.getKey(), + ex); + } + } + partitionStartCursors.put(jobId, jobCache); + } + + private Map walkBoundaries( + String entityType, List entityPartitions) { + List sortedTargets = + entityPartitions.stream() + .map(RdfIndexPartition::getRangeStart) + .filter(r -> r > 0) + .sorted() + .distinct() + .collect(Collectors.toList()); + Map result = new HashMap<>(); + if (sortedTargets.isEmpty()) { + return result; + } + EntityRepository repo = Entity.getEntityRepository(entityType); + walkAndRecord(repo, sortedTargets, result); + LOG.debug("Precomputed {} RDF boundary cursors for entity {}", result.size(), entityType); + return result; + } + + private void walkAndRecord( + EntityRepository repo, List sortedTargets, Map result) { + ListFilter filter = new ListFilter(Include.ALL); + String afterName = ""; + String afterId = ""; + long currentOffset = 0; + int targetIdx = 0; + long nextTarget = sortedTargets.get(targetIdx); + T lastSeenEntity = null; + + while (targetIdx < sortedTargets.size()) { + long need = nextTarget - currentOffset; + if (need <= 0) { + // Defensive: we walked past this target without recording it. Reuse the last + // entity we saw and run it through the same cursor encoder as the regular + // path, so quoted-name entities don't end up with a different cursor format. + if (lastSeenEntity != null) { + result.put(nextTarget, RestUtil.encodeCursor(repo.getCursorValue(lastSeenEntity))); + } + targetIdx++; + nextTarget = (targetIdx < sortedTargets.size()) ? sortedTargets.get(targetIdx) : -1; + continue; + } + int fetch = (int) Math.min(need, CURSOR_WALK_BATCH_SIZE); + List batch = repo.getDao().listAfter(filter, fetch, afterName, afterId); + if (batch.isEmpty()) { + break; + } + T lastEntity = repo.getEntityClass().cast(deserializeLast(repo, batch)); + lastSeenEntity = lastEntity; + currentOffset += batch.size(); + afterName = FullyQualifiedName.unquoteName(lastEntity.getName()); + afterId = lastEntity.getId() == null ? "" : lastEntity.getId().toString(); + + if (currentOffset >= nextTarget) { + result.put(nextTarget, RestUtil.encodeCursor(repo.getCursorValue(lastEntity))); + targetIdx++; + nextTarget = (targetIdx < sortedTargets.size()) ? sortedTargets.get(targetIdx) : -1; + } + if (batch.size() < fetch) { + break; + } + } + } + + private Object deserializeLast( + EntityRepository repo, List batch) { + return JsonUtils.readValue(batch.get(batch.size() - 1), repo.getEntityClass()); + } + public RdfIndexPartition claimNextPartition(UUID jobId) { + return claimNextPartition(jobId, serverId); + } + + public RdfIndexPartition claimNextPartition(UUID jobId, String claimingServerId) { + int inFlight = + collectionDAO + .rdfIndexPartitionDAO() + .countInFlightPartitionsForServer(jobId.toString(), claimingServerId); + if (inFlight >= MAX_IN_FLIGHT_PARTITIONS_PER_SERVER) { + LOG.debug( + "Server {} has {} in-flight RDF partitions (max {}), backing off", + claimingServerId, + inFlight, + MAX_IN_FLIGHT_PARTITIONS_PER_SERVER); + return null; + } + long claimAt = nextClaimTimestamp(); int updated = collectionDAO .rdfIndexPartitionDAO() - .claimNextPartitionAtomic(jobId.toString(), serverId, claimAt); + .claimNextPartitionAtomic(jobId.toString(), claimingServerId, claimAt); if (updated <= 0) { return null; } @@ -232,7 +369,7 @@ public class DistributedRdfIndexCoordinator { RdfIndexPartitionRecord record = collectionDAO .rdfIndexPartitionDAO() - .findLatestClaimedPartition(jobId.toString(), serverId, claimAt); + .findLatestClaimedPartition(jobId.toString(), claimingServerId, claimAt); if (record == null) { LOG.warn( "Claimed RDF partition for job {} but could not retrieve the record; it may require stale recovery", @@ -256,25 +393,41 @@ public class DistributedRdfIndexCoordinator { } public void completePartition( - UUID partitionId, long cursor, long processedCount, long successCount, long failedCount) { + UUID partitionId, + long cursor, + long processedCount, + long successCount, + long failedCount, + String lastError) { RdfIndexPartition partition = getPartition(partitionId); long now = System.currentTimeMillis(); - collectionDAO - .rdfIndexPartitionDAO() - .update( - partitionId.toString(), - PartitionStatus.COMPLETED.name(), - cursor, - processedCount, - successCount, - failedCount, - partition.getAssignedServer(), - partition.getClaimedAt(), - partition.getStartedAt(), - now, - now, - null, - partition.getRetryCount()); + int updated = + collectionDAO + .rdfIndexPartitionDAO() + .updateIfProcessing( + partitionId.toString(), + PartitionStatus.COMPLETED.name(), + cursor, + processedCount, + successCount, + failedCount, + partition.getAssignedServer(), + partition.getClaimedAt(), + partition.getStartedAt(), + now, + now, + lastError, + partition.getRetryCount()); + if (updated == 0) { + // Stop or another participant already moved the row out of PROCESSING + // (typically to CANCELLED). Don't bump server stats and don't overwrite + // the authoritative status — the partition is done as far as this + // worker is concerned. + LOG.info( + "Skipping completion of RDF partition {} — no longer PROCESSING (status overridden by stop/reclaim)", + partitionId); + return; + } incrementServerStats(partition, processedCount, successCount, failedCount, 1, 0); refreshAggregatedJob(jobIdFrom(partition)); } @@ -288,22 +441,29 @@ public class DistributedRdfIndexCoordinator { String errorMessage) { RdfIndexPartition partition = getPartition(partitionId); long now = System.currentTimeMillis(); - collectionDAO - .rdfIndexPartitionDAO() - .update( - partitionId.toString(), - PartitionStatus.FAILED.name(), - cursor, - processedCount, - successCount, - failedCount, - partition.getAssignedServer(), - partition.getClaimedAt(), - partition.getStartedAt(), - now, - now, - errorMessage, - partition.getRetryCount() + 1); + int updated = + collectionDAO + .rdfIndexPartitionDAO() + .updateIfProcessing( + partitionId.toString(), + PartitionStatus.FAILED.name(), + cursor, + processedCount, + successCount, + failedCount, + partition.getAssignedServer(), + partition.getClaimedAt(), + partition.getStartedAt(), + now, + now, + errorMessage, + partition.getRetryCount() + 1); + if (updated == 0) { + LOG.info( + "Skipping failure of RDF partition {} — no longer PROCESSING (status overridden by stop/reclaim)", + partitionId); + return; + } incrementServerStats(partition, processedCount, successCount, failedCount, 0, 1); refreshAggregatedJob(jobIdFrom(partition)); } @@ -336,6 +496,102 @@ public class DistributedRdfIndexCoordinator { refreshAggregatedJob(jobId); } + public int cancelInFlightPartitions(UUID jobId) { + long now = System.currentTimeMillis(); + int cancelled = + collectionDAO.rdfIndexPartitionDAO().cancelInFlightPartitions(jobId.toString(), now); + if (cancelled > 0) { + LOG.info("Cancelled {} in-flight RDF partitions for job {}", cancelled, jobId); + } + return cancelled; + } + + public void requestStop(UUID jobId) { + RdfIndexJob job = getJob(jobId).orElse(null); + if (job == null) { + LOG.warn("Cannot stop RDF job {} - not found", jobId); + return; + } + if (job.isTerminal()) { + LOG.warn("Cannot stop RDF job {} - already in terminal state: {}", jobId, job.getStatus()); + return; + } + + updateJobStatus(jobId, IndexJobStatus.STOPPING, null); + cancelInFlightPartitions(jobId); + checkAndUpdateJobCompletion(jobId); + } + + public void checkAndUpdateJobCompletion(UUID jobId) { + RdfIndexJob job = refreshAggregatedJob(jobId); + if (job == null || job.isTerminal()) { + return; + } + + String id = jobId.toString(); + int pending = + collectionDAO + .rdfIndexPartitionDAO() + .countPartitionsByStatus(id, PartitionStatus.PENDING.name()); + int processing = + collectionDAO + .rdfIndexPartitionDAO() + .countPartitionsByStatus(id, PartitionStatus.PROCESSING.name()); + + if (pending > 0 || processing > 0) { + return; + } + + int failed = + collectionDAO + .rdfIndexPartitionDAO() + .countPartitionsByStatus(id, PartitionStatus.FAILED.name()); + int cancelled = + collectionDAO + .rdfIndexPartitionDAO() + .countPartitionsByStatus(id, PartitionStatus.CANCELLED.name()); + + // A partition can finish COMPLETED but still carry a non-null lastError — + // e.g. a relationship/lineage bulk write that failed without incrementing + // the entity-level failedCount or marking the partition FAILED. Treat that + // as an error signal too, otherwise the job appears clean despite real + // Fuseki write failures. + boolean hasPartitionLastError = + !collectionDAO.rdfIndexPartitionDAO().findRecentPartitionErrors(id, 1).isEmpty(); + + IndexJobStatus terminal; + if (job.getStatus() == IndexJobStatus.STOPPING) { + terminal = IndexJobStatus.STOPPED; + } else if (failed > 0 || cancelled > 0 || job.getFailedRecords() > 0 || hasPartitionLastError) { + terminal = IndexJobStatus.COMPLETED_WITH_ERRORS; + } else { + terminal = IndexJobStatus.COMPLETED; + } + + String errorMessage = job.getErrorMessage(); + if (terminal == IndexJobStatus.COMPLETED_WITH_ERRORS + && (errorMessage == null || errorMessage.isBlank()) + && hasPartitionLastError) { + // Surface a representative error so the run record isn't blank when the + // only signal was a partition lastError. + java.util.List samples = + collectionDAO.rdfIndexPartitionDAO().findRecentPartitionErrors(id, MAX_ERROR_SAMPLES); + errorMessage = "Partition errors: " + String.join(" | ", samples); + if (errorMessage.length() > MAX_ERROR_MESSAGE_LENGTH) { + errorMessage = errorMessage.substring(0, MAX_ERROR_MESSAGE_LENGTH) + "..."; + } + } + + updateJobStatus(jobId, terminal, errorMessage); + partitionStartCursors.remove(jobId); + LOG.info( + "RDF job {} reached terminal state {} (success={}, failed={})", + jobId, + terminal, + job.getSuccessRecords(), + job.getFailedRecords()); + } + public void releaseServerPartitions(UUID jobId, String serverId, boolean stopJob, String reason) { long now = System.currentTimeMillis(); collectionDAO @@ -405,6 +661,26 @@ public class DistributedRdfIndexCoordinator { reclaimStalePartitions(job.getId()); refreshAggregatedJob(job.getId()); } + evictStaleCursorCacheEntries(); + } + + /** + * Drop precomputed-cursor cache entries for jobs that no longer exist in the DB + * or are already terminal. Without this a server that crashed mid-job before + * {@link #refreshAggregatedJob} could mark the job terminal would leak the cache + * entry until the process restarts. + */ + private void evictStaleCursorCacheEntries() { + if (partitionStartCursors.isEmpty()) { + return; + } + partitionStartCursors + .keySet() + .removeIf( + cachedJobId -> { + RdfIndexJob job = getJob(cachedJobId).orElse(null); + return job == null || job.isTerminal(); + }); } private RdfIndexJob refreshAggregatedJob(UUID jobId) { @@ -454,10 +730,27 @@ public class DistributedRdfIndexCoordinator { IndexJobStatus status = existing.getStatus(); String errorMessage = existing.getErrorMessage(); if (aggregate.pendingPartitions() == 0 && aggregate.processingPartitions() == 0) { + // Partition lastError is an additional error signal alongside + // failedPartitions/failedRecords: a partition can finish COMPLETED but + // still carry a non-null lastError (e.g. relationship/lineage bulk write + // failures that don't bump failedRecords). Without this check the job + // could be promoted straight to COMPLETED here, and the later + // checkAndUpdateJobCompletion call would early-return because the job + // is already terminal — silently dropping the error signal. + boolean hasPartitionLastError = + !collectionDAO + .rdfIndexPartitionDAO() + .findRecentPartitionErrors(jobId.toString(), 1) + .isEmpty(); if (status == IndexJobStatus.STOPPING) { status = IndexJobStatus.STOPPED; - } else if (aggregate.failedPartitions() > 0 || aggregate.failedRecords() > 0) { + } else if (aggregate.failedPartitions() > 0 + || aggregate.failedRecords() > 0 + || hasPartitionLastError) { status = IndexJobStatus.COMPLETED_WITH_ERRORS; + if (errorMessage == null || errorMessage.isBlank()) { + errorMessage = aggregatePartitionErrors(jobId, aggregate); + } } else if (status == IndexJobStatus.READY || status == IndexJobStatus.RUNNING) { status = IndexJobStatus.COMPLETED; } @@ -466,13 +759,17 @@ public class DistributedRdfIndexCoordinator { } Long completedAt = existing.getCompletedAt(); - if (completedAt == null - && (status == IndexJobStatus.COMPLETED + boolean isTerminalNow = + status == IndexJobStatus.COMPLETED || status == IndexJobStatus.COMPLETED_WITH_ERRORS || status == IndexJobStatus.FAILED - || status == IndexJobStatus.STOPPED)) { + || status == IndexJobStatus.STOPPED; + if (completedAt == null && isTerminalNow) { completedAt = System.currentTimeMillis(); } + if (isTerminalNow) { + partitionStartCursors.remove(jobId); + } RdfIndexJob refreshed = existing.toBuilder() @@ -491,6 +788,27 @@ public class DistributedRdfIndexCoordinator { return refreshed; } + private String aggregatePartitionErrors(UUID jobId, RdfAggregatedStatsRecord aggregate) { + List samples = + collectionDAO + .rdfIndexPartitionDAO() + .findRecentPartitionErrors(jobId.toString(), MAX_ERROR_SAMPLES); + StringBuilder summary = new StringBuilder(); + summary + .append(aggregate.failedRecords()) + .append(" record(s) failed across ") + .append(aggregate.failedPartitions()) + .append(" partition(s)."); + if (samples != null && !samples.isEmpty()) { + summary.append(" Sample errors: "); + summary.append(String.join(" | ", samples)); + } + String message = summary.toString(); + return message.length() > MAX_ERROR_MESSAGE_LENGTH + ? message.substring(0, MAX_ERROR_MESSAGE_LENGTH) + "..." + : message; + } + private void incrementServerStats( RdfIndexPartition partition, long processedCount, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java index e2c4ab21a4e..fe32815ae2d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexExecutor.java @@ -39,6 +39,7 @@ public class DistributedRdfIndexExecutor { private static final long STALE_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30); private static final long CLAIM_RETRY_SLEEP_MS = 1000; private static final long SHUTDOWN_TIMEOUT_SECONDS = 30; + private static final long PARTITION_HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30); private final CollectionDAO collectionDAO; private final DistributedRdfIndexCoordinator coordinator; @@ -46,11 +47,14 @@ public class DistributedRdfIndexExecutor { private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean localExecutionCleaned = new AtomicBoolean(true); private final List activeWorkers = new CopyOnWriteArrayList<>(); + private final Set activePartitions = ConcurrentHashMap.newKeySet(); + private volatile RdfEntityCompletionTracker completionTracker; @Getter private volatile RdfIndexJob currentJob; private volatile ExecutorService workerExecutor; private volatile Thread lockRefreshThread; private volatile Thread staleReclaimerThread; + private volatile Thread partitionHeartbeatThread; private volatile boolean coordinatorOwnedJob; public DistributedRdfIndexExecutor(CollectionDAO collectionDAO, int partitionSize) { @@ -117,6 +121,8 @@ public class DistributedRdfIndexExecutor { throw new IllegalStateException("Failed to load RDF distributed job state"); } + initializeCompletionTracker(); + try { startCoordinatorThreads(); runWorkers(jobConfiguration, true); @@ -126,6 +132,25 @@ public class DistributedRdfIndexExecutor { } } + private void initializeCompletionTracker() { + completionTracker = new RdfEntityCompletionTracker(currentJob.getId()); + if (currentJob.getEntityStats() == null) { + return; + } + currentJob + .getEntityStats() + .forEach( + (entityType, stats) -> + completionTracker.initializeEntity(entityType, stats.getTotalPartitions())); + completionTracker.setOnEntityComplete( + (entityType, success) -> + LOG.info( + "RDF entity '{}' fully indexed (success={}) - job {}", + entityType, + success, + currentJob.getId())); + } + public void joinJob(RdfIndexJob job, EventPublisherJob jobConfiguration) throws InterruptedException { RdfRepository.getInstance().ensureStorageReady(); @@ -149,9 +174,7 @@ public class DistributedRdfIndexExecutor { if (currentJob != null) { if (coordinatorOwnedJob) { - coordinator.updateJobStatus(currentJob.getId(), IndexJobStatus.STOPPING, null); - coordinator.cancelPendingPartitions(currentJob.getId()); - coordinator.releaseServerPartitions(currentJob.getId(), serverId, true, "Stopped by user"); + coordinator.requestStop(currentJob.getId()); } else { coordinator.releaseServerPartitions( currentJob.getId(), serverId, false, "Worker server stopped participating"); @@ -162,6 +185,8 @@ public class DistributedRdfIndexExecutor { worker.stop(); } + // cleanupLocalExecution -> shutdownWorkerExecutor calls shutdownNow exactly + // once; don't shut it down again here or callers will see two invocations. cleanupLocalExecution(); } @@ -217,7 +242,7 @@ public class DistributedRdfIndexExecutor { return; } - RdfIndexPartition partition = coordinator.claimNextPartition(latestJob.getId()); + RdfIndexPartition partition = coordinator.claimNextPartition(latestJob.getId(), serverId); if (partition == null) { try { TimeUnit.MILLISECONDS.sleep(CLAIM_RETRY_SLEEP_MS); @@ -228,7 +253,49 @@ public class DistributedRdfIndexExecutor { continue; } - worker.processPartition(partition); + activePartitions.add(partition.getId()); + RdfPartitionWorker.PartitionResult result = null; + try { + result = worker.processPartition(partition); + } finally { + activePartitions.remove(partition.getId()); + } + if (completionTracker != null && result != null && !result.stopped()) { + // hasAnyFailure() captures BOTH entity-level failures (failedCount, + // including readerErrors) AND per-edge relationship failures + // (relationshipFailureCount). Using only failedCount would let an + // entity be promoted to "success" even when its lineage / ownership / + // tag triples failed to write — premature promotion. + completionTracker.recordPartitionComplete( + partition.getEntityType(), result.hasAnyFailure()); + } + } + } + + private void runPartitionHeartbeatLoop() { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { + try { + TimeUnit.MILLISECONDS.sleep(PARTITION_HEARTBEAT_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + try { + if (currentJob == null || currentJob.isTerminal() || activePartitions.isEmpty()) { + continue; + } + long now = System.currentTimeMillis(); + int updated = 0; + for (UUID partitionId : activePartitions) { + collectionDAO.rdfIndexPartitionDAO().updateHeartbeat(partitionId.toString(), now); + updated++; + } + if (updated > 0) { + LOG.debug("Refreshed RDF partition heartbeats for {} partitions", updated); + } + } catch (Exception e) { + LOG.warn("Error refreshing RDF partition heartbeats", e); + } } } @@ -238,14 +305,10 @@ public class DistributedRdfIndexExecutor { return; } - if (stopped.get()) { - coordinator.updateJobStatus(currentJob.getId(), IndexJobStatus.STOPPED, null); - } else if (!currentJob.isTerminal()) { - IndexJobStatus terminalStatus = - currentJob.getFailedRecords() > 0 - ? IndexJobStatus.COMPLETED_WITH_ERRORS - : IndexJobStatus.COMPLETED; - coordinator.updateJobStatus(currentJob.getId(), terminalStatus, currentJob.getErrorMessage()); + if (stopped.get() && !currentJob.isTerminal()) { + coordinator.requestStop(currentJob.getId()); + } else { + coordinator.checkAndUpdateJobCompletion(currentJob.getId()); } currentJob = coordinator.getJobWithAggregatedStats(currentJob.getId()); @@ -270,6 +333,11 @@ public class DistributedRdfIndexExecutor { } }); + partitionHeartbeatThread = + Thread.ofVirtual() + .name("rdf-partition-heartbeat-" + currentJob.getId().toString().substring(0, 8)) + .start(this::runPartitionHeartbeatLoop); + staleReclaimerThread = Thread.ofVirtual() .name("rdf-stale-reclaimer-" + currentJob.getId().toString().substring(0, 8)) @@ -327,9 +395,12 @@ public class DistributedRdfIndexExecutor { shutdownWorkerExecutor(); interruptThread(lockRefreshThread); interruptThread(staleReclaimerThread); + interruptThread(partitionHeartbeatThread); lockRefreshThread = null; staleReclaimerThread = null; + partitionHeartbeatThread = null; activeWorkers.clear(); + activePartitions.clear(); } private void cleanupCoordinatorExecution() { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfEntityCompletionTracker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfEntityCompletionTracker.java new file mode 100644 index 00000000000..5ba64a627ee --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfEntityCompletionTracker.java @@ -0,0 +1,170 @@ +/* + * Copyright 2024 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.apps.bundles.rdf.distributed; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.service.apps.bundles.searchIndex.distributed.PartitionStatus; + +/** + * Tracks partition completion per entity type during distributed RDF reindexing. + * When all partitions for an entity complete, fires a callback so consumers can + * promote that entity's RDF view (e.g. swap a staging graph) immediately rather + * than waiting for the full job to finish. + */ +@Slf4j +public class RdfEntityCompletionTracker { + private final Map totalPartitions = new ConcurrentHashMap<>(); + private final Map completedPartitions = new ConcurrentHashMap<>(); + private final Map failedPartitions = new ConcurrentHashMap<>(); + private final Set promotedEntities = ConcurrentHashMap.newKeySet(); + private volatile BiConsumer onEntityComplete; + private final UUID jobId; + + public RdfEntityCompletionTracker(UUID jobId) { + this.jobId = jobId; + } + + public void initializeEntity(String entityType, int partitionCount) { + totalPartitions.put(entityType, new AtomicInteger(partitionCount)); + completedPartitions.put(entityType, new AtomicInteger(0)); + failedPartitions.put(entityType, new AtomicInteger(0)); + } + + public void setOnEntityComplete(BiConsumer callback) { + this.onEntityComplete = callback; + } + + public void recordPartitionComplete(String entityType, boolean partitionFailed) { + AtomicInteger completed = completedPartitions.get(entityType); + AtomicInteger total = totalPartitions.get(entityType); + if (completed == null || total == null) { + LOG.warn( + "Received RDF partition completion for untracked entity '{}' (job {})", + entityType, + jobId); + return; + } + if (partitionFailed) { + AtomicInteger failed = failedPartitions.get(entityType); + if (failed != null) { + failed.incrementAndGet(); + } + } + int newCompleted = completed.incrementAndGet(); + int totalCount = total.get(); + if (newCompleted >= totalCount) { + AtomicInteger failed = failedPartitions.get(entityType); + boolean hasFailed = failed != null && failed.get() > 0; + promoteIfReady(entityType, hasFailed); + } + } + + public boolean isPromoted(String entityType) { + return promotedEntities.contains(entityType); + } + + public Set getPromotedEntities() { + return Set.copyOf(promotedEntities); + } + + public UUID getJobId() { + return jobId; + } + + /** + * Reconcile entity completion state from the partition table. Catches partition + * completions that bypass the in-memory tracker — e.g. partitions completed by + * a different participant server, or marked FAILED by the stale-reclaimer SQL. + */ + public void reconcileFromDatabase(List partitions) { + Map> byEntity = + partitions.stream().collect(Collectors.groupingBy(RdfIndexPartition::getEntityType)); + for (Map.Entry> entry : byEntity.entrySet()) { + String entityType = entry.getKey(); + List entityPartitions = entry.getValue(); + if (promotedEntities.contains(entityType)) { + continue; + } + long completedCount = + entityPartitions.stream() + .filter( + p -> + p.getStatus() == PartitionStatus.COMPLETED + || p.getStatus() == PartitionStatus.FAILED) + .count(); + boolean allDone = completedCount == entityPartitions.size() && !entityPartitions.isEmpty(); + if (allDone) { + boolean hasFailed = + entityPartitions.stream().anyMatch(p -> p.getStatus() == PartitionStatus.FAILED); + promoteIfReady(entityType, hasFailed); + } + } + } + + private void promoteIfReady(String entityType, boolean hasFailed) { + if (promotedEntities.add(entityType)) { + boolean success = !hasFailed; + LOG.debug( + "RDF entity '{}' all partitions complete (success={}, job {})", + entityType, + success, + jobId); + if (onEntityComplete != null) { + try { + onEntityComplete.accept(entityType, success); + } catch (Exception e) { + LOG.error( + "Error in RDF entity-completion callback for '{}' (job {})", entityType, jobId, e); + } + } + } + } + + public EntityCompletionStatus getStatus(String entityType) { + AtomicInteger total = totalPartitions.get(entityType); + AtomicInteger completed = completedPartitions.get(entityType); + AtomicInteger failed = failedPartitions.get(entityType); + if (total == null) { + return null; + } + return new EntityCompletionStatus( + entityType, + total.get(), + completed != null ? completed.get() : 0, + failed != null ? failed.get() : 0, + promotedEntities.contains(entityType)); + } + + public record EntityCompletionStatus( + String entityType, + int totalPartitions, + int completedPartitions, + int failedPartitions, + boolean promoted) { + public boolean isComplete() { + return completedPartitions >= totalPartitions; + } + + public boolean hasFailures() { + return failedPartitions > 0; + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorker.java index a7130262a6a..bc6a045a9f6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorker.java @@ -26,6 +26,7 @@ import org.openmetadata.service.apps.bundles.rdf.RdfBatchProcessor; import org.openmetadata.service.exception.SearchIndexException; import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource; +import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; @Slf4j public class RdfPartitionWorker { @@ -50,9 +51,11 @@ public class RdfPartitionWorker { long processedCount = partition.getProcessedCount(); long successCount = partition.getSuccessCount(); long failedCount = partition.getFailedCount(); + long relationshipFailureCount = 0; + String lastError = null; try { - String keysetCursor = initializeKeysetCursor(entityType, currentOffset); + String keysetCursor = initializeKeysetCursor(partition, entityType, currentOffset); while (currentOffset < partition.getRangeEnd() && !stopped.get() && !Thread.currentThread().isInterrupted()) { @@ -71,8 +74,16 @@ public class RdfPartitionWorker { processedCount += batchProcessed; successCount += batchResult.successCount(); + // failedCount tracks entity-level failures only (matches the + // failedRecords stat semantics where one record == one entity). + // Relationship/lineage edge failures are counted separately and + // surfaced through relationshipFailureCount in the result. failedCount += batchResult.failedCount() + readerErrors; + relationshipFailureCount += batchResult.relationshipFailureCount(); currentOffset += batchProcessed; + if (batchResult.lastError() != null) { + lastError = batchResult.lastError(); + } if (processedCount % PROGRESS_UPDATE_INTERVAL < batchProcessed) { coordinator.updatePartitionProgress( @@ -86,7 +97,7 @@ public class RdfPartitionWorker { keysetCursor = resultList.getPaging() != null ? resultList.getPaging().getAfter() : null; if (keysetCursor == null && currentOffset < partition.getRangeEnd()) { - keysetCursor = initializeKeysetCursor(entityType, currentOffset); + keysetCursor = initializeKeysetCursor(partition, entityType, currentOffset); if (keysetCursor == null) { break; } @@ -94,12 +105,14 @@ public class RdfPartitionWorker { } if (stopped.get() || Thread.currentThread().isInterrupted()) { - return new PartitionResult(processedCount, successCount, failedCount, true, null); + return new PartitionResult( + processedCount, successCount, failedCount, relationshipFailureCount, true, lastError); } coordinator.completePartition( - partition.getId(), currentOffset, processedCount, successCount, failedCount); - return new PartitionResult(processedCount, successCount, failedCount, false, null); + partition.getId(), currentOffset, processedCount, successCount, failedCount, lastError); + return new PartitionResult( + processedCount, successCount, failedCount, relationshipFailureCount, false, lastError); } catch (Exception e) { LOG.error("Failed to process RDF partition {}", partition.getId(), e); coordinator.failPartition( @@ -109,7 +122,13 @@ public class RdfPartitionWorker { successCount, failedCount, e.getMessage()); - return new PartitionResult(processedCount, successCount, failedCount, false, e.getMessage()); + return new PartitionResult( + processedCount, + successCount, + failedCount, + relationshipFailureCount, + false, + e.getMessage()); } } @@ -119,15 +138,21 @@ public class RdfPartitionWorker { private ResultList readEntitiesKeyset( String entityType, String keysetCursor, int limit) throws SearchIndexException { - PaginatedEntitiesSource source = - new PaginatedEntitiesSource(entityType, limit, List.of("*"), 0); + List fields = ReindexingUtil.getSearchIndexFields(entityType); + PaginatedEntitiesSource source = new PaginatedEntitiesSource(entityType, limit, fields, 0); return source.readNextKeyset(keysetCursor); } - private String initializeKeysetCursor(String entityType, long offset) { + private String initializeKeysetCursor( + RdfIndexPartition partition, String entityType, long offset) { if (offset <= 0) { return null; } + String precomputed = + coordinator.getPartitionStartCursor(partition.getJobId(), entityType, offset); + if (precomputed != null) { + return precomputed; + } int cursorOffset = toCursorOffset(entityType, offset); return Entity.getEntityRepository(entityType) .getCursorAtOffset(new ListFilter(Include.ALL), cursorOffset); @@ -144,10 +169,29 @@ public class RdfPartitionWorker { return Math.toIntExact(cursorOffset); } + /** + * Outcome of processing a single partition. + * + * @param processedCount entities + reader-error rows seen + * @param successCount entities written successfully + * @param failedCount entity-level failures (counts toward failedRecords stats) + * @param relationshipFailureCount per-edge relationship/lineage failures, NOT + * included in failedCount because they don't map to "records"; surfaced so + * completion tracking and run-record reporting can still flag the partition + * @param stopped whether the partition exited via stop signal + * @param errorMessage representative failure message if any + */ public record PartitionResult( long processedCount, long successCount, long failedCount, + long relationshipFailureCount, boolean stopped, - String errorMessage) {} + String errorMessage) { + + /** Did this partition encounter any failure (entity-level or relationship)? */ + public boolean hasAnyFailure() { + return failedCount > 0 || relationshipFailureCount > 0; + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPollingJobNotifier.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPollingJobNotifier.java new file mode 100644 index 00000000000..7683c07b784 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPollingJobNotifier.java @@ -0,0 +1,161 @@ +/* + * Copyright 2024 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.apps.bundles.rdf.distributed; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.service.jdbi3.CollectionDAO; + +/** + * Database-polling job notifier for the RDF distributed indexing job. Lets other + * server pods discover an in-flight RDF reindex and join it as participants. Mirrors + * the SearchIndex {@code PollingJobNotifier} but queries the {@code rdf_index_job} + * table. + * + *

Adaptive polling: 30s while idle, 1s while actively participating in a job to + * detect completion quickly. Single-server deployments don't gain anything from this + * — it's a no-op when only one pod exists. Multi-pod deployments use it to + * coordinate work without needing Redis pub/sub. + */ +@Slf4j +public class RdfPollingJobNotifier { + + private static final long IDLE_POLL_INTERVAL_MS = 30_000; + private static final long ACTIVE_POLL_INTERVAL_MS = 1_000; + + private final CollectionDAO collectionDAO; + private final String serverId; + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean participating = new AtomicBoolean(false); + private final Set knownJobs = ConcurrentHashMap.newKeySet(); + + private ScheduledExecutorService scheduler; + private Consumer jobStartedCallback; + private volatile java.util.concurrent.ScheduledFuture pollTask; + + public RdfPollingJobNotifier(CollectionDAO collectionDAO, String serverId) { + this.collectionDAO = collectionDAO; + this.serverId = serverId; + } + + public void start() { + if (!running.compareAndSet(false, true)) { + LOG.warn("RdfPollingJobNotifier already running"); + return; + } + scheduler = + Executors.newSingleThreadScheduledExecutor( + Thread.ofPlatform() + .name("rdf-job-notifier-" + serverId.substring(0, Math.min(8, serverId.length()))) + .factory()); + schedulePoll(IDLE_POLL_INTERVAL_MS); + LOG.info( + "RdfPollingJobNotifier started on server {} (idle: {}s, active: {}s)", + serverId, + IDLE_POLL_INTERVAL_MS / 1000, + ACTIVE_POLL_INTERVAL_MS / 1000); + } + + public void stop() { + if (!running.compareAndSet(true, false)) { + return; + } + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + knownJobs.clear(); + } + + public void notifyJobStarted(UUID jobId) { + knownJobs.add(jobId); + } + + public void notifyJobCompleted(UUID jobId) { + knownJobs.remove(jobId); + } + + public void onJobStarted(Consumer callback) { + this.jobStartedCallback = callback; + } + + public boolean isRunning() { + return running.get(); + } + + /** + * Toggle the active poll cadence. Reschedules the poll task at the new interval + * instead of relying on a soft throttle inside {@link #pollForJobs}, so the thread + * doesn't wake every second while idle. + */ + public void setParticipating(boolean isParticipating) { + boolean changed = participating.compareAndSet(!isParticipating, isParticipating); + if (changed && running.get()) { + schedulePoll(isParticipating ? ACTIVE_POLL_INTERVAL_MS : IDLE_POLL_INTERVAL_MS); + } + } + + private synchronized void schedulePoll(long intervalMs) { + if (scheduler == null || scheduler.isShutdown()) { + return; + } + if (pollTask != null) { + pollTask.cancel(false); + } + pollTask = + scheduler.scheduleWithFixedDelay(this::pollForJobs, 0, intervalMs, TimeUnit.MILLISECONDS); + } + + private void pollForJobs() { + if (!running.get()) { + return; + } + try { + List runningJobIds = collectionDAO.rdfIndexJobDAO().getRunningJobIds(); + if (runningJobIds.isEmpty()) { + if (!knownJobs.isEmpty()) { + knownJobs.clear(); + } + return; + } + for (String jobIdStr : runningJobIds) { + UUID jobId = UUID.fromString(jobIdStr); + if (knownJobs.add(jobId)) { + LOG.info("Discovered new running RDF job via polling: {}", jobId); + if (jobStartedCallback != null) { + jobStartedCallback.accept(jobId); + } + } + } + knownJobs.removeIf( + jobId -> runningJobIds.stream().noneMatch(id -> id.equals(jobId.toString()))); + } catch (Exception e) { + LOG.error("Error polling for RDF jobs", e); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 0cb047c7095..0b8b0124e81 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -13589,6 +13589,51 @@ public interface CollectionDAO { "UPDATE rdf_index_partition SET status = 'CANCELLED' WHERE jobId = :jobId AND status = 'PENDING'") int cancelPendingPartitions(@Bind("jobId") String jobId); + @SqlUpdate( + "UPDATE rdf_index_partition SET status = 'CANCELLED', " + + "lastError = 'Stopped by user', completedAt = :now, lastUpdateAt = :now " + + "WHERE jobId = :jobId AND status IN ('PENDING','PROCESSING')") + int cancelInFlightPartitions(@Bind("jobId") String jobId, @Bind("now") long now); + + @SqlQuery( + "SELECT COUNT(*) FROM rdf_index_partition " + + "WHERE jobId = :jobId AND status = 'PROCESSING' AND assignedServer = :serverId") + int countInFlightPartitionsForServer( + @Bind("jobId") String jobId, @Bind("serverId") String serverId); + + @SqlQuery("SELECT COUNT(*) FROM rdf_index_partition WHERE jobId = :jobId AND status = :status") + int countPartitionsByStatus(@Bind("jobId") String jobId, @Bind("status") String status); + + /** + * Status-guarded variant of {@link #update}: only writes if the row is still + * PROCESSING. Workers use this on completion so that a concurrent Stop + * (which moves the row to CANCELLED) isn't overwritten back to + * COMPLETED/FAILED, which would make the Stop button look unreliable. + * Returns the number of rows updated (0 means the row was no longer + * PROCESSING and the caller should skip side effects like server-stat + * increments). + */ + @SqlUpdate( + "UPDATE rdf_index_partition SET status = :status, processingCursor = :cursor, " + + "processedCount = :processedCount, successCount = :successCount, failedCount = :failedCount, " + + "assignedServer = :assignedServer, claimedAt = :claimedAt, startedAt = :startedAt, " + + "completedAt = :completedAt, lastUpdateAt = :lastUpdateAt, lastError = :lastError, " + + "retryCount = :retryCount WHERE id = :id AND status = 'PROCESSING'") + int updateIfProcessing( + @Bind("id") String id, + @Bind("status") String status, + @Bind("cursor") long cursor, + @Bind("processedCount") long processedCount, + @Bind("successCount") long successCount, + @Bind("failedCount") long failedCount, + @Bind("assignedServer") String assignedServer, + @Bind("claimedAt") Long claimedAt, + @Bind("startedAt") Long startedAt, + @Bind("completedAt") Long completedAt, + @Bind("lastUpdateAt") Long lastUpdateAt, + @Bind("lastError") String lastError, + @Bind("retryCount") int retryCount); + @SqlUpdate( "UPDATE rdf_index_partition SET status = :status, assignedServer = NULL, claimedAt = NULL, " + "lastError = :reason, lastUpdateAt = :updatedAt, completedAt = :completedAt " @@ -13647,6 +13692,12 @@ public interface CollectionDAO { + "WHERE jobId = :jobId AND assignedServer IS NOT NULL") List getAssignedServers(@Bind("jobId") String jobId); + @SqlQuery( + "SELECT lastError FROM rdf_index_partition " + + "WHERE jobId = :jobId AND lastError IS NOT NULL " + + "ORDER BY lastUpdateAt DESC LIMIT :limit") + List findRecentPartitionErrors(@Bind("jobId") String jobId, @Bind("limit") int limit); + @SqlUpdate("DELETE FROM rdf_index_partition") void deleteAll(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java index 7a21acfe33d..9530f97d7d8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java @@ -255,6 +255,7 @@ public class RdfRepository { LOG.debug("Added relationship {} to RDF store", relationship); } catch (Exception e) { LOG.error("Failed to add relationship to RDF", e); + throw new RuntimeException("Failed to add relationship to RDF", e); } } @@ -328,6 +329,7 @@ public class RdfRepository { LOG.debug("Bulk added {} relationships to RDF store", relationships.size()); } catch (Exception e) { LOG.error("Failed to bulk add relationships to RDF", e); + throw new RuntimeException("Failed to bulk add relationships to RDF", e); } } @@ -369,14 +371,11 @@ public class RdfRepository { fromResource.addProperty(upstream, toResource); if (lineageDetails != null) { + // Deterministic URI: re-indexing the same lineage produces the same URI, + // letting the DELETE+INSERT idempotency below collapse duplicate + // LineageDetails resources instead of creating a new one per run. String detailsUri = - config.getBaseUri().toString() - + "lineageDetails/" - + fromId - + "/" - + toId - + "/" - + System.currentTimeMillis(); + config.getBaseUri().toString() + "lineageDetails/" + fromId + "/" + toId; Resource detailsResource = model.createResource(detailsUri); Property hasLineageDetails = @@ -386,11 +385,33 @@ public class RdfRepository { detailsResource.addProperty( model.createProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"), model.createResource("https://open-metadata.org/ontology/LineageDetails")); + // detailsResource is the Activity instance for this lineage edge — it + // carries Activity-shaped predicates (prov:startedAtTime, endedAtTime, + // used, hadPlan, wasGeneratedBy, wasAssociatedWith). Type it as + // prov:Activity so PROV-O reasoners and federated SPARQL clients treat + // it as one without having to learn the OM-specific type. + detailsResource.addProperty( + model.createProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"), + model.createResource("http://www.w3.org/ns/prov#Activity")); if (lineageDetails.getSqlQuery() != null && !lineageDetails.getSqlQuery().isEmpty()) { detailsResource.addProperty( model.createProperty("https://open-metadata.org/ontology/", "sqlQuery"), lineageDetails.getSqlQuery()); + + // PROV-O Plan: model the SQL transformation recipe as a prov:Plan that + // the Activity hadPlan. Lets external clients diff/version transformation + // logic separately from individual runs. + String planUri = detailsUri + "/plan"; + Resource planResource = model.createResource(planUri); + planResource.addProperty( + model.createProperty("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "type"), + model.createResource("http://www.w3.org/ns/prov#Plan")); + planResource.addProperty( + model.createProperty("http://www.w3.org/ns/prov#", "value"), + lineageDetails.getSqlQuery()); + detailsResource.addProperty( + model.createProperty("http://www.w3.org/ns/prov#", "hadPlan"), planResource); } if (lineageDetails.getSource() != null) { @@ -415,17 +436,41 @@ public class RdfRepository { detailsResource.addProperty( model.createProperty("http://www.w3.org/ns/prov#", "wasGeneratedBy"), pipelineResource); + + // PROV-O inverse: pipeline prov:generated lineageDetails. Emitting both + // directions lets activity-side queries ("what did this pipeline produce?") + // run without needing reverse-property reasoning support in the triple store. + pipelineResource.addProperty( + model.createProperty("http://www.w3.org/ns/prov#", "generated"), detailsResource); } + // PROV-O input: lineageDetails prov:used . Completes the + // standard PROV-O Entity → Activity → Entity chain alongside wasDerivedFrom, + // so external SPARQL clients can query "what inputs did this activity use?". + detailsResource.addProperty( + model.createProperty("http://www.w3.org/ns/prov#", "used"), fromResource); + if (lineageDetails.getColumnsLineage() != null && !lineageDetails.getColumnsLineage().isEmpty()) { Property hasColumnLineage = model.createProperty("https://open-metadata.org/ontology/", "hasColumnLineage"); + int colLineageIndex = 0; for (org.openmetadata.schema.type.ColumnLineage colLineage : lineageDetails.getColumnsLineage()) { - String colLineageUri = detailsUri + "/columnLineage/" + System.nanoTime(); + // Deterministic URI per (lineage edge, target column) so re-indexing + // doesn't multiply column-lineage resources. The index suffix is a + // tiebreaker so distinct toColumn values that normalize to the same + // string (e.g. `a-b` and `a_b` both → `a_b` after the + // [^A-Za-z0-9]→`_` replacement) don't collapse to one resource. + String safeName = + colLineage.getToColumn() != null + ? colLineage.getToColumn().replaceAll("[^A-Za-z0-9]", "_") + : "noTarget"; + String colLineageUri = + detailsUri + "/columnLineage/" + safeName + "_" + colLineageIndex; Resource colLineageResource = model.createResource(colLineageUri); + colLineageIndex++; detailsResource.addProperty(hasColumnLineage, colLineageResource); colLineageResource.addProperty( @@ -460,6 +505,13 @@ public class RdfRepository { model.createTypedLiteral( lineageDetails.getCreatedAt().toString(), org.apache.jena.datatypes.xsd.XSDDatatype.XSDlong)); + // PROV-O timing: detailsResource represents the Activity instance, so its + // createdAt is when the Activity started. + detailsResource.addProperty( + model.createProperty("http://www.w3.org/ns/prov#", "startedAtTime"), + model.createTypedLiteral( + java.time.Instant.ofEpochMilli(lineageDetails.getCreatedAt()).toString(), + org.apache.jena.datatypes.xsd.XSDDatatype.XSDdateTime)); } if (lineageDetails.getUpdatedAt() != null) { detailsResource.addProperty( @@ -467,12 +519,29 @@ public class RdfRepository { model.createTypedLiteral( lineageDetails.getUpdatedAt().toString(), org.apache.jena.datatypes.xsd.XSDDatatype.XSDlong)); + // PROV-O timing: updatedAt is when the Activity last completed (or was + // last observed). For instantaneous activities it equals startedAtTime. + detailsResource.addProperty( + model.createProperty("http://www.w3.org/ns/prov#", "endedAtTime"), + model.createTypedLiteral( + java.time.Instant.ofEpochMilli(lineageDetails.getUpdatedAt()).toString(), + org.apache.jena.datatypes.xsd.XSDDatatype.XSDdateTime)); } if (lineageDetails.getCreatedBy() != null) { detailsResource.addProperty( model.createProperty("https://open-metadata.org/ontology/", "lineageCreatedBy"), lineageDetails.getCreatedBy()); + // PROV-O agency: the Activity was associated with the Agent (user/bot) + // that triggered or owns it. We don't know the agent's UUID from a + // username string, so use a name-based URI under entity/user/. + String associatedAgentUri = + config.getBaseUri().toString() + + "entity/user/" + + lineageDetails.getCreatedBy().replaceAll("[^A-Za-z0-9_.-]", "_"); + detailsResource.addProperty( + model.createProperty("http://www.w3.org/ns/prov#", "wasAssociatedWith"), + model.createResource(associatedAgentUri)); } if (lineageDetails.getUpdatedBy() != null) { detailsResource.addProperty( @@ -487,11 +556,40 @@ public class RdfRepository { String triples = writer.toString(); if (!triples.isEmpty()) { + String detailsUri = + config.getBaseUri().toString() + "lineageDetails/" + fromId + "/" + toId; + // Cleanup before re-insert: remove the lineage edge (both directions), + // any LineageDetails subtree for THIS specific (fromId, toId) edge — never + // touch the source entity's hasLineageDetails links to OTHER downstream + // entities — and any prov:generated reference to this details resource. + // The hasLineageDetails delete is pinned to hasLineageDetails + // so reindexing one edge doesn't strip the source's other + // downstream lineage links. The detailsUri-prefixed delete cleans up the + // LineageDetails resource itself plus its child columnLineage resources + // (deterministic URI prefix). String deleteQuery = String.format( - "DELETE WHERE { GRAPH <%s> { <%s> <%s> . } }; " - + "DELETE WHERE { GRAPH <%s> { <%s> <%s> . } }", - KNOWLEDGE_GRAPH, fromUri, toUri, KNOWLEDGE_GRAPH, toUri, fromUri); + "DELETE WHERE { GRAPH <%s> { <%s> <%s> . } };" + + " DELETE WHERE { GRAPH <%s> { <%s> <%s> . } };" + + " DELETE WHERE { GRAPH <%s> { <%s> <%s> . } };" + + " DELETE { GRAPH <%s> { ?s ?p ?o } } WHERE { GRAPH <%s> { ?s ?p ?o . FILTER(STRSTARTS(STR(?s), \"%s\")) } };" + + " DELETE { GRAPH <%s> { ?act <%s> } } WHERE { GRAPH <%s> { ?act <%s> } }", + KNOWLEDGE_GRAPH, + fromUri, + toUri, + KNOWLEDGE_GRAPH, + toUri, + fromUri, + KNOWLEDGE_GRAPH, + fromUri, + detailsUri, + KNOWLEDGE_GRAPH, + KNOWLEDGE_GRAPH, + detailsUri, + KNOWLEDGE_GRAPH, + detailsUri, + KNOWLEDGE_GRAPH, + detailsUri); storageService.executeSparqlUpdate(deleteQuery); @@ -508,6 +606,7 @@ public class RdfRepository { toType, toId, e); + throw new RuntimeException("Failed to add lineage with details", e); } } @@ -1516,6 +1615,34 @@ public class RdfRepository { } } + /** + * Re-orient lineage relation labels relative to the focal node. The raw stored + * relation `(A, B, upstream)` means "A is upstream of B" — but in a graph view + * centered on focal F, an edge {@code F → X} means X is *downstream* of F, not + * upstream. Without this re-orientation, every outgoing lineage edge from the + * focal would carry the misleading "Upstream" label even though it really + * represents downstream flow. + * + *

Returns the input relation untouched for non-lineage relations and for + * edges that don't touch the focal (e.g. multi-hop neighbours). + */ + private String relativeRelationLabel(EdgeInfo edge, String focalUri) { + if (focalUri == null || edge.relation == null) { + return edge.relation; + } + String rel = edge.relation.toLowerCase(Locale.ROOT); + boolean focalIsSource = focalUri.equals(edge.fromUri); + boolean focalIsTarget = focalUri.equals(edge.toUri); + if (!focalIsSource && !focalIsTarget) { + return edge.relation; + } + return switch (rel) { + case "upstream" -> focalIsSource ? "downstream" : "upstream"; + case "downstream" -> focalIsSource ? "upstream" : "downstream"; + default -> edge.relation; + }; + } + private String formatRelationshipLabel(String relationship) { return switch (relationship.toLowerCase()) { case "contains" -> "Contains"; @@ -1689,12 +1816,34 @@ public class RdfRepository { continue; } - String edgeKey = subjectUri + "|" + relationType + "|" + objectUri; + String fromUri = subjectUri; + String toUri = objectUri; + String canonicalPredicate = predicate; + if (isReverseDirectionPredicate(predicate)) { + fromUri = objectUri; + toUri = subjectUri; + // Predicate must travel with the canonicalized direction; otherwise the + // EdgeInfo would carry e.g. prov:wasDerivedFrom , + // which is the wrong direction by PROV-O semantics. Substitute the + // forward-direction equivalent. + canonicalPredicate = forwardEquivalentPredicate(predicate); + // Re-derive relationType from the canonical predicate so it matches + // the new (from, to) orientation. Otherwise prov:wasInfluencedBy gives + // relationType=downstream + predicate=om:UPSTREAM, which is internally + // inconsistent and would also miss dedup against an existing UPSTREAM + // edge written with the same subject/object. + relationType = extractEntityRelationType(canonicalPredicate); + if (relationType == null || relationType.isBlank()) { + continue; + } + } + + String edgeKey = fromUri + "|" + relationType + "|" + toUri; if (!edgeKeys.add(edgeKey)) { continue; } - EdgeInfo edge = new EdgeInfo(subjectUri, objectUri, relationType, predicate); + EdgeInfo edge = new EdgeInfo(fromUri, toUri, relationType, canonicalPredicate); edges.add(edge); discoveredNodes.add(subjectUri); discoveredNodes.add(objectUri); @@ -1992,8 +2141,13 @@ public class RdfRepository { JsonUtils.getObjectMapper().createObjectNode(); graphEdge.put("from", edge.fromUri); graphEdge.put("to", edge.toUri); - graphEdge.put("label", formatRelationshipLabel(edge.relation)); - graphEdge.put("relationType", edge.relation); + // Label edges relative to the focal node so the user sees the right semantics: + // focal → X (focal is upstream of X) → "Downstream" + // X → focal (X is upstream of focal) → "Upstream" + // Edges that don't touch the focal keep the raw relation label. + String displayRelation = relativeRelationLabel(edge, rootUri); + graphEdge.put("label", formatRelationshipLabel(displayRelation)); + graphEdge.put("relationType", displayRelation); graphEdge.put("arrows", "to"); graphEdges.add(graphEdge); } @@ -2059,6 +2213,39 @@ public class RdfRepository { }; } + private boolean isReverseDirectionPredicate(String predicateUri) { + String localName = extractUriLocalName(predicateUri); + if (localName == null || localName.isBlank()) { + return false; + } + String normalized = localName.replaceAll("[^A-Za-z0-9]", "").toLowerCase(Locale.ROOT); + return normalized.equals("wasderivedfrom") || normalized.equals("wasinfluencedby"); + } + + /** + * Map a reverse-direction predicate (PROV-O) to its forward-direction OpenMetadata + * equivalent so the canonicalized edge in {@link #parseEntityGraphEdgesFromResults} + * carries a predicate that matches its (from, to) orientation. + * + *

Both `prov:wasDerivedFrom` and `prov:wasInfluencedBy` are reverse-direction + * causation predicates: in `B wasDerivedFrom A` / `B wasInfluencedBy A`, A is + * the source and B is the effect. After we flip subject/object so the edge + * reads source→target, the canonical forward predicate is `om:UPSTREAM` in + * both cases. (OM does not store a separate `om:DOWNSTREAM` URI — downstream + * is derived by reading the same UPSTREAM edge from the other side.) + */ + private String forwardEquivalentPredicate(String reversePredicateUri) { + String localName = extractUriLocalName(reversePredicateUri); + if (localName == null) { + return reversePredicateUri; + } + String normalized = localName.replaceAll("[^A-Za-z0-9]", "").toLowerCase(Locale.ROOT); + return switch (normalized) { + case "wasderivedfrom", "wasinfluencedby" -> "https://open-metadata.org/ontology/UPSTREAM"; + default -> reversePredicateUri; + }; + } + private String normalizeEntityTypeFilter(String entityType) { return entityType == null ? "" : entityType.trim().toLowerCase(Locale.ROOT); } @@ -2367,6 +2554,7 @@ public class RdfRepository { } } catch (Exception e) { LOG.error("Failed to bulk add glossary term relations to RDF", e); + throw new RuntimeException("Failed to bulk add glossary term relations to RDF", e); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfUtils.java index deb1378a044..e3dccc64722 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfUtils.java @@ -1,14 +1,75 @@ package org.openmetadata.service.rdf; +import java.util.Set; + /** * Utility methods for RDF operations */ public class RdfUtils { + private static final Set PROV_ACTIVITY_TYPES = + Set.of( + "pipeline", + "ingestionpipeline", + "storedprocedure", + "dbtpipeline", + "workflow", + "pipelinerun"); + + private static final Set PROV_AGENT_TYPES = Set.of("user", "team", "bot", "role"); + + private static final Set PROV_ENTITY_TYPES = + Set.of( + "table", + "database", + "databaseschema", + "dashboard", + "chart", + "topic", + "mlmodel", + "container", + "report", + "searchindex", + "apicollection", + "apiendpoint", + "datamodel", + "dashboarddatamodel", + "metric", + "directory", + "file", + "worksheet", + "spreadsheet", + "glossaryterm", + "tag", + "dataproduct", + "domain"); + private RdfUtils() { // Private constructor for utility class } + /** + * Maps an entity type to its PROV-O class (Entity, Activity, or Agent). + * Returns null when the entity type doesn't fit cleanly into the PROV-O model + * (e.g. service definitions, classifications, policies). + */ + public static String getProvType(String entityType) { + if (entityType == null) { + return null; + } + String key = entityType.toLowerCase(); + if (PROV_ACTIVITY_TYPES.contains(key)) { + return "prov:Activity"; + } + if (PROV_AGENT_TYPES.contains(key)) { + return "prov:Agent"; + } + if (PROV_ENTITY_TYPES.contains(key)) { + return "prov:Entity"; + } + return null; + } + public static String getRdfType(String entityType) { return switch (entityType.toLowerCase()) { case "table" -> "dcat:Dataset"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/JsonLdTranslator.java b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/JsonLdTranslator.java index 310e55e25ad..4512e9e469b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/JsonLdTranslator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/JsonLdTranslator.java @@ -243,6 +243,16 @@ public class JsonLdTranslator { String omType = entityType.substring(0, 1).toUpperCase() + entityType.substring(1); entityResource.addProperty(RDF.type, model.createResource(omNamespace + omType)); + // Add PROV-O class typing (prov:Entity/Activity/Agent) so PROV-O reasoners can + // apply standard rules. Skipped when the primary rdfType is already a PROV-O + // class (e.g. pipeline → prov:Activity) to avoid duplicate triples. + String provType = RdfUtils.getProvType(entityType); + if (provType != null && !provType.equals(rdfType)) { + String provNamespace = model.getNsPrefixURI("prov"); + String provLocalName = provType.substring(provType.indexOf(':') + 1); + entityResource.addProperty(RDF.type, model.createResource(provNamespace + provLocalName)); + } + RdfPropertyMapper propertyMapper = new RdfPropertyMapper(baseUri, objectMapper, contextCache); propertyMapper.mapEntityToRdf(entity, entityResource, model); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/RdfPropertyMapper.java b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/RdfPropertyMapper.java index 810b6cb5d3e..cc6887bac10 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/RdfPropertyMapper.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/rdf/translator/RdfPropertyMapper.java @@ -16,10 +16,12 @@ import org.apache.jena.vocabulary.RDF; import org.apache.jena.vocabulary.RDFS; import org.apache.jena.vocabulary.SKOS; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.classification.Tag; import org.openmetadata.schema.entity.data.GlossaryTerm; import org.openmetadata.schema.type.Include; import org.openmetadata.service.Entity; import org.openmetadata.service.rdf.RdfUtils; +import org.openmetadata.service.util.FullyQualifiedName; /** * Maps all entity properties to RDF triples based on context definitions @@ -31,6 +33,8 @@ public class RdfPropertyMapper { private final ObjectMapper objectMapper; private final Map contextCache; private final Map glossaryTermIdCache = new ConcurrentHashMap<>(); + private final Map classificationTagIdCache = new ConcurrentHashMap<>(); + private static final String TIER_CLASSIFICATION_PREFIX = "Tier."; // Common namespace URIs private static final String OM_NS = "https://open-metadata.org/ontology/"; @@ -40,13 +44,14 @@ public class RdfPropertyMapper { private static final String FOAF_NS = "http://xmlns.com/foaf/0.1/"; private static final String VOID_NS = "http://rdfs.org/ns/void#"; private static final String CSVW_NS = "http://www.w3.org/ns/csvw#"; + private static final String DPROD_NS = "https://ekgf.github.io/dprod/"; // Properties that should be mapped to structured RDF instead of JSON literals private static final Set STRUCTURED_PROPERTIES = - Set.of("votes", "lifeCycle", "customProperties", "extension"); + Set.of("lifeCycle", "customProperties", "extension", "certification"); // Properties that should be omitted from RDF because they are audit/helper data. - private static final Set IGNORED_PROPERTIES = Set.of("changeDescription"); + private static final Set IGNORED_PROPERTIES = Set.of("changeDescription", "votes"); // Lineage properties that need special handling private static final Set LINEAGE_PROPERTIES = @@ -92,17 +97,36 @@ public class RdfPropertyMapper { JsonNode entityJson, Resource entityResource, Model model) { + // Flatten all context maps in the array into one combined map BEFORE iterating + // entity fields, so each field gets resolved against the union of mappings + // exactly once. Without this, processContextMappings runs per-context-map and + // the same field can be emitted multiple times: e.g. `owners` is mapped in + // base.jsonld (→ om:hasOwner) but absent from `dataAsset-complete`, so the + // second pass falls through to processUnmappedField and emits an extra + // `om:owners` predicate alongside om:hasOwner — duplicate triples for the + // same logical relationship. Later contexts win on key conflicts (standard + // JSON-LD context-merge semantics). + Map mergedContext = new java.util.HashMap<>(); for (Object contextItem : contextArray) { if (contextItem instanceof Map) { - processContextMappings( - (Map) contextItem, entityJson, entityResource, model); + mergedContext.putAll((Map) contextItem); } } + processContextMappings(mergedContext, entityJson, entityResource, model); } // Fields that are handled separately with typed predicates (not via JSON-LD context) private static final Set TYPED_RELATION_FIELDS = Set.of("relatedTerms"); + // Fields where the array contains EntityReferences. When the field also has a + // JSON-LD context mapping the mapped path emits clean `om: ` + // triples and the unmapped path's JSON-string literal would be redundant noise. + // For fields without a context mapping the unmapped path is the ONLY path, so we + // can't simply skip — we expand each array element as an entity reference using + // an `om:` predicate so the data isn't lost. + private static final Set ENTITY_REFERENCE_ARRAY_FIELDS = + Set.of("owners", "followers", "reviewers", "voters", "experts", "domains", "dataProducts"); + private void processContextMappings( Map contextMap, JsonNode entityJson, Resource entityResource, Model model) { // Iterate through all fields in the entity JSON @@ -126,6 +150,20 @@ public class RdfPropertyMapper { continue; } + // Structured properties (certification, lifeCycle, etc.) are handled before the JSON-LD + // context lookup so they get proper RDF triples even when no context entry exists for them. + if (STRUCTURED_PROPERTIES.contains(fieldName) + && fieldValue != null + && !fieldValue.isNull() + && (fieldValue.isObject() || fieldValue.isArray())) { + if (fieldValue.isArray()) { + addStructuredArrayProperty(fieldName, fieldValue, entityResource, model); + } else { + addStructuredProperty(fieldName, fieldValue, entityResource, model); + } + continue; + } + // Look up the mapping in context Object mapping = contextMap.get(fieldName); if (mapping != null) { @@ -143,6 +181,13 @@ public class RdfPropertyMapper { return; } + // PROV-O attribution: emit prov:wasAttributedTo for each owner in addition to + // the standard om:owners triples. Lets external SPARQL clients query attribution + // using the W3C PROV-O vocabulary instead of OpenMetadata-specific predicates. + if ("owners".equals(fieldName) && fieldValue.isArray()) { + addProvAttribution(entityResource, fieldValue, model); + } + // Check if this is a lineage property that needs special handling if (LINEAGE_PROPERTIES.contains(fieldName)) { addLineageProperty(fieldName, fieldValue, entityResource, model); @@ -194,6 +239,34 @@ public class RdfPropertyMapper { private void processUnmappedField( String fieldName, JsonNode fieldValue, Resource entityResource, Model model) { + // PROV-O attribution mirror — fires here too because not every entity context + // declares the owners field, in which case it falls through to the unmapped path + // and bypasses processFieldMapping. + if ("owners".equals(fieldName) && fieldValue.isArray()) { + addProvAttribution(entityResource, fieldValue, model); + } + + // EntityReference arrays: don't dump the raw JSON as a literal. If the array is + // empty there's nothing to emit. Otherwise expand each element through + // addEntityReference so the data still lands as proper `om: ` + // triples even when no JSON-LD context maps the field. For fields the mapped + // path also handles (e.g. owners), this is a no-op because the same triples + // were already added — Jena's Model dedupes identical triples. + if (ENTITY_REFERENCE_ARRAY_FIELDS.contains(fieldName) && fieldValue.isArray()) { + if (fieldValue.isEmpty()) { + return; + } + addEntityReference(entityResource, OM_NS + fieldName, fieldValue, model); + return; + } + + // Skip empty arrays / objects — emitting "[]" or "{}" string literals creates + // noise without providing useful information. + if ((fieldValue.isArray() && fieldValue.isEmpty()) + || (fieldValue.isObject() && fieldValue.isEmpty())) { + return; + } + // Create property in om: namespace String propertyUri = OM_NS + fieldName; Property property = model.createProperty(propertyUri); @@ -228,6 +301,17 @@ public class RdfPropertyMapper { } } + private void addProvAttribution(Resource entityResource, JsonNode owners, Model model) { + Property attributedTo = model.createProperty(PROV_NS, "wasAttributedTo"); + for (JsonNode owner : owners) { + if (owner.isObject() && owner.has("id") && owner.has("type")) { + String ownerUri = + baseUri + "entity/" + owner.get("type").asText() + "/" + owner.get("id").asText(); + entityResource.addProperty(attributedTo, model.createResource(ownerUri)); + } + } + } + private void addEntityReference( Resource resource, String propertyId, JsonNode value, Model model) { Property property = createProperty(propertyId, model); @@ -242,7 +326,7 @@ public class RdfPropertyMapper { resource.addProperty(property, refResource); // Also add type information for the reference - refResource.addProperty(RDF.type, model.createResource(getRdfType(refType))); + refResource.addProperty(RDF.type, createTypeResource(refType, model)); // Add basic properties of the reference if (value.has("name")) { @@ -268,73 +352,106 @@ public class RdfPropertyMapper { private void addTagLabel(Resource resource, Property property, JsonNode tagLabel, Model model) { String tagFqn = tagLabel.get("tagFQN").asText(); + String source = tagLabel.has("source") ? tagLabel.get("source").asText() : "Classification"; + boolean isGlossary = "Glossary".equalsIgnoreCase(source); - // Create a URI for the tag based on its FQN - // Convert FQN like "PII.None" to a valid URI - String tagUri = baseUri + "tag/" + tagFqn.replace(".", "/"); - Resource tagResource = model.createResource(tagUri); - - // Link the entity to the tag + Resource tagResource = resolveTagResource(tagFqn, source, tagLabel, model); resource.addProperty(property, tagResource); - // Add tag type - tagResource.addProperty(RDF.type, model.createResource(OM_NS + "Tag")); + if (isGlossary) { + tagResource.addProperty(RDF.type, createTypeResource("glossaryTerm", model)); + tagResource.addProperty(RDF.type, model.createResource(SKOS.getURI() + "Concept")); + resource.addProperty(model.createProperty(OM_NS, "hasGlossaryTerm"), tagResource); + } else { + tagResource.addProperty(RDF.type, createTypeResource("tag", model)); + tagResource.addProperty(RDF.type, model.createResource(OM_NS + "Tag")); + if (tagFqn.startsWith(TIER_CLASSIFICATION_PREFIX)) { + resource.addProperty(model.createProperty(OM_NS, "hasTier"), tagResource); + } + } - // Add tagFQN as a property tagResource.addProperty(model.createProperty(OM_NS, "tagFQN"), tagFqn); - - // Add tag name if available + tagResource.addProperty(model.createProperty(OM_NS, "tagSource"), source); if (tagLabel.has("name")) { tagResource.addProperty(RDFS.label, tagLabel.get("name").asText()); } - - // Add displayName if available if (tagLabel.has("displayName")) { tagResource.addProperty(SKOS.prefLabel, tagLabel.get("displayName").asText()); } - - // Add labelType if (tagLabel.has("labelType")) { tagResource.addProperty( model.createProperty(OM_NS, "labelType"), tagLabel.get("labelType").asText()); } - - // Add source (Classification or Glossary) - if (tagLabel.has("source")) { - String source = tagLabel.get("source").asText(); - tagResource.addProperty(model.createProperty(OM_NS, "tagSource"), source); - - // Also add appropriate type based on source - if ("Glossary".equalsIgnoreCase(source)) { - tagResource.addProperty(RDF.type, model.createResource(SKOS.getURI() + "Concept")); - addGlossaryTermReference(resource, tagFqn, tagLabel, model); - } - } - - // Add state if (tagLabel.has("state")) { tagResource.addProperty( model.createProperty(OM_NS, "tagState"), tagLabel.get("state").asText()); } - - // Add description if available if (tagLabel.has("description")) { tagResource.addProperty( model.createProperty(DCT_NS, "description"), tagLabel.get("description").asText()); } } - private void addGlossaryTermReference( - Resource resource, String termFqn, JsonNode tagLabel, Model model) { - UUID termId = resolveGlossaryTermId(termFqn, tagLabel); - if (termId == null) { - return; + /** + * Resolves a TagLabel to the canonical entity URI. When the underlying tag or glossary term can + * be looked up by FQN, the asset is linked to the real entity (e.g. {@code entity/tag/{uuid}}) + * so SPARQL traversals reach the tag's metadata, owners, classification, etc. Falls back to a + * deterministic synthetic URI only if lookup fails (e.g. tag deleted concurrently). + */ + private Resource resolveTagResource( + String tagFqn, String source, JsonNode tagLabel, Model model) { + UUID id = + "Glossary".equalsIgnoreCase(source) + ? resolveGlossaryTermId(tagFqn, tagLabel) + : resolveClassificationTagId(tagFqn, tagLabel); + String entityType = "Glossary".equalsIgnoreCase(source) ? "glossaryTerm" : "tag"; + if (id != null) { + return model.createResource(baseUri + "entity/" + entityType + "/" + id); } + return model.createResource(baseUri + "tag/" + tagFqn.replace(".", "/")); + } - String termUri = baseUri + "entity/glossaryTerm/" + termId; - Resource termResource = model.createResource(termUri); - resource.addProperty(model.createProperty(OM_NS, "hasGlossaryTerm"), termResource); - termResource.addProperty(RDF.type, model.createResource(getRdfType("glossaryTerm"))); + private String extractCertificationLevel(String tagFqn) { + if (tagFqn == null || tagFqn.isBlank()) { + return null; + } + try { + String[] parts = FullyQualifiedName.split(tagFqn); + if (parts.length < 2) { + return null; + } + return FullyQualifiedName.unquoteName(parts[parts.length - 1]); + } catch (Exception e) { + LOG.debug("Could not extract certification level from FQN {}", tagFqn); + return null; + } + } + + private UUID resolveClassificationTagId(String tagFqn, JsonNode tagLabel) { + if (tagFqn == null || tagFqn.isEmpty()) { + return null; + } + UUID cached = classificationTagIdCache.get(tagFqn); + if (cached != null) { + return cached; + } + try { + UUID resolvedId = tryResolveUuidFromHref(tagLabel); + if (resolvedId != null) { + classificationTagIdCache.put(tagFqn, resolvedId); + return resolvedId; + } + + Tag tag = Entity.getEntityByName(Entity.TAG, tagFqn, "", Include.NON_DELETED, false); + UUID id = tag != null ? tag.getId() : null; + if (id != null) { + classificationTagIdCache.put(tagFqn, id); + } + return id; + } catch (Exception e) { + LOG.debug("Could not resolve classification tag id for FQN {}: {}", tagFqn, e.getMessage()); + return null; + } } private UUID resolveGlossaryTermId(String termFqn, JsonNode tagLabel) { @@ -347,14 +464,14 @@ public class RdfPropertyMapper { } try { - UUID resolvedTermId = tryResolveGlossaryTermIdFromHref(tagLabel); + UUID resolvedTermId = tryResolveUuidFromHref(tagLabel); if (resolvedTermId != null) { glossaryTermIdCache.put(termFqn, resolvedTermId); return resolvedTermId; } GlossaryTerm term = - Entity.getEntityByName(Entity.GLOSSARY_TERM, termFqn, "id", Include.NON_DELETED, false); + Entity.getEntityByName(Entity.GLOSSARY_TERM, termFqn, "", Include.NON_DELETED, false); UUID termId = term != null ? term.getId() : null; if (termId != null) { glossaryTermIdCache.put(termFqn, termId); @@ -366,7 +483,7 @@ public class RdfPropertyMapper { } } - private UUID tryResolveGlossaryTermIdFromHref(JsonNode tagLabel) { + private UUID tryResolveUuidFromHref(JsonNode tagLabel) { if (tagLabel == null || !tagLabel.has("href")) { return null; } @@ -403,9 +520,9 @@ public class RdfPropertyMapper { private void addStructuredProperty( String fieldName, JsonNode value, Resource entityResource, Model model) { switch (fieldName) { - case "votes" -> addVotes(value, entityResource, model); case "lifeCycle" -> addLifeCycle(value, entityResource, model); case "extension" -> addExtension(value, entityResource, model); + case "certification" -> addCertification(value, entityResource, model); default -> LOG.warn("Unknown structured property: {}", fieldName); } } @@ -424,37 +541,56 @@ public class RdfPropertyMapper { } /** - * Converts Votes to structured RDF triples. Enables SPARQL queries like: "Find all entities with - * more than 10 upvotes" without exposing individual voter identities as graph edges. + * Converts AssetCertification into a real RDF link. Emits {@code asset om:hasCertification} to + * the resolved tag resource (canonical {@code entity/tag/{uuid}} when the tag can be looked up, + * falling back to a synthetic {@code tag/{fqn}} URI only if lookup fails), plus the + * certification level (last FQN segment) and the applied/expiry timestamps as typed literals — + * instead of dumping the whole JSON as a string literal under {@code om:certification}. */ - private void addVotes(JsonNode votes, Resource entityResource, Model model) { - if (votes == null || votes.isNull()) { + private void addCertification(JsonNode certification, Resource entityResource, Model model) { + if (certification == null || certification.isNull() || !certification.has("tagLabel")) { return; } + JsonNode tagLabel = certification.get("tagLabel"); + if (!tagLabel.has("tagFQN")) { + return; + } + String tagFqn = tagLabel.get("tagFQN").asText(); + String source = tagLabel.has("source") ? tagLabel.get("source").asText() : "Classification"; + boolean isGlossary = "Glossary".equalsIgnoreCase(source); + Resource tagResource = resolveTagResource(tagFqn, source, tagLabel, model); - // Create a resource for votes - String votesUri = baseUri + "votes/" + entityResource.getLocalName(); - Resource votesNode = model.createResource(votesUri); - - // Link entity to votes - Property hasVotes = model.createProperty(OM_NS, "hasVotes"); - entityResource.addProperty(hasVotes, votesNode); - - // Add type - votesNode.addProperty(RDF.type, model.createResource(OM_NS + "Votes")); - - // Add upVotes count - if (votes.has("upVotes")) { - votesNode.addProperty( - model.createProperty(OM_NS, "upVotes"), - model.createTypedLiteral(votes.get("upVotes").asInt())); + // Mirror addTagLabel's typing so SPARQL queries can find certification + // targets the same way they find any other tag/glossary term — by source + // (glossaryTerm vs tag), with skos:Concept on glossary-backed targets and + // om:Tag on classification-backed ones. + if (isGlossary) { + tagResource.addProperty(RDF.type, createTypeResource("glossaryTerm", model)); + tagResource.addProperty(RDF.type, model.createResource(SKOS.getURI() + "Concept")); + } else { + tagResource.addProperty(RDF.type, createTypeResource("tag", model)); + tagResource.addProperty(RDF.type, model.createResource(OM_NS + "Tag")); + } + tagResource.addProperty(model.createProperty(OM_NS, "tagFQN"), tagFqn); + tagResource.addProperty(model.createProperty(OM_NS, "tagSource"), source); + if (tagLabel.has("name")) { + tagResource.addProperty(RDFS.label, tagLabel.get("name").asText()); } - // Add downVotes count - if (votes.has("downVotes")) { - votesNode.addProperty( - model.createProperty(OM_NS, "downVotes"), - model.createTypedLiteral(votes.get("downVotes").asInt())); + entityResource.addProperty(model.createProperty(OM_NS, "hasCertification"), tagResource); + String level = extractCertificationLevel(tagFqn); + if (level != null) { + entityResource.addProperty(model.createProperty(OM_NS, "certificationLevel"), level); + } + if (certification.has("appliedDate") && certification.get("appliedDate").isNumber()) { + entityResource.addProperty( + model.createProperty(OM_NS, "certificationAppliedAt"), + model.createTypedLiteral(certification.get("appliedDate").asLong())); + } + if (certification.has("expiryDate") && certification.get("expiryDate").isNumber()) { + entityResource.addProperty( + model.createProperty(OM_NS, "certificationExpiresAt"), + model.createTypedLiteral(certification.get("expiryDate").asLong())); } } @@ -698,7 +834,7 @@ public class RdfPropertyMapper { relatedEntityResource = model.createResource(relatedEntityUri); // Add type to the related entity - relatedEntityResource.addProperty(RDF.type, model.createResource(getRdfType(entityType))); + relatedEntityResource.addProperty(RDF.type, createTypeResource(entityType, model)); // Add name if available if (relatedEntityNode.has("name")) { @@ -796,7 +932,7 @@ public class RdfPropertyMapper { model.createProperty(PROV_NS, "wasGeneratedBy"), pipelineResource); // Add pipeline type - pipelineResource.addProperty(RDF.type, model.createResource(getRdfType(pipelineType))); + pipelineResource.addProperty(RDF.type, createTypeResource(pipelineType, model)); } // Add column lineage @@ -895,8 +1031,7 @@ public class RdfPropertyMapper { entityResource.addProperty(hasLineageNode, nodeResource); // Add type to the node - nodeResource.addProperty( - RDF.type, model.createResource(getRdfType(node.get("type").asText()))); + nodeResource.addProperty(RDF.type, createTypeResource(node.get("type").asText(), model)); // Add name if available if (node.has("name")) { @@ -976,11 +1111,24 @@ public class RdfPropertyMapper { } private void addStandardProperties(EntityInterface entity, Resource resource, Model model) { - // Add timestamps + // Add timestamps. updatedAt is epoch millis on the entity; convert to an + // ISO-8601 instant before tagging it as xsd:dateTime so the lexical form is + // valid (a long literal would be a malformed xsd:dateTime). if (entity.getUpdatedAt() != null) { + String iso = java.time.Instant.ofEpochMilli(entity.getUpdatedAt()).toString(); resource.addProperty( model.createProperty(DCT_NS, "modified"), - model.createTypedLiteral(entity.getUpdatedAt().toString(), XSDDatatype.XSDdateTime)); + model.createTypedLiteral(iso, XSDDatatype.XSDdateTime)); + } + + // PROV-O soft-delete: when the entity is marked deleted, expose its updatedAt + // as the invalidation timestamp so timeline-aware queries can filter on it. + if (Boolean.TRUE.equals(entity.getDeleted()) && entity.getUpdatedAt() != null) { + resource.addProperty( + model.createProperty(PROV_NS, "invalidatedAtTime"), + model.createTypedLiteral( + java.time.Instant.ofEpochMilli(entity.getUpdatedAt()).toString(), + XSDDatatype.XSDdateTime)); } // Add version @@ -1017,6 +1165,7 @@ public class RdfPropertyMapper { case "skos" -> SKOS.getURI(); case "void" -> VOID_NS; case "csvw" -> CSVW_NS; + case "dprod" -> DPROD_NS; default -> null; }; } @@ -1063,4 +1212,23 @@ public class RdfPropertyMapper { private String getRdfType(String entityType) { return RdfUtils.getRdfType(entityType); } + + private Resource createTypeResource(String entityType, Model model) { + String curieOrUri = getRdfType(entityType); + if (curieOrUri == null || curieOrUri.isEmpty()) { + return model.createResource(); + } + if (curieOrUri.startsWith("http://") || curieOrUri.startsWith("https://")) { + return model.createResource(curieOrUri); + } + int separatorIndex = curieOrUri.indexOf(':'); + if (separatorIndex <= 0 || separatorIndex == curieOrUri.length() - 1) { + return model.createResource(curieOrUri); + } + String namespace = getNamespace(curieOrUri.substring(0, separatorIndex)); + if (namespace == null) { + return model.createResource(curieOrUri); + } + return model.createResource(namespace + curieOrUri.substring(separatorIndex + 1)); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinatorTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinatorTest.java index 243b085893a..89567ad3898 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinatorTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinatorTest.java @@ -413,4 +413,57 @@ class DistributedRdfIndexCoordinatorTest { assertFalse(coordinator.hasClaimableWork(jobId)); } + + @Test + @SuppressWarnings("unchecked") + void getPartitionStartCursorReturnsCachedValue() throws Exception { + UUID jobId = UUID.randomUUID(); + java.lang.reflect.Field cacheField = + DistributedRdfIndexCoordinator.class.getDeclaredField("partitionStartCursors"); + cacheField.setAccessible(true); + Map>> cache = + (Map>>) cacheField.get(coordinator); + Map cursors = new java.util.HashMap<>(); + cursors.put(100L, "encoded-cursor-100"); + cursors.put(200L, "encoded-cursor-200"); + Map> entityMap = new java.util.HashMap<>(); + entityMap.put("table", cursors); + cache.put(jobId, entityMap); + + assertEquals("encoded-cursor-100", coordinator.getPartitionStartCursor(jobId, "table", 100L)); + assertEquals("encoded-cursor-200", coordinator.getPartitionStartCursor(jobId, "table", 200L)); + assertNull(coordinator.getPartitionStartCursor(jobId, "table", 999L)); + assertNull(coordinator.getPartitionStartCursor(jobId, "dashboard", 100L)); + assertNull(coordinator.getPartitionStartCursor(UUID.randomUUID(), "table", 100L)); + assertNull(coordinator.getPartitionStartCursor(jobId, "table", 0L)); + assertNull(coordinator.getPartitionStartCursor(null, "table", 100L)); + } + + @Test + void cancelInFlightPartitionsDelegatesToDao() { + when(partitionDAO.cancelInFlightPartitions(anyString(), anyLong())).thenReturn(7); + int cancelled = coordinator.cancelInFlightPartitions(UUID.randomUUID()); + assertEquals(7, cancelled); + verify(partitionDAO, times(1)).cancelInFlightPartitions(anyString(), anyLong()); + } + + @Test + void claimNextPartitionRespectsInFlightBackpressure() { + when(partitionDAO.countInFlightPartitionsForServer(anyString(), eq(TEST_SERVER_ID))) + .thenReturn(5); + + assertNull(coordinator.claimNextPartition(UUID.randomUUID(), TEST_SERVER_ID)); + verify(partitionDAO, never()).claimNextPartitionAtomic(anyString(), anyString(), anyLong()); + } + + @Test + void claimNextPartitionProceedsWhenUnderInFlightCap() { + when(partitionDAO.countInFlightPartitionsForServer(anyString(), eq(TEST_SERVER_ID))) + .thenReturn(2); + when(partitionDAO.claimNextPartitionAtomic(anyString(), anyString(), anyLong())).thenReturn(0); + + coordinator.claimNextPartition(UUID.randomUUID(), TEST_SERVER_ID); + verify(partitionDAO, times(1)) + .claimNextPartitionAtomic(anyString(), eq(TEST_SERVER_ID), anyLong()); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfEntityCompletionTrackerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfEntityCompletionTrackerTest.java new file mode 100644 index 00000000000..22b294fe23b --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfEntityCompletionTrackerTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.apps.bundles.rdf.distributed; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +class RdfEntityCompletionTrackerTest { + + @Test + void firesCallbackOnceWhenAllPartitionsComplete() { + UUID jobId = UUID.randomUUID(); + RdfEntityCompletionTracker tracker = new RdfEntityCompletionTracker(jobId); + tracker.initializeEntity("table", 3); + + AtomicInteger callbackCount = new AtomicInteger(); + AtomicReference capturedSuccess = new AtomicReference<>(); + tracker.setOnEntityComplete( + (type, success) -> { + callbackCount.incrementAndGet(); + capturedSuccess.set(success); + }); + + tracker.recordPartitionComplete("table", false); + tracker.recordPartitionComplete("table", false); + assertEquals(0, callbackCount.get(), "callback fires only after all partitions complete"); + + tracker.recordPartitionComplete("table", false); + assertEquals(1, callbackCount.get()); + assertTrue(capturedSuccess.get()); + assertTrue(tracker.isPromoted("table")); + + // Extra completions never re-fire the callback + tracker.recordPartitionComplete("table", false); + assertEquals(1, callbackCount.get()); + } + + @Test + void capturesFailureFromAnyPartition() { + RdfEntityCompletionTracker tracker = new RdfEntityCompletionTracker(UUID.randomUUID()); + tracker.initializeEntity("dashboard", 2); + + AtomicReference capturedSuccess = new AtomicReference<>(); + tracker.setOnEntityComplete((type, success) -> capturedSuccess.set(success)); + + tracker.recordPartitionComplete("dashboard", false); + tracker.recordPartitionComplete("dashboard", true); + assertFalse(capturedSuccess.get()); + } + + @Test + void getStatusReportsAccurateCounts() { + RdfEntityCompletionTracker tracker = new RdfEntityCompletionTracker(UUID.randomUUID()); + tracker.initializeEntity("topic", 5); + tracker.recordPartitionComplete("topic", false); + tracker.recordPartitionComplete("topic", true); + + RdfEntityCompletionTracker.EntityCompletionStatus status = tracker.getStatus("topic"); + assertNotNull(status); + assertEquals(5, status.totalPartitions()); + assertEquals(2, status.completedPartitions()); + assertEquals(1, status.failedPartitions()); + assertFalse(status.isComplete()); + assertTrue(status.hasFailures()); + } + + @Test + void untrackedEntityIsIgnored() { + RdfEntityCompletionTracker tracker = new RdfEntityCompletionTracker(UUID.randomUUID()); + AtomicInteger callbackCount = new AtomicInteger(); + tracker.setOnEntityComplete((type, success) -> callbackCount.incrementAndGet()); + + tracker.recordPartitionComplete("never-initialized", false); + assertEquals(0, callbackCount.get()); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorkerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorkerTest.java index aa8a3d2904f..90fe0e79631 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorkerTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPartitionWorkerTest.java @@ -41,6 +41,8 @@ class RdfPartitionWorkerTest { void initializeKeysetCursorHandlesRepositoryBackedEntities() throws Exception { @SuppressWarnings("unchecked") EntityRepository repository = mock(EntityRepository.class); + RdfIndexPartition partition = + RdfIndexPartition.builder().jobId(java.util.UUID.randomUUID()).entityType("table").build(); try (MockedStatic entityMock = mockStatic(Entity.class)) { entityMock.when(() -> Entity.getEntityRepository("table")).thenReturn(repository); @@ -50,7 +52,8 @@ class RdfPartitionWorkerTest { invokePrivate( worker, "initializeKeysetCursor", - new Class[] {String.class, long.class}, + new Class[] {RdfIndexPartition.class, String.class, long.class}, + partition, "table", 0L)); assertEquals( @@ -58,7 +61,8 @@ class RdfPartitionWorkerTest { invokePrivate( worker, "initializeKeysetCursor", - new Class[] {String.class, long.class}, + new Class[] {RdfIndexPartition.class, String.class, long.class}, + partition, "table", 5L)); } @@ -66,6 +70,8 @@ class RdfPartitionWorkerTest { @Test void initializeKeysetCursorRejectsOffsetsBeyondSupportedRange() { + RdfIndexPartition partition = + RdfIndexPartition.builder().jobId(java.util.UUID.randomUUID()).entityType("table").build(); IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, @@ -73,7 +79,8 @@ class RdfPartitionWorkerTest { invokePrivate( worker, "initializeKeysetCursor", - new Class[] {String.class, long.class}, + new Class[] {RdfIndexPartition.class, String.class, long.class}, + partition, "table", (long) Integer.MAX_VALUE + 2L)); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPollingJobNotifierTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPollingJobNotifierTest.java new file mode 100644 index 00000000000..075f05e78bc --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/distributed/RdfPollingJobNotifierTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.apps.bundles.rdf.distributed; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; +import org.openmetadata.service.jdbi3.CollectionDAO; + +class RdfPollingJobNotifierTest { + + @Test + void startStopFlipsRunningFlag() { + CollectionDAO collectionDAO = mock(CollectionDAO.class); + CollectionDAO.RdfIndexJobDAO jobDAO = mock(CollectionDAO.RdfIndexJobDAO.class); + when(collectionDAO.rdfIndexJobDAO()).thenReturn(jobDAO); + when(jobDAO.getRunningJobIds()).thenReturn(java.util.List.of()); + + RdfPollingJobNotifier notifier = new RdfPollingJobNotifier(collectionDAO, "test-server-1234"); + assertFalse(notifier.isRunning()); + + notifier.start(); + assertTrue(notifier.isRunning()); + + notifier.stop(); + assertFalse(notifier.isRunning()); + } + + @Test + void doubleStartIsSafe() { + CollectionDAO collectionDAO = mock(CollectionDAO.class); + CollectionDAO.RdfIndexJobDAO jobDAO = mock(CollectionDAO.RdfIndexJobDAO.class); + when(collectionDAO.rdfIndexJobDAO()).thenReturn(jobDAO); + when(jobDAO.getRunningJobIds()).thenReturn(java.util.List.of()); + + RdfPollingJobNotifier notifier = new RdfPollingJobNotifier(collectionDAO, "test-server-1234"); + notifier.start(); + notifier.start(); // no-op, no exception + assertTrue(notifier.isRunning()); + notifier.stop(); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfParserHelpersTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfParserHelpersTest.java new file mode 100644 index 00000000000..866f6c45b59 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfParserHelpersTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.rdf; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.openmetadata.schema.api.configuration.rdf.RdfConfiguration; + +/** + * Unit coverage for the RDF parser helpers that drive the lineage edge + * canonicalization in {@code parseEntityGraphEdgesFromResults}. These methods + * are private; we reach them via reflection rather than re-running the full + * SPARQL → API path so the assertions stay close to the logic under review. + */ +class RdfParserHelpersTest { + + private static RdfRepository repo; + private static Class edgeInfoClass; + private static Constructor edgeInfoCtor; + + @BeforeAll + static void setUp() throws Exception { + RdfConfiguration cfg = new RdfConfiguration(); + cfg.setEnabled(false); + Constructor ctor = + RdfRepository.class.getDeclaredConstructor(RdfConfiguration.class); + ctor.setAccessible(true); + repo = ctor.newInstance(cfg); + Field instance = RdfRepository.class.getDeclaredField("INSTANCE"); + instance.setAccessible(true); + instance.set(null, repo); + + edgeInfoClass = Class.forName("org.openmetadata.service.rdf.RdfRepository$EdgeInfo"); + edgeInfoCtor = + edgeInfoClass.getDeclaredConstructor( + String.class, String.class, String.class, String.class); + edgeInfoCtor.setAccessible(true); + } + + @AfterAll + static void tearDown() { + RdfRepository.reset(); + } + + @Test + void isReverseDirectionPredicateRecognizesProvCausationPredicates() throws Exception { + Method m = privateMethod("isReverseDirectionPredicate", String.class); + assertTrue((boolean) m.invoke(repo, "http://www.w3.org/ns/prov#wasDerivedFrom")); + assertTrue((boolean) m.invoke(repo, "http://www.w3.org/ns/prov#wasInfluencedBy")); + assertFalse((boolean) m.invoke(repo, "https://open-metadata.org/ontology/UPSTREAM")); + assertFalse((boolean) m.invoke(repo, "http://www.w3.org/ns/prov#wasGeneratedBy")); + assertFalse((boolean) m.invoke(repo, "")); + assertFalse((boolean) m.invoke(repo, (Object) null)); + } + + @Test + void forwardEquivalentPredicateMapsBothCausationPredicatesToUpstream() throws Exception { + Method m = privateMethod("forwardEquivalentPredicate", String.class); + String upstream = "https://open-metadata.org/ontology/UPSTREAM"; + assertEquals(upstream, m.invoke(repo, "http://www.w3.org/ns/prov#wasDerivedFrom")); + // wasInfluencedBy must also collapse to UPSTREAM (not a non-existent DOWNSTREAM URI), + // so dedup against an existing UPSTREAM edge still works. + assertEquals(upstream, m.invoke(repo, "http://www.w3.org/ns/prov#wasInfluencedBy")); + // Non-reverse predicates pass through unchanged so non-lineage edges aren't rewritten. + String unrelated = "https://open-metadata.org/ontology/hasOwner"; + assertEquals(unrelated, m.invoke(repo, unrelated)); + } + + @Test + void relativeRelationLabelFlipsForOutgoingFocalEdge() throws Exception { + String focal = "https://open-metadata.org/entity/table/focal-uuid"; + String other = "https://open-metadata.org/entity/table/other-uuid"; + + // Outgoing edge from focal: focal → other where focal is the upstream of other. + // From focal's perspective, other is downstream. + Object outgoing = edgeInfoCtor.newInstance(focal, other, "upstream", "om:UPSTREAM"); + assertEquals("downstream", invokeRelativeLabel(outgoing, focal)); + + // Incoming edge to focal: other → focal where other is the upstream of focal. + // From focal's perspective, other is upstream. + Object incoming = edgeInfoCtor.newInstance(other, focal, "upstream", "om:UPSTREAM"); + assertEquals("upstream", invokeRelativeLabel(incoming, focal)); + } + + @Test + void relativeRelationLabelLeavesNonFocalEdgesUntouched() throws Exception { + String focal = "https://open-metadata.org/entity/table/focal-uuid"; + String a = "https://open-metadata.org/entity/table/a"; + String b = "https://open-metadata.org/entity/table/b"; + + // Multi-hop edge that doesn't touch the focal: keep raw relation label. + Object edge = edgeInfoCtor.newInstance(a, b, "upstream", "om:UPSTREAM"); + assertEquals("upstream", invokeRelativeLabel(edge, focal)); + } + + @Test + void relativeRelationLabelLeavesNonLineageRelationsAlone() throws Exception { + String focal = "https://open-metadata.org/entity/table/focal-uuid"; + String other = "https://open-metadata.org/entity/user/owner-uuid"; + + Object edge = edgeInfoCtor.newInstance(focal, other, "ownedBy", "om:ownedBy"); + assertEquals("ownedBy", invokeRelativeLabel(edge, focal)); + } + + @Test + void relativeRelationLabelHandlesNullFocal() throws Exception { + String a = "https://open-metadata.org/entity/table/a"; + String b = "https://open-metadata.org/entity/table/b"; + Object edge = edgeInfoCtor.newInstance(a, b, "upstream", "om:UPSTREAM"); + assertEquals("upstream", invokeRelativeLabel(edge, null)); + } + + private static Method privateMethod(String name, Class... params) throws Exception { + Method m = RdfRepository.class.getDeclaredMethod(name, params); + m.setAccessible(true); + return m; + } + + private static String invokeRelativeLabel(Object edgeInfo, String focalUri) throws Exception { + Method m = + RdfRepository.class.getDeclaredMethod("relativeRelationLabel", edgeInfoClass, String.class); + m.setAccessible(true); + return (String) m.invoke(repo, edgeInfo, focalUri); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfPropertyMapperTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfPropertyMapperTest.java index b59adc5e5ca..ca55bee5311 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfPropertyMapperTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfPropertyMapperTest.java @@ -17,7 +17,6 @@ import org.apache.jena.rdf.model.ModelFactory; import org.apache.jena.rdf.model.Property; import org.apache.jena.rdf.model.RDFList; import org.apache.jena.rdf.model.Resource; -import org.apache.jena.rdf.model.Statement; import org.apache.jena.rdf.model.StmtIterator; import org.apache.jena.vocabulary.RDF; import org.apache.jena.vocabulary.RDFS; @@ -105,56 +104,26 @@ class RdfPropertyMapperTest { class VotesTests { @Test - @DisplayName("Votes should keep counts but omit voter relationship edges") - void testVotesStructured() throws Exception { + @DisplayName("Votes are ignored during RDF field processing (audit/helper data)") + void testVotesAreIgnored() throws Exception { ObjectNode votes = objectMapper.createObjectNode(); votes.put("upVotes", 10); votes.put("downVotes", 2); - ArrayNode upVoters = objectMapper.createArrayNode(); - ObjectNode voter = objectMapper.createObjectNode(); - voter.put("id", UUID.randomUUID().toString()); - voter.put("type", "user"); - voter.put("name", "test_user"); - upVoters.add(voter); - votes.set("upVoters", upVoters); + ObjectNode entityJson = objectMapper.createObjectNode(); + entityJson.set("votes", votes); - java.lang.reflect.Method method = - RdfPropertyMapper.class.getDeclaredMethod( - "addVotes", JsonNode.class, Resource.class, Model.class); - method.setAccessible(true); - method.invoke(propertyMapper, votes, entityResource, model); + invokePrivate( + "processContextMappings", + new Class[] {Map.class, JsonNode.class, Resource.class, Model.class}, + Map.of("votes", Map.of("@id", "om:hasVotes", "@type", "@json")), + entityJson, + entityResource, + model); - // Verify structured RDF was created - Property hasVotes = model.createProperty(OM_NS, "hasVotes"); - assertTrue(model.contains(entityResource, hasVotes), "Entity should have hasVotes property"); - - Resource votesResource = - model.listObjectsOfProperty(entityResource, hasVotes).next().asResource(); - - // Verify type - assertTrue( - model.contains(votesResource, RDF.type, model.createResource(OM_NS + "Votes")), - "Votes should have correct type"); - - // Verify upVotes is stored as integer - Property upVotesProp = model.createProperty(OM_NS, "upVotes"); - assertTrue(model.contains(votesResource, upVotesProp), "Votes should have upVotes"); - Statement stmt = model.getProperty(votesResource, upVotesProp); - assertEquals(10, stmt.getInt(), "upVotes should be 10"); - - // Verify downVotes is stored as integer - Property downVotesProp = model.createProperty(OM_NS, "downVotes"); - assertTrue(model.contains(votesResource, downVotesProp), "Votes should have downVotes"); - stmt = model.getProperty(votesResource, downVotesProp); - assertEquals(2, stmt.getInt(), "downVotes should be 2"); - - // Verify individual voter references are not stored as graph edges - Property upVotersProp = model.createProperty(OM_NS, "upVoters"); - assertFalse(model.contains(votesResource, upVotersProp), "Votes should not expose upVoters"); assertFalse( - model.contains(votesResource, model.createProperty(OM_NS, "downVoters")), - "Votes should not expose downVoters"); + model.contains(entityResource, model.createProperty(OM_NS, "hasVotes")), + "Votes helper nodes should not be emitted into RDF"); } } @@ -742,8 +711,8 @@ class RdfPropertyMapperTest { } @Test - @DisplayName("container, votes, and extension helpers should cover remaining value branches") - void testContainerVotesAndExtensionHelpersCoverRemainingBranches() throws Exception { + @DisplayName("container and extension helpers should cover remaining value branches") + void testContainerAndExtensionHelpersCoverRemainingBranches() throws Exception { ArrayNode listOfReferences = objectMapper.createArrayNode(); UUID upstreamId = UUID.randomUUID(); listOfReferences.add(entityReferenceNode("table", upstreamId.toString(), "orders", null)); @@ -766,30 +735,6 @@ class RdfPropertyMapperTest { .map(node -> node.asResource().getURI()) .toList()); - ObjectNode votes = objectMapper.createObjectNode(); - votes.put("upVotes", 2); - ArrayNode downVoters = objectMapper.createArrayNode(); - UUID reviewerId = UUID.randomUUID(); - downVoters.add(entityReferenceNode("user", reviewerId.toString(), "reviewer", null)); - votes.set("downVoters", downVoters); - invokePrivate( - "addVotes", - new Class[] {JsonNode.class, Resource.class, Model.class}, - votes, - entityResource, - model); - Resource votesResource = - model - .listObjectsOfProperty(entityResource, model.createProperty(OM_NS, "hasVotes")) - .next() - .asResource(); - assertFalse( - model.contains( - votesResource, - model.createProperty(OM_NS, "downVoters"), - model.createResource(BASE_URI + "entity/user/" + reviewerId)), - "Vote helpers should not emit voter references"); - ObjectNode extension = objectMapper.createObjectNode(); extension.put("threshold", 2.5); extension.set("settings", objectMapper.createObjectNode().put("env", "prod")); @@ -841,7 +786,9 @@ class RdfPropertyMapperTest { votes, entityResource, model); - assertTrue(model.contains(entityResource, model.createProperty(OM_NS, "hasVotes"))); + assertFalse( + model.contains(entityResource, model.createProperty(OM_NS, "hasVotes")), + "votes is ignored by the structured-property dispatch"); ObjectNode lifeCycle = objectMapper.createObjectNode(); lifeCycle.set( diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfUtilsTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfUtilsTest.java new file mode 100644 index 00000000000..767f1e631d2 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfUtilsTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.rdf; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class RdfUtilsTest { + + @ParameterizedTest + @CsvSource({ + "table,prov:Entity", + "TABLE,prov:Entity", + "dashboard,prov:Entity", + "topic,prov:Entity", + "glossaryTerm,prov:Entity", + "dataProduct,prov:Entity", + "domain,prov:Entity", + "pipeline,prov:Activity", + "ingestionPipeline,prov:Activity", + "storedProcedure,prov:Activity", + "dbtPipeline,prov:Activity", + "user,prov:Agent", + "team,prov:Agent", + "bot,prov:Agent", + "role,prov:Agent" + }) + void getProvTypeMapsKnownEntities(String entityType, String expectedProv) { + assertEquals(expectedProv, RdfUtils.getProvType(entityType)); + } + + @ParameterizedTest + @CsvSource({"databaseService", "policy", "classification", "tagCategory"}) + void getProvTypeReturnsNullForNonProvEntities(String entityType) { + assertNull(RdfUtils.getProvType(entityType)); + } + + @org.junit.jupiter.api.Test + void getProvTypeHandlesNullAndEmpty() { + assertNull(RdfUtils.getProvType(null)); + assertNull(RdfUtils.getProvType("")); + } +} diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx index 9a10fe47f52..0468207787c 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunsHistory/AppRunsHistory.component.tsx @@ -123,18 +123,15 @@ const AppRunsHistory = forwardRef( return appRunsHistoryData; } - return [ - { - id: `${appData.id ?? appData.name ?? 'app'}-current-config`, - appId: appData.id, - appName: appData.name, - config: appData.appConfiguration ?? {}, - isSynthetic: true, - runType: 'CurrentConfig', - startTime: appData.updatedAt, - timestamp: appData.updatedAt, - }, - ]; + const syntheticRecord: AppRunRecordWithId = { + id: `${appData.id ?? appData.name ?? 'app'}-current-config`, + appId: appData.id, + appName: appData.name, + config: (appData.appConfiguration as { [key: string]: unknown }) ?? {}, + isSynthetic: true, + }; + + return [syntheticRecord]; }, [appData, appRunsHistoryData, isExternalApp]); const handleRowExpandable = useCallback( @@ -210,7 +207,8 @@ const AppRunsHistory = forwardRef( onClick={() => showAppRunConfig(record)}> {t('label.config')} - {record.status !== Status.Success && + {!record.isSynthetic && + record.status !== Status.Success && record.status !== Status.Failed && record.status !== Status.Stopped && record.status !== Status.Completed && @@ -244,6 +242,10 @@ const AppRunsHistory = forwardRef( dataIndex: 'timestamp', key: 'timestamp', render: (_, record) => { + if (record.isSynthetic) { + return NO_DATA_PLACEHOLDER; + } + return isExternalApp ? formatDateTime(record.startTime) : formatDateTime(record.timestamp); @@ -253,8 +255,12 @@ const AppRunsHistory = forwardRef( title: t('label.run-type'), dataIndex: 'runType', key: 'runType', - render: (runType) => ( - {runType ?? NO_DATA_PLACEHOLDER} + render: (runType, record) => ( + + {record.isSynthetic + ? NO_DATA_PLACEHOLDER + : runType ?? NO_DATA_PLACEHOLDER} + ), }, { @@ -262,6 +268,10 @@ const AppRunsHistory = forwardRef( dataIndex: 'executionTime', key: 'executionTime', render: (_, record: AppRunRecordWithId) => { + if (record.isSynthetic) { + return NO_DATA_PLACEHOLDER; + } + if (isExternalApp && record.executionTime) { return formatDurationToHHMMSS(record.executionTime); }