FIX: add support for offset and limit on listing aggregations (#25943)

* add support for offset and limit on listing aggregations

* add tests

* fix couple issues

* fix couple issues

* fix couple issues

* fix couple issues

* fix couple issues

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
IceS2 2026-02-19 17:31:21 +01:00 committed by Harshit Shah
parent 6e916a6dad
commit 0ff9f27ef4
17 changed files with 937 additions and 30 deletions

View file

@ -0,0 +1,234 @@
package org.openmetadata.it.tests;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.openmetadata.it.bootstrap.SharedEntities;
import org.openmetadata.it.util.SdkClients;
import org.openmetadata.schema.api.data.CreateDatabase;
import org.openmetadata.schema.api.data.CreateDatabaseSchema;
import org.openmetadata.schema.api.data.CreateTable;
import org.openmetadata.schema.api.tests.CreateTestCase;
import org.openmetadata.schema.api.tests.CreateTestCaseResolutionStatus;
import org.openmetadata.schema.entity.data.Database;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.type.Severity;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatus;
import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes;
import org.openmetadata.schema.type.Column;
import org.openmetadata.schema.type.ColumnDataType;
import org.openmetadata.sdk.client.OpenMetadataClient;
import org.openmetadata.sdk.models.ListParams;
import org.openmetadata.sdk.models.ListResponse;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class IncidentPaginationIT {
private static final int TEST_DATA_SIZE = 11;
private static final int PAGE_SIZE = 5;
private OpenMetadataClient client;
private List<TestCase> testCases;
private String databaseSchemaFqn;
@BeforeAll
public void setup() throws Exception {
client = SdkClients.adminClient();
testCases = new ArrayList<>();
long ts = System.currentTimeMillis();
Database database =
client
.databases()
.create(
new CreateDatabase()
.withName("pagination_test_db_" + ts)
.withService(SharedEntities.get().MYSQL_SERVICE.getFullyQualifiedName()));
databaseSchemaFqn =
client
.databaseSchemas()
.create(
new CreateDatabaseSchema()
.withName("pagination_test_schema_" + ts)
.withDatabase(database.getFullyQualifiedName()))
.getFullyQualifiedName();
Table table = createTestTable();
String testDefFqn =
client
.testDefinitions()
.list(new ListParams().withLimit(1))
.getData()
.get(0)
.getFullyQualifiedName();
for (int i = 0; i < TEST_DATA_SIZE; i++) {
TestCase testCase = createTestCase(table, i, testDefFqn);
testCases.add(testCase);
createIncidentStatus(testCase);
}
await()
.atMost(Duration.ofMinutes(3))
.pollInterval(Duration.ofSeconds(5))
.until(
() -> {
try {
ListParams params =
new ListParams().withLimit(TEST_DATA_SIZE + 10).withLatest(true);
ListResponse<TestCaseResolutionStatus> response =
client.testCaseResolutionStatuses().searchList(params);
return response.getPaging().getTotal() >= TEST_DATA_SIZE;
} catch (Exception e) {
return false;
}
});
}
@Test
public void testPaginationFirstPage() throws Exception {
ListParams params = new ListParams().withLimit(PAGE_SIZE).withOffset(0).withLatest(true);
ListResponse<TestCaseResolutionStatus> response =
client.testCaseResolutionStatuses().searchList(params);
assertNotNull(response);
assertEquals(
PAGE_SIZE, response.getData().size(), "First page should return " + PAGE_SIZE + " results");
assertTrue(
response.getPaging().getTotal() >= TEST_DATA_SIZE,
"Total should be at least " + TEST_DATA_SIZE);
}
@Test
public void testPaginationSecondPage() throws Exception {
ListParams firstPageParams =
new ListParams().withLimit(PAGE_SIZE).withOffset(0).withLatest(true);
ListResponse<TestCaseResolutionStatus> firstPage =
client.testCaseResolutionStatuses().searchList(firstPageParams);
ListParams secondPageParams =
new ListParams().withLimit(PAGE_SIZE).withOffset(PAGE_SIZE).withLatest(true);
ListResponse<TestCaseResolutionStatus> secondPage =
client.testCaseResolutionStatuses().searchList(secondPageParams);
assertNotNull(secondPage);
assertEquals(
PAGE_SIZE,
secondPage.getData().size(),
"Second page should return " + PAGE_SIZE + " results");
assertEquals(
firstPage.getPaging().getTotal(),
secondPage.getPaging().getTotal(),
"Total count should be consistent across pages");
if (!firstPage.getData().isEmpty() && !secondPage.getData().isEmpty()) {
Object firstItem = firstPage.getData().get(0);
Object secondItem = secondPage.getData().get(0);
assertTrue(!firstItem.equals(secondItem), "Pages should contain different data");
}
}
@Test
public void testPaginationLastPage() throws Exception {
ListParams params = new ListParams().withLimit(PAGE_SIZE).withOffset(0).withLatest(true);
ListResponse<TestCaseResolutionStatus> firstPage =
client.testCaseResolutionStatuses().searchList(params);
int total = firstPage.getPaging().getTotal();
int lastPageOffset = ((total - 1) / PAGE_SIZE) * PAGE_SIZE;
ListParams lastPageParams =
new ListParams().withLimit(PAGE_SIZE).withOffset(lastPageOffset).withLatest(true);
ListResponse<TestCaseResolutionStatus> lastPage =
client.testCaseResolutionStatuses().searchList(lastPageParams);
assertNotNull(lastPage);
assertTrue(
lastPage.getData().size() > 0 && lastPage.getData().size() <= PAGE_SIZE,
"Last page should have between 1 and " + PAGE_SIZE + " results");
}
@Test
public void testBackwardsCompatibilityNoParams() throws Exception {
ListParams params = new ListParams().withLatest(true);
ListResponse<TestCaseResolutionStatus> response =
client.testCaseResolutionStatuses().searchList(params);
assertNotNull(response);
assertTrue(
response.getPaging().getTotal() >= TEST_DATA_SIZE,
"Total should be at least " + TEST_DATA_SIZE + " even without explicit pagination params");
}
@Test
public void testOffsetBeyondResults() throws Exception {
ListParams params = new ListParams().withLimit(PAGE_SIZE).withOffset(10000).withLatest(true);
ListResponse<TestCaseResolutionStatus> response =
client.testCaseResolutionStatuses().searchList(params);
assertNotNull(response);
assertEquals(0, response.getData().size(), "Offset beyond results should return empty list");
assertTrue(response.getPaging().getTotal() > 0, "Total should still be accurate");
}
@Test
public void testFilteredTotalCountIsExact() throws Exception {
String targetFqn = testCases.get(0).getFullyQualifiedName();
ListParams params =
new ListParams()
.withLimit(PAGE_SIZE)
.withOffset(0)
.withLatest(true)
.addFilter("testCaseFQN", targetFqn);
ListResponse<TestCaseResolutionStatus> response =
client.testCaseResolutionStatuses().searchList(params);
assertNotNull(response);
assertEquals(1, response.getData().size(), "Filter should return exactly 1 incident");
assertEquals(
1,
response.getPaging().getTotal(),
"Total count must reflect only the filtered group, not all groups");
}
private Table createTestTable() throws Exception {
CreateTable createTable =
new CreateTable()
.withName("pagination_test_table_" + System.currentTimeMillis())
.withDatabaseSchema(databaseSchemaFqn)
.withColumns(List.of(new Column().withName("id").withDataType(ColumnDataType.BIGINT)));
return client.tables().create(createTable);
}
private TestCase createTestCase(Table table, int index, String testDefFqn) throws Exception {
CreateTestCase createTestCase =
new CreateTestCase()
.withName("pagination_test_case_" + index)
.withEntityLink("<#E::table::" + table.getFullyQualifiedName() + "::columns::id>")
.withTestDefinition(testDefFqn);
return client.testCases().create(createTestCase);
}
private void createIncidentStatus(TestCase testCase) throws Exception {
CreateTestCaseResolutionStatus createStatus =
new CreateTestCaseResolutionStatus()
.withTestCaseResolutionStatusType(TestCaseResolutionStatusTypes.New)
.withTestCaseReference(testCase.getFullyQualifiedName())
.withSeverity(Severity.Severity2);
client.testCaseResolutionStatuses().create(createStatus);
}
}

View file

@ -195,6 +195,10 @@ public class RootCauseAnalysisTool implements McpTool {
testResultTimeSeriesRepository.getFields("testCaseStatus,result,testResultValue"),
searchListFilter,
"testCaseFQN.keyword",
null,
null,
null,
null,
null);
if (testCaseResults.getData() != null && !testCaseResults.getData().isEmpty()) {
testCaseResult.put("testCaseResults", testCaseResults.getData());

View file

@ -8,6 +8,8 @@ import java.util.Map;
*/
public class ListParams {
private Integer limit;
private Integer offset;
private Boolean latest;
private String after;
private String before;
private String fields;
@ -142,6 +144,32 @@ public class ListParams {
return setPipelineType(pipelineType);
}
public Integer getOffset() {
return offset;
}
public ListParams setOffset(Integer offset) {
this.offset = offset;
return this;
}
public ListParams withOffset(Integer offset) {
return setOffset(offset);
}
public Boolean getLatest() {
return latest;
}
public ListParams setLatest(Boolean latest) {
this.latest = latest;
return this;
}
public ListParams withLatest(boolean latest) {
return setLatest(latest);
}
public ListParams withLimit(Integer limit) {
return setLimit(limit);
}
@ -182,6 +210,12 @@ public class ListParams {
if (limit != null) {
params.put("limit", limit.toString());
}
if (offset != null) {
params.put("offset", offset.toString());
}
if (latest != null) {
params.put("latest", latest.toString());
}
if (after != null) {
params.put("after", after);
}
@ -198,6 +232,8 @@ public class ListParams {
public ListParams copy() {
ListParams copy = new ListParams();
copy.limit = this.limit;
copy.offset = this.offset;
copy.latest = this.latest;
copy.after = this.after;
copy.before = this.before;
copy.fields = this.fields;

View file

@ -3,6 +3,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.getEntityFields;
import static org.openmetadata.service.search.EntityBuilderConstant.MAX_AGGREGATE_SIZE;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset;
@ -19,6 +20,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityTimeSeriesInterface;
@ -40,6 +42,7 @@ import org.openmetadata.service.util.RestUtil;
@Getter
@Repository
@Slf4j
public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInterface> {
protected final String collectionPath;
protected final EntityTimeSeriesDAO timeSeriesDao;
@ -392,14 +395,22 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
@SuppressWarnings("unchecked")
public ResultList<T> listLatestFromSearch(
EntityUtil.Fields fields, SearchListFilter contentFilter, String groupBy, String q)
EntityUtil.Fields fields,
SearchListFilter contentFilter,
String groupBy,
String q,
Integer limit,
Integer offset,
String sortField,
String sortType)
throws IOException {
List<T> entityList = new ArrayList<>();
SearchListFilter searchListFilter = new SearchListFilter();
setIncludeSearchFields(searchListFilter);
setExcludeSearchFields(searchListFilter);
String aggregationPath = "$.sterms#byTerms.buckets";
SearchAggregation searchAggregation = buildComplexAggregation(groupBy, contentFilter);
SearchAggregation searchAggregation =
buildComplexAggregation(groupBy, contentFilter, limit, offset, sortField, sortType);
JsonObject jsonObjResults =
searchRepository.aggregate(q, entityType, searchAggregation, searchListFilter);
@ -428,34 +439,72 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
});
}
});
return new ResultList<>(entityList, null, null, entityList.size());
int totalCount = entityList.size();
if (limit != null && limit > 0) {
try {
String statsBucketPath = "$.stats_bucket#total_bucket_count.count";
Optional<Integer> statsBucketCount =
JsonUtils.readJsonAtPath(jsonObjResults.toString(), statsBucketPath, Integer.class);
if (statsBucketCount.isPresent()) {
totalCount = statsBucketCount.get();
}
} catch (Exception e) {
LOG.warn("Failed to extract stats_bucket total count, falling back to page size", e);
}
}
return new ResultList<>(entityList, offset, limit, totalCount);
}
private SearchAggregation buildComplexAggregation(
String groupBy, SearchListFilter contentFilter) {
SearchAggregationNode root = new SearchAggregationNode("root", "root", null);
// Create terms aggregation by groupBy field
SearchAggregationNode termsAgg = SearchAggregation.terms("byTerms", groupBy);
// Add latest_overall (replaces the old "latest" aggregation for parsing compatibility)
termsAgg.addChild(SearchAggregation.topHits("latest", 1, "timestamp", "desc"));
// Add max_timestamp for bucket selector
termsAgg.addChild(SearchAggregation.max("max_timestamp", "timestamp"));
// Get content filters using the new getFilterQuery method for filter aggregations
String groupBy,
SearchListFilter contentFilter,
Integer limit,
Integer offset,
String sortField,
String sortType) {
String contentFilters = contentFilter.getFilterQuery(entityType);
// Create content filter aggregation
List<SearchAggregationNode> nodes =
buildAggregationNodes(
groupBy, contentFilters, limit, offset, sortField, sortType, MAX_AGGREGATE_SIZE);
SearchAggregationNode root = new SearchAggregationNode("root", "root", null);
nodes.forEach(root::addChild);
return SearchAggregation.fromTree(root);
}
static List<SearchAggregationNode> buildAggregationNodes(
String groupBy,
String contentFilters,
Integer limit,
Integer offset,
String sortField,
String sortType,
int maxAggSize) {
List<SearchAggregationNode> rootNodes = new ArrayList<>();
// When paginating, use MAX_AGGREGATE_SIZE so bucket_sort has enough upstream buckets to slice
// from. Without this, a default size of 100 would make offset>100 always return empty results.
int termsSize = (limit != null && limit > 0) ? maxAggSize : 100;
SearchAggregationNode termsAgg = SearchAggregation.terms("byTerms", groupBy, termsSize);
// top_hits fetches the latest document per group the actual entity we return to the caller.
termsAgg.addChild(SearchAggregation.topHits("latest", 1, "timestamp", "desc"));
// max_timestamp is the reference value for bucket_selector to compare against.
termsAgg.addChild(SearchAggregation.max("max_timestamp", "timestamp"));
// Re-apply content filters inside the bucket to find the latest document that also matches.
// This lets bucket_selector decide whether the group's latest doc satisfies the filters.
SearchAggregationNode filterAgg =
SearchAggregation.filter("with_content_filters", contentFilters);
filterAgg.addChild(SearchAggregation.max("max_matching_timestamp", "timestamp"));
filterAgg.addChild(SearchAggregation.valueCount("count", "timestamp"));
termsAgg.addChild(filterAgg);
// Add bucket selector to filter buckets where latest timestamp equals matching timestamp
// Discard groups where the latest document does not match the content filters.
// The check "latest_timestamp == matching_timestamp" means the most recent doc passed the
// filter.
termsAgg.addChild(
SearchAggregation.bucketSelector(
"filter_groups",
@ -463,8 +512,59 @@ public abstract class EntityTimeSeriesRepository<T extends EntityTimeSeriesInter
"latest_timestamp,matching_count,matching_timestamp",
"max_timestamp,with_content_filters>count,with_content_filters>max_matching_timestamp"));
root.addChild(termsAgg);
return SearchAggregation.fromTree(root);
if (limit != null && limit > 0) {
// Slice the surviving buckets into pages. Must run after bucket_selector.
Integer effectiveLimit = Math.min(limit, maxAggSize);
Integer effectiveOffset = offset != null ? offset : 0;
String aggSortField = mapSortFieldToAggregationField(sortField);
String aggSortOrder = sortType != null ? sortType.toLowerCase() : "desc";
termsAgg.addChild(
SearchAggregation.bucketSort(
"pagination", effectiveLimit, effectiveOffset, aggSortField, aggSortOrder));
}
rootNodes.add(termsAgg);
if (limit != null && limit > 0) {
// byTermsCount is a sibling of byTerms with identical filters but no top_hits and no
// bucket_sort. Its sole purpose is to count all post-filter groups so stats_bucket can
// compute an exact total bypassing the pagination slice applied to byTerms.
SearchAggregationNode termsCountAgg =
SearchAggregation.terms("byTermsCount", groupBy, maxAggSize);
termsCountAgg.addChild(SearchAggregation.max("max_timestamp", "timestamp"));
SearchAggregationNode countFilterAgg =
SearchAggregation.filter("with_content_filters", contentFilters);
countFilterAgg.addChild(SearchAggregation.max("max_matching_timestamp", "timestamp"));
countFilterAgg.addChild(SearchAggregation.valueCount("count", "timestamp"));
termsCountAgg.addChild(countFilterAgg);
termsCountAgg.addChild(
SearchAggregation.bucketSelector(
"filter_groups",
"if (params.matching_count == 0) return false; return params.latest_timestamp == params.matching_timestamp;",
"latest_timestamp,matching_count,matching_timestamp",
"max_timestamp,with_content_filters>count,with_content_filters>max_matching_timestamp"));
rootNodes.add(termsCountAgg);
// stats_bucket counts how many buckets survived in byTermsCount, giving an exact total
// that reflects filters but not the pagination window.
rootNodes.add(
SearchAggregation.statsBucket("total_bucket_count", "byTermsCount>max_timestamp"));
}
return rootNodes;
}
static String mapSortFieldToAggregationField(String sortField) {
if (sortField == null) {
return "max_timestamp";
}
return switch (sortField.toLowerCase()) {
case "updatedat", "timestamp", "createdat" -> "max_timestamp";
case "_key" -> "_key";
default -> "max_timestamp";
};
}
public T latestFromSearch(EntityUtil.Fields fields, SearchListFilter searchListFilter, String q)

View file

@ -325,6 +325,10 @@ public class TestCaseResultRepository extends EntityTimeSeriesRepository<TestCas
EntityUtil.Fields.EMPTY_FIELDS,
new SearchListFilter().addQueryParam("entityFQN", fqn),
"testCaseFQN.keyword",
null,
null,
null,
null,
null);
return testCaseResultResults.getData().stream()
.anyMatch(

View file

@ -88,11 +88,16 @@ public abstract class EntityTimeSeriesResource<
SearchListFilter searchListFilter,
String groupBy,
String q,
Integer limit,
Integer offset,
String sortField,
String sortType,
OperationContext operationContext,
ResourceContextInterface resourceContext)
throws IOException {
authorizer.authorize(securityContext, operationContext, resourceContext);
return repository.listLatestFromSearch(fields, searchListFilter, groupBy, q);
return repository.listLatestFromSearch(
fields, searchListFilter, groupBy, q, limit, offset, sortField, sortType);
}
public ResultList<T> listLatestFromSearch(
@ -101,11 +106,16 @@ public abstract class EntityTimeSeriesResource<
SearchListFilter searchListFilter,
String groupBy,
String q,
Integer limit,
Integer offset,
String sortField,
String sortType,
List<AuthRequest> authRequests,
AuthorizationLogic authorizationLogic)
throws IOException {
authorizer.authorizeRequests(securityContext, authRequests, authorizationLogic);
return repository.listLatestFromSearch(fields, searchListFilter, groupBy, q);
return repository.listLatestFromSearch(
fields, searchListFilter, groupBy, q, limit, offset, sortField, sortType);
}
protected T latestInternalFromSearch(

View file

@ -473,7 +473,11 @@ public class TestCaseResolutionStatusResource
searchListFilter,
"testCase.fullyQualifiedName.keyword", // Group by test case to get latest status per test
// case
null);
null,
limit,
offset,
defaultSortField,
sortType);
} else {
return repository.listFromSearchWithOffset(
new Fields(null), searchListFilter, limit, offset, searchSortFilter, null, null);

View file

@ -265,7 +265,7 @@ public class TestCaseResultResource
String entityFQN,
@Parameter(
description =
"Get the latest test case result for each test case -- requires `testSuiteId`. Offset and limit are ignored",
"Get the latest test case result for each test case -- requires `testSuiteId`",
schema =
@Schema(
type = "boolean",
@ -340,6 +340,10 @@ public class TestCaseResultResource
searchListFilter,
"testCaseFQN.keyword",
q,
limit,
offset,
"timestamp",
"desc",
authRequests,
AuthorizationLogic.ANY);
}

View file

@ -33,9 +33,16 @@ public class SearchAggregation {
* Static builder method for terms aggregation.
*/
public static SearchAggregationNode terms(String name, String field) {
return terms(name, field, 100);
}
/**
* Static builder method for terms aggregation with explicit size.
*/
public static SearchAggregationNode terms(String name, String field, int size) {
Map<String, String> value = new HashMap<>();
value.put("field", field);
value.put("size", "100");
value.put("size", String.valueOf(size));
return new SearchAggregationNode("terms", name, value);
}
@ -90,6 +97,50 @@ public class SearchAggregation {
return new SearchAggregationNode("bucket_selector", name, value);
}
/**
* Static builder method for bucket_sort aggregation.
*/
public static SearchAggregationNode bucketSort(String name, Integer size, Integer from) {
Map<String, String> value = new HashMap<>();
if (size != null) {
value.put("size", String.valueOf(size));
}
if (from != null) {
value.put("from", String.valueOf(from));
}
return new SearchAggregationNode("bucket_sort", name, value);
}
/**
* Static builder method for bucket_sort aggregation with sorting.
*/
public static SearchAggregationNode bucketSort(
String name, Integer size, Integer from, String sortField, String sortOrder) {
Map<String, String> value = new HashMap<>();
if (size != null) {
value.put("size", String.valueOf(size));
}
if (from != null) {
value.put("from", String.valueOf(from));
}
if (sortField != null) {
value.put("sort_field", sortField);
}
if (sortOrder != null) {
value.put("sort_order", sortOrder);
}
return new SearchAggregationNode("bucket_sort", name, value);
}
/**
* Static builder method for stats_bucket pipeline aggregation.
*/
public static SearchAggregationNode statsBucket(String name, String bucketsPath) {
Map<String, String> value = new HashMap<>();
value.put("buckets_path", bucketsPath);
return new SearchAggregationNode("stats_bucket", name, value);
}
/*
* Get the metadata for the aggregation results. We'll use the metadata to build the report and
* to traverse the aggregation tree. 3 types of metadata are returned:

View file

@ -4,6 +4,7 @@ import es.co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import es.co.elastic.clients.elasticsearch._types.query_dsl.Query;
import es.co.elastic.clients.json.JsonpMapper;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.openmetadata.service.search.SearchAggregationNode;
@ -35,7 +36,7 @@ public class ElasticAggregationsBuilder {
ElasticAggregations elasticAggregation = getAggregation(type);
elasticAggregation.createAggregation(node);
Map<String, Aggregation> subAggregations = new HashMap<>();
Map<String, Aggregation> subAggregations = new LinkedHashMap<>();
for (SearchAggregationNode child : node.getChildren()) {
buildAggregation(child, elasticAggregation, subAggregations);
}
@ -53,7 +54,7 @@ public class ElasticAggregationsBuilder {
if (elasticAggregation.getAggregationName() != null) {
String aggName = elasticAggregation.getAggregationName();
Map<String, Aggregation> wrappedSubAggs = new HashMap<>(subAggregations);
Map<String, Aggregation> wrappedSubAggs = new LinkedHashMap<>(subAggregations);
wrappedSubAggs.put(aggName + "_inner", elasticAggregation.getAggregation());
finalAggregation =
@ -67,6 +68,7 @@ public class ElasticAggregationsBuilder {
private ElasticAggregations getAggregation(String aggregationType) {
return switch (aggregationType) {
case "bucket_selector" -> new ElasticBucketSelectorAggregations();
case "bucket_sort" -> new ElasticBucketSortAggregations();
case "date_histogram" -> new ElasticDateHistogramAggregations();
case "terms" -> new ElasticTermsAggregations();
case "avg" -> new ElasticAvgAggregations();
@ -75,6 +77,7 @@ public class ElasticAggregationsBuilder {
case "filter" -> new ElasticFilterAggregations(mapper);
case "value_count" -> new ElasticValueCountAggregations();
case "cardinality" -> new ElasticCardinalityAggregations();
case "stats_bucket" -> new ElasticStatsBucketAggregations();
case "nested" -> new ElasticNestedAggregations();
case "top_hits" -> new ElasticTopHitsAggregations();
default -> throw new IllegalArgumentException("Invalid aggregation type: " + aggregationType);

View file

@ -0,0 +1,57 @@
package org.openmetadata.service.search.elasticsearch.aggregations;
import es.co.elastic.clients.elasticsearch._types.SortOrder;
import es.co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import es.co.elastic.clients.elasticsearch._types.aggregations.BucketSortAggregation;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.openmetadata.service.search.SearchAggregationNode;
@Setter
@Getter
public class ElasticBucketSortAggregations implements ElasticAggregations {
private final String aggregationType = "bucket_sort";
private String aggregationName;
private Aggregation aggregation;
private Map<String, Aggregation> subAggregations = new HashMap<>();
@Override
public void createAggregation(SearchAggregationNode node) {
Map<String, String> params = node.getValue();
this.aggregationName = node.getName();
String sizeStr = params.get("size");
String fromStr = params.get("from");
String sortField = params.get("sort_field");
String sortOrderStr = params.get("sort_order");
BucketSortAggregation.Builder builder = new BucketSortAggregation.Builder();
if (sizeStr != null) {
builder.size(Integer.parseInt(sizeStr));
}
if (fromStr != null) {
builder.from(Integer.parseInt(fromStr));
}
if (sortField != null && sortOrderStr != null) {
SortOrder sortOrder = sortOrderStr.equalsIgnoreCase("asc") ? SortOrder.Asc : SortOrder.Desc;
builder.sort(s -> s.field(f -> f.field(sortField).order(sortOrder)));
}
this.aggregation = Aggregation.of(a -> a.bucketSort(builder.build()));
}
@Override
public Boolean isPipelineAggregation() {
return true;
}
@Override
public void setSubAggregations(Map<String, Aggregation> subAggregations) {
this.subAggregations = subAggregations;
}
}

View file

@ -0,0 +1,40 @@
package org.openmetadata.service.search.elasticsearch.aggregations;
import es.co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import es.co.elastic.clients.elasticsearch._types.aggregations.StatsBucketAggregation;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.openmetadata.service.search.SearchAggregationNode;
@Setter
@Getter
public class ElasticStatsBucketAggregations implements ElasticAggregations {
static final String aggregationType = "stats_bucket";
private String aggregationName;
private Aggregation aggregation;
private Map<String, Aggregation> subAggregations = new HashMap<>();
@Override
public void createAggregation(SearchAggregationNode node) {
Map<String, String> params = node.getValue();
this.aggregationName = node.getName();
String bucketsPath = params.get("buckets_path");
this.aggregation =
Aggregation.of(
a ->
a.statsBucket(
StatsBucketAggregation.of(sb -> sb.bucketsPath(b -> b.single(bucketsPath)))));
}
@Override
public Boolean isPipelineAggregation() {
return true;
}
@Override
public void setSubAggregations(Map<String, Aggregation> subAggregations) {
this.subAggregations = subAggregations;
}
}

View file

@ -1,6 +1,7 @@
package org.openmetadata.service.search.opensearch.aggregations;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.openmetadata.service.search.SearchAggregationNode;
import os.org.opensearch.client.json.JsonpMapper;
@ -35,7 +36,7 @@ public class OpenAggregationsBuilder {
OpenAggregations openAggregation = getAggregation(type);
openAggregation.createAggregation(node);
Map<String, Aggregation> subAggregations = new HashMap<>();
Map<String, Aggregation> subAggregations = new LinkedHashMap<>();
for (SearchAggregationNode child : node.getChildren()) {
buildAggregation(child, openAggregation, subAggregations);
}
@ -53,7 +54,7 @@ public class OpenAggregationsBuilder {
if (openAggregation.getAggregationName() != null) {
String aggName = openAggregation.getAggregationName();
Map<String, Aggregation> wrappedSubAggs = new HashMap<>(subAggregations);
Map<String, Aggregation> wrappedSubAggs = new LinkedHashMap<>(subAggregations);
wrappedSubAggs.put(aggName + "_inner", openAggregation.getAggregation());
finalAggregation =
@ -67,6 +68,7 @@ public class OpenAggregationsBuilder {
private OpenAggregations getAggregation(String aggregationType) {
return switch (aggregationType) {
case "bucket_selector" -> new OpenBucketSelectorAggregations();
case "bucket_sort" -> new OpenBucketSortAggregations();
case "date_histogram" -> new OpenDateHistogramAggregations();
case "terms" -> new OpenTermsAggregations();
case "avg" -> new OpenAvgAggregations();
@ -75,6 +77,7 @@ public class OpenAggregationsBuilder {
case "filter" -> new OpenFilterAggregations(mapper);
case "value_count" -> new OpenValueCountAggregations();
case "cardinality" -> new OpenCardinalityAggregations();
case "stats_bucket" -> new OpenStatsBucketAggregations();
case "nested" -> new OpenNestedAggregations();
case "top_hits" -> new OpenTopHitsAggregations();
default -> throw new IllegalArgumentException("Invalid aggregation type: " + aggregationType);

View file

@ -0,0 +1,57 @@
package org.openmetadata.service.search.opensearch.aggregations;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.openmetadata.service.search.SearchAggregationNode;
import os.org.opensearch.client.opensearch._types.SortOrder;
import os.org.opensearch.client.opensearch._types.aggregations.Aggregation;
import os.org.opensearch.client.opensearch._types.aggregations.BucketSortAggregation;
@Setter
@Getter
public class OpenBucketSortAggregations implements OpenAggregations {
private final String aggregationType = "bucket_sort";
private String aggregationName;
private Aggregation aggregation;
private Map<String, Aggregation> subAggregations = new HashMap<>();
@Override
public void createAggregation(SearchAggregationNode node) {
Map<String, String> params = node.getValue();
this.aggregationName = node.getName();
String sizeStr = params.get("size");
String fromStr = params.get("from");
String sortField = params.get("sort_field");
String sortOrderStr = params.get("sort_order");
BucketSortAggregation.Builder builder = new BucketSortAggregation.Builder();
if (sizeStr != null) {
builder.size(Integer.parseInt(sizeStr));
}
if (fromStr != null) {
builder.from(Integer.parseInt(fromStr));
}
if (sortField != null && sortOrderStr != null) {
SortOrder sortOrder = sortOrderStr.equalsIgnoreCase("asc") ? SortOrder.Asc : SortOrder.Desc;
builder.sort(s -> s.field(f -> f.field(sortField).order(sortOrder)));
}
this.aggregation = Aggregation.of(a -> a.bucketSort(builder.build()));
}
@Override
public Boolean isPipelineAggregation() {
return true;
}
@Override
public void setSubAggregations(Map<String, Aggregation> subAggregations) {
this.subAggregations = subAggregations;
}
}

View file

@ -0,0 +1,40 @@
package org.openmetadata.service.search.opensearch.aggregations;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.openmetadata.service.search.SearchAggregationNode;
import os.org.opensearch.client.opensearch._types.aggregations.Aggregation;
import os.org.opensearch.client.opensearch._types.aggregations.StatsBucketAggregation;
@Setter
@Getter
public class OpenStatsBucketAggregations implements OpenAggregations {
static final String aggregationType = "stats_bucket";
private String aggregationName;
private Aggregation aggregation;
private Map<String, Aggregation> subAggregations = new HashMap<>();
@Override
public void createAggregation(SearchAggregationNode node) {
Map<String, String> params = node.getValue();
this.aggregationName = node.getName();
String bucketsPath = params.get("buckets_path");
this.aggregation =
Aggregation.of(
a ->
a.statsBucket(
StatsBucketAggregation.of(sb -> sb.bucketsPath(b -> b.single(bucketsPath)))));
}
@Override
public Boolean isPipelineAggregation() {
return true;
}
@Override
public void setSubAggregations(Map<String, Aggregation> subAggregations) {
this.subAggregations = subAggregations;
}
}

View file

@ -0,0 +1,196 @@
package org.openmetadata.service.jdbi3;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.openmetadata.service.search.SearchAggregationNode;
class EntityTimeSeriesRepositoryPaginationTest {
private static final String GROUP_BY = "testCase.fullyQualifiedName.keyword";
private static final String CONTENT_FILTERS = "{\"match_all\":{}}";
private static final int MAX_AGG_SIZE = 10000;
@Test
void testBucketSortPresentWhenPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE);
boolean hasBucketSort =
byTerms(nodes).getChildren().stream().anyMatch(n -> "bucket_sort".equals(n.getType()));
assertTrue(hasBucketSort, "Should include bucket_sort when limit > 0");
}
@Test
void testBucketSortAbsentWhenNotPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE);
boolean hasBucketSort =
byTerms(nodes).getChildren().stream().anyMatch(n -> "bucket_sort".equals(n.getType()));
assertFalse(hasBucketSort, "Should not include bucket_sort when limit is null");
}
@Test
void testTermsSizeIsMaxAggSizeWhenPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE);
assertEquals(
String.valueOf(MAX_AGG_SIZE),
byTerms(nodes).getValue().get("size"),
"byTerms size must be maxAggSize when paginating so bucket_sort has enough buckets to slice");
}
@Test
void testTermsSizeIsDefaultWhenNotPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE);
assertEquals(
"100",
byTerms(nodes).getValue().get("size"),
"byTerms size should be 100 when not paginating");
}
@Test
void testByTermsCountPresentWhenPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE);
SearchAggregationNode byTermsCount = byTermsCount(nodes);
boolean hasMaxTimestamp =
byTermsCount.getChildren().stream()
.anyMatch(n -> "max".equals(n.getType()) && "max_timestamp".equals(n.getName()));
assertTrue(
hasMaxTimestamp, "byTermsCount must have max_timestamp for stats_bucket to reference");
}
@Test
void testByTermsCountHasNoBucketSort() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE);
boolean hasBucketSort =
byTermsCount(nodes).getChildren().stream().anyMatch(n -> "bucket_sort".equals(n.getType()));
assertFalse(
hasBucketSort,
"byTermsCount must not paginate — it needs all post-filter buckets visible to stats_bucket");
}
@Test
void testByTermsCountAbsentWhenNotPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE);
boolean hasByTermsCount =
nodes.stream()
.anyMatch(n -> "terms".equals(n.getType()) && "byTermsCount".equals(n.getName()));
assertFalse(hasByTermsCount, "byTermsCount should not be built when not paginating");
}
@Test
void testStatsBucketPresentWhenPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15, 0, null, null, MAX_AGG_SIZE);
SearchAggregationNode statsBucket =
nodes.stream()
.filter(
n -> "stats_bucket".equals(n.getType()) && "total_bucket_count".equals(n.getName()))
.findFirst()
.orElseThrow(() -> new AssertionError("stats_bucket#total_bucket_count not found"));
assertEquals(
"byTermsCount>max_timestamp",
statsBucket.getValue().get("buckets_path"),
"stats_bucket must point to byTermsCount>max_timestamp to count post-filter groups");
}
@Test
void testStatsBucketAbsentWhenNotPaginating() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, null, null, null, null, MAX_AGG_SIZE);
boolean hasStatsBucket = nodes.stream().anyMatch(n -> "stats_bucket".equals(n.getType()));
assertFalse(hasStatsBucket, "stats_bucket should not be built when not paginating");
}
@Test
void testLimitCappedAtMaxAggSize() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15000, 0, null, null, MAX_AGG_SIZE);
assertEquals(
String.valueOf(MAX_AGG_SIZE),
bucketSort(nodes).getValue().get("size"),
"Limit must be capped at maxAggSize");
}
@Test
void testLimitBelowMaxAggSizeIsPreserved() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 500, 0, null, null, MAX_AGG_SIZE);
assertEquals(
"500",
bucketSort(nodes).getValue().get("size"),
"Limit below maxAggSize should not be capped");
}
@Test
void testOffsetDefaultsToZeroWhenNull() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15, null, null, null, MAX_AGG_SIZE);
assertEquals("0", bucketSort(nodes).getValue().get("from"), "Null offset should default to 0");
}
@Test
void testOffsetPreservedWhenProvided() {
List<SearchAggregationNode> nodes =
EntityTimeSeriesRepository.buildAggregationNodes(
GROUP_BY, CONTENT_FILTERS, 15, 25, null, null, MAX_AGG_SIZE);
assertEquals(
"25",
bucketSort(nodes).getValue().get("from"),
"Offset should be passed through to bucket_sort");
}
private static SearchAggregationNode byTerms(List<SearchAggregationNode> nodes) {
return nodes.stream()
.filter(n -> "terms".equals(n.getType()) && "byTerms".equals(n.getName()))
.findFirst()
.orElseThrow(() -> new AssertionError("byTerms aggregation not found"));
}
private static SearchAggregationNode byTermsCount(List<SearchAggregationNode> nodes) {
return nodes.stream()
.filter(n -> "terms".equals(n.getType()) && "byTermsCount".equals(n.getName()))
.findFirst()
.orElseThrow(() -> new AssertionError("byTermsCount aggregation not found"));
}
private static SearchAggregationNode bucketSort(List<SearchAggregationNode> nodes) {
return byTerms(nodes).getChildren().stream()
.filter(n -> "bucket_sort".equals(n.getType()))
.findFirst()
.orElseThrow(() -> new AssertionError("bucket_sort not found in byTerms"));
}
}

View file

@ -0,0 +1,64 @@
package org.openmetadata.service.search;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.Map;
import org.junit.jupiter.api.Test;
class SearchAggregationTest {
@Test
void testBucketSortCreatesCorrectNode() {
SearchAggregationNode node = SearchAggregation.bucketSort("test_sort", 10, 20);
assertNotNull(node);
assertEquals("bucket_sort", node.getType());
assertEquals("test_sort", node.getName());
Map<String, String> value = node.getValue();
assertEquals("10", value.get("size"));
assertEquals("20", value.get("from"));
}
@Test
void testBucketSortWithSorting() {
SearchAggregationNode node =
SearchAggregation.bucketSort("test_sort", 15, 0, "max_timestamp", "desc");
assertNotNull(node);
assertEquals("bucket_sort", node.getType());
assertEquals("test_sort", node.getName());
Map<String, String> value = node.getValue();
assertEquals("15", value.get("size"));
assertEquals("0", value.get("from"));
assertEquals("max_timestamp", value.get("sort_field"));
assertEquals("desc", value.get("sort_order"));
}
@Test
void testBucketSortWithNullValues() {
SearchAggregationNode node = SearchAggregation.bucketSort("test_sort", null, null);
assertNotNull(node);
assertEquals("bucket_sort", node.getType());
assertEquals("test_sort", node.getName());
Map<String, String> value = node.getValue();
assertEquals(0, value.size());
}
@Test
void testStatsBucketCreatesCorrectNode() {
SearchAggregationNode node =
SearchAggregation.statsBucket("total_bucket_count", "byTermsCount>max_timestamp");
assertNotNull(node);
assertEquals("stats_bucket", node.getType());
assertEquals("total_bucket_count", node.getName());
Map<String, String> value = node.getValue();
assertEquals("byTermsCount>max_timestamp", value.get("buckets_path"));
}
}