mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: vibe coded optimization for direct read from kv array
This commit is contained in:
parent
7335a23acd
commit
3d71a96aa8
3 changed files with 391 additions and 3 deletions
|
|
@ -17,6 +17,9 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_logs
|
|||
`ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)),
|
||||
`ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
|
||||
`LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
|
||||
`ResourceAttributeTokens` Array(String) ALIAS arrayMap((k,v) -> concat(k, '=', v), mapKeys(ResourceAttributes), mapValues(ResourceAttributes)),
|
||||
`LogAttributeTokens` Array(String) ALIAS arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes)),
|
||||
`ScopeAttributeTokens` Array(String) ALIAS arrayMap((k,v) -> concat(k, '=', v), mapKeys(ScopeAttributes), mapValues(ScopeAttributes)),
|
||||
`__hdx_materialized_k8s.cluster.name` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.cluster.name'] CODEC(ZSTD(1)),
|
||||
`__hdx_materialized_k8s.container.name` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.container.name'] CODEC(ZSTD(1)),
|
||||
`__hdx_materialized_k8s.deployment.name` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.deployment.name'] CODEC(ZSTD(1)),
|
||||
|
|
@ -27,11 +30,11 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_logs
|
|||
`__hdx_materialized_deployment.environment.name` LowCardinality(String) MATERIALIZED ResourceAttributes['deployment.environment.name'] CODEC(ZSTD(1)),
|
||||
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
|
||||
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
|
||||
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
|
||||
INDEX idx_res_attr_kv_text ResourceAttributeTokens TYPE text(tokenizer = 'array'),
|
||||
INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
|
||||
INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
|
||||
INDEX idx_scope_attr_kv_text ScopeAttributeTokens TYPE text(tokenizer = 'array')
|
||||
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
|
||||
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
|
||||
INDEX idx_log_attr_kv_text LogAttributeTokens TYPE text(tokenizer = 'array'),
|
||||
INDEX idx_lower_body lower(Body) TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
|
|
|
|||
|
|
@ -1263,6 +1263,325 @@ describe('CustomSchemaSQLSerializerV2 - indexCoversColumn', () => {
|
|||
);
|
||||
});
|
||||
|
||||
describe('CustomSchemaSQLSerializerV2 - Map Tokens Skip-Index Optimization', () => {
|
||||
const metadata = getMetadata(
|
||||
new ClickhouseClient({ host: 'http://localhost:8123' }),
|
||||
);
|
||||
metadata.getColumn = jest.fn().mockImplementation(async ({ column }) => {
|
||||
if (column === 'LogAttributes') {
|
||||
return {
|
||||
name: 'LogAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
};
|
||||
} else if (column === 'ResourceAttributes') {
|
||||
return {
|
||||
name: 'ResourceAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
};
|
||||
} else if (column === 'Body') {
|
||||
return { name: 'Body', type: 'String' };
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
metadata.getMaterializedColumnsLookupTable = jest
|
||||
.fn()
|
||||
.mockResolvedValue(new Map());
|
||||
metadata.getSetting = jest.fn().mockResolvedValue(undefined);
|
||||
|
||||
const databaseName = 'default';
|
||||
const tableName = 'otel_logs';
|
||||
const connectionId = 'test';
|
||||
|
||||
it('should use has(TokensColumn, key=value) for exact map matches when tokens column with text index exists', async () => {
|
||||
metadata.getColumns = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'LogAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
default_type: '',
|
||||
default_expression: '',
|
||||
},
|
||||
{
|
||||
name: 'LogAttributeTokens',
|
||||
type: 'Array(String)',
|
||||
default_type: 'ALIAS',
|
||||
default_expression:
|
||||
"arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))",
|
||||
},
|
||||
]);
|
||||
metadata.getSkipIndices = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'idx_log_attr_kv_text',
|
||||
type: 'text',
|
||||
typeFull: "text(tokenizer = 'array')",
|
||||
expression: 'LogAttributeTokens',
|
||||
granularity: 1,
|
||||
},
|
||||
]);
|
||||
|
||||
const serializer = new CustomSchemaSQLSerializerV2({
|
||||
metadata,
|
||||
databaseName,
|
||||
tableName,
|
||||
connectionId,
|
||||
implicitColumnExpression: 'Body',
|
||||
});
|
||||
|
||||
// Exact match: LogAttributes.userId:"abc123"
|
||||
const result = await serializer.eq(
|
||||
'LogAttributes.userId',
|
||||
'abc123',
|
||||
false,
|
||||
{},
|
||||
);
|
||||
expect(result).toBe("(has(`LogAttributeTokens`, 'userId=abc123'))");
|
||||
});
|
||||
|
||||
it('should fall back to map access for negated exact matches', async () => {
|
||||
metadata.getColumns = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'LogAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
default_type: '',
|
||||
default_expression: '',
|
||||
},
|
||||
{
|
||||
name: 'LogAttributeTokens',
|
||||
type: 'Array(String)',
|
||||
default_type: 'ALIAS',
|
||||
default_expression:
|
||||
"arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))",
|
||||
},
|
||||
]);
|
||||
metadata.getSkipIndices = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'idx_log_attr_kv_text',
|
||||
type: 'text',
|
||||
typeFull: "text(tokenizer = 'array')",
|
||||
expression: 'LogAttributeTokens',
|
||||
granularity: 1,
|
||||
},
|
||||
]);
|
||||
|
||||
const serializer = new CustomSchemaSQLSerializerV2({
|
||||
metadata,
|
||||
databaseName,
|
||||
tableName,
|
||||
connectionId,
|
||||
implicitColumnExpression: 'Body',
|
||||
});
|
||||
|
||||
// Negated: -LogAttributes.userId:"abc123"
|
||||
const result = await serializer.eq(
|
||||
'LogAttributes.userId',
|
||||
'abc123',
|
||||
true,
|
||||
{},
|
||||
);
|
||||
expect(result).toBe("(`LogAttributes`['userId'] != 'abc123')");
|
||||
});
|
||||
|
||||
it('should fall back to map access when no tokens column exists', async () => {
|
||||
metadata.getColumns = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'LogAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
default_type: '',
|
||||
default_expression: '',
|
||||
},
|
||||
]);
|
||||
metadata.getSkipIndices = jest.fn().mockResolvedValue([]);
|
||||
|
||||
const serializer = new CustomSchemaSQLSerializerV2({
|
||||
metadata,
|
||||
databaseName,
|
||||
tableName,
|
||||
connectionId,
|
||||
implicitColumnExpression: 'Body',
|
||||
});
|
||||
|
||||
const result = await serializer.eq(
|
||||
'LogAttributes.userId',
|
||||
'abc123',
|
||||
false,
|
||||
{},
|
||||
);
|
||||
expect(result).toBe(
|
||||
"(`LogAttributes`['userId'] = 'abc123' AND indexHint(mapContains(`LogAttributes`, 'userId')))",
|
||||
);
|
||||
});
|
||||
|
||||
it('should fall back to map access when tokens column exists but has no text index', async () => {
|
||||
metadata.getColumns = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'LogAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
default_type: '',
|
||||
default_expression: '',
|
||||
},
|
||||
{
|
||||
name: 'LogAttributeTokens',
|
||||
type: 'Array(String)',
|
||||
default_type: 'ALIAS',
|
||||
default_expression:
|
||||
"arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))",
|
||||
},
|
||||
]);
|
||||
metadata.getSkipIndices = jest.fn().mockResolvedValue([]);
|
||||
|
||||
const serializer = new CustomSchemaSQLSerializerV2({
|
||||
metadata,
|
||||
databaseName,
|
||||
tableName,
|
||||
connectionId,
|
||||
implicitColumnExpression: 'Body',
|
||||
});
|
||||
|
||||
const result = await serializer.eq(
|
||||
'LogAttributes.userId',
|
||||
'abc123',
|
||||
false,
|
||||
{},
|
||||
);
|
||||
expect(result).toBe(
|
||||
"(`LogAttributes`['userId'] = 'abc123' AND indexHint(mapContains(`LogAttributes`, 'userId')))",
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle values containing equals signs', async () => {
|
||||
metadata.getColumns = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'LogAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
default_type: '',
|
||||
default_expression: '',
|
||||
},
|
||||
{
|
||||
name: 'LogAttributeTokens',
|
||||
type: 'Array(String)',
|
||||
default_type: 'ALIAS',
|
||||
default_expression:
|
||||
"arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))",
|
||||
},
|
||||
]);
|
||||
metadata.getSkipIndices = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'idx_log_attr_kv_text',
|
||||
type: 'text',
|
||||
typeFull: "text(tokenizer = 'array')",
|
||||
expression: 'LogAttributeTokens',
|
||||
granularity: 1,
|
||||
},
|
||||
]);
|
||||
|
||||
const serializer = new CustomSchemaSQLSerializerV2({
|
||||
metadata,
|
||||
databaseName,
|
||||
tableName,
|
||||
connectionId,
|
||||
implicitColumnExpression: 'Body',
|
||||
});
|
||||
|
||||
// Value with = should still work (key doesn't contain =)
|
||||
const result = await serializer.eq(
|
||||
'LogAttributes.userId',
|
||||
'abc=123',
|
||||
false,
|
||||
{},
|
||||
);
|
||||
expect(result).toBe("(has(`LogAttributeTokens`, 'userId=abc=123'))");
|
||||
});
|
||||
|
||||
it('should use tokens optimization via SearchQueryBuilder for quoted map field searches', async () => {
|
||||
metadata.getColumns = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'LogAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
default_type: '',
|
||||
default_expression: '',
|
||||
},
|
||||
{
|
||||
name: 'LogAttributeTokens',
|
||||
type: 'Array(String)',
|
||||
default_type: 'ALIAS',
|
||||
default_expression:
|
||||
"arrayMap((k,v) -> concat(k, '=', v), mapKeys(LogAttributes), mapValues(LogAttributes))",
|
||||
},
|
||||
]);
|
||||
metadata.getSkipIndices = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'idx_log_attr_kv_text',
|
||||
type: 'text',
|
||||
typeFull: "text(tokenizer = 'array')",
|
||||
expression: 'LogAttributeTokens',
|
||||
granularity: 1,
|
||||
},
|
||||
]);
|
||||
|
||||
const serializer = new CustomSchemaSQLSerializerV2({
|
||||
metadata,
|
||||
databaseName,
|
||||
tableName,
|
||||
connectionId,
|
||||
implicitColumnExpression: 'Body',
|
||||
});
|
||||
|
||||
// Lucene: LogAttributes.error.message:"Failed to fetch"
|
||||
const builder = new SearchQueryBuilder(
|
||||
'LogAttributes.error.message:"Failed to fetch"',
|
||||
serializer,
|
||||
);
|
||||
const sql = await builder.build();
|
||||
expect(sql).toBe(
|
||||
"((has(`LogAttributeTokens`, 'error.message=Failed to fetch')))",
|
||||
);
|
||||
});
|
||||
|
||||
it('should work with ResourceAttributes tokens column', async () => {
|
||||
metadata.getColumns = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'ResourceAttributes',
|
||||
type: 'Map(LowCardinality(String), String)',
|
||||
default_type: '',
|
||||
default_expression: '',
|
||||
},
|
||||
{
|
||||
name: 'ResourceAttributeTokens',
|
||||
type: 'Array(String)',
|
||||
default_type: 'ALIAS',
|
||||
default_expression:
|
||||
"arrayMap((k,v) -> concat(k, '=', v), mapKeys(ResourceAttributes), mapValues(ResourceAttributes))",
|
||||
},
|
||||
]);
|
||||
metadata.getSkipIndices = jest.fn().mockResolvedValue([
|
||||
{
|
||||
name: 'idx_res_attr_kv_text',
|
||||
type: 'text',
|
||||
typeFull: "text(tokenizer = 'array')",
|
||||
expression: 'ResourceAttributeTokens',
|
||||
granularity: 1,
|
||||
},
|
||||
]);
|
||||
|
||||
const serializer = new CustomSchemaSQLSerializerV2({
|
||||
metadata,
|
||||
databaseName,
|
||||
tableName,
|
||||
connectionId,
|
||||
implicitColumnExpression: 'Body',
|
||||
});
|
||||
|
||||
const result = await serializer.eq(
|
||||
'ResourceAttributes.service.name',
|
||||
'my-service',
|
||||
false,
|
||||
{},
|
||||
);
|
||||
expect(result).toBe(
|
||||
"(has(`ResourceAttributeTokens`, 'service.name=my-service'))",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('CustomSchemaSQLSerializerV2 - Array and Nested Fields', () => {
|
||||
const metadata = getMetadata(
|
||||
new ClickhouseClient({ host: 'http://localhost:8123' }),
|
||||
|
|
|
|||
|
|
@ -398,6 +398,8 @@ export abstract class SQLSerializer implements Serializer {
|
|||
found: boolean;
|
||||
mapKeyIndexExpression?: string;
|
||||
arrayMapKeyExpression?: string;
|
||||
mapTokensColumn?: string;
|
||||
mapKey?: string;
|
||||
}>;
|
||||
|
||||
operator(op: lucene.Operator) {
|
||||
|
|
@ -436,6 +438,8 @@ export abstract class SQLSerializer implements Serializer {
|
|||
isArray,
|
||||
mapKeyIndexExpression,
|
||||
arrayMapKeyExpression,
|
||||
mapTokensColumn,
|
||||
mapKey,
|
||||
} = await this.getColumnForField(field, context);
|
||||
if (!found) {
|
||||
return this.NOT_FOUND_QUERY;
|
||||
|
|
@ -452,6 +456,22 @@ export abstract class SQLSerializer implements Serializer {
|
|||
});
|
||||
}
|
||||
|
||||
// Map tokens skip-index optimization: use has(TokensColumn, 'key=value')
|
||||
// when a text-indexed tokens column exists for this map column.
|
||||
// Only for non-negated string equality where the key doesn't contain '='.
|
||||
if (
|
||||
mapTokensColumn &&
|
||||
mapKey &&
|
||||
!isNegatedField &&
|
||||
!mapKey.includes('=') &&
|
||||
propertyType === JSDataType.String
|
||||
) {
|
||||
return SqlString.format(`(has(??, ?))`, [
|
||||
mapTokensColumn,
|
||||
`${mapKey}=${term}`,
|
||||
]);
|
||||
}
|
||||
|
||||
const expressionPostfix =
|
||||
mapKeyIndexExpression && !isNegatedField
|
||||
? ` AND ${mapKeyIndexExpression}`
|
||||
|
|
@ -705,6 +725,10 @@ type CustomSchemaSQLColumnExpression = {
|
|||
};
|
||||
mapKeyIndexExpression?: string;
|
||||
arrayMapKeyExpression?: string;
|
||||
/** Tokens column name for map skip-index optimization (e.g. 'LogAttributeTokens') */
|
||||
mapTokensColumn?: string;
|
||||
/** Map key for tokens optimization (e.g. 'userId') */
|
||||
mapKey?: string;
|
||||
};
|
||||
|
||||
export type CustomSchemaConfig = {
|
||||
|
|
@ -1112,6 +1136,9 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer {
|
|||
|
||||
if (prefixMatch.type.startsWith('Map')) {
|
||||
const valueType = prefixMatch.type.match(/,\s+(\w+)\)$/)?.[1];
|
||||
const tokensColumn = await this.findTokensColumnForMap(
|
||||
prefixMatch.name,
|
||||
);
|
||||
return {
|
||||
found: true,
|
||||
columnExpression: SqlString.format(`??[?]`, [
|
||||
|
|
@ -1120,6 +1147,8 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer {
|
|||
]),
|
||||
mapKeyIndexExpression: `indexHint(${buildMapContains(`${prefixMatch.name}['${fieldPostfix}']`)})`,
|
||||
columnType: valueType ?? 'Unknown',
|
||||
mapTokensColumn: tokensColumn,
|
||||
mapKey: fieldPostfix,
|
||||
};
|
||||
} else if (prefixMatch.type.startsWith('JSON')) {
|
||||
// ignore original column expression at here
|
||||
|
|
@ -1173,6 +1202,41 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer {
|
|||
// throw new Error(`Column not found: ${field}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds an ALIAS column that tokenizes a Map column into 'key=value' pairs
|
||||
* and has a text index on it, enabling skip-index optimization.
|
||||
*/
|
||||
private async findTokensColumnForMap(
|
||||
mapColumnName: string,
|
||||
): Promise<string | undefined> {
|
||||
try {
|
||||
const columns = await this.metadata.getColumns({
|
||||
databaseName: this.databaseName,
|
||||
tableName: this.tableName,
|
||||
connectionId: this.connectionId,
|
||||
});
|
||||
|
||||
// Find ALIAS columns derived from this map column
|
||||
const tokensColumn = columns.find(
|
||||
c =>
|
||||
c.default_type === 'ALIAS' &&
|
||||
c.default_expression.includes(`mapKeys(${mapColumnName})`) &&
|
||||
c.default_expression.includes(`mapValues(${mapColumnName})`),
|
||||
);
|
||||
|
||||
if (!tokensColumn) return undefined;
|
||||
|
||||
// Verify there's a text index on the tokens column
|
||||
const textIndex = await this.findTextIndex(tokensColumn.name);
|
||||
if (!textIndex) return undefined;
|
||||
|
||||
return tokensColumn.name;
|
||||
} catch (e) {
|
||||
console.debug('Error in findTokensColumnForMap', e);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private async findTextIndex(
|
||||
columnExpression: string,
|
||||
): Promise<SkipIndexMetadata | undefined> {
|
||||
|
|
@ -1320,6 +1384,8 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer {
|
|||
arrayMapKeyExpression: isArray
|
||||
? expression.arrayMapKeyExpression
|
||||
: undefined,
|
||||
mapTokensColumn: expression.mapTokensColumn,
|
||||
mapKey: expression.mapKey,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue