fix(search): column bulk operations search not returning results at scale (#27216)

* fix(search): column bulk operations search not returning results at scale

When searching by column name pattern (e.g., "MAT") in column bulk
operations, the composite aggregation returned ALL column names from
matching documents, then post-filtered in Java. With 20000+ columns,
the first composite page of 25 names rarely contained matches, so
users saw 0 results.

Switch to terms aggregation with `include` regex when a search pattern
is set. This filters at the ES/OS aggregation level — only matching
column names produce buckets. Two-phase approach: (1) lightweight
names query to get all matching names + accurate total, (2) targeted
data query with top_hits for the current page only.

* test(search): add integration tests for column pattern search regex against ES
This commit is contained in:
sonika-shah 2026-04-29 02:51:07 +05:30 committed by GitHub
parent a2a0c8ee54
commit 3d88c994e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 1508 additions and 302 deletions

View file

@ -7,6 +7,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import org.junit.jupiter.api.BeforeAll;
@ -14,6 +16,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.junit.jupiter.api.parallel.Resources;
import org.openmetadata.it.factories.DashboardServiceTestFactory;
import org.openmetadata.it.factories.DatabaseSchemaTestFactory;
import org.openmetadata.it.factories.DatabaseServiceTestFactory;
@ -1471,6 +1476,581 @@ public class ColumnGridResourceIT {
.until(() -> true);
}
@Test
void test_getColumnGrid_patternSearchIsCaseInsensitive(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
String colName = ns.prefix("CaseMixCol");
Column col = Columns.build(colName).withType(ColumnDataType.VARCHAR).withLength(255).create();
Tables.create()
.name(ns.prefix("case_test_table"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(col))
.execute();
waitForSearchIndexRefresh();
await("Wait for lowercase pattern search to find mixed-case column")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse lowerResponse =
getColumnGrid(
client,
"entityTypes=table&columnNamePattern=casemixcol&serviceName="
+ service.getName());
assertNotNull(lowerResponse);
assertTrue(
lowerResponse.getColumns().stream()
.anyMatch(c -> c.getColumnName().equals(colName)),
"Lowercase search should find the mixed-case column");
});
await("Wait for uppercase pattern search to find mixed-case column")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse upperResponse =
getColumnGrid(
client,
"entityTypes=table&columnNamePattern=CASEMIXCOL&serviceName="
+ service.getName());
assertNotNull(upperResponse);
assertTrue(
upperResponse.getColumns().stream()
.anyMatch(c -> c.getColumnName().equals(colName)),
"Uppercase search should find the mixed-case column");
});
}
@Test
void test_getColumnGrid_patternSearchExcludesNonMatching(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
String matchCol = ns.prefix("regex_target");
String noMatchCol = ns.prefix("other_field");
Column col1 = Columns.build(matchCol).withType(ColumnDataType.VARCHAR).withLength(255).create();
Column col2 =
Columns.build(noMatchCol).withType(ColumnDataType.VARCHAR).withLength(255).create();
Tables.create()
.name(ns.prefix("regex_exclude_table"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(col1, col2))
.execute();
waitForSearchIndexRefresh();
await("Wait for pattern search to exclude non-matching columns")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse response =
getColumnGrid(
client,
"entityTypes=table&columnNamePattern=regex_target&serviceName="
+ service.getName());
assertNotNull(response);
assertTrue(
response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)),
"Matching column should be in results");
assertFalse(
response.getColumns().stream()
.anyMatch(c -> c.getColumnName().equals(noMatchCol)),
"Non-matching column from same table should be excluded");
});
}
@Test
void test_getColumnGrid_patternSearchWithSpecialChars(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
String colWithDot = ns.prefix("col.with.dots");
String colNoDot = ns.prefix("colXwithXdots");
Column col1 =
Columns.build(colWithDot).withType(ColumnDataType.VARCHAR).withLength(255).create();
Column col2 = Columns.build(colNoDot).withType(ColumnDataType.VARCHAR).withLength(255).create();
Tables.create()
.name(ns.prefix("special_char_table"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(col1, col2))
.execute();
waitForSearchIndexRefresh();
// Search for "col.with" dot should be literal, not wildcard
await("Wait for pattern search with special chars")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse response =
getColumnGrid(
client,
"entityTypes=table&columnNamePattern=col.with&serviceName="
+ service.getName());
assertNotNull(response);
assertTrue(
response.getColumns().stream()
.anyMatch(c -> c.getColumnName().equals(colWithDot)),
"Column with literal dot should match");
assertFalse(
response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(colNoDot)),
"Column without dot should not match — dot must be literal, not wildcard");
});
}
@Test
void test_getColumnGrid_patternPlusTagFilter(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
TagLabel piiTag = new TagLabel();
piiTag.setTagFQN("PII.Sensitive");
piiTag.setSource(TagLabel.TagSource.CLASSIFICATION);
piiTag.setLabelType(TagLabel.LabelType.MANUAL);
piiTag.setState(TagLabel.State.CONFIRMED);
String taggedMatchCol = ns.prefix("pat_tag_match");
String taggedNoMatchCol = ns.prefix("pat_tag_other");
String untaggedMatchCol = ns.prefix("pat_tag_match_notag");
// Table 1: tagged column matching pattern + tagged column NOT matching pattern
Column col1 =
Columns.build(taggedMatchCol)
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.withTags(List.of(piiTag))
.create();
Column col2 =
Columns.build(taggedNoMatchCol)
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.withTags(List.of(piiTag))
.create();
Tables.create()
.name(ns.prefix("pat_tag_table_1"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(col1, col2))
.execute();
// Table 2: untagged column whose name also matches the pattern
Column col3 =
Columns.build(untaggedMatchCol).withType(ColumnDataType.VARCHAR).withLength(255).create();
Tables.create()
.name(ns.prefix("pat_tag_table_2"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(col3))
.execute();
waitForSearchIndexRefresh();
await("Wait for pattern + tag filter result")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse response =
getColumnGrid(
client,
"entityTypes=table&tags=PII.Sensitive&columnNamePattern=pat_tag_match&serviceName="
+ service.getName());
assertNotNull(response);
// Should find taggedMatchCol (matches pattern AND has tag)
// Should NOT find taggedNoMatchCol (has tag but doesn't match pattern)
// Should NOT find untaggedMatchCol (matches pattern but no tag)
boolean foundTaggedMatch = false;
boolean foundTaggedNoMatch = false;
boolean foundUntaggedMatch = false;
for (ColumnGridItem item : response.getColumns()) {
if (item.getColumnName().equals(taggedMatchCol)) {
foundTaggedMatch = true;
}
if (item.getColumnName().equals(taggedNoMatchCol)) {
foundTaggedNoMatch = true;
}
if (item.getColumnName().equals(untaggedMatchCol)) {
foundUntaggedMatch = true;
}
}
assertTrue(
foundTaggedMatch, "Column with tag AND matching pattern should be in results");
assertFalse(
foundTaggedNoMatch,
"Column with tag but NOT matching pattern should be excluded");
assertFalse(
foundUntaggedMatch, "Column matching pattern but WITHOUT tag should be excluded");
});
}
@Test
void test_getColumnGrid_patternPlusGlossaryFilter(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
Glossary glossary = createGlossary(client, ns, "PG");
GlossaryTerm term = createGlossaryTerm(client, glossary, ns, "PT");
TagLabel glossaryTag = new TagLabel();
glossaryTag.setTagFQN(term.getFullyQualifiedName());
glossaryTag.setSource(TagLabel.TagSource.GLOSSARY);
glossaryTag.setLabelType(TagLabel.LabelType.MANUAL);
glossaryTag.setState(TagLabel.State.CONFIRMED);
String matchCol = ns.prefix("pg_match_col");
String noMatchCol = ns.prefix("pg_other_col");
Column col1 =
Columns.build(matchCol)
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.withTags(List.of(glossaryTag))
.create();
Column col2 =
Columns.build(noMatchCol)
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.withTags(List.of(glossaryTag))
.create();
Tables.create()
.name(ns.prefix("pg_table"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(col1, col2))
.execute();
waitForSearchIndexRefresh();
await("Wait for pattern + glossary filter result")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse response =
getColumnGrid(
client,
"entityTypes=table&glossaryTerms="
+ term.getFullyQualifiedName()
+ "&columnNamePattern=pg_match&serviceName="
+ service.getName());
assertNotNull(response);
assertTrue(
response.getColumns().stream().anyMatch(c -> c.getColumnName().equals(matchCol)),
"Column matching both pattern and glossary should be in results");
assertFalse(
response.getColumns().stream()
.anyMatch(c -> c.getColumnName().equals(noMatchCol)),
"Column with glossary but not matching pattern should be excluded");
});
}
@Test
void test_getColumnGrid_tagFilterPaginationConsistency(TestNamespace ns) throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
TagLabel piiTag = new TagLabel();
piiTag.setTagFQN("PII.Sensitive");
piiTag.setSource(TagLabel.TagSource.CLASSIFICATION);
piiTag.setLabelType(TagLabel.LabelType.MANUAL);
piiTag.setState(TagLabel.State.CONFIRMED);
// Create 5 tables, each with a uniquely-named tagged column
for (int i = 0; i < 5; i++) {
Column col =
Columns.build(ns.prefix("pagcon_col_" + i))
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.withTags(List.of(piiTag))
.create();
Tables.create()
.name(ns.prefix("pagcon_table_" + i))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(col))
.execute();
}
waitForSearchIndexRefresh();
// Page through with size=2 should get 2, 2, 1
// Use serviceName to scope to this test's data, raw pattern prefix to match column names
String baseQuery =
"entityTypes=table&tags=PII.Sensitive&columnNamePattern=pagcon&serviceName="
+ service.getName()
+ "&size=2";
await("Wait for all 5 tagged columns to be indexed")
.atMost(Duration.ofSeconds(45))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse first = getColumnGrid(client, baseQuery);
assertNotNull(first);
assertEquals(5, first.getTotalUniqueColumns(), "Should report 5 unique columns");
});
ColumnGridResponse page1 = getColumnGrid(client, baseQuery);
assertEquals(2, page1.getColumns().size(), "Page 1 should have exactly 2 columns");
assertNotNull(page1.getCursor(), "Page 1 should have a cursor for next page");
ColumnGridResponse page2 =
getColumnGrid(
client,
baseQuery + "&cursor=" + URLEncoder.encode(page1.getCursor(), StandardCharsets.UTF_8));
assertEquals(2, page2.getColumns().size(), "Page 2 should have exactly 2 columns");
assertNotNull(page2.getCursor(), "Page 2 should have a cursor for next page");
ColumnGridResponse page3 =
getColumnGrid(
client,
baseQuery + "&cursor=" + URLEncoder.encode(page2.getCursor(), StandardCharsets.UTF_8));
assertEquals(1, page3.getColumns().size(), "Page 3 (last) should have exactly 1 column");
// Verify no duplicates across pages
java.util.Set<String> allNames = new java.util.HashSet<>();
for (ColumnGridItem item : page1.getColumns()) {
assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName());
}
for (ColumnGridItem item : page2.getColumns()) {
assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName());
}
for (ColumnGridItem item : page3.getColumns()) {
assertTrue(allNames.add(item.getColumnName()), "Duplicate found: " + item.getColumnName());
}
assertEquals(5, allNames.size(), "Should have collected all 5 unique columns across pages");
}
@Test
void test_getColumnGrid_glossaryFilter_onlyReturnsGlossaryOccurrences(TestNamespace ns)
throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
Glossary glossary = createGlossary(client, ns, "OG");
GlossaryTerm term = createGlossaryTerm(client, glossary, ns, "OT");
TagLabel glossaryTag = new TagLabel();
glossaryTag.setTagFQN(term.getFullyQualifiedName());
glossaryTag.setSource(TagLabel.TagSource.GLOSSARY);
glossaryTag.setLabelType(TagLabel.LabelType.MANUAL);
glossaryTag.setState(TagLabel.State.CONFIRMED);
String sharedName = ns.prefix("gocc_col");
// Table 1: column WITH glossary term
Column withGlossary =
Columns.build(sharedName)
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.withDescription("Has glossary")
.withTags(List.of(glossaryTag))
.create();
Tables.create()
.name(ns.prefix("gocc_t1"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(withGlossary))
.execute();
// Table 2: same column name WITHOUT glossary term
Column withoutGlossary =
Columns.build(sharedName)
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.withDescription("No glossary")
.create();
Tables.create()
.name(ns.prefix("gocc_t2"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(withoutGlossary))
.execute();
waitForSearchIndexRefresh();
await("Wait for glossary-filtered column to return the tagged occurrence only")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse response =
getColumnGrid(
client,
"entityTypes=table&glossaryTerms="
+ term.getFullyQualifiedName()
+ "&serviceName="
+ service.getName());
assertNotNull(response);
assertNotNull(response.getColumns());
ColumnGridItem sharedItem =
response.getColumns().stream()
.filter(item -> item.getColumnName().equals(sharedName))
.findFirst()
.orElse(null);
assertNotNull(
sharedItem,
"Expected '" + sharedName + "' to be present in the glossary-filtered response");
assertEquals(
1,
sharedItem.getTotalOccurrences(),
"Should only return the occurrence WITH the glossary term, not all with same name");
});
}
@Test
void test_getColumnGrid_patternSearchAcrossEntityTypesDedupesNames(TestNamespace ns)
throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService dbService = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, dbService);
String sharedName = ns.prefix("multi_type_col");
Column tableCol =
Columns.build(sharedName).withType(ColumnDataType.VARCHAR).withLength(255).create();
Tables.create()
.name(ns.prefix("multi_type_table"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(List.of(tableCol))
.execute();
DashboardService dashService = DashboardServiceTestFactory.createMetabase(ns);
Column dashCol =
Columns.build(sharedName).withType(ColumnDataType.VARCHAR).withLength(255).create();
DashboardDataModels.create()
.name(ns.prefix("multi_type_datamodel"))
.in(dashService.getFullyQualifiedName())
.withColumns(List.of(dashCol))
.withDataModelType(DataModelType.MetabaseDataModel)
.execute();
waitForSearchIndexRefresh();
await("Wait for both entities to be indexed and dedupe correctly")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse response =
getColumnGrid(
client,
"entityTypes=table,dashboardDataModel&columnNamePattern=multi_type_col");
assertNotNull(response);
long matches =
response.getColumns().stream()
.filter(c -> c.getColumnName().equals(sharedName))
.count();
assertEquals(
1, matches, "Same column name in two entity types must dedupe to one grid entry");
ColumnGridItem item =
response.getColumns().stream()
.filter(c -> c.getColumnName().equals(sharedName))
.findFirst()
.orElseThrow();
assertEquals(
2,
item.getTotalOccurrences(),
"Per-column occurrences must include both entity types");
assertTrue(
response.getTotalOccurrences() >= 2,
"Response totalOccurrences must include both entity-type buckets");
});
}
@Test
@ResourceLock(value = Resources.GLOBAL, mode = ResourceAccessMode.READ_WRITE)
void test_getColumnGrid_patternSearchFindsAlphabeticallyLateColumn(TestNamespace ns)
throws Exception {
OpenMetadataClient client = SdkClients.adminClient();
DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns);
DatabaseSchema schema = DatabaseSchemaTestFactory.createSimple(ns, service);
// Match (zzz_target) at position 50 with size=25 old code returns 0 on page 1, new code finds
// it.
int columnCount = 50;
String matchedColumn = ns.prefix("zzz_target");
java.util.List<Column> columns = new java.util.ArrayList<>();
for (int i = 0; i < columnCount - 1; i++) {
columns.add(
Columns.build(ns.prefix(String.format("aaa_filler_%02d", i)))
.withType(ColumnDataType.VARCHAR)
.withLength(255)
.create());
}
columns.add(
Columns.build(matchedColumn).withType(ColumnDataType.VARCHAR).withLength(255).create());
Table table =
Tables.create()
.name(ns.prefix("scale_search_table"))
.inSchema(schema.getFullyQualifiedName())
.withColumns(columns)
.execute();
try {
waitForSearchIndexRefresh();
await("Wait for first-page search to surface alphabetically-late match (size=25)")
.atMost(Duration.ofSeconds(45))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ColumnGridResponse response =
getColumnGrid(
client,
"entityTypes=table&columnNamePattern=zzz_target&size=25&serviceName="
+ service.getName());
assertNotNull(response);
assertTrue(
response.getColumns().stream()
.anyMatch(c -> c.getColumnName().equals(matchedColumn)),
"First page must contain the alphabetically-late matching column "
+ "(this exercises the original bug fix — composite agg would have hidden it)");
assertEquals(
1,
response.getTotalUniqueColumns(),
"Only one unique column matches the pattern");
});
} finally {
java.util.Map<String, String> params = new java.util.HashMap<>();
params.put("hardDelete", "true");
try {
SdkClients.adminClient().tables().delete(table.getId().toString(), params);
} catch (Exception ignored) {
}
}
}
private void waitForColumnToBeIndexed(
OpenMetadataClient client, String columnName, String serviceName) {
await()

View file

@ -13,14 +13,111 @@
package org.openmetadata.service.search;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.openmetadata.schema.api.data.ColumnGridResponse;
import org.openmetadata.schema.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface ColumnAggregator {
Logger LOG = LoggerFactory.getLogger(ColumnAggregator.class);
/** Max column names to retrieve in the names-only query during pattern search. */
int MAX_PATTERN_SEARCH_NAMES = 10000;
/**
* Number of sample docs pulled per column-name bucket to populate occurrences. Caps
* {@code ColumnGridItem.totalOccurrences}; columns appearing in more entities than this
* undercount.
*/
int SAMPLE_DOCS_PER_COLUMN = 100;
/** Aggregation names used in pattern-search queries (ES + OS). */
String AGG_MATCHING_COLUMNS = "matching_columns";
String AGG_PAGE_COLUMNS = "page_columns";
String AGG_SAMPLE_DOCS = "sample_docs";
String AGG_KEY_ORDER = "_key";
/** Cursor payload key for the offset-based search/tag pagination cursor. */
String CURSOR_SEARCH_OFFSET = "searchOffset";
TypeReference<Map<String, Object>> CURSOR_TYPE = new TypeReference<>() {};
ColumnGridResponse aggregateColumns(ColumnAggregationRequest request) throws IOException;
/**
* Convert a plain text pattern to a case-insensitive regex for ES/OS terms include. Lucene regex
* does not support (?i), so each letter is expanded to a character class: "MAT" → [mM][aA][tT].
*/
static String toCaseInsensitiveRegex(String pattern) {
StringBuilder sb = new StringBuilder(".*");
for (char c : pattern.toCharArray()) {
if (Character.isLetter(c)) {
sb.append('[')
.append(Character.toLowerCase(c))
.append(Character.toUpperCase(c))
.append(']');
} else if (".+*?|[](){}^$\\~@&#<>\"".indexOf(c) >= 0) {
sb.append('\\').append(c);
} else {
sb.append(c);
}
}
sb.append(".*");
return sb.toString();
}
/** Encode an offset into the search/tag pagination cursor (base64 JSON). */
static String encodeSearchOffset(int offset) {
try {
String json = JsonUtils.pojoToJson(Map.of(CURSOR_SEARCH_OFFSET, offset));
return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
LOG.error("Failed to encode search offset", e);
return null;
}
}
/** Decode the search/tag pagination cursor; restart at 0 for malformed input. */
static int decodeSearchOffset(String cursor) {
if (cursor == null) {
return 0;
}
try {
String json = new String(Base64.getDecoder().decode(cursor), StandardCharsets.UTF_8);
Map<String, Object> map = JsonUtils.readValue(json, CURSOR_TYPE);
Object offset = map.get(CURSOR_SEARCH_OFFSET);
if (offset instanceof Number num) {
return num.intValue();
}
return 0;
} catch (Exception e) {
LOG.debug("Failed to decode search offset cursor, restarting from page 1", e);
return 0;
}
}
/** Saturating long → int cast for response totals. Caps at Integer.MAX_VALUE. */
static int toIntSaturating(long value) {
if (value > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
if (value < 0) {
return 0;
}
return (int) value;
}
/** Phase 1 result: matching column names and the total doc_count summed across buckets. */
record NamesWithCount(List<String> names, long totalDocCount) {}
class ColumnAggregationRequest {
private int size = 1000;
private String cursor;

View file

@ -15,6 +15,7 @@ package org.openmetadata.service.search.elasticsearch;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import es.co.elastic.clients.elasticsearch.ElasticsearchClient;
import es.co.elastic.clients.elasticsearch._types.ElasticsearchException;
@ -24,6 +25,8 @@ import es.co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeAggregate;
import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeAggregationSource;
import es.co.elastic.clients.elasticsearch._types.aggregations.CompositeBucket;
import es.co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
import es.co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
import es.co.elastic.clients.elasticsearch._types.aggregations.TopHitsAggregate;
import es.co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import es.co.elastic.clients.elasticsearch._types.query_dsl.Query;
@ -31,6 +34,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchRequest;
import es.co.elastic.clients.elasticsearch.core.SearchResponse;
import es.co.elastic.clients.elasticsearch.core.search.Hit;
import es.co.elastic.clients.json.JsonData;
import es.co.elastic.clients.util.NamedValue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -38,8 +42,11 @@ import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.data.ColumnGridItem;
import org.openmetadata.schema.api.data.ColumnGridResponse;
@ -87,24 +94,33 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
List<String> entityTypes = getEntityTypesForRequest(request);
// Two-phase query for tags/glossaryTerms filtering:
// Phase 1: Find entityFQN#columnName pairs that have the tag
// Phase 2: Filter to only return those specific occurrences
// Tag/glossary filter path: we must read _source to check which specific column has
// the tag (ES flat object mapping can't tell us). Since we're already reading _source,
// we extract full column metadata in the same pass no separate data-fetch query needed.
boolean hasTagFilter =
!nullOrEmpty(request.getTags()) || !nullOrEmpty(request.getGlossaryTerms());
Set<String> entityColumnPairsWithTags = null;
Set<String> columnNamesWithTags = null;
if (hasTagFilter) {
entityColumnPairsWithTags = getEntityColumnPairsWithTags(request, entityTypes);
if (entityColumnPairsWithTags.isEmpty()) {
Map<String, List<ColumnWithContext>> taggedColumns =
getColumnsWithTagsFromSource(request, entityTypes);
if (taggedColumns.isEmpty()) {
return buildResponse(new ArrayList<>(), null, false, 0, 0);
}
// Also keep just the column names for the Phase 2 query filter
columnNamesWithTags =
entityColumnPairsWithTags.stream()
.map(pair -> pair.substring(pair.indexOf('#') + 1))
.collect(java.util.stream.Collectors.toSet());
// Pattern + tag combined: filter the already-fetched columns by pattern in Java
if (!nullOrEmpty(request.getColumnNamePattern())) {
String pattern = request.getColumnNamePattern().toLowerCase(Locale.ROOT);
taggedColumns
.entrySet()
.removeIf(e -> !e.getKey().toLowerCase(Locale.ROOT).contains(pattern));
}
return aggregateColumnsWithKnownNames(request, taggedColumns);
}
// Pattern-only path (no tag filter): use terms agg with include regex
if (!nullOrEmpty(request.getColumnNamePattern())) {
return aggregateColumnsWithPattern(request, entityTypes);
}
Map<String, List<ColumnWithContext>> allColumnsByName = new HashMap<>();
@ -124,43 +140,15 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
String columnFieldPath = INDEX_CONFIGS.get(groupEntityTypes.getFirst()).columnFieldPath();
// Phase 2: Build query WITHOUT tag filter but WITH column names filter
List<String> columnNamesList =
columnNamesWithTags != null ? new ArrayList<>(columnNamesWithTags) : null;
Query query = buildFilters(request, columnNameKeyword, columnNamesList);
Query query = buildFilters(request, columnNameKeyword, null);
try {
SearchResponse<JsonData> response =
executeSearch(request, query, indexes, columnNameKeyword);
Map<String, List<ColumnWithContext>> columnsByName =
parseAggregationResults(response, columnFieldPath);
parseCompositeAggResults(response, columnFieldPath);
// Post-filter columns by name pattern since ES aggregation returns all columns from matched
// documents
String columnNamePattern = request.getColumnNamePattern();
if (!nullOrEmpty(columnNamePattern)) {
columnsByName
.entrySet()
.removeIf(e -> !matchesColumnNamePattern(e.getKey(), columnNamePattern));
}
// Post-filter for tag/glossary terms filtering: Only keep occurrences that were
// identified in Phase 1 as having the tag (not just same column name)
if (entityColumnPairsWithTags != null && !entityColumnPairsWithTags.isEmpty()) {
final Set<String> allowedPairs = entityColumnPairsWithTags;
for (List<ColumnWithContext> occurrences : columnsByName.values()) {
occurrences.removeIf(
ctx -> {
String key = ctx.entityFQN + "#" + ctx.column.getName();
return !allowedPairs.contains(key);
});
}
// Remove column entries that have no occurrences left
columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty());
}
// Merge results
for (Map.Entry<String, List<ColumnWithContext>> colEntry : columnsByName.entrySet()) {
allColumnsByName
.computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>())
@ -173,9 +161,7 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
hasMore = true;
}
// Get totals only on first page and only when no column name pattern
// (ES aggregation counts all columns from matched docs, not just filtered ones)
if (request.getCursor() == null && nullOrEmpty(request.getColumnNamePattern())) {
if (request.getCursor() == null) {
Map<String, Long> totals = getTotalCounts(query, indexes, columnNameKeyword);
totalUniqueColumns += totals.get("uniqueColumns");
totalOccurrences += totals.get("totalOccurrences");
@ -191,10 +177,7 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
List<ColumnGridItem> gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName);
// Calculate totals from actual filtered data when:
// - On subsequent pages (cursor is set)
// - When column name pattern is specified (ES aggregation includes non-matching columns)
if (request.getCursor() != null || !nullOrEmpty(request.getColumnNamePattern())) {
if (request.getCursor() != null) {
totalUniqueColumns = allColumnsByName.size();
totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum();
}
@ -204,13 +187,128 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
}
/**
* Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens
* arrays, we must fetch column data and filter in Java to find columns that actually have the
* tag.
* Pattern-only search path (no tag filter): uses terms aggregation with include regex to filter
* column names at the aggregation level. Two queries per entity-type group: (1) lightweight names
* query to get all matching names and total count, (2) targeted data query with top_hits for the
* current page.
*/
private Set<String> getEntityColumnPairsWithTags(
private ColumnGridResponse aggregateColumnsWithPattern(
ColumnAggregationRequest request, List<String> entityTypes) throws IOException {
Set<String> entityColumnPairs = new HashSet<>();
Map<String, List<String>> fieldPathToEntityTypes = groupByFieldPath(entityTypes);
String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern());
Set<String> allMatchingNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
long totalOccurrencesAcrossGroups = 0;
for (Map.Entry<String, List<String>> entry : fieldPathToEntityTypes.entrySet()) {
String columnNameKeyword = entry.getKey();
List<String> indexes = resolveIndexNames(entry.getValue());
Query query = buildFilters(request, columnNameKeyword, null);
try {
NamesWithCount result = executeNamesQuery(query, indexes, columnNameKeyword, regex);
allMatchingNames.addAll(result.names());
totalOccurrencesAcrossGroups += result.totalDocCount();
} catch (ElasticsearchException e) {
if (!isIndexNotFoundException(e)) {
throw e;
}
}
}
int totalUniqueColumns = allMatchingNames.size();
int totalOccurrences = ColumnAggregator.toIntSaturating(totalOccurrencesAcrossGroups);
int offset = ColumnAggregator.decodeSearchOffset(request.getCursor());
int pageSize = request.getSize();
List<String> sortedNames = new ArrayList<>(allMatchingNames);
int fromIndex = Math.min(offset, sortedNames.size());
int toIndex = Math.min(offset + pageSize, sortedNames.size());
List<String> pageNames = sortedNames.subList(fromIndex, toIndex);
if (pageNames.isEmpty()) {
return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences);
}
Map<String, List<ColumnWithContext>> allColumnsByName = new HashMap<>();
for (Map.Entry<String, List<String>> entry : fieldPathToEntityTypes.entrySet()) {
String columnNameKeyword = entry.getKey();
List<String> indexes = resolveIndexNames(entry.getValue());
String columnFieldPath = INDEX_CONFIGS.get(entry.getValue().getFirst()).columnFieldPath();
Query query = buildFilters(request, columnNameKeyword, null);
try {
Map<String, List<ColumnWithContext>> columnsByName =
executePageDataQuery(query, indexes, columnNameKeyword, columnFieldPath, pageNames);
for (Map.Entry<String, List<ColumnWithContext>> colEntry : columnsByName.entrySet()) {
allColumnsByName
.computeIfAbsent(colEntry.getKey(), k -> new ArrayList<>())
.addAll(colEntry.getValue());
}
} catch (ElasticsearchException e) {
if (!isIndexNotFoundException(e)) {
throw e;
}
}
}
List<ColumnGridItem> gridItems = ColumnMetadataGrouper.groupColumns(allColumnsByName);
boolean hasMore = toIndex < totalUniqueColumns;
String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null;
return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences);
}
/**
* Tag/glossary filter path: the tag-check pass already extracted full column metadata from
* _source (only tagged columns are in the map). Just paginate over the in-memory result.
*/
private ColumnGridResponse aggregateColumnsWithKnownNames(
ColumnAggregationRequest request, Map<String, List<ColumnWithContext>> taggedColumns) {
int totalUniqueColumns = taggedColumns.size();
int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum();
int offset = ColumnAggregator.decodeSearchOffset(request.getCursor());
int pageSize = request.getSize();
List<String> sortedNames = new ArrayList<>(taggedColumns.keySet());
int fromIndex = Math.min(offset, sortedNames.size());
int toIndex = Math.min(offset + pageSize, sortedNames.size());
List<String> pageNames = sortedNames.subList(fromIndex, toIndex);
if (pageNames.isEmpty()) {
return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences);
}
Map<String, List<ColumnWithContext>> pageColumns = new HashMap<>();
for (String name : pageNames) {
List<ColumnWithContext> occurrences = taggedColumns.get(name);
if (occurrences != null) {
pageColumns.put(name, occurrences);
}
}
List<ColumnGridItem> gridItems = ColumnMetadataGrouper.groupColumns(pageColumns);
boolean hasMore = toIndex < totalUniqueColumns;
String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null;
return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences);
}
/**
* Fetch columns with matching tags from _source. ES flat object mapping means we can't filter
* "column X has tag Y" at query level, so we read _source and check in Java. Since we already
* have the full document, we extract column metadata here avoiding a separate data-fetch query.
*/
private Map<String, List<ColumnWithContext>> getColumnsWithTagsFromSource(
ColumnAggregationRequest request, List<String> entityTypes) throws IOException {
Map<String, List<ColumnWithContext>> columnsByName =
new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
Map<String, List<String>> fieldPathToEntityTypes = groupByFieldPath(entityTypes);
Set<String> targetTags = buildTargetTagSet(request);
@ -224,9 +322,7 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
Query query = buildTagFilterQuery(request, columnNameKeyword);
try {
Set<String> matchingPairs =
fetchEntityColumnPairsWithTags(indexes, query, columnFieldPath, targetTags);
entityColumnPairs.addAll(matchingPairs);
fetchColumnsWithTagsFromSource(indexes, query, columnFieldPath, targetTags, columnsByName);
} catch (ElasticsearchException e) {
if (!isIndexNotFoundException(e)) {
throw e;
@ -234,7 +330,7 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
}
}
return entityColumnPairs;
return columnsByName;
}
private Set<String> buildTargetTagSet(ColumnAggregationRequest request) {
@ -248,27 +344,33 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
return targetTags;
}
private Set<String> fetchEntityColumnPairsWithTags(
List<String> indexes, Query query, String columnFieldPath, Set<String> targetTags)
private void fetchColumnsWithTagsFromSource(
List<String> indexes,
Query query,
String columnFieldPath,
Set<String> targetTags,
Map<String, List<ColumnWithContext>> columnsByName)
throws IOException {
Set<String> entityColumnPairs = new HashSet<>();
SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexes).query(query).size(10000));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
for (Hit<JsonData> hit : response.hits().hits()) {
extractMatchingEntityColumnPairs(hit, columnFieldPath, targetTags, entityColumnPairs);
long totalHits = response.hits().total() != null ? response.hits().total().value() : 0;
if (totalHits > 10000) {
LOG.warn(
"Tag/glossary source-fetch matched {} entities; only first 10000 scanned.", totalHits);
}
return entityColumnPairs;
for (Hit<JsonData> hit : response.hits().hits()) {
extractMatchingColumnsFromHit(hit, columnFieldPath, targetTags, columnsByName);
}
}
private void extractMatchingEntityColumnPairs(
private void extractMatchingColumnsFromHit(
Hit<JsonData> hit,
String columnFieldPath,
Set<String> targetTags,
Set<String> entityColumnPairs) {
Map<String, List<ColumnWithContext>> columnsByName) {
if (hit.source() == null) {
return;
}
@ -279,19 +381,35 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
return;
}
String entityType = getTextField(sourceNode, "entityType");
String entityDisplayName = getTextField(sourceNode, "displayName");
String serviceName = getNestedField(sourceNode, "service", "name");
String databaseName = getNestedField(sourceNode, "database", "name");
String schemaName = getNestedField(sourceNode, "databaseSchema", "name");
JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath);
if (columnsData != null && columnsData.isArray()) {
for (JsonNode columnData : columnsData) {
String colName = getTextField(columnData, "name");
boolean hasTag = columnHasTargetTag(columnData, targetTags);
if (hasTag && colName != null) {
entityColumnPairs.add(entityFQN + "#" + colName);
if (colName != null && columnHasTargetTag(columnData, targetTags)) {
Column column = parseColumn(columnData, entityFQN);
columnsByName
.computeIfAbsent(colName, k -> new ArrayList<>())
.add(
new ColumnWithContext(
column,
entityType,
entityFQN,
entityDisplayName,
serviceName,
databaseName,
schemaName));
}
}
}
} catch (Exception e) {
LOG.warn("Failed to extract entity column pairs from hit", e);
LOG.warn("Failed to extract columns from hit", e);
}
}
@ -318,13 +436,28 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
return false;
}
/** Build query specifically for tag filtering (Phase 1) */
/**
* Build query for tag filtering source fetch. Includes all scope filters (service, database,
* schema, domain, entityType), column-name pattern, and metadataStatus so the _source fetch is
* scoped to the same data as the main query. Per-column correlation (which specific column has
* the tag + matches the pattern) still happens in Java because flat object mapping prevents
* expressing it at query level.
*/
private Query buildTagFilterQuery(ColumnAggregationRequest request, String columnNameKeyword) {
BoolQuery.Builder boolBuilder = new BoolQuery.Builder();
String columnFieldPath = columnNameKeyword.replace(".name.keyword", "");
boolBuilder.filter(Query.of(q -> q.exists(e -> e.field(columnFieldPath))));
addEntityTypeFilter(boolBuilder, request);
addServiceFilter(boolBuilder, request);
addServiceTypeFilter(boolBuilder, request);
addDatabaseFilter(boolBuilder, request);
addSchemaFilter(boolBuilder, request);
addDomainFilter(boolBuilder, request);
addColumnNamePatternFilter(boolBuilder, request, columnNameKeyword);
addMetadataStatusFilter(boolBuilder, request, columnFieldPath);
String tagFQNField = columnNameKeyword.replace(".name.keyword", ".tags.tagFQN");
List<String> allTags = new ArrayList<>();
@ -356,15 +489,6 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
return input.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?");
}
private boolean matchesColumnNamePattern(String columnName, String pattern) {
if (nullOrEmpty(pattern)) {
return true;
}
String lowerColumnName = columnName.toLowerCase();
String lowerPattern = pattern.toLowerCase();
return lowerColumnName.contains(lowerPattern);
}
/** Get entity types to query - defaults to table only for performance */
private List<String> getEntityTypesForRequest(ColumnAggregationRequest request) {
if (request.getEntityTypes() == null || request.getEntityTypes().isEmpty()) {
@ -573,23 +697,132 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
.minimumShouldMatch("1")));
}
/** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */
private ColumnAggregator.NamesWithCount executeNamesQuery(
Query query, List<String> indexes, String columnNameKeyword, String regex)
throws IOException {
Aggregation termsAgg =
Aggregation.of(
a ->
a.terms(
t ->
t.field(columnNameKeyword)
.include(inc -> inc.regexp(regex))
.size(ColumnAggregator.MAX_PATTERN_SEARCH_NAMES)
.order(
List.of(
NamedValue.of(
ColumnAggregator.AGG_KEY_ORDER, SortOrder.Asc)))));
SearchRequest searchRequest =
SearchRequest.of(
s ->
s.index(indexes)
.query(query)
.aggregations(ColumnAggregator.AGG_MATCHING_COLUMNS, termsAgg)
.size(0));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
List<String> names = new ArrayList<>();
long totalDocCount = 0;
if (response.aggregations() != null
&& response.aggregations().containsKey(ColumnAggregator.AGG_MATCHING_COLUMNS)) {
StringTermsAggregate termsResult =
response.aggregations().get(ColumnAggregator.AGG_MATCHING_COLUMNS).sterms();
for (StringTermsBucket bucket : termsResult.buckets().array()) {
names.add(bucket.key().stringValue());
totalDocCount += bucket.docCount();
}
if (names.size() == ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) {
LOG.warn(
"Column name pattern matched at least {} distinct names; results truncated",
ColumnAggregator.MAX_PATTERN_SEARCH_NAMES);
}
}
return new ColumnAggregator.NamesWithCount(names, totalDocCount);
}
/** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */
private Map<String, List<ColumnWithContext>> executePageDataQuery(
Query query,
List<String> indexes,
String columnNameKeyword,
String columnFieldPath,
List<String> columnNames)
throws IOException {
Aggregation topHitsAgg =
Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN)));
Aggregation termsAgg =
Aggregation.of(
a ->
a.terms(
t ->
t.field(columnNameKeyword)
.include(inc -> inc.terms(columnNames))
.size(columnNames.size()))
.aggregations(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg));
SearchRequest searchRequest =
SearchRequest.of(
s ->
s.index(indexes)
.query(query)
.aggregations(ColumnAggregator.AGG_PAGE_COLUMNS, termsAgg)
.size(0));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
return parseTermsAggResults(response, columnFieldPath);
}
private Map<String, List<ColumnWithContext>> parseTermsAggResults(
SearchResponse<JsonData> response, String columnFieldPath) {
Map<String, List<ColumnWithContext>> columnsByName = new HashMap<>();
if (response.aggregations() == null
|| !response.aggregations().containsKey(ColumnAggregator.AGG_PAGE_COLUMNS)) {
return columnsByName;
}
StringTermsAggregate termsAgg =
response.aggregations().get(ColumnAggregator.AGG_PAGE_COLUMNS).sterms();
for (StringTermsBucket bucket : termsAgg.buckets().array()) {
String columnName = bucket.key().stringValue();
if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) {
continue;
}
TopHitsAggregate topHits =
bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits();
parseBucketHits(columnName, topHits, columnFieldPath, columnsByName);
}
return columnsByName;
}
private SearchResponse<JsonData> executeSearch(
ColumnAggregationRequest request, Query query, List<String> indexes, String columnNameKeyword)
throws IOException {
List<es.co.elastic.clients.util.NamedValue<CompositeAggregationSource>> sources =
new ArrayList<>();
List<NamedValue<CompositeAggregationSource>> sources = new ArrayList<>();
sources.add(
es.co.elastic.clients.util.NamedValue.of(
NamedValue.of(
"column_name",
CompositeAggregationSource.of(
cas -> cas.terms(t -> t.field(columnNameKeyword).order(SortOrder.Asc)))));
// Use full _source to avoid top_hits source-filter edge cases where combining root and nested
// include paths can produce empty buckets.
Aggregation topHitsAgg = Aggregation.of(a -> a.topHits(th -> th.size(10)));
Aggregation topHitsAgg =
Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN)));
Map<String, Aggregation> subAggs = new HashMap<>();
subAggs.put("sample_docs", topHitsAgg);
subAggs.put(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg);
Map<String, FieldValue> afterKey =
request.getCursor() != null ? decodeCursor(request.getCursor()) : null;
@ -617,7 +850,7 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
return client.search(searchRequest, JsonData.class);
}
private Map<String, List<ColumnWithContext>> parseAggregationResults(
private Map<String, List<ColumnWithContext>> parseCompositeAggResults(
SearchResponse<JsonData> response, String columnFieldPath) {
Map<String, List<ColumnWithContext>> columnsByName = new HashMap<>();
@ -639,71 +872,74 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
}
TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits();
if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) {
continue;
}
List<ColumnWithContext> occurrences = new ArrayList<>();
// Track the original case column name from the document source
String originalCaseColumnName = null;
for (Hit<JsonData> hit : topHits.hits().hits()) {
try {
JsonData source = hit.source();
if (source == null) continue;
JsonNode sourceNode = source.to(JsonNode.class);
String entityType = getTextField(sourceNode, "entityType");
String entityFQN = getTextField(sourceNode, "fullyQualifiedName");
String entityDisplayName = getTextField(sourceNode, "displayName");
String serviceName = getNestedField(sourceNode, "service", "name");
String databaseName = getNestedField(sourceNode, "database", "name");
String schemaName = getNestedField(sourceNode, "databaseSchema", "name");
// Get columns data from the correct path (e.g., "columns", "dataModel.columns", "fields")
JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath);
if (columnsData != null && columnsData.isArray()) {
for (JsonNode columnData : columnsData) {
String colName = getTextField(columnData, "name");
// ES keyword aggregation lowercases the column names, so use case-insensitive
// comparison
if (columnName.equalsIgnoreCase(colName)) {
// Preserve the original case column name from the first match
if (originalCaseColumnName == null) {
originalCaseColumnName = colName;
}
Column column = parseColumn(columnData, entityFQN);
ColumnWithContext columnCtx =
new ColumnWithContext(
column,
entityType,
entityFQN,
entityDisplayName,
serviceName,
databaseName,
schemaName);
occurrences.add(columnCtx);
break;
}
}
}
} catch (Exception e) {
LOG.warn("Failed to parse column occurrence from search hit", e);
}
}
if (!occurrences.isEmpty() && originalCaseColumnName != null) {
columnsByName.put(originalCaseColumnName, occurrences);
}
parseBucketHits(columnName, topHits, columnFieldPath, columnsByName);
}
return columnsByName;
}
/** Parse top_hits from a single bucket (shared by composite and terms agg parsing). */
private void parseBucketHits(
String columnName,
TopHitsAggregate topHits,
String columnFieldPath,
Map<String, List<ColumnWithContext>> columnsByName) {
if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) {
return;
}
List<ColumnWithContext> occurrences = new ArrayList<>();
String originalCaseColumnName = null;
for (Hit<JsonData> hit : topHits.hits().hits()) {
try {
JsonData source = hit.source();
if (source == null) continue;
JsonNode sourceNode = source.to(JsonNode.class);
String entityType = getTextField(sourceNode, "entityType");
String entityFQN = getTextField(sourceNode, "fullyQualifiedName");
String entityDisplayName = getTextField(sourceNode, "displayName");
String serviceName = getNestedField(sourceNode, "service", "name");
String databaseName = getNestedField(sourceNode, "database", "name");
String schemaName = getNestedField(sourceNode, "databaseSchema", "name");
JsonNode columnsData = getNestedJsonNode(sourceNode, columnFieldPath);
if (columnsData != null && columnsData.isArray()) {
for (JsonNode columnData : columnsData) {
String colName = getTextField(columnData, "name");
if (columnName.equalsIgnoreCase(colName)) {
if (originalCaseColumnName == null) {
originalCaseColumnName = colName;
}
Column column = parseColumn(columnData, entityFQN);
occurrences.add(
new ColumnWithContext(
column,
entityType,
entityFQN,
entityDisplayName,
serviceName,
databaseName,
schemaName));
break;
}
}
}
} catch (Exception e) {
LOG.warn("Failed to parse column occurrence from search hit", e);
}
}
if (!occurrences.isEmpty() && originalCaseColumnName != null) {
columnsByName.put(originalCaseColumnName, occurrences);
}
}
/** Navigate nested JSON path like "dataModel.columns" or "messageSchema.schemaFields" */
private JsonNode getNestedJsonNode(JsonNode root, String path) {
String[] parts = path.split("\\.");
@ -848,12 +1084,11 @@ public class ElasticSearchColumnAggregator implements ColumnAggregator {
}
}
@SuppressWarnings("unchecked")
private Map<String, FieldValue> decodeCursor(String cursor) {
try {
byte[] decoded = Base64.getDecoder().decode(cursor);
String json = new String(decoded, StandardCharsets.UTF_8);
Map<String, String> stringMap = JsonUtils.readValue(json, Map.class);
Map<String, String> stringMap = JsonUtils.readValue(json, new TypeReference<>() {});
Map<String, FieldValue> result = new HashMap<>();
for (Map.Entry<String, String> entry : stringMap.entrySet()) {
result.put(entry.getKey(), FieldValue.of(entry.getValue()));

View file

@ -25,8 +25,11 @@ import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.data.ColumnGridItem;
import org.openmetadata.schema.api.data.ColumnGridResponse;
@ -47,6 +50,8 @@ import os.org.opensearch.client.opensearch._types.aggregations.Aggregation;
import os.org.opensearch.client.opensearch._types.aggregations.CompositeAggregate;
import os.org.opensearch.client.opensearch._types.aggregations.CompositeAggregationSource;
import os.org.opensearch.client.opensearch._types.aggregations.CompositeBucket;
import os.org.opensearch.client.opensearch._types.aggregations.StringTermsAggregate;
import os.org.opensearch.client.opensearch._types.aggregations.StringTermsBucket;
import os.org.opensearch.client.opensearch._types.aggregations.TopHitsAggregate;
import os.org.opensearch.client.opensearch._types.query_dsl.BoolQuery;
import os.org.opensearch.client.opensearch._types.query_dsl.Query;
@ -73,61 +78,41 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
request.getTags(),
request.getGlossaryTerms());
// Two-phase query for tags/glossaryTerms filtering:
// Phase 1: Find entityFQN#columnName pairs that have the tag
// Phase 2: Filter to only return those specific occurrences
// Tag/glossary filter path: we must read _source to check which specific column has
// the tag (ES flat object mapping can't tell us). Since we're already reading _source,
// we extract full column metadata in the same pass no separate data-fetch query needed.
boolean hasTagFilter =
!nullOrEmpty(request.getTags()) || !nullOrEmpty(request.getGlossaryTerms());
Set<String> entityColumnPairsWithTags = null;
List<String> columnNamesWithTags = null;
LOG.info("hasTagFilter={}", hasTagFilter);
if (hasTagFilter) {
entityColumnPairsWithTags = getEntityColumnPairsWithTags(request);
LOG.info("Phase1 result: entityColumnPairsWithTags={}", entityColumnPairsWithTags);
if (entityColumnPairsWithTags.isEmpty()) {
LOG.info("No columns found with tags, returning empty response");
Map<String, List<ColumnWithContext>> taggedColumns = getColumnsWithTagsFromSource(request);
if (taggedColumns.isEmpty()) {
return buildResponse(new ArrayList<>(), null, false, 0, 0);
}
// Also keep just the column names for the Phase 2 query filter
columnNamesWithTags =
entityColumnPairsWithTags.stream()
.map(pair -> pair.substring(pair.indexOf('#') + 1))
.collect(java.util.stream.Collectors.toList());
// Pattern + tag combined: filter the already-fetched columns by pattern in Java
if (!nullOrEmpty(request.getColumnNamePattern())) {
String pattern = request.getColumnNamePattern().toLowerCase(Locale.ROOT);
taggedColumns
.entrySet()
.removeIf(e -> !e.getKey().toLowerCase(Locale.ROOT).contains(pattern));
}
return aggregateColumnsWithKnownNames(request, taggedColumns);
}
// Phase 2: Build query WITHOUT tag filter but WITH column names filter
Query query = buildFilters(request, columnNamesWithTags);
// Pattern-only path (no tag filter): use terms agg with include regex
if (!nullOrEmpty(request.getColumnNamePattern())) {
return aggregateColumnsWithPattern(request);
}
// Browse path: scope filters + composite agg with after_key cursor.
Query query = buildFilters(request, null);
try {
SearchResponse<JsonData> response = executeSearch(request, query);
Map<String, List<ColumnWithContext>> columnsByName = parseAggregationResults(response);
// Post-filter columns by name pattern since ES aggregation returns all columns from matched
// documents
String columnNamePattern = request.getColumnNamePattern();
if (!nullOrEmpty(columnNamePattern)) {
columnsByName
.entrySet()
.removeIf(e -> !matchesColumnNamePattern(e.getKey(), columnNamePattern));
}
// Post-filter for tag/glossary terms filtering: Only keep occurrences that were
// identified in Phase 1 as having the tag (not just same column name)
if (entityColumnPairsWithTags != null && !entityColumnPairsWithTags.isEmpty()) {
final Set<String> allowedPairs = entityColumnPairsWithTags;
for (List<ColumnWithContext> occurrences : columnsByName.values()) {
occurrences.removeIf(
ctx -> {
String key = ctx.entityFQN + "#" + ctx.column.getName();
return !allowedPairs.contains(key);
});
}
// Remove column entries that have no occurrences left
columnsByName.entrySet().removeIf(e -> e.getValue().isEmpty());
}
Map<String, List<ColumnWithContext>> columnsByName = parseCompositeAggResults(response);
List<ColumnGridItem> gridItems = ColumnMetadataGrouper.groupColumns(columnsByName);
@ -136,14 +121,11 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
int totalUniqueColumns;
int totalOccurrences;
// Get totals from ES aggregation only when no column name pattern
// (ES aggregation counts all columns from matched docs, not just filtered ones)
if (request.getCursor() == null && nullOrEmpty(request.getColumnNamePattern())) {
if (request.getCursor() == null) {
Map<String, Long> totals = getTotalCounts(query);
totalUniqueColumns = totals.get("uniqueColumns").intValue();
totalOccurrences = totals.get("totalOccurrences").intValue();
} else {
// Calculate from actual filtered data when pattern is specified or on subsequent pages
totalUniqueColumns = columnsByName.size();
totalOccurrences = gridItems.stream().mapToInt(ColumnGridItem::getTotalOccurrences).sum();
}
@ -159,26 +141,111 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
}
/**
* Phase 1: Get entityFQN#columnName pairs that have the specified tags. Since ES flattens arrays,
* we must fetch column data and filter in Java to find columns that actually have the tag.
* Pattern-only search path (no tag filter): uses terms aggregation with include regex to filter
* column names at the aggregation level. Two queries: (1) lightweight names query to get all
* matching names and total count, (2) targeted data query with top_hits for the current page.
*/
private Set<String> getEntityColumnPairsWithTags(ColumnAggregationRequest request)
private ColumnGridResponse aggregateColumnsWithPattern(ColumnAggregationRequest request)
throws IOException {
Set<String> entityColumnPairs = new HashSet<>();
Query query = buildFilters(request, null);
String regex = ColumnAggregator.toCaseInsensitiveRegex(request.getColumnNamePattern());
try {
ColumnAggregator.NamesWithCount phase1 = executeNamesQuery(query, regex);
Set<String> dedupedNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
dedupedNames.addAll(phase1.names());
int totalUniqueColumns = dedupedNames.size();
int totalOccurrences = ColumnAggregator.toIntSaturating(phase1.totalDocCount());
int offset = ColumnAggregator.decodeSearchOffset(request.getCursor());
int pageSize = request.getSize();
List<String> sortedNames = new ArrayList<>(dedupedNames);
int fromIndex = Math.min(offset, sortedNames.size());
int toIndex = Math.min(offset + pageSize, sortedNames.size());
List<String> pageNames = sortedNames.subList(fromIndex, toIndex);
if (pageNames.isEmpty()) {
return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences);
}
Map<String, List<ColumnWithContext>> columnsByName = executePageDataQuery(query, pageNames);
List<ColumnGridItem> gridItems = ColumnMetadataGrouper.groupColumns(columnsByName);
boolean hasMore = toIndex < totalUniqueColumns;
String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null;
return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences);
} catch (OpenSearchException e) {
if (isIndexNotFoundException(e)) {
LOG.warn("Search index not found, returning empty results");
return buildResponse(new ArrayList<>(), null, false, 0, 0);
}
throw e;
}
}
/**
* Tag/glossary filter path: the tag-check pass already extracted full column metadata from
* _source (only tagged columns are in the map). Just paginate over the in-memory result.
*/
private ColumnGridResponse aggregateColumnsWithKnownNames(
ColumnAggregationRequest request, Map<String, List<ColumnWithContext>> taggedColumns) {
int totalUniqueColumns = taggedColumns.size();
int totalOccurrences = taggedColumns.values().stream().mapToInt(List::size).sum();
int offset = ColumnAggregator.decodeSearchOffset(request.getCursor());
int pageSize = request.getSize();
List<String> sortedNames = new ArrayList<>(taggedColumns.keySet());
int fromIndex = Math.min(offset, sortedNames.size());
int toIndex = Math.min(offset + pageSize, sortedNames.size());
List<String> pageNames = sortedNames.subList(fromIndex, toIndex);
if (pageNames.isEmpty()) {
return buildResponse(new ArrayList<>(), null, false, totalUniqueColumns, totalOccurrences);
}
Map<String, List<ColumnWithContext>> pageColumns = new HashMap<>();
for (String name : pageNames) {
List<ColumnWithContext> occurrences = taggedColumns.get(name);
if (occurrences != null) {
pageColumns.put(name, occurrences);
}
}
List<ColumnGridItem> gridItems = ColumnMetadataGrouper.groupColumns(pageColumns);
boolean hasMore = toIndex < totalUniqueColumns;
String cursor = hasMore ? ColumnAggregator.encodeSearchOffset(toIndex) : null;
return buildResponse(gridItems, cursor, hasMore, totalUniqueColumns, totalOccurrences);
}
/**
* Fetch columns with matching tags from _source. ES flat object mapping means we can't filter
* "column X has tag Y" at query level, so we read _source and check in Java. Since we already
* have the full document, we extract column metadata here avoiding a separate data-fetch query.
*/
private Map<String, List<ColumnWithContext>> getColumnsWithTagsFromSource(
ColumnAggregationRequest request) throws IOException {
Map<String, List<ColumnWithContext>> columnsByName =
new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
Set<String> targetTags = buildTargetTagSet(request);
Query query = buildTagFilterQuery(request);
try {
Set<String> matchingPairs = fetchEntityColumnPairsWithTags(query, targetTags);
entityColumnPairs.addAll(matchingPairs);
fetchColumnsWithTagsFromSource(query, targetTags, columnsByName);
} catch (OpenSearchException e) {
if (!isIndexNotFoundException(e)) {
throw e;
}
}
return entityColumnPairs;
return columnsByName;
}
private Set<String> buildTargetTagSet(ColumnAggregationRequest request) {
@ -199,37 +266,31 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
.toList();
}
private Set<String> fetchEntityColumnPairsWithTags(Query query, Set<String> targetTags)
private void fetchColumnsWithTagsFromSource(
Query query, Set<String> targetTags, Map<String, List<ColumnWithContext>> columnsByName)
throws IOException {
Set<String> entityColumnPairs = new HashSet<>();
List<String> resolvedIndexes = resolveIndexNames();
SearchRequest searchRequest =
SearchRequest.of(s -> s.index(resolvedIndexes).query(query).size(10000));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
long totalHits = response.hits().total() != null ? response.hits().total().value() : 0;
LOG.info(
"Phase1 fetchEntityColumnPairsWithTags: indexes={}, targetTags={}, totalHits={}",
resolvedIndexes,
targetTags,
totalHits);
if (totalHits > 10000) {
LOG.warn(
"Tag/glossary source-fetch matched {} entities; only first 10000 scanned.", totalHits);
}
for (os.org.opensearch.client.opensearch.core.search.Hit<JsonData> hit :
response.hits().hits()) {
extractMatchingEntityColumnPairs(hit, targetTags, entityColumnPairs);
extractMatchingColumnsFromHit(hit, targetTags, columnsByName);
}
LOG.info("Phase1 fetchEntityColumnPairsWithTags: found pairs: {}", entityColumnPairs);
return entityColumnPairs;
}
private void extractMatchingEntityColumnPairs(
private void extractMatchingColumnsFromHit(
os.org.opensearch.client.opensearch.core.search.Hit<JsonData> hit,
Set<String> targetTags,
Set<String> entityColumnPairs) {
Map<String, List<ColumnWithContext>> columnsByName) {
if (hit.source() == null) {
return;
}
@ -240,19 +301,35 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
return;
}
String entityType = getTextField(sourceNode, "entityType");
String entityDisplayName = getTextField(sourceNode, "displayName");
String serviceName = getNestedField(sourceNode, "service", "name");
String databaseName = getNestedField(sourceNode, "database", "name");
String schemaName = getNestedField(sourceNode, "databaseSchema", "name");
JsonNode columnsData = sourceNode.get("columns");
if (columnsData != null && columnsData.isArray()) {
for (JsonNode columnData : columnsData) {
String colName = getTextField(columnData, "name");
boolean hasTag = columnHasTargetTag(columnData, targetTags);
if (hasTag && colName != null) {
entityColumnPairs.add(entityFQN + "#" + colName);
if (colName != null && columnHasTargetTag(columnData, targetTags)) {
Column column = parseColumn(columnData, entityFQN);
columnsByName
.computeIfAbsent(colName, k -> new ArrayList<>())
.add(
new ColumnWithContext(
column,
entityType,
entityFQN,
entityDisplayName,
serviceName,
databaseName,
schemaName));
}
}
}
} catch (Exception e) {
LOG.warn("Failed to extract entity column pairs from hit", e);
LOG.warn("Failed to extract columns from hit", e);
}
}
@ -279,12 +356,27 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
return false;
}
/** Build query specifically for tag filtering (Phase 1) */
/**
* Build query for tag filtering source fetch. Includes all scope filters (service, database,
* schema, domain, entityType), column-name pattern, and metadataStatus so the _source fetch is
* scoped to the same data as the main query. Per-column correlation (which specific column has
* the tag + matches the pattern) still happens in Java because flat object mapping prevents
* expressing it at query level.
*/
private Query buildTagFilterQuery(ColumnAggregationRequest request) {
BoolQuery.Builder boolBuilder = new BoolQuery.Builder();
boolBuilder.filter(Query.of(q -> q.exists(e -> e.field("columns"))));
addEntityTypeFilter(boolBuilder, request);
addServiceFilter(boolBuilder, request);
addServiceTypeFilter(boolBuilder, request);
addDatabaseFilter(boolBuilder, request);
addSchemaFilter(boolBuilder, request);
addDomainFilter(boolBuilder, request);
addColumnNamePatternFilter(boolBuilder, request);
addMetadataStatusFilter(boolBuilder, request);
List<String> allTags = new ArrayList<>();
if (!nullOrEmpty(request.getTags())) {
allTags.addAll(request.getTags());
@ -315,15 +407,6 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
return input.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?");
}
private boolean matchesColumnNamePattern(String columnName, String pattern) {
if (nullOrEmpty(pattern)) {
return true;
}
String lowerColumnName = columnName.toLowerCase();
String lowerPattern = pattern.toLowerCase();
return lowerColumnName.contains(lowerPattern);
}
/**
* Build filters for the main query. When columnNamesFromTagFilter is provided (two-phase query),
* skip tag/glossaryTerms filters and use column names filter instead.
@ -516,6 +599,106 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
.minimumShouldMatch("1")));
}
/** Phase 1: Get all matching column names using terms agg with include regex (no top_hits). */
private ColumnAggregator.NamesWithCount executeNamesQuery(Query query, String regex)
throws IOException {
Aggregation termsAgg =
Aggregation.of(
a ->
a.terms(
t ->
t.field("columns.name.keyword")
.include(inc -> inc.regexp(regex))
.size(ColumnAggregator.MAX_PATTERN_SEARCH_NAMES)
.order(
List.of(Map.of(ColumnAggregator.AGG_KEY_ORDER, SortOrder.Asc)))));
SearchRequest searchRequest =
SearchRequest.of(
s ->
s.index(resolveIndexNames())
.query(query)
.aggregations(ColumnAggregator.AGG_MATCHING_COLUMNS, termsAgg)
.size(0));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
List<String> names = new ArrayList<>();
long totalDocCount = 0;
if (response.aggregations() != null
&& response.aggregations().containsKey(ColumnAggregator.AGG_MATCHING_COLUMNS)) {
StringTermsAggregate termsResult =
response.aggregations().get(ColumnAggregator.AGG_MATCHING_COLUMNS).sterms();
for (StringTermsBucket bucket : termsResult.buckets().array()) {
names.add(bucket.key());
totalDocCount += bucket.docCount();
}
if (names.size() == ColumnAggregator.MAX_PATTERN_SEARCH_NAMES) {
LOG.warn(
"Column name pattern matched at least {} distinct names; results truncated",
ColumnAggregator.MAX_PATTERN_SEARCH_NAMES);
}
}
return new ColumnAggregator.NamesWithCount(names, totalDocCount);
}
/** Phase 2: Get data for specific column names using terms agg with exact include + top_hits. */
private Map<String, List<ColumnWithContext>> executePageDataQuery(
Query query, List<String> columnNames) throws IOException {
Aggregation topHitsAgg =
Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN)));
Aggregation termsAgg =
Aggregation.of(
a ->
a.terms(
t ->
t.field("columns.name.keyword")
.include(inc -> inc.terms(columnNames))
.size(columnNames.size()))
.aggregations(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg));
SearchRequest searchRequest =
SearchRequest.of(
s ->
s.index(resolveIndexNames())
.query(query)
.aggregations(ColumnAggregator.AGG_PAGE_COLUMNS, termsAgg)
.size(0));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
return parseTermsAggResults(response);
}
private Map<String, List<ColumnWithContext>> parseTermsAggResults(
SearchResponse<JsonData> response) {
Map<String, List<ColumnWithContext>> columnsByName = new HashMap<>();
if (response.aggregations() == null
|| !response.aggregations().containsKey(ColumnAggregator.AGG_PAGE_COLUMNS)) {
return columnsByName;
}
StringTermsAggregate termsAgg =
response.aggregations().get(ColumnAggregator.AGG_PAGE_COLUMNS).sterms();
for (StringTermsBucket bucket : termsAgg.buckets().array()) {
String columnName = bucket.key();
if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) {
continue;
}
TopHitsAggregate topHits =
bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits();
parseBucketHits(columnName, topHits, columnsByName);
}
return columnsByName;
}
private SearchResponse<JsonData> executeSearch(ColumnAggregationRequest request, Query query)
throws IOException {
Map<String, CompositeAggregationSource> sources = new HashMap<>();
@ -525,14 +708,10 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
cas -> cas.terms(t -> t.field("columns.name.keyword").order(SortOrder.Asc))));
Aggregation topHitsAgg =
Aggregation.of(
a ->
// Use full _source to avoid OpenSearch top_hits source-filter edge cases where
// mixing root + nested include paths can return empty buckets unexpectedly.
a.topHits(th -> th.size(100)));
Aggregation.of(a -> a.topHits(th -> th.size(ColumnAggregator.SAMPLE_DOCS_PER_COLUMN)));
Map<String, Aggregation> subAggs = new HashMap<>();
subAggs.put("sample_docs", topHitsAgg);
subAggs.put(ColumnAggregator.AGG_SAMPLE_DOCS, topHitsAgg);
Map<String, FieldValue> afterKey =
request.getCursor() != null ? decodeCursorAsFieldValues(request.getCursor()) : null;
@ -560,7 +739,7 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
return client.search(searchRequest, JsonData.class);
}
private Map<String, List<ColumnWithContext>> parseAggregationResults(
private Map<String, List<ColumnWithContext>> parseCompositeAggResults(
SearchResponse<JsonData> response) {
Map<String, List<ColumnWithContext>> columnsByName = new HashMap<>();
@ -578,75 +757,79 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
FieldValue fieldValue = bucket.key().get("column_name");
String columnName = fieldValue != null ? fieldValue.stringValue() : null;
if (!bucket.aggregations().containsKey("sample_docs")) {
if (!bucket.aggregations().containsKey(ColumnAggregator.AGG_SAMPLE_DOCS)) {
continue;
}
TopHitsAggregate topHits = bucket.aggregations().get("sample_docs").topHits();
if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) {
continue;
}
List<ColumnWithContext> occurrences = new ArrayList<>();
// Track the original case column name from the document source
String originalCaseColumnName = null;
for (Hit<JsonData> hit : topHits.hits().hits()) {
try {
JsonData source = hit.source();
if (source == null) continue;
JsonNode sourceNode = source.to(JsonNode.class);
String entityType = getTextField(sourceNode, "entityType");
String entityFQN = getTextField(sourceNode, "fullyQualifiedName");
String entityDisplayName = getTextField(sourceNode, "displayName");
String serviceName = getNestedField(sourceNode, "service", "name");
String databaseName = getNestedField(sourceNode, "database", "name");
String schemaName = getNestedField(sourceNode, "databaseSchema", "name");
JsonNode columnsData = sourceNode.get("columns");
if (columnsData != null && columnsData.isArray()) {
for (JsonNode columnData : columnsData) {
String colName = getTextField(columnData, "name");
// ES keyword aggregation lowercases the column names, so use case-insensitive
// comparison
if (columnName.equalsIgnoreCase(colName)) {
// Preserve the original case column name from the first match
if (originalCaseColumnName == null) {
originalCaseColumnName = colName;
}
Column column = parseColumn(columnData, entityFQN);
ColumnWithContext columnCtx =
new ColumnWithContext(
column,
entityType,
entityFQN,
entityDisplayName,
serviceName,
databaseName,
schemaName);
occurrences.add(columnCtx);
break;
}
}
}
} catch (Exception e) {
LOG.warn("Failed to parse column occurrence from search hit", e);
}
}
if (!occurrences.isEmpty() && originalCaseColumnName != null) {
columnsByName.put(originalCaseColumnName, occurrences);
}
TopHitsAggregate topHits =
bucket.aggregations().get(ColumnAggregator.AGG_SAMPLE_DOCS).topHits();
parseBucketHits(columnName, topHits, columnsByName);
}
return columnsByName;
}
/** Parse top_hits from a single bucket (shared by composite and terms agg parsing). */
private void parseBucketHits(
String columnName,
TopHitsAggregate topHits,
Map<String, List<ColumnWithContext>> columnsByName) {
if (topHits == null || topHits.hits() == null || topHits.hits().hits().isEmpty()) {
return;
}
List<ColumnWithContext> occurrences = new ArrayList<>();
String originalCaseColumnName = null;
for (Hit<JsonData> hit : topHits.hits().hits()) {
try {
JsonData source = hit.source();
if (source == null) continue;
JsonNode sourceNode = source.to(JsonNode.class);
String entityType = getTextField(sourceNode, "entityType");
String entityFQN = getTextField(sourceNode, "fullyQualifiedName");
String entityDisplayName = getTextField(sourceNode, "displayName");
String serviceName = getNestedField(sourceNode, "service", "name");
String databaseName = getNestedField(sourceNode, "database", "name");
String schemaName = getNestedField(sourceNode, "databaseSchema", "name");
JsonNode columnsData = sourceNode.get("columns");
if (columnsData != null && columnsData.isArray()) {
for (JsonNode columnData : columnsData) {
String colName = getTextField(columnData, "name");
if (columnName.equalsIgnoreCase(colName)) {
if (originalCaseColumnName == null) {
originalCaseColumnName = colName;
}
Column column = parseColumn(columnData, entityFQN);
occurrences.add(
new ColumnWithContext(
column,
entityType,
entityFQN,
entityDisplayName,
serviceName,
databaseName,
schemaName));
break;
}
}
}
} catch (Exception e) {
LOG.warn("Failed to parse column occurrence from search hit", e);
}
}
if (!occurrences.isEmpty() && originalCaseColumnName != null) {
columnsByName.put(originalCaseColumnName, occurrences);
}
}
private String getTextField(JsonNode node, String field) {
JsonNode fieldNode = node.get(field);
return fieldNode != null && !fieldNode.isNull() ? fieldNode.asText() : null;
@ -794,12 +977,11 @@ public class OpenSearchColumnAggregator implements ColumnAggregator {
}
}
@SuppressWarnings("unchecked")
private Map<String, String> decodeCursor(String cursor) {
try {
byte[] decoded = Base64.getDecoder().decode(cursor);
String json = new String(decoded, StandardCharsets.UTF_8);
return JsonUtils.readValue(json, Map.class);
return JsonUtils.readValue(json, new TypeReference<>() {});
} catch (Exception e) {
LOG.error("Failed to decode cursor", e);
return new HashMap<>();

View file

@ -0,0 +1,112 @@
/*
* Copyright 2025 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.search;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;
class ColumnAggregatorTest {
@Test
void toCaseInsensitiveRegex_simpleAlpha() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("MAT");
assertEquals(".*[mM][aA][tT].*", regex);
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("MAT").matches());
assertTrue(pattern.matcher("mat").matches());
assertTrue(pattern.matcher("Mat").matches());
assertTrue(pattern.matcher("MATNR").matches());
assertTrue(pattern.matcher("some_mat_column").matches());
assertFalse(pattern.matcher("MBA").matches());
}
@Test
void toCaseInsensitiveRegex_mixedCase() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("MaTnR");
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("MATNR").matches());
assertTrue(pattern.matcher("matnr").matches());
assertTrue(pattern.matcher("MaTnR").matches());
assertFalse(pattern.matcher("MATMR").matches());
}
@Test
void toCaseInsensitiveRegex_withDigits() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("col1");
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("COL1").matches());
assertTrue(pattern.matcher("col1").matches());
assertTrue(pattern.matcher("my_col1_name").matches());
assertFalse(pattern.matcher("col2").matches());
}
@Test
void toCaseInsensitiveRegex_withUnderscore() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("col_name");
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("col_name").matches());
assertTrue(pattern.matcher("COL_NAME").matches());
assertTrue(pattern.matcher("my_col_name_here").matches());
}
@Test
void toCaseInsensitiveRegex_escapesRegexSpecialChars() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("col.name");
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("col.name").matches());
// Dot should be literal, not wildcard
assertFalse(pattern.matcher("colXname").matches());
}
@Test
void toCaseInsensitiveRegex_singleChar() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("a");
assertEquals(".*[aA].*", regex);
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("A").matches());
assertTrue(pattern.matcher("abc").matches());
assertTrue(pattern.matcher("XAY").matches());
}
@Test
void toCaseInsensitiveRegex_emptyString() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("");
assertEquals(".*.*", regex);
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("anything").matches());
assertTrue(pattern.matcher("").matches());
}
@Test
void toCaseInsensitiveRegex_specialCharsAreEscaped() {
String regex = ColumnAggregator.toCaseInsensitiveRegex("a+b*c?");
Pattern pattern = Pattern.compile(regex);
assertTrue(pattern.matcher("a+b*c?").matches());
assertTrue(pattern.matcher("prefix_a+b*c?_suffix").matches());
// Plus and star should be literal, not regex quantifiers
assertFalse(pattern.matcher("abbbbc").matches());
}
}