diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/IncidentPaginationIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/IncidentPaginationIT.java new file mode 100644 index 00000000000..9ca47b086e2 --- /dev/null +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/IncidentPaginationIT.java @@ -0,0 +1,234 @@ +package org.openmetadata.it.tests; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.openmetadata.it.bootstrap.SharedEntities; +import org.openmetadata.it.util.SdkClients; +import org.openmetadata.schema.api.data.CreateDatabase; +import org.openmetadata.schema.api.data.CreateDatabaseSchema; +import org.openmetadata.schema.api.data.CreateTable; +import org.openmetadata.schema.api.tests.CreateTestCase; +import org.openmetadata.schema.api.tests.CreateTestCaseResolutionStatus; +import org.openmetadata.schema.entity.data.Database; +import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.tests.TestCase; +import org.openmetadata.schema.tests.type.Severity; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatus; +import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes; +import org.openmetadata.schema.type.Column; +import org.openmetadata.schema.type.ColumnDataType; +import org.openmetadata.sdk.client.OpenMetadataClient; +import org.openmetadata.sdk.models.ListParams; +import org.openmetadata.sdk.models.ListResponse; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class IncidentPaginationIT { + + private static final int TEST_DATA_SIZE = 11; + private static final int PAGE_SIZE = 5; + private OpenMetadataClient client; + private List testCases; + private String databaseSchemaFqn; + + @BeforeAll + public void setup() throws Exception { + client = SdkClients.adminClient(); + testCases = new ArrayList<>(); + + long ts = System.currentTimeMillis(); + Database database = + client + .databases() + .create( + new CreateDatabase() + .withName("pagination_test_db_" + ts) + .withService(SharedEntities.get().MYSQL_SERVICE.getFullyQualifiedName())); + databaseSchemaFqn = + client + .databaseSchemas() + .create( + new CreateDatabaseSchema() + .withName("pagination_test_schema_" + ts) + .withDatabase(database.getFullyQualifiedName())) + .getFullyQualifiedName(); + + Table table = createTestTable(); + String testDefFqn = + client + .testDefinitions() + .list(new ListParams().withLimit(1)) + .getData() + .get(0) + .getFullyQualifiedName(); + + for (int i = 0; i < TEST_DATA_SIZE; i++) { + TestCase testCase = createTestCase(table, i, testDefFqn); + testCases.add(testCase); + createIncidentStatus(testCase); + } + + await() + .atMost(Duration.ofMinutes(3)) + .pollInterval(Duration.ofSeconds(5)) + .until( + () -> { + try { + ListParams params = + new ListParams().withLimit(TEST_DATA_SIZE + 10).withLatest(true); + ListResponse response = + client.testCaseResolutionStatuses().searchList(params); + return response.getPaging().getTotal() >= TEST_DATA_SIZE; + } catch (Exception e) { + return false; + } + }); + } + + @Test + public void testPaginationFirstPage() throws Exception { + ListParams params = new ListParams().withLimit(PAGE_SIZE).withOffset(0).withLatest(true); + + ListResponse response = + client.testCaseResolutionStatuses().searchList(params); + + assertNotNull(response); + assertEquals( + PAGE_SIZE, response.getData().size(), "First page should return " + PAGE_SIZE + " results"); + assertTrue( + response.getPaging().getTotal() >= TEST_DATA_SIZE, + "Total should be at least " + TEST_DATA_SIZE); + } + + @Test + public void testPaginationSecondPage() throws Exception { + ListParams firstPageParams = + new ListParams().withLimit(PAGE_SIZE).withOffset(0).withLatest(true); + ListResponse firstPage = + client.testCaseResolutionStatuses().searchList(firstPageParams); + + ListParams secondPageParams = + new ListParams().withLimit(PAGE_SIZE).withOffset(PAGE_SIZE).withLatest(true); + ListResponse secondPage = + client.testCaseResolutionStatuses().searchList(secondPageParams); + + assertNotNull(secondPage); + assertEquals( + PAGE_SIZE, + secondPage.getData().size(), + "Second page should return " + PAGE_SIZE + " results"); + assertEquals( + firstPage.getPaging().getTotal(), + secondPage.getPaging().getTotal(), + "Total count should be consistent across pages"); + + if (!firstPage.getData().isEmpty() && !secondPage.getData().isEmpty()) { + Object firstItem = firstPage.getData().get(0); + Object secondItem = secondPage.getData().get(0); + assertTrue(!firstItem.equals(secondItem), "Pages should contain different data"); + } + } + + @Test + public void testPaginationLastPage() throws Exception { + ListParams params = new ListParams().withLimit(PAGE_SIZE).withOffset(0).withLatest(true); + ListResponse firstPage = + client.testCaseResolutionStatuses().searchList(params); + int total = firstPage.getPaging().getTotal(); + int lastPageOffset = ((total - 1) / PAGE_SIZE) * PAGE_SIZE; + + ListParams lastPageParams = + new ListParams().withLimit(PAGE_SIZE).withOffset(lastPageOffset).withLatest(true); + ListResponse lastPage = + client.testCaseResolutionStatuses().searchList(lastPageParams); + + assertNotNull(lastPage); + assertTrue( + lastPage.getData().size() > 0 && lastPage.getData().size() <= PAGE_SIZE, + "Last page should have between 1 and " + PAGE_SIZE + " results"); + } + + @Test + public void testBackwardsCompatibilityNoParams() throws Exception { + ListParams params = new ListParams().withLatest(true); + + ListResponse response = + client.testCaseResolutionStatuses().searchList(params); + + assertNotNull(response); + assertTrue( + response.getPaging().getTotal() >= TEST_DATA_SIZE, + "Total should be at least " + TEST_DATA_SIZE + " even without explicit pagination params"); + } + + @Test + public void testOffsetBeyondResults() throws Exception { + ListParams params = new ListParams().withLimit(PAGE_SIZE).withOffset(10000).withLatest(true); + + ListResponse response = + client.testCaseResolutionStatuses().searchList(params); + + assertNotNull(response); + assertEquals(0, response.getData().size(), "Offset beyond results should return empty list"); + assertTrue(response.getPaging().getTotal() > 0, "Total should still be accurate"); + } + + @Test + public void testFilteredTotalCountIsExact() throws Exception { + String targetFqn = testCases.get(0).getFullyQualifiedName(); + ListParams params = + new ListParams() + .withLimit(PAGE_SIZE) + .withOffset(0) + .withLatest(true) + .addFilter("testCaseFQN", targetFqn); + + ListResponse response = + client.testCaseResolutionStatuses().searchList(params); + + assertNotNull(response); + assertEquals(1, response.getData().size(), "Filter should return exactly 1 incident"); + assertEquals( + 1, + response.getPaging().getTotal(), + "Total count must reflect only the filtered group, not all groups"); + } + + private Table createTestTable() throws Exception { + CreateTable createTable = + new CreateTable() + .withName("pagination_test_table_" + System.currentTimeMillis()) + .withDatabaseSchema(databaseSchemaFqn) + .withColumns(List.of(new Column().withName("id").withDataType(ColumnDataType.BIGINT))); + + return client.tables().create(createTable); + } + + private TestCase createTestCase(Table table, int index, String testDefFqn) throws Exception { + CreateTestCase createTestCase = + new CreateTestCase() + .withName("pagination_test_case_" + index) + .withEntityLink("<#E::table::" + table.getFullyQualifiedName() + "::columns::id>") + .withTestDefinition(testDefFqn); + + return client.testCases().create(createTestCase); + } + + private void createIncidentStatus(TestCase testCase) throws Exception { + CreateTestCaseResolutionStatus createStatus = + new CreateTestCaseResolutionStatus() + .withTestCaseResolutionStatusType(TestCaseResolutionStatusTypes.New) + .withTestCaseReference(testCase.getFullyQualifiedName()) + .withSeverity(Severity.Severity2); + + client.testCaseResolutionStatuses().create(createStatus); + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RootCauseAnalysisTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RootCauseAnalysisTool.java index 6ce97c83935..61e2c0ec2fa 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RootCauseAnalysisTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RootCauseAnalysisTool.java @@ -195,6 +195,10 @@ public class RootCauseAnalysisTool implements McpTool { testResultTimeSeriesRepository.getFields("testCaseStatus,result,testResultValue"), searchListFilter, "testCaseFQN.keyword", + null, + null, + null, + null, null); if (testCaseResults.getData() != null && !testCaseResults.getData().isEmpty()) { testCaseResult.put("testCaseResults", testCaseResults.getData()); diff --git a/openmetadata-sdk/src/main/java/org/openmetadata/sdk/models/ListParams.java b/openmetadata-sdk/src/main/java/org/openmetadata/sdk/models/ListParams.java index 43ef7399495..73ec04630f7 100644 --- a/openmetadata-sdk/src/main/java/org/openmetadata/sdk/models/ListParams.java +++ b/openmetadata-sdk/src/main/java/org/openmetadata/sdk/models/ListParams.java @@ -8,6 +8,8 @@ import java.util.Map; */ public class ListParams { private Integer limit; + private Integer offset; + private Boolean latest; private String after; private String before; private String fields; @@ -142,6 +144,32 @@ public class ListParams { return setPipelineType(pipelineType); } + public Integer getOffset() { + return offset; + } + + public ListParams setOffset(Integer offset) { + this.offset = offset; + return this; + } + + public ListParams withOffset(Integer offset) { + return setOffset(offset); + } + + public Boolean getLatest() { + return latest; + } + + public ListParams setLatest(Boolean latest) { + this.latest = latest; + return this; + } + + public ListParams withLatest(boolean latest) { + return setLatest(latest); + } + public ListParams withLimit(Integer limit) { return setLimit(limit); } @@ -182,6 +210,12 @@ public class ListParams { if (limit != null) { params.put("limit", limit.toString()); } + if (offset != null) { + params.put("offset", offset.toString()); + } + if (latest != null) { + params.put("latest", latest.toString()); + } if (after != null) { params.put("after", after); } @@ -198,6 +232,8 @@ public class ListParams { public ListParams copy() { ListParams copy = new ListParams(); copy.limit = this.limit; + copy.offset = this.offset; + copy.latest = this.latest; copy.after = this.after; copy.before = this.before; copy.fields = this.fields; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesRepository.java index ec397ead07d..657ce823557 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesRepository.java @@ -3,6 +3,7 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED; import static org.openmetadata.schema.type.Include.ALL; import static org.openmetadata.service.Entity.getEntityFields; +import static org.openmetadata.service.search.EntityBuilderConstant.MAX_AGGREGATE_SIZE; import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset; import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset; import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset; @@ -19,6 +20,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityTimeSeriesInterface; @@ -40,6 +42,7 @@ import org.openmetadata.service.util.RestUtil; @Getter @Repository +@Slf4j public abstract class EntityTimeSeriesRepository { protected final String collectionPath; protected final EntityTimeSeriesDAO timeSeriesDao; @@ -392,14 +395,22 @@ public abstract class EntityTimeSeriesRepository listLatestFromSearch( - EntityUtil.Fields fields, SearchListFilter contentFilter, String groupBy, String q) + EntityUtil.Fields fields, + SearchListFilter contentFilter, + String groupBy, + String q, + Integer limit, + Integer offset, + String sortField, + String sortType) throws IOException { List entityList = new ArrayList<>(); SearchListFilter searchListFilter = new SearchListFilter(); setIncludeSearchFields(searchListFilter); setExcludeSearchFields(searchListFilter); String aggregationPath = "$.sterms#byTerms.buckets"; - SearchAggregation searchAggregation = buildComplexAggregation(groupBy, contentFilter); + SearchAggregation searchAggregation = + buildComplexAggregation(groupBy, contentFilter, limit, offset, sortField, sortType); JsonObject jsonObjResults = searchRepository.aggregate(q, entityType, searchAggregation, searchListFilter); @@ -428,34 +439,72 @@ public abstract class EntityTimeSeriesRepository(entityList, null, null, entityList.size()); + + int totalCount = entityList.size(); + if (limit != null && limit > 0) { + try { + String statsBucketPath = "$.stats_bucket#total_bucket_count.count"; + Optional statsBucketCount = + JsonUtils.readJsonAtPath(jsonObjResults.toString(), statsBucketPath, Integer.class); + if (statsBucketCount.isPresent()) { + totalCount = statsBucketCount.get(); + } + } catch (Exception e) { + LOG.warn("Failed to extract stats_bucket total count, falling back to page size", e); + } + } + + return new ResultList<>(entityList, offset, limit, totalCount); } private SearchAggregation buildComplexAggregation( - String groupBy, SearchListFilter contentFilter) { - SearchAggregationNode root = new SearchAggregationNode("root", "root", null); - - // Create terms aggregation by groupBy field - SearchAggregationNode termsAgg = SearchAggregation.terms("byTerms", groupBy); - - // Add latest_overall (replaces the old "latest" aggregation for parsing compatibility) - termsAgg.addChild(SearchAggregation.topHits("latest", 1, "timestamp", "desc")); - - // Add max_timestamp for bucket selector - termsAgg.addChild(SearchAggregation.max("max_timestamp", "timestamp")); - - // Get content filters using the new getFilterQuery method for filter aggregations + String groupBy, + SearchListFilter contentFilter, + Integer limit, + Integer offset, + String sortField, + String sortType) { String contentFilters = contentFilter.getFilterQuery(entityType); - // Create content filter aggregation + List nodes = + buildAggregationNodes( + groupBy, contentFilters, limit, offset, sortField, sortType, MAX_AGGREGATE_SIZE); + SearchAggregationNode root = new SearchAggregationNode("root", "root", null); + nodes.forEach(root::addChild); + return SearchAggregation.fromTree(root); + } + + static List buildAggregationNodes( + String groupBy, + String contentFilters, + Integer limit, + Integer offset, + String sortField, + String sortType, + int maxAggSize) { + List rootNodes = new ArrayList<>(); + + // When paginating, use MAX_AGGREGATE_SIZE so bucket_sort has enough upstream buckets to slice + // from. Without this, a default size of 100 would make offset>100 always return empty results. + int termsSize = (limit != null && limit > 0) ? maxAggSize : 100; + SearchAggregationNode termsAgg = SearchAggregation.terms("byTerms", groupBy, termsSize); + + // top_hits fetches the latest document per group — the actual entity we return to the caller. + termsAgg.addChild(SearchAggregation.topHits("latest", 1, "timestamp", "desc")); + // max_timestamp is the reference value for bucket_selector to compare against. + termsAgg.addChild(SearchAggregation.max("max_timestamp", "timestamp")); + + // Re-apply content filters inside the bucket to find the latest document that also matches. + // This lets bucket_selector decide whether the group's latest doc satisfies the filters. SearchAggregationNode filterAgg = SearchAggregation.filter("with_content_filters", contentFilters); filterAgg.addChild(SearchAggregation.max("max_matching_timestamp", "timestamp")); filterAgg.addChild(SearchAggregation.valueCount("count", "timestamp")); - termsAgg.addChild(filterAgg); - // Add bucket selector to filter buckets where latest timestamp equals matching timestamp + // Discard groups where the latest document does not match the content filters. + // The check "latest_timestamp == matching_timestamp" means the most recent doc passed the + // filter. termsAgg.addChild( SearchAggregation.bucketSelector( "filter_groups", @@ -463,8 +512,59 @@ public abstract class EntityTimeSeriesRepositorycount,with_content_filters>max_matching_timestamp")); - root.addChild(termsAgg); - return SearchAggregation.fromTree(root); + if (limit != null && limit > 0) { + // Slice the surviving buckets into pages. Must run after bucket_selector. + Integer effectiveLimit = Math.min(limit, maxAggSize); + Integer effectiveOffset = offset != null ? offset : 0; + String aggSortField = mapSortFieldToAggregationField(sortField); + String aggSortOrder = sortType != null ? sortType.toLowerCase() : "desc"; + termsAgg.addChild( + SearchAggregation.bucketSort( + "pagination", effectiveLimit, effectiveOffset, aggSortField, aggSortOrder)); + } + + rootNodes.add(termsAgg); + + if (limit != null && limit > 0) { + // byTermsCount is a sibling of byTerms with identical filters but no top_hits and no + // bucket_sort. Its sole purpose is to count all post-filter groups so stats_bucket can + // compute an exact total — bypassing the pagination slice applied to byTerms. + SearchAggregationNode termsCountAgg = + SearchAggregation.terms("byTermsCount", groupBy, maxAggSize); + termsCountAgg.addChild(SearchAggregation.max("max_timestamp", "timestamp")); + + SearchAggregationNode countFilterAgg = + SearchAggregation.filter("with_content_filters", contentFilters); + countFilterAgg.addChild(SearchAggregation.max("max_matching_timestamp", "timestamp")); + countFilterAgg.addChild(SearchAggregation.valueCount("count", "timestamp")); + termsCountAgg.addChild(countFilterAgg); + + termsCountAgg.addChild( + SearchAggregation.bucketSelector( + "filter_groups", + "if (params.matching_count == 0) return false; return params.latest_timestamp == params.matching_timestamp;", + "latest_timestamp,matching_count,matching_timestamp", + "max_timestamp,with_content_filters>count,with_content_filters>max_matching_timestamp")); + + rootNodes.add(termsCountAgg); + // stats_bucket counts how many buckets survived in byTermsCount, giving an exact total + // that reflects filters but not the pagination window. + rootNodes.add( + SearchAggregation.statsBucket("total_bucket_count", "byTermsCount>max_timestamp")); + } + + return rootNodes; + } + + static String mapSortFieldToAggregationField(String sortField) { + if (sortField == null) { + return "max_timestamp"; + } + return switch (sortField.toLowerCase()) { + case "updatedat", "timestamp", "createdat" -> "max_timestamp"; + case "_key" -> "_key"; + default -> "max_timestamp"; + }; } public T latestFromSearch(EntityUtil.Fields fields, SearchListFilter searchListFilter, String q) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java index 8f656e17579..8ac523eac0c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TestCaseResultRepository.java @@ -325,6 +325,10 @@ public class TestCaseResultRepository extends EntityTimeSeriesRepository listLatestFromSearch( @@ -101,11 +106,16 @@ public abstract class EntityTimeSeriesResource< SearchListFilter searchListFilter, String groupBy, String q, + Integer limit, + Integer offset, + String sortField, + String sortType, List authRequests, AuthorizationLogic authorizationLogic) throws IOException { authorizer.authorizeRequests(securityContext, authRequests, authorizationLogic); - return repository.listLatestFromSearch(fields, searchListFilter, groupBy, q); + return repository.listLatestFromSearch( + fields, searchListFilter, groupBy, q, limit, offset, sortField, sortType); } protected T latestInternalFromSearch( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResolutionStatusResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResolutionStatusResource.java index 9d01e3414c0..f40d0a0b68a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResolutionStatusResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResolutionStatusResource.java @@ -473,7 +473,11 @@ public class TestCaseResolutionStatusResource searchListFilter, "testCase.fullyQualifiedName.keyword", // Group by test case to get latest status per test // case - null); + null, + limit, + offset, + defaultSortField, + sortType); } else { return repository.listFromSearchWithOffset( new Fields(null), searchListFilter, limit, offset, searchSortFilter, null, null); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java index d60b8db1d5f..225ada7fbc1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResultResource.java @@ -265,7 +265,7 @@ public class TestCaseResultResource String entityFQN, @Parameter( description = - "Get the latest test case result for each test case -- requires `testSuiteId`. Offset and limit are ignored", + "Get the latest test case result for each test case -- requires `testSuiteId`", schema = @Schema( type = "boolean", @@ -340,6 +340,10 @@ public class TestCaseResultResource searchListFilter, "testCaseFQN.keyword", q, + limit, + offset, + "timestamp", + "desc", authRequests, AuthorizationLogic.ANY); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchAggregation.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchAggregation.java index 092c236426d..f99c89429db 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchAggregation.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchAggregation.java @@ -33,9 +33,16 @@ public class SearchAggregation { * Static builder method for terms aggregation. */ public static SearchAggregationNode terms(String name, String field) { + return terms(name, field, 100); + } + + /** + * Static builder method for terms aggregation with explicit size. + */ + public static SearchAggregationNode terms(String name, String field, int size) { Map value = new HashMap<>(); value.put("field", field); - value.put("size", "100"); + value.put("size", String.valueOf(size)); return new SearchAggregationNode("terms", name, value); } @@ -90,6 +97,50 @@ public class SearchAggregation { return new SearchAggregationNode("bucket_selector", name, value); } + /** + * Static builder method for bucket_sort aggregation. + */ + public static SearchAggregationNode bucketSort(String name, Integer size, Integer from) { + Map value = new HashMap<>(); + if (size != null) { + value.put("size", String.valueOf(size)); + } + if (from != null) { + value.put("from", String.valueOf(from)); + } + return new SearchAggregationNode("bucket_sort", name, value); + } + + /** + * Static builder method for bucket_sort aggregation with sorting. + */ + public static SearchAggregationNode bucketSort( + String name, Integer size, Integer from, String sortField, String sortOrder) { + Map value = new HashMap<>(); + if (size != null) { + value.put("size", String.valueOf(size)); + } + if (from != null) { + value.put("from", String.valueOf(from)); + } + if (sortField != null) { + value.put("sort_field", sortField); + } + if (sortOrder != null) { + value.put("sort_order", sortOrder); + } + return new SearchAggregationNode("bucket_sort", name, value); + } + + /** + * Static builder method for stats_bucket pipeline aggregation. + */ + public static SearchAggregationNode statsBucket(String name, String bucketsPath) { + Map value = new HashMap<>(); + value.put("buckets_path", bucketsPath); + return new SearchAggregationNode("stats_bucket", name, value); + } + /* * Get the metadata for the aggregation results. We'll use the metadata to build the report and * to traverse the aggregation tree. 3 types of metadata are returned: diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticAggregationsBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticAggregationsBuilder.java index 12b0219b76c..39f5c0b28e9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticAggregationsBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticAggregationsBuilder.java @@ -4,6 +4,7 @@ import es.co.elastic.clients.elasticsearch._types.aggregations.Aggregation; import es.co.elastic.clients.elasticsearch._types.query_dsl.Query; import es.co.elastic.clients.json.JsonpMapper; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import org.openmetadata.service.search.SearchAggregationNode; @@ -35,7 +36,7 @@ public class ElasticAggregationsBuilder { ElasticAggregations elasticAggregation = getAggregation(type); elasticAggregation.createAggregation(node); - Map subAggregations = new HashMap<>(); + Map subAggregations = new LinkedHashMap<>(); for (SearchAggregationNode child : node.getChildren()) { buildAggregation(child, elasticAggregation, subAggregations); } @@ -53,7 +54,7 @@ public class ElasticAggregationsBuilder { if (elasticAggregation.getAggregationName() != null) { String aggName = elasticAggregation.getAggregationName(); - Map wrappedSubAggs = new HashMap<>(subAggregations); + Map wrappedSubAggs = new LinkedHashMap<>(subAggregations); wrappedSubAggs.put(aggName + "_inner", elasticAggregation.getAggregation()); finalAggregation = @@ -67,6 +68,7 @@ public class ElasticAggregationsBuilder { private ElasticAggregations getAggregation(String aggregationType) { return switch (aggregationType) { case "bucket_selector" -> new ElasticBucketSelectorAggregations(); + case "bucket_sort" -> new ElasticBucketSortAggregations(); case "date_histogram" -> new ElasticDateHistogramAggregations(); case "terms" -> new ElasticTermsAggregations(); case "avg" -> new ElasticAvgAggregations(); @@ -75,6 +77,7 @@ public class ElasticAggregationsBuilder { case "filter" -> new ElasticFilterAggregations(mapper); case "value_count" -> new ElasticValueCountAggregations(); case "cardinality" -> new ElasticCardinalityAggregations(); + case "stats_bucket" -> new ElasticStatsBucketAggregations(); case "nested" -> new ElasticNestedAggregations(); case "top_hits" -> new ElasticTopHitsAggregations(); default -> throw new IllegalArgumentException("Invalid aggregation type: " + aggregationType); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticBucketSortAggregations.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticBucketSortAggregations.java new file mode 100644 index 00000000000..45d9ff970cb --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticBucketSortAggregations.java @@ -0,0 +1,57 @@ +package org.openmetadata.service.search.elasticsearch.aggregations; + +import es.co.elastic.clients.elasticsearch._types.SortOrder; +import es.co.elastic.clients.elasticsearch._types.aggregations.Aggregation; +import es.co.elastic.clients.elasticsearch._types.aggregations.BucketSortAggregation; +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import org.openmetadata.service.search.SearchAggregationNode; + +@Setter +@Getter +public class ElasticBucketSortAggregations implements ElasticAggregations { + private final String aggregationType = "bucket_sort"; + private String aggregationName; + private Aggregation aggregation; + private Map subAggregations = new HashMap<>(); + + @Override + public void createAggregation(SearchAggregationNode node) { + Map params = node.getValue(); + this.aggregationName = node.getName(); + + String sizeStr = params.get("size"); + String fromStr = params.get("from"); + String sortField = params.get("sort_field"); + String sortOrderStr = params.get("sort_order"); + + BucketSortAggregation.Builder builder = new BucketSortAggregation.Builder(); + + if (sizeStr != null) { + builder.size(Integer.parseInt(sizeStr)); + } + + if (fromStr != null) { + builder.from(Integer.parseInt(fromStr)); + } + + if (sortField != null && sortOrderStr != null) { + SortOrder sortOrder = sortOrderStr.equalsIgnoreCase("asc") ? SortOrder.Asc : SortOrder.Desc; + builder.sort(s -> s.field(f -> f.field(sortField).order(sortOrder))); + } + + this.aggregation = Aggregation.of(a -> a.bucketSort(builder.build())); + } + + @Override + public Boolean isPipelineAggregation() { + return true; + } + + @Override + public void setSubAggregations(Map subAggregations) { + this.subAggregations = subAggregations; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticStatsBucketAggregations.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticStatsBucketAggregations.java new file mode 100644 index 00000000000..8fcbdd3f18f --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/aggregations/ElasticStatsBucketAggregations.java @@ -0,0 +1,40 @@ +package org.openmetadata.service.search.elasticsearch.aggregations; + +import es.co.elastic.clients.elasticsearch._types.aggregations.Aggregation; +import es.co.elastic.clients.elasticsearch._types.aggregations.StatsBucketAggregation; +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import org.openmetadata.service.search.SearchAggregationNode; + +@Setter +@Getter +public class ElasticStatsBucketAggregations implements ElasticAggregations { + static final String aggregationType = "stats_bucket"; + private String aggregationName; + private Aggregation aggregation; + private Map subAggregations = new HashMap<>(); + + @Override + public void createAggregation(SearchAggregationNode node) { + Map params = node.getValue(); + this.aggregationName = node.getName(); + String bucketsPath = params.get("buckets_path"); + this.aggregation = + Aggregation.of( + a -> + a.statsBucket( + StatsBucketAggregation.of(sb -> sb.bucketsPath(b -> b.single(bucketsPath))))); + } + + @Override + public Boolean isPipelineAggregation() { + return true; + } + + @Override + public void setSubAggregations(Map subAggregations) { + this.subAggregations = subAggregations; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenAggregationsBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenAggregationsBuilder.java index dde6a66327e..25d9a5aa748 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenAggregationsBuilder.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenAggregationsBuilder.java @@ -1,6 +1,7 @@ package org.openmetadata.service.search.opensearch.aggregations; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import org.openmetadata.service.search.SearchAggregationNode; import os.org.opensearch.client.json.JsonpMapper; @@ -35,7 +36,7 @@ public class OpenAggregationsBuilder { OpenAggregations openAggregation = getAggregation(type); openAggregation.createAggregation(node); - Map subAggregations = new HashMap<>(); + Map subAggregations = new LinkedHashMap<>(); for (SearchAggregationNode child : node.getChildren()) { buildAggregation(child, openAggregation, subAggregations); } @@ -53,7 +54,7 @@ public class OpenAggregationsBuilder { if (openAggregation.getAggregationName() != null) { String aggName = openAggregation.getAggregationName(); - Map wrappedSubAggs = new HashMap<>(subAggregations); + Map wrappedSubAggs = new LinkedHashMap<>(subAggregations); wrappedSubAggs.put(aggName + "_inner", openAggregation.getAggregation()); finalAggregation = @@ -67,6 +68,7 @@ public class OpenAggregationsBuilder { private OpenAggregations getAggregation(String aggregationType) { return switch (aggregationType) { case "bucket_selector" -> new OpenBucketSelectorAggregations(); + case "bucket_sort" -> new OpenBucketSortAggregations(); case "date_histogram" -> new OpenDateHistogramAggregations(); case "terms" -> new OpenTermsAggregations(); case "avg" -> new OpenAvgAggregations(); @@ -75,6 +77,7 @@ public class OpenAggregationsBuilder { case "filter" -> new OpenFilterAggregations(mapper); case "value_count" -> new OpenValueCountAggregations(); case "cardinality" -> new OpenCardinalityAggregations(); + case "stats_bucket" -> new OpenStatsBucketAggregations(); case "nested" -> new OpenNestedAggregations(); case "top_hits" -> new OpenTopHitsAggregations(); default -> throw new IllegalArgumentException("Invalid aggregation type: " + aggregationType); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenBucketSortAggregations.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenBucketSortAggregations.java new file mode 100644 index 00000000000..42e687174a2 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenBucketSortAggregations.java @@ -0,0 +1,57 @@ +package org.openmetadata.service.search.opensearch.aggregations; + +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import org.openmetadata.service.search.SearchAggregationNode; +import os.org.opensearch.client.opensearch._types.SortOrder; +import os.org.opensearch.client.opensearch._types.aggregations.Aggregation; +import os.org.opensearch.client.opensearch._types.aggregations.BucketSortAggregation; + +@Setter +@Getter +public class OpenBucketSortAggregations implements OpenAggregations { + private final String aggregationType = "bucket_sort"; + private String aggregationName; + private Aggregation aggregation; + private Map subAggregations = new HashMap<>(); + + @Override + public void createAggregation(SearchAggregationNode node) { + Map params = node.getValue(); + this.aggregationName = node.getName(); + + String sizeStr = params.get("size"); + String fromStr = params.get("from"); + String sortField = params.get("sort_field"); + String sortOrderStr = params.get("sort_order"); + + BucketSortAggregation.Builder builder = new BucketSortAggregation.Builder(); + + if (sizeStr != null) { + builder.size(Integer.parseInt(sizeStr)); + } + + if (fromStr != null) { + builder.from(Integer.parseInt(fromStr)); + } + + if (sortField != null && sortOrderStr != null) { + SortOrder sortOrder = sortOrderStr.equalsIgnoreCase("asc") ? SortOrder.Asc : SortOrder.Desc; + builder.sort(s -> s.field(f -> f.field(sortField).order(sortOrder))); + } + + this.aggregation = Aggregation.of(a -> a.bucketSort(builder.build())); + } + + @Override + public Boolean isPipelineAggregation() { + return true; + } + + @Override + public void setSubAggregations(Map subAggregations) { + this.subAggregations = subAggregations; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenStatsBucketAggregations.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenStatsBucketAggregations.java new file mode 100644 index 00000000000..40ca1cc1da6 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/aggregations/OpenStatsBucketAggregations.java @@ -0,0 +1,40 @@ +package org.openmetadata.service.search.opensearch.aggregations; + +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import org.openmetadata.service.search.SearchAggregationNode; +import os.org.opensearch.client.opensearch._types.aggregations.Aggregation; +import os.org.opensearch.client.opensearch._types.aggregations.StatsBucketAggregation; + +@Setter +@Getter +public class OpenStatsBucketAggregations implements OpenAggregations { + static final String aggregationType = "stats_bucket"; + private String aggregationName; + private Aggregation aggregation; + private Map subAggregations = new HashMap<>(); + + @Override + public void createAggregation(SearchAggregationNode node) { + Map params = node.getValue(); + this.aggregationName = node.getName(); + String bucketsPath = params.get("buckets_path"); + this.aggregation = + Aggregation.of( + a -> + a.statsBucket( + StatsBucketAggregation.of(sb -> sb.bucketsPath(b -> b.single(bucketsPath))))); + } + + @Override + public Boolean isPipelineAggregation() { + return true; + } + + @Override + public void setSubAggregations(Map subAggregations) { + this.subAggregations = subAggregations; + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/EntityTimeSeriesRepositoryPaginationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/EntityTimeSeriesRepositoryPaginationTest.java new file mode 100644 index 00000000000..71acdea98c0 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/EntityTimeSeriesRepositoryPaginationTest.java @@ -0,0 +1,196 @@ +package org.openmetadata.service.jdbi3; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import org.junit.jupiter.api.Test; +import org.openmetadata.service.search.SearchAggregationNode; + +class EntityTimeSeriesRepositoryPaginationTest { + + private static final String GROUP_BY = "testCase.fullyQualifiedName.keyword"; + private static final String CONTENT_FILTERS = "{\"match_all\":{}}"; + private static final int MAX_AGG_SIZE = 10000; + + @Test + void testBucketSortPresentWhenPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE); + + boolean hasBucketSort = + byTerms(nodes).getChildren().stream().anyMatch(n -> "bucket_sort".equals(n.getType())); + assertTrue(hasBucketSort, "Should include bucket_sort when limit > 0"); + } + + @Test + void testBucketSortAbsentWhenNotPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE); + + boolean hasBucketSort = + byTerms(nodes).getChildren().stream().anyMatch(n -> "bucket_sort".equals(n.getType())); + assertFalse(hasBucketSort, "Should not include bucket_sort when limit is null"); + } + + @Test + void testTermsSizeIsMaxAggSizeWhenPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE); + + assertEquals( + String.valueOf(MAX_AGG_SIZE), + byTerms(nodes).getValue().get("size"), + "byTerms size must be maxAggSize when paginating so bucket_sort has enough buckets to slice"); + } + + @Test + void testTermsSizeIsDefaultWhenNotPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE); + + assertEquals( + "100", + byTerms(nodes).getValue().get("size"), + "byTerms size should be 100 when not paginating"); + } + + @Test + void testByTermsCountPresentWhenPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE); + + SearchAggregationNode byTermsCount = byTermsCount(nodes); + boolean hasMaxTimestamp = + byTermsCount.getChildren().stream() + .anyMatch(n -> "max".equals(n.getType()) && "max_timestamp".equals(n.getName())); + assertTrue( + hasMaxTimestamp, "byTermsCount must have max_timestamp for stats_bucket to reference"); + } + + @Test + void testByTermsCountHasNoBucketSort() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE); + + boolean hasBucketSort = + byTermsCount(nodes).getChildren().stream().anyMatch(n -> "bucket_sort".equals(n.getType())); + assertFalse( + hasBucketSort, + "byTermsCount must not paginate — it needs all post-filter buckets visible to stats_bucket"); + } + + @Test + void testByTermsCountAbsentWhenNotPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE); + + boolean hasByTermsCount = + nodes.stream() + .anyMatch(n -> "terms".equals(n.getType()) && "byTermsCount".equals(n.getName())); + assertFalse(hasByTermsCount, "byTermsCount should not be built when not paginating"); + } + + @Test + void testStatsBucketPresentWhenPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE); + + SearchAggregationNode statsBucket = + nodes.stream() + .filter( + n -> "stats_bucket".equals(n.getType()) && "total_bucket_count".equals(n.getName())) + .findFirst() + .orElseThrow(() -> new AssertionError("stats_bucket#total_bucket_count not found")); + + assertEquals( + "byTermsCount>max_timestamp", + statsBucket.getValue().get("buckets_path"), + "stats_bucket must point to byTermsCount>max_timestamp to count post-filter groups"); + } + + @Test + void testStatsBucketAbsentWhenNotPaginating() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE); + + boolean hasStatsBucket = nodes.stream().anyMatch(n -> "stats_bucket".equals(n.getType())); + assertFalse(hasStatsBucket, "stats_bucket should not be built when not paginating"); + } + + @Test + void testLimitCappedAtMaxAggSize() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15000, 0, null, null, MAX_AGG_SIZE); + + assertEquals( + String.valueOf(MAX_AGG_SIZE), + bucketSort(nodes).getValue().get("size"), + "Limit must be capped at maxAggSize"); + } + + @Test + void testLimitBelowMaxAggSizeIsPreserved() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 500, 0, null, null, MAX_AGG_SIZE); + + assertEquals( + "500", + bucketSort(nodes).getValue().get("size"), + "Limit below maxAggSize should not be capped"); + } + + @Test + void testOffsetDefaultsToZeroWhenNull() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15, null, null, null, MAX_AGG_SIZE); + + assertEquals("0", bucketSort(nodes).getValue().get("from"), "Null offset should default to 0"); + } + + @Test + void testOffsetPreservedWhenProvided() { + List nodes = + EntityTimeSeriesRepository.buildAggregationNodes( + GROUP_BY, CONTENT_FILTERS, 15, 25, null, null, MAX_AGG_SIZE); + + assertEquals( + "25", + bucketSort(nodes).getValue().get("from"), + "Offset should be passed through to bucket_sort"); + } + + private static SearchAggregationNode byTerms(List nodes) { + return nodes.stream() + .filter(n -> "terms".equals(n.getType()) && "byTerms".equals(n.getName())) + .findFirst() + .orElseThrow(() -> new AssertionError("byTerms aggregation not found")); + } + + private static SearchAggregationNode byTermsCount(List nodes) { + return nodes.stream() + .filter(n -> "terms".equals(n.getType()) && "byTermsCount".equals(n.getName())) + .findFirst() + .orElseThrow(() -> new AssertionError("byTermsCount aggregation not found")); + } + + private static SearchAggregationNode bucketSort(List nodes) { + return byTerms(nodes).getChildren().stream() + .filter(n -> "bucket_sort".equals(n.getType())) + .findFirst() + .orElseThrow(() -> new AssertionError("bucket_sort not found in byTerms")); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchAggregationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchAggregationTest.java new file mode 100644 index 00000000000..6d9c813f936 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchAggregationTest.java @@ -0,0 +1,64 @@ +package org.openmetadata.service.search; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class SearchAggregationTest { + + @Test + void testBucketSortCreatesCorrectNode() { + SearchAggregationNode node = SearchAggregation.bucketSort("test_sort", 10, 20); + + assertNotNull(node); + assertEquals("bucket_sort", node.getType()); + assertEquals("test_sort", node.getName()); + + Map value = node.getValue(); + assertEquals("10", value.get("size")); + assertEquals("20", value.get("from")); + } + + @Test + void testBucketSortWithSorting() { + SearchAggregationNode node = + SearchAggregation.bucketSort("test_sort", 15, 0, "max_timestamp", "desc"); + + assertNotNull(node); + assertEquals("bucket_sort", node.getType()); + assertEquals("test_sort", node.getName()); + + Map value = node.getValue(); + assertEquals("15", value.get("size")); + assertEquals("0", value.get("from")); + assertEquals("max_timestamp", value.get("sort_field")); + assertEquals("desc", value.get("sort_order")); + } + + @Test + void testBucketSortWithNullValues() { + SearchAggregationNode node = SearchAggregation.bucketSort("test_sort", null, null); + + assertNotNull(node); + assertEquals("bucket_sort", node.getType()); + assertEquals("test_sort", node.getName()); + + Map value = node.getValue(); + assertEquals(0, value.size()); + } + + @Test + void testStatsBucketCreatesCorrectNode() { + SearchAggregationNode node = + SearchAggregation.statsBucket("total_bucket_count", "byTermsCount>max_timestamp"); + + assertNotNull(node); + assertEquals("stats_bucket", node.getType()); + assertEquals("total_bucket_count", node.getName()); + + Map value = node.getValue(); + assertEquals("byTermsCount>max_timestamp", value.get("buckets_path")); + } +}