diff --git a/docker/otel-collector/schema/seed/00002_otel_logs.sql b/docker/otel-collector/schema/seed/00002_otel_logs.sql index 5cc414ed..c3808eaa 100644 --- a/docker/otel-collector/schema/seed/00002_otel_logs.sql +++ b/docker/otel-collector/schema/seed/00002_otel_logs.sql @@ -17,6 +17,9 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_logs `ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)), `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `ResourceAttributeTokens` Array(String) ALIAS arrayMap((k,v) -> concat(k, '=', v), mapKeys(ResourceAttributes), mapValues(ResourceAttributes)), + `LogAttributeTokens` Array(String) ALIAS arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes)), + `ScopeAttributeTokens` Array(String) ALIAS arrayMap((k,v) -> concat(k, '=', v), mapKeys(ScopeAttributes), mapValues(ScopeAttributes)), `__hdx_materialized_k8s.cluster.name` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.cluster.name'] CODEC(ZSTD(1)), `__hdx_materialized_k8s.container.name` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.container.name'] CODEC(ZSTD(1)), `__hdx_materialized_k8s.deployment.name` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.deployment.name'] CODEC(ZSTD(1)), @@ -27,11 +30,11 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_logs `__hdx_materialized_deployment.environment.name` LowCardinality(String) MATERIALIZED ResourceAttributes['deployment.environment.name'] CODEC(ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_kv_text ResourceAttributeTokens TYPE text(tokenizer = 'array'), INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_scope_attr_kv_text ScopeAttributeTokens TYPE text(tokenizer = 'array') INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_log_attr_kv_text LogAttributeTokens TYPE text(tokenizer = 'array'), INDEX idx_lower_body lower(Body) TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8 ) ENGINE = MergeTree diff --git a/packages/common-utils/src/__tests__/queryParser.test.ts b/packages/common-utils/src/__tests__/queryParser.test.ts index 0cbd87a1..a9c9ec70 100644 --- a/packages/common-utils/src/__tests__/queryParser.test.ts +++ b/packages/common-utils/src/__tests__/queryParser.test.ts @@ -1263,6 +1263,325 @@ describe('CustomSchemaSQLSerializerV2 - indexCoversColumn', () => { ); }); +describe('CustomSchemaSQLSerializerV2 - Map Tokens Skip-Index Optimization', () => { + const metadata = getMetadata( + new ClickhouseClient({ host: 'http://localhost:8123' }), + ); + metadata.getColumn = jest.fn().mockImplementation(async ({ column }) => { + if (column === 'LogAttributes') { + return { + name: 'LogAttributes', + type: 'Map(LowCardinality(String), String)', + }; + } else if (column === 'ResourceAttributes') { + return { + name: 'ResourceAttributes', + type: 'Map(LowCardinality(String), String)', + }; + } else if (column === 'Body') { + return { name: 'Body', type: 'String' }; + } + return undefined; + }); + metadata.getMaterializedColumnsLookupTable = jest + .fn() + .mockResolvedValue(new Map()); + metadata.getSetting = jest.fn().mockResolvedValue(undefined); + + const databaseName = 'default'; + const tableName = 'otel_logs'; + const connectionId = 'test'; + + it('should use has(TokensColumn, key=value) for exact map matches when tokens column with text index exists', async () => { + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(LowCardinality(String), String)', + default_type: '', + default_expression: '', + }, + { + name: 'LogAttributeTokens', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))", + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([ + { + name: 'idx_log_attr_kv_text', + type: 'text', + typeFull: "text(tokenizer = 'array')", + expression: 'LogAttributeTokens', + granularity: 1, + }, + ]); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + // Exact match: LogAttributes.userId:"abc123" + const result = await serializer.eq( + 'LogAttributes.userId', + 'abc123', + false, + {}, + ); + expect(result).toBe("(has(`LogAttributeTokens`, 'userId=abc123'))"); + }); + + it('should fall back to map access for negated exact matches', async () => { + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(LowCardinality(String), String)', + default_type: '', + default_expression: '', + }, + { + name: 'LogAttributeTokens', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))", + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([ + { + name: 'idx_log_attr_kv_text', + type: 'text', + typeFull: "text(tokenizer = 'array')", + expression: 'LogAttributeTokens', + granularity: 1, + }, + ]); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + // Negated: -LogAttributes.userId:"abc123" + const result = await serializer.eq( + 'LogAttributes.userId', + 'abc123', + true, + {}, + ); + expect(result).toBe("(`LogAttributes`['userId'] != 'abc123')"); + }); + + it('should fall back to map access when no tokens column exists', async () => { + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(LowCardinality(String), String)', + default_type: '', + default_expression: '', + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([]); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + const result = await serializer.eq( + 'LogAttributes.userId', + 'abc123', + false, + {}, + ); + expect(result).toBe( + "(`LogAttributes`['userId'] = 'abc123' AND indexHint(mapContains(`LogAttributes`, 'userId')))", + ); + }); + + it('should fall back to map access when tokens column exists but has no text index', async () => { + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(LowCardinality(String), String)', + default_type: '', + default_expression: '', + }, + { + name: 'LogAttributeTokens', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))", + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([]); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + const result = await serializer.eq( + 'LogAttributes.userId', + 'abc123', + false, + {}, + ); + expect(result).toBe( + "(`LogAttributes`['userId'] = 'abc123' AND indexHint(mapContains(`LogAttributes`, 'userId')))", + ); + }); + + it('should handle values containing equals signs', async () => { + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(LowCardinality(String), String)', + default_type: '', + default_expression: '', + }, + { + name: 'LogAttributeTokens', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))", + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([ + { + name: 'idx_log_attr_kv_text', + type: 'text', + typeFull: "text(tokenizer = 'array')", + expression: 'LogAttributeTokens', + granularity: 1, + }, + ]); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + // Value with = should still work (key doesn't contain =) + const result = await serializer.eq( + 'LogAttributes.userId', + 'abc=123', + false, + {}, + ); + expect(result).toBe("(has(`LogAttributeTokens`, 'userId=abc=123'))"); + }); + + it('should use tokens optimization via SearchQueryBuilder for quoted map field searches', async () => { + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(LowCardinality(String), String)', + default_type: '', + default_expression: '', + }, + { + name: 'LogAttributeTokens', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))", + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([ + { + name: 'idx_log_attr_kv_text', + type: 'text', + typeFull: "text(tokenizer = 'array')", + expression: 'LogAttributeTokens', + granularity: 1, + }, + ]); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + // Lucene: LogAttributes.error.message:"Failed to fetch" + const builder = new SearchQueryBuilder( + 'LogAttributes.error.message:"Failed to fetch"', + serializer, + ); + const sql = await builder.build(); + expect(sql).toBe( + "((has(`LogAttributeTokens`, 'error.message=Failed to fetch')))", + ); + }); + + it('should work with ResourceAttributes tokens column', async () => { + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'ResourceAttributes', + type: 'Map(LowCardinality(String), String)', + default_type: '', + default_expression: '', + }, + { + name: 'ResourceAttributeTokens', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((k,v) -> concat(k, '=', v), mapKeys(ResourceAttributes), mapValues(ResourceAttributes))", + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([ + { + name: 'idx_res_attr_kv_text', + type: 'text', + typeFull: "text(tokenizer = 'array')", + expression: 'ResourceAttributeTokens', + granularity: 1, + }, + ]); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + const result = await serializer.eq( + 'ResourceAttributes.service.name', + 'my-service', + false, + {}, + ); + expect(result).toBe( + "(has(`ResourceAttributeTokens`, 'service.name=my-service'))", + ); + }); +}); + describe('CustomSchemaSQLSerializerV2 - Array and Nested Fields', () => { const metadata = getMetadata( new ClickhouseClient({ host: 'http://localhost:8123' }), diff --git a/packages/common-utils/src/queryParser.ts b/packages/common-utils/src/queryParser.ts index 11cde151..6045cab8 100644 --- a/packages/common-utils/src/queryParser.ts +++ b/packages/common-utils/src/queryParser.ts @@ -398,6 +398,8 @@ export abstract class SQLSerializer implements Serializer { found: boolean; mapKeyIndexExpression?: string; arrayMapKeyExpression?: string; + mapTokensColumn?: string; + mapKey?: string; }>; operator(op: lucene.Operator) { @@ -436,6 +438,8 @@ export abstract class SQLSerializer implements Serializer { isArray, mapKeyIndexExpression, arrayMapKeyExpression, + mapTokensColumn, + mapKey, } = await this.getColumnForField(field, context); if (!found) { return this.NOT_FOUND_QUERY; @@ -452,6 +456,22 @@ export abstract class SQLSerializer implements Serializer { }); } + // Map tokens skip-index optimization: use has(TokensColumn, 'key=value') + // when a text-indexed tokens column exists for this map column. + // Only for non-negated string equality where the key doesn't contain '='. + if ( + mapTokensColumn && + mapKey && + !isNegatedField && + !mapKey.includes('=') && + propertyType === JSDataType.String + ) { + return SqlString.format(`(has(??, ?))`, [ + mapTokensColumn, + `${mapKey}=${term}`, + ]); + } + const expressionPostfix = mapKeyIndexExpression && !isNegatedField ? ` AND ${mapKeyIndexExpression}` @@ -705,6 +725,10 @@ type CustomSchemaSQLColumnExpression = { }; mapKeyIndexExpression?: string; arrayMapKeyExpression?: string; + /** Tokens column name for map skip-index optimization (e.g. 'LogAttributeTokens') */ + mapTokensColumn?: string; + /** Map key for tokens optimization (e.g. 'userId') */ + mapKey?: string; }; export type CustomSchemaConfig = { @@ -1112,6 +1136,9 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { if (prefixMatch.type.startsWith('Map')) { const valueType = prefixMatch.type.match(/,\s+(\w+)\)$/)?.[1]; + const tokensColumn = await this.findTokensColumnForMap( + prefixMatch.name, + ); return { found: true, columnExpression: SqlString.format(`??[?]`, [ @@ -1120,6 +1147,8 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { ]), mapKeyIndexExpression: `indexHint(${buildMapContains(`${prefixMatch.name}['${fieldPostfix}']`)})`, columnType: valueType ?? 'Unknown', + mapTokensColumn: tokensColumn, + mapKey: fieldPostfix, }; } else if (prefixMatch.type.startsWith('JSON')) { // ignore original column expression at here @@ -1173,6 +1202,41 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { // throw new Error(`Column not found: ${field}`); } + /** + * Finds an ALIAS column that tokenizes a Map column into 'key=value' pairs + * and has a text index on it, enabling skip-index optimization. + */ + private async findTokensColumnForMap( + mapColumnName: string, + ): Promise { + try { + const columns = await this.metadata.getColumns({ + databaseName: this.databaseName, + tableName: this.tableName, + connectionId: this.connectionId, + }); + + // Find ALIAS columns derived from this map column + const tokensColumn = columns.find( + c => + c.default_type === 'ALIAS' && + c.default_expression.includes(`mapKeys(${mapColumnName})`) && + c.default_expression.includes(`mapValues(${mapColumnName})`), + ); + + if (!tokensColumn) return undefined; + + // Verify there's a text index on the tokens column + const textIndex = await this.findTextIndex(tokensColumn.name); + if (!textIndex) return undefined; + + return tokensColumn.name; + } catch (e) { + console.debug('Error in findTokensColumnForMap', e); + return undefined; + } + } + private async findTextIndex( columnExpression: string, ): Promise { @@ -1320,6 +1384,8 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { arrayMapKeyExpression: isArray ? expression.arrayMapKeyExpression : undefined, + mapTokensColumn: expression.mapTokensColumn, + mapKey: expression.mapKey, }; } }