fix(lineage): prevent pipeline annotation inheritance in service/domain/dataProduct lineage and add pipeline service edges

Bug #1: Service nodes (e.g., DatabaseService, MessagingService) were incorrectly appearing in
entity-level lineage views. Root cause: getOrCreateLineageDetails() in addServiceLineage(),
addDomainLineage(), and addDataProductsLineage() was copying the pipeline annotation from
entity-level LineageDetails to service/domain/dataProduct-level LineageDetails. This caused
service entities to have upstreamLineage.pipeline.fqnHash set in their Elasticsearch documents,
making them match the PIPELINE_AS_EDGE_KEY query during BFS traversal and incorrectly appear
alongside actual data assets. Fix: add .withPipeline(null) on each service/domain/dataProduct
LineageDetails object to strip the pipeline annotation before persisting.

Bug #2: "By Service" view was empty when viewing lineage for pipeline entities that were stored
as edge annotators (Case B: table → topic with pipeline=flink_pipeline in LineageDetails) rather
than as actual nodes (Case A). Root cause: addServiceLineage() only created database_service →
kafka_service edges but no edges involving flink_pipeline_service. Fix: add addPipelineServiceEdges()
called from addServiceLineage() that creates fromService → pipelineService and pipelineService →
toService edges when a pipeline annotation exists in the entity-level lineage details.

Also add unit tests covering both fixes to prevent regression.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
mohitdeuex 2026-04-11 00:03:30 +05:30
parent 7693a5b04b
commit e6df7a6c62
2 changed files with 85 additions and 8 deletions

View file

@ -230,15 +230,59 @@ public class LineageRepository {
if (!shouldAddServiceLineage(fromEntity, toEntity)) {
return;
}
// Add Service Level Lineage
EntityReference fromService = fromEntity.getService();
EntityReference toService = toEntity.getService();
if (!fromService.getId().equals(toService.getId())) {
LineageDetails serviceLineageDetails =
getOrCreateLineageDetails(
fromService.getId(), toService.getId(), entityLineageDetails, childRelationExists);
fromService.getId(), toService.getId(), entityLineageDetails, childRelationExists)
.withPipeline(null);
insertLineage(fromService, toService, serviceLineageDetails);
}
addPipelineServiceEdges(fromService, toService, entityLineageDetails, childRelationExists);
}
private void addPipelineServiceEdges(
EntityReference fromService,
EntityReference toService,
LineageDetails entityLineageDetails,
boolean childRelationExists) {
EntityReference pipelineService = getPipelineService(entityLineageDetails);
if (pipelineService == null) {
return;
}
insertServiceEdgeIfDistinct(
fromService, pipelineService, entityLineageDetails, childRelationExists);
insertServiceEdgeIfDistinct(
pipelineService, toService, entityLineageDetails, childRelationExists);
}
private EntityReference getPipelineService(LineageDetails entityLineageDetails) {
if (nullOrEmpty(entityLineageDetails.getPipeline())) {
return null;
}
EntityReference pipelineRef = entityLineageDetails.getPipeline();
if (!Entity.entityHasField(pipelineRef.getType(), FIELD_SERVICE)) {
return null;
}
EntityInterface pipelineEntity =
Entity.getEntity(pipelineRef.getType(), pipelineRef.getId(), FIELD_SERVICE, Include.ALL);
return pipelineEntity.getService();
}
private void insertServiceEdgeIfDistinct(
EntityReference fromService,
EntityReference toService,
LineageDetails entityLineageDetails,
boolean childRelationExists) {
if (fromService.getId().equals(toService.getId())) {
return;
}
LineageDetails serviceDetails =
getOrCreateLineageDetails(
fromService.getId(), toService.getId(), entityLineageDetails, childRelationExists)
.withPipeline(null);
insertLineage(fromService, toService, serviceDetails);
}
private void addDomainLineage(
@ -259,7 +303,11 @@ public class LineageRepository {
if (!fromDomain.getId().equals(toDomain.getId())) {
LineageDetails domainLineageDetails =
getOrCreateLineageDetails(
fromDomain.getId(), toDomain.getId(), entityLineageDetails, childRelationExists);
fromDomain.getId(),
toDomain.getId(),
entityLineageDetails,
childRelationExists)
.withPipeline(null);
insertLineage(fromDomain, toDomain, domainLineageDetails);
}
}
@ -281,11 +329,11 @@ public class LineageRepository {
if (!fromEntityRef.getId().equals(toEntityRef.getId())) {
LineageDetails dataProductsLineageDetails =
getOrCreateLineageDetails(
fromEntityRef.getId(),
toEntityRef.getId(),
entityLineageDetails,
childRelationExists);
fromEntityRef.getId(),
toEntityRef.getId(),
entityLineageDetails,
childRelationExists)
.withPipeline(null);
insertLineage(fromEntityRef, toEntityRef, dataProductsLineageDetails);
}
}

View file

@ -465,6 +465,35 @@ class LineageRepositoryTest {
details.getColumnsLineage().get(0).getFromColumns().get(0));
}
@Test
void testBuildEntityLineageData_NullPipeline_ProducesNoPipelineInEsData() {
EntityReference from =
new EntityReference().withId(UUID.randomUUID()).withFullyQualifiedName("db_service");
EntityReference to =
new EntityReference().withId(UUID.randomUUID()).withFullyQualifiedName("kafka_service");
LineageDetails details = new LineageDetails().withPipeline(null);
var esData = LineageRepository.buildEntityLineageData(from, to, details);
assertNull(esData.getPipeline(), "Service-level lineage must not inherit pipeline annotation");
}
@Test
void testLineageDetails_WithPipelineNull_PipelineFieldIsNull() {
EntityReference pipelineRef =
new EntityReference()
.withId(UUID.randomUUID())
.withType("pipeline")
.withFullyQualifiedName("Flink.my_pipeline");
LineageDetails details = new LineageDetails().withPipeline(pipelineRef);
LineageDetails stripped = details.withPipeline(null);
assertNull(
stripped.getPipeline(),
"After withPipeline(null), service-level lineage must have no pipeline");
}
@Test
void testDeleteLineageBySource_OpenLineage_UsesPipelinePath() {
CollectionDAO dao = mock(CollectionDAO.class);