mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
fix(lineage): prevent pipeline annotation inheritance in service/domain/dataProduct lineage and add pipeline service edges
Bug #1: Service nodes (e.g., DatabaseService, MessagingService) were incorrectly appearing in entity-level lineage views. Root cause: getOrCreateLineageDetails() in addServiceLineage(), addDomainLineage(), and addDataProductsLineage() was copying the pipeline annotation from entity-level LineageDetails to service/domain/dataProduct-level LineageDetails. This caused service entities to have upstreamLineage.pipeline.fqnHash set in their Elasticsearch documents, making them match the PIPELINE_AS_EDGE_KEY query during BFS traversal and incorrectly appear alongside actual data assets. Fix: add .withPipeline(null) on each service/domain/dataProduct LineageDetails object to strip the pipeline annotation before persisting. Bug #2: "By Service" view was empty when viewing lineage for pipeline entities that were stored as edge annotators (Case B: table → topic with pipeline=flink_pipeline in LineageDetails) rather than as actual nodes (Case A). Root cause: addServiceLineage() only created database_service → kafka_service edges but no edges involving flink_pipeline_service. Fix: add addPipelineServiceEdges() called from addServiceLineage() that creates fromService → pipelineService and pipelineService → toService edges when a pipeline annotation exists in the entity-level lineage details. Also add unit tests covering both fixes to prevent regression. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
7693a5b04b
commit
e6df7a6c62
2 changed files with 85 additions and 8 deletions
|
|
@ -230,15 +230,59 @@ public class LineageRepository {
|
|||
if (!shouldAddServiceLineage(fromEntity, toEntity)) {
|
||||
return;
|
||||
}
|
||||
// Add Service Level Lineage
|
||||
EntityReference fromService = fromEntity.getService();
|
||||
EntityReference toService = toEntity.getService();
|
||||
if (!fromService.getId().equals(toService.getId())) {
|
||||
LineageDetails serviceLineageDetails =
|
||||
getOrCreateLineageDetails(
|
||||
fromService.getId(), toService.getId(), entityLineageDetails, childRelationExists);
|
||||
fromService.getId(), toService.getId(), entityLineageDetails, childRelationExists)
|
||||
.withPipeline(null);
|
||||
insertLineage(fromService, toService, serviceLineageDetails);
|
||||
}
|
||||
addPipelineServiceEdges(fromService, toService, entityLineageDetails, childRelationExists);
|
||||
}
|
||||
|
||||
private void addPipelineServiceEdges(
|
||||
EntityReference fromService,
|
||||
EntityReference toService,
|
||||
LineageDetails entityLineageDetails,
|
||||
boolean childRelationExists) {
|
||||
EntityReference pipelineService = getPipelineService(entityLineageDetails);
|
||||
if (pipelineService == null) {
|
||||
return;
|
||||
}
|
||||
insertServiceEdgeIfDistinct(
|
||||
fromService, pipelineService, entityLineageDetails, childRelationExists);
|
||||
insertServiceEdgeIfDistinct(
|
||||
pipelineService, toService, entityLineageDetails, childRelationExists);
|
||||
}
|
||||
|
||||
private EntityReference getPipelineService(LineageDetails entityLineageDetails) {
|
||||
if (nullOrEmpty(entityLineageDetails.getPipeline())) {
|
||||
return null;
|
||||
}
|
||||
EntityReference pipelineRef = entityLineageDetails.getPipeline();
|
||||
if (!Entity.entityHasField(pipelineRef.getType(), FIELD_SERVICE)) {
|
||||
return null;
|
||||
}
|
||||
EntityInterface pipelineEntity =
|
||||
Entity.getEntity(pipelineRef.getType(), pipelineRef.getId(), FIELD_SERVICE, Include.ALL);
|
||||
return pipelineEntity.getService();
|
||||
}
|
||||
|
||||
private void insertServiceEdgeIfDistinct(
|
||||
EntityReference fromService,
|
||||
EntityReference toService,
|
||||
LineageDetails entityLineageDetails,
|
||||
boolean childRelationExists) {
|
||||
if (fromService.getId().equals(toService.getId())) {
|
||||
return;
|
||||
}
|
||||
LineageDetails serviceDetails =
|
||||
getOrCreateLineageDetails(
|
||||
fromService.getId(), toService.getId(), entityLineageDetails, childRelationExists)
|
||||
.withPipeline(null);
|
||||
insertLineage(fromService, toService, serviceDetails);
|
||||
}
|
||||
|
||||
private void addDomainLineage(
|
||||
|
|
@ -259,7 +303,11 @@ public class LineageRepository {
|
|||
if (!fromDomain.getId().equals(toDomain.getId())) {
|
||||
LineageDetails domainLineageDetails =
|
||||
getOrCreateLineageDetails(
|
||||
fromDomain.getId(), toDomain.getId(), entityLineageDetails, childRelationExists);
|
||||
fromDomain.getId(),
|
||||
toDomain.getId(),
|
||||
entityLineageDetails,
|
||||
childRelationExists)
|
||||
.withPipeline(null);
|
||||
insertLineage(fromDomain, toDomain, domainLineageDetails);
|
||||
}
|
||||
}
|
||||
|
|
@ -281,11 +329,11 @@ public class LineageRepository {
|
|||
if (!fromEntityRef.getId().equals(toEntityRef.getId())) {
|
||||
LineageDetails dataProductsLineageDetails =
|
||||
getOrCreateLineageDetails(
|
||||
fromEntityRef.getId(),
|
||||
toEntityRef.getId(),
|
||||
entityLineageDetails,
|
||||
childRelationExists);
|
||||
|
||||
fromEntityRef.getId(),
|
||||
toEntityRef.getId(),
|
||||
entityLineageDetails,
|
||||
childRelationExists)
|
||||
.withPipeline(null);
|
||||
insertLineage(fromEntityRef, toEntityRef, dataProductsLineageDetails);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -465,6 +465,35 @@ class LineageRepositoryTest {
|
|||
details.getColumnsLineage().get(0).getFromColumns().get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBuildEntityLineageData_NullPipeline_ProducesNoPipelineInEsData() {
|
||||
EntityReference from =
|
||||
new EntityReference().withId(UUID.randomUUID()).withFullyQualifiedName("db_service");
|
||||
EntityReference to =
|
||||
new EntityReference().withId(UUID.randomUUID()).withFullyQualifiedName("kafka_service");
|
||||
LineageDetails details = new LineageDetails().withPipeline(null);
|
||||
|
||||
var esData = LineageRepository.buildEntityLineageData(from, to, details);
|
||||
|
||||
assertNull(esData.getPipeline(), "Service-level lineage must not inherit pipeline annotation");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLineageDetails_WithPipelineNull_PipelineFieldIsNull() {
|
||||
EntityReference pipelineRef =
|
||||
new EntityReference()
|
||||
.withId(UUID.randomUUID())
|
||||
.withType("pipeline")
|
||||
.withFullyQualifiedName("Flink.my_pipeline");
|
||||
LineageDetails details = new LineageDetails().withPipeline(pipelineRef);
|
||||
|
||||
LineageDetails stripped = details.withPipeline(null);
|
||||
|
||||
assertNull(
|
||||
stripped.getPipeline(),
|
||||
"After withPipeline(null), service-level lineage must have no pipeline");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDeleteLineageBySource_OpenLineage_UsesPipelinePath() {
|
||||
CollectionDAO dao = mock(CollectionDAO.class);
|
||||
|
|
|
|||
Loading…
Reference in a new issue