mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: Add MV granularities and infer config from SummingMergeTree (#1550)
Closes HDX-3064 # Summary This PR makes two improvements to MV configuration 1. Additional granularities are now available 2. Configuration can now be inferred from SummingMergeTree MVs (in addition to AggregatingMergeTree, which were already supported) 3. Source column inference has been improved to support cases where the target table column name does not follow the `aggFn__sourceColumn` convention. Also, tests have been updated and additional tests have been backported from the EE repo. ## Demo https://github.com/user-attachments/assets/7ba6ef0c-2187-4e07-bfda-7b4b690b7a73 ## Testing To test the Summing Merge Tree inference, create a materialized view like so: ```sql CREATE TABLE default.traces_summed_2 ( `Timestamp` DateTime, `ServiceName` LowCardinality(String), `StatusCode` LowCardinality(String), `count` UInt64, `sumDuration` UInt64, `quantileDuration` AggregateFunction(quantile(0.5), UInt64) ) ENGINE = SummingMergeTree((count, sumDuration)) ORDER BY (ServiceName, StatusCode, Timestamp) SETTINGS index_granularity = 8192; CREATE MATERIALIZED VIEW default.traces_summed_mv_2 TO default.traces_summed_2 ( `Timestamp` DateTime, `ServiceName` LowCardinality(String), `StatusCode` LowCardinality(String), `count` UInt64, `sumDuration` UInt64, `quantileDuration` AggregateFunction(quantile(0.5), UInt64) ) AS SELECT toStartOfInterval(Timestamp, toIntervalMinute(30)) AS Timestamp, ServiceName, StatusCode, count() AS count, sum(Duration) AS sumDuration, quantileState(0.5)(Duration) AS quantileDuration FROM default.otel_traces GROUP BY Timestamp, ServiceName, StatusCode; ```
This commit is contained in:
parent
e9650e8651
commit
ae12ca1670
4 changed files with 878 additions and 31 deletions
6
.changeset/purple-bats-smile.md
Normal file
6
.changeset/purple-bats-smile.md
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
"@hyperdx/api": patch
|
||||
"@hyperdx/app": patch
|
||||
---
|
||||
|
||||
feat: Add MV granularities and infer config from SummingMergeTree
|
||||
|
|
@ -58,6 +58,7 @@ import {
|
|||
import {
|
||||
inferMaterializedViewConfig,
|
||||
MV_AGGREGATE_FUNCTIONS,
|
||||
MV_GRANULARITY_OPTIONS,
|
||||
} from '@/utils/materializedViews';
|
||||
|
||||
import ConfirmDeleteMenu from './ConfirmDeleteMenu';
|
||||
|
|
@ -70,15 +71,6 @@ import { SQLInlineEditorControlled } from './SQLInlineEditor';
|
|||
|
||||
const DEFAULT_DATABASE = 'default';
|
||||
|
||||
const MV_GRANULARITY_OPTIONS = [
|
||||
{ value: '1 second', label: '1 second' },
|
||||
{ value: '1 minute', label: '1 minute' },
|
||||
{ value: '5 minute', label: '5 minutes' },
|
||||
{ value: '15 minute', label: '15 minutes' },
|
||||
{ value: '1 hour', label: '1 hour' },
|
||||
{ value: '1 day', label: '1 day' },
|
||||
];
|
||||
|
||||
const MV_AGGREGATE_FUNCTION_OPTIONS = MV_AGGREGATE_FUNCTIONS.map(fn => ({
|
||||
value: fn,
|
||||
label: fn,
|
||||
|
|
@ -584,12 +576,14 @@ function AggregatedColumnsFormSection({
|
|||
replaceAggregates(config.aggregatedColumns ?? []);
|
||||
notifications.show({
|
||||
color: 'green',
|
||||
id: 'mv-infer-success',
|
||||
message:
|
||||
'Partially inferred materialized view configuration from view schema.',
|
||||
});
|
||||
} else {
|
||||
notifications.show({
|
||||
color: 'yellow',
|
||||
id: 'mv-infer-failure',
|
||||
message: 'Unable to infer materialized view configuration.',
|
||||
});
|
||||
}
|
||||
|
|
|
|||
698
packages/app/src/utils/__tests__/materializedViews.test.ts
Normal file
698
packages/app/src/utils/__tests__/materializedViews.test.ts
Normal file
|
|
@ -0,0 +1,698 @@
|
|||
import { ColumnMeta } from '@hyperdx/common-utils/dist/clickhouse';
|
||||
import {
|
||||
Metadata,
|
||||
TableConnection,
|
||||
TableMetadata,
|
||||
} from '@hyperdx/common-utils/dist/core/metadata';
|
||||
|
||||
import { getMetadata } from '@/metadata';
|
||||
|
||||
import {
|
||||
getSourceTableColumn,
|
||||
inferMaterializedViewConfig,
|
||||
inferTimestampColumnGranularity,
|
||||
parseSummedColumns,
|
||||
} from '../materializedViews';
|
||||
|
||||
jest.mock('@/metadata', () => {
|
||||
return {
|
||||
getMetadata: jest.fn(),
|
||||
};
|
||||
});
|
||||
|
||||
function createMockColumnMeta({
|
||||
name,
|
||||
type,
|
||||
}: {
|
||||
name: string;
|
||||
type: string;
|
||||
}): ColumnMeta {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
return { name, type } as ColumnMeta;
|
||||
}
|
||||
|
||||
describe('inferMaterializedViewConfig', () => {
|
||||
const mockGetMetadata = jest.mocked(getMetadata);
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const mockMetadata: Metadata = {
|
||||
getColumns: jest.fn(),
|
||||
getTableMetadata: jest.fn(),
|
||||
queryMaterializedViewsByTarget: jest.fn(),
|
||||
} as unknown as Metadata;
|
||||
|
||||
const mvTargetTable = {
|
||||
columns: [
|
||||
createMockColumnMeta({ name: 'Timestamp', type: 'DateTime' }),
|
||||
createMockColumnMeta({
|
||||
name: 'ServiceName',
|
||||
type: 'LowCardinality(String)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'SpanKind',
|
||||
type: 'LowCardinality(String)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'count',
|
||||
type: 'SimpleAggregateFunction(sum, UInt64)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'sum__Duration',
|
||||
type: 'SimpleAggregateFunction(sum, UInt64)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'histogram__Duration',
|
||||
type: 'AggregateFunction(histogram(20), UInt64)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'quantile__Duration',
|
||||
type: 'AggregateFunction(quantile(0.5), UInt64)',
|
||||
}),
|
||||
] as ColumnMeta[],
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
meta: {
|
||||
engine: 'AggregatingMergeTree',
|
||||
database: 'test_db',
|
||||
name: 'test_mv_target_table',
|
||||
primary_key: 'Timestamp, ServiceName, SpanKind',
|
||||
} as unknown as TableMetadata,
|
||||
};
|
||||
|
||||
const mvTargetTableSummingMergeTree = {
|
||||
columns: [
|
||||
createMockColumnMeta({ name: 'Timestamp', type: 'DateTime' }),
|
||||
createMockColumnMeta({
|
||||
name: 'ServiceName',
|
||||
type: 'LowCardinality(String)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'SpanKind',
|
||||
type: 'LowCardinality(String)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'quantileDuration',
|
||||
type: 'AggregateFunction(quantile(0.5), UInt64)',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'count',
|
||||
type: 'UInt64',
|
||||
}),
|
||||
createMockColumnMeta({
|
||||
name: 'sumDuration',
|
||||
type: 'UInt64',
|
||||
}),
|
||||
] as ColumnMeta[],
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
meta: {
|
||||
engine: 'SummingMergeTree',
|
||||
engine_full:
|
||||
'SummingMergeTree((count, sumDuration)) ORDER BY (Timestamp, ServiceName, SpanKind) SETTINGS index_granularity = 8192',
|
||||
database: 'test_db',
|
||||
name: 'test_mv_target_table_summing',
|
||||
primary_key: 'Timestamp, ServiceName, SpanKind',
|
||||
} as unknown as TableMetadata,
|
||||
};
|
||||
|
||||
const mvSourceTable = {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
columns: [
|
||||
{ name: 'Timestamp', type: 'DateTime' },
|
||||
{ name: 'ServiceName', type: 'LowCardinality(String)' },
|
||||
{ name: 'SpanKind', type: 'LowCardinality(String)' },
|
||||
{ name: 'Duration', type: 'UInt64' },
|
||||
] as ColumnMeta[],
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
meta: {
|
||||
engine: 'SharedMergeTree',
|
||||
database: 'test_db',
|
||||
name: 'test_source_table',
|
||||
} as unknown as TableMetadata,
|
||||
};
|
||||
|
||||
const mv = {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
meta: {
|
||||
engine: 'MaterializedView',
|
||||
database: 'test_db',
|
||||
name: 'test_mv',
|
||||
create_table_query: `CREATE MATERIALIZED VIEW test_db.test_mv TO test_db.test_mv_target_table AS
|
||||
SELECT toStartOfHour(Timestamp) AS Timestamp, ServiceName, SpanKind,
|
||||
count(*) AS count, sum(Duration) AS sum__Duration, histogram(20)(Duration) as histogram__Duration, quantileState(0.5)(Duration) AS quantile__Duration
|
||||
FROM test_source_table
|
||||
GROUP BY Timestamp, ServiceName, SpanKind`,
|
||||
as_select: `SELECT toStartOfHour(Timestamp) AS Timestamp, ServiceName, SpanKind,
|
||||
count(*) AS count, sum(Duration) AS sum__Duration, histogram(20)(Duration) as histogram__Duration, quantileState(0.5)(Duration) AS quantile__Duration
|
||||
FROM test_source_table
|
||||
GROUP BY Timestamp, ServiceName, SpanKind`,
|
||||
} as unknown as TableMetadata,
|
||||
};
|
||||
|
||||
const summingMergeTreeMV = {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
meta: {
|
||||
engine: 'MaterializedView',
|
||||
database: 'test_db',
|
||||
name: 'test_mv_summing',
|
||||
create_table_query: `CREATE MATERIALIZED VIEW test_db.test_mv_summing TO test_db.test_mv_target_table_summing AS
|
||||
SELECT toStartOfHour(Timestamp) AS Timestamp, ServiceName, SpanKind, count() AS count, sum(Duration) AS sumDuration, quantileState(0.5)(Duration) AS quantileDuration
|
||||
FROM test_source_table
|
||||
GROUP BY Timestamp, ServiceName, SpanKind`,
|
||||
as_select: `SELECT toStartOfHour(Timestamp) AS Timestamp, ServiceName, SpanKind, count() AS count, sum(Duration) AS sumDuration, quantileState(0.5)(Duration) AS quantileDuration
|
||||
FROM test_source_table
|
||||
GROUP BY Timestamp, ServiceName, SpanKind`,
|
||||
} as unknown as TableMetadata,
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
mockGetMetadata.mockReturnValue(mockMetadata);
|
||||
mockMetadata.getTableMetadata = jest
|
||||
.fn()
|
||||
.mockImplementation(({ tableName }) => {
|
||||
if (tableName === 'test_mv') {
|
||||
return Promise.resolve(mv.meta);
|
||||
} else if (tableName === 'test_mv_summing') {
|
||||
return Promise.resolve(summingMergeTreeMV.meta);
|
||||
} else if (tableName === 'test_mv_target_table') {
|
||||
return Promise.resolve(mvTargetTable.meta);
|
||||
} else if (tableName === 'test_mv_target_table_summing') {
|
||||
return Promise.resolve(mvTargetTableSummingMergeTree.meta);
|
||||
} else if (tableName === 'test_source_table') {
|
||||
return Promise.resolve(mvSourceTable.meta);
|
||||
}
|
||||
return Promise.reject(new Error(`Table ${tableName} not found`));
|
||||
});
|
||||
mockMetadata.getColumns = jest.fn().mockImplementation(({ tableName }) => {
|
||||
if (tableName === 'test_source_table') {
|
||||
return Promise.resolve(mvSourceTable.columns);
|
||||
} else if (tableName === 'test_mv_target_table') {
|
||||
return Promise.resolve(mvTargetTable.columns);
|
||||
} else if (tableName === 'test_mv_target_table_summing') {
|
||||
return Promise.resolve(mvTargetTableSummingMergeTree.columns);
|
||||
}
|
||||
return Promise.reject(new Error(`Table ${tableName} not found`));
|
||||
});
|
||||
|
||||
mockMetadata.queryMaterializedViewsByTarget = jest
|
||||
.fn()
|
||||
.mockImplementation(({ tableName }) => {
|
||||
return Promise.resolve([
|
||||
{
|
||||
databaseName: 'test_db',
|
||||
tableName:
|
||||
tableName === 'test_mv_target_table_summing'
|
||||
? 'test_mv_summing'
|
||||
: 'test_mv',
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
it('should infer materialized view configuration when given the name of a materialized view target table', async () => {
|
||||
const sourceTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_source_table',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const mvTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv_target_table',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const actualConfig = await inferMaterializedViewConfig(
|
||||
mvTableConnection,
|
||||
sourceTableConnection,
|
||||
);
|
||||
|
||||
expect(actualConfig).toEqual({
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv_target_table',
|
||||
dimensionColumns: 'ServiceName, SpanKind',
|
||||
timestampColumn: 'Timestamp',
|
||||
minGranularity: '1 hour',
|
||||
aggregatedColumns: [
|
||||
{
|
||||
aggFn: 'count',
|
||||
mvColumn: 'count',
|
||||
sourceColumn: '',
|
||||
},
|
||||
{
|
||||
aggFn: 'sum',
|
||||
mvColumn: 'sum__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
{
|
||||
aggFn: 'histogram',
|
||||
mvColumn: 'histogram__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
{
|
||||
aggFn: 'quantile',
|
||||
mvColumn: 'quantile__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('should infer materialized view configuration when given the name of a materialized view', async () => {
|
||||
const sourceTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_source_table',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const mvTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv', // Same as the previous test except this line refers to the MV instead of the target table
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const actualConfig = await inferMaterializedViewConfig(
|
||||
mvTableConnection,
|
||||
sourceTableConnection,
|
||||
);
|
||||
|
||||
expect(actualConfig).toEqual({
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv_target_table',
|
||||
dimensionColumns: 'ServiceName, SpanKind',
|
||||
timestampColumn: 'Timestamp',
|
||||
minGranularity: '1 hour',
|
||||
aggregatedColumns: [
|
||||
{
|
||||
aggFn: 'count',
|
||||
mvColumn: 'count',
|
||||
sourceColumn: '',
|
||||
},
|
||||
{
|
||||
aggFn: 'sum',
|
||||
mvColumn: 'sum__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
{
|
||||
aggFn: 'histogram',
|
||||
mvColumn: 'histogram__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
{
|
||||
aggFn: 'quantile',
|
||||
mvColumn: 'quantile__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('should infer materialized view configuration when given the name of a SummingMergeTree target table', async () => {
|
||||
const sourceTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_source_table',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const mvTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv_target_table_summing',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const actualConfig = await inferMaterializedViewConfig(
|
||||
mvTableConnection,
|
||||
sourceTableConnection,
|
||||
);
|
||||
|
||||
expect(actualConfig).toEqual({
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv_target_table_summing',
|
||||
dimensionColumns: 'ServiceName, SpanKind',
|
||||
timestampColumn: 'Timestamp',
|
||||
minGranularity: '1 hour',
|
||||
aggregatedColumns: [
|
||||
{
|
||||
aggFn: 'quantile',
|
||||
mvColumn: 'quantileDuration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
{
|
||||
aggFn: 'count',
|
||||
mvColumn: 'count',
|
||||
sourceColumn: '',
|
||||
},
|
||||
{
|
||||
aggFn: 'sum',
|
||||
mvColumn: 'sumDuration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('should return a partial result when multiple materialized views target the same table', async () => {
|
||||
mockMetadata.queryMaterializedViewsByTarget = jest.fn().mockResolvedValue([
|
||||
{ tableName: 'test_mv', databaseName: 'test_db' },
|
||||
{ tableName: 'test_mv_2', databaseName: 'test_db' },
|
||||
]);
|
||||
|
||||
const sourceTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_source_table',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const mvTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv_target_table',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const actualConfig = await inferMaterializedViewConfig(
|
||||
mvTableConnection,
|
||||
sourceTableConnection,
|
||||
);
|
||||
|
||||
expect(actualConfig).toEqual({
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_mv_target_table',
|
||||
dimensionColumns: 'ServiceName, SpanKind',
|
||||
timestampColumn: 'Timestamp',
|
||||
minGranularity: '', // Since we don't know the MV, we can't infer the granularity
|
||||
aggregatedColumns: [
|
||||
{
|
||||
aggFn: 'count',
|
||||
mvColumn: 'count',
|
||||
sourceColumn: '',
|
||||
},
|
||||
{
|
||||
aggFn: 'sum',
|
||||
mvColumn: 'sum__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
{
|
||||
aggFn: 'histogram',
|
||||
mvColumn: 'histogram__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
{
|
||||
aggFn: 'quantile',
|
||||
mvColumn: 'quantile__Duration',
|
||||
sourceColumn: 'Duration',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('should return undefined when the target table is not an AggregatingMergeTree', async () => {
|
||||
const sourceTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_source_table',
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const mvTableConnection: TableConnection = {
|
||||
databaseName: 'test_db',
|
||||
tableName: 'test_source_table', // This table is not an AggregatingMergeTree
|
||||
connectionId: 'test_connection',
|
||||
};
|
||||
|
||||
const actualConfig = await inferMaterializedViewConfig(
|
||||
mvTableConnection,
|
||||
sourceTableConnection,
|
||||
);
|
||||
|
||||
expect(actualConfig).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('inferTimestampColumnGranularity', () => {
|
||||
it.each([
|
||||
{
|
||||
expected: '1 second',
|
||||
asSelect:
|
||||
'SELECT toStartOfSecond(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
{
|
||||
expected: '1 minute',
|
||||
asSelect:
|
||||
'SELECT toStartOfMinute(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
{
|
||||
expected: '5 minute',
|
||||
asSelect:
|
||||
'SELECT toStartOfFiveMinutes(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
{
|
||||
expected: '15 minute',
|
||||
asSelect:
|
||||
'SELECT toStartOfFifteenMinutes(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
{
|
||||
expected: '1 hour',
|
||||
asSelect:
|
||||
'SELECT toStartOfHour(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
{
|
||||
expected: '1 day',
|
||||
asSelect:
|
||||
'SELECT toStartOfDay(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
])(
|
||||
'should handle a toStartOfX function with granularity: $expected',
|
||||
({ asSelect, expected }) => {
|
||||
expect(
|
||||
inferTimestampColumnGranularity(
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
{
|
||||
as_select: asSelect,
|
||||
} as unknown as TableMetadata,
|
||||
'Timestamp',
|
||||
),
|
||||
).toBe(expected);
|
||||
},
|
||||
);
|
||||
|
||||
it.each([
|
||||
{
|
||||
expected: '1 second',
|
||||
asSelect:
|
||||
'SELECT toStartOfInterval(Timestamp, INTERVAL 1 SECOND) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
{
|
||||
expected: '5 minute',
|
||||
asSelect:
|
||||
'SELECT toStartOfInterval(Timestamp, interval 5 minutes) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
{
|
||||
expected: '30 minute',
|
||||
asSelect:
|
||||
'SELECT toStartOfInterval(Timestamp, toIntervalMinute(30)) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
},
|
||||
])(
|
||||
'should handle a toStartOfInterval function with a dynamic interval: $expected',
|
||||
({ asSelect, expected }) => {
|
||||
expect(
|
||||
inferTimestampColumnGranularity(
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
{
|
||||
as_select: asSelect,
|
||||
} as unknown as TableMetadata,
|
||||
'Timestamp',
|
||||
),
|
||||
).toBe(expected);
|
||||
},
|
||||
);
|
||||
|
||||
it('should handle toDate()', () => {
|
||||
expect(
|
||||
inferTimestampColumnGranularity(
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
{
|
||||
as_select:
|
||||
'SELECT toDate(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
} as unknown as TableMetadata,
|
||||
'Timestamp',
|
||||
),
|
||||
).toBe('1 day');
|
||||
});
|
||||
|
||||
it('should handle toDateTime()', () => {
|
||||
expect(
|
||||
inferTimestampColumnGranularity(
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
{
|
||||
as_select:
|
||||
'SELECT toDateTime(Timestamp) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
} as unknown as TableMetadata,
|
||||
'Timestamp',
|
||||
),
|
||||
).toBe('1 second');
|
||||
});
|
||||
|
||||
it('should reject non-standard granularities', () => {
|
||||
expect(
|
||||
inferTimestampColumnGranularity(
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
{
|
||||
as_select:
|
||||
'SELECT toStartOfInterval(Timestamp, INTERVAL 7 SECOND) AS Timestamp, ServiceName, quantileState(0.9)(Duration) AS p90__Duration FROM default.otel_traces GROUP BY Timestamp, ServiceName',
|
||||
} as unknown as TableMetadata,
|
||||
'Timestamp',
|
||||
),
|
||||
).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('parseSummedColumns', () => {
|
||||
it('should parse summed columns correctly when there is one summed column', () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const metadata: TableMetadata = {
|
||||
engine: 'SummingMergeTree',
|
||||
engine_full:
|
||||
'SummingMergeTree(count) ORDER BY (Timestamp, ServiceName, SpanKind) SETTINGS index_granularity = 8192',
|
||||
} as TableMetadata;
|
||||
|
||||
const parsed = parseSummedColumns(metadata);
|
||||
expect(parsed).toEqual(new Set(['count']));
|
||||
});
|
||||
|
||||
it('should parse summed columns correctly when there are multiple summed columns', () => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const metadata = {
|
||||
engine: 'SummingMergeTree',
|
||||
engine_full:
|
||||
'SummingMergeTree((count, sum__Duration)) ORDER BY (Timestamp, ServiceName, SpanKind) SETTINGS index_granularity = 8192',
|
||||
} as TableMetadata;
|
||||
|
||||
const parsed = parseSummedColumns(metadata);
|
||||
expect(parsed).toEqual(new Set(['count', 'sum__Duration']));
|
||||
});
|
||||
});
|
||||
|
||||
describe('getSourceTableColumn', () => {
|
||||
it('should return empty string if no matching source column is found', () => {
|
||||
const sourceTableColumns: ColumnMeta[] = [
|
||||
createMockColumnMeta({ name: 'Duration', type: 'UInt64' }),
|
||||
createMockColumnMeta({ name: 'Value', type: 'UInt64' }),
|
||||
];
|
||||
|
||||
const targetTableColumn = createMockColumnMeta({
|
||||
name: 'sum__NonExistentColumn',
|
||||
type: 'SimpleAggregateFunction(sum, UInt64)',
|
||||
});
|
||||
const sourceColumn = getSourceTableColumn(
|
||||
'sum',
|
||||
targetTableColumn,
|
||||
sourceTableColumns,
|
||||
);
|
||||
expect(sourceColumn).toBe('');
|
||||
});
|
||||
|
||||
it('should return empty string if the aggFn is count', () => {
|
||||
const sourceTableColumns: ColumnMeta[] = [
|
||||
createMockColumnMeta({ name: 'Duration', type: 'UInt64' }),
|
||||
createMockColumnMeta({ name: 'Value', type: 'UInt64' }),
|
||||
];
|
||||
|
||||
const targetTableColumn = createMockColumnMeta({
|
||||
name: 'count',
|
||||
type: 'SimpleAggregateFunction(count, UInt64)',
|
||||
});
|
||||
const sourceColumn = getSourceTableColumn(
|
||||
'count',
|
||||
targetTableColumn,
|
||||
sourceTableColumns,
|
||||
);
|
||||
expect(sourceColumn).toBe('');
|
||||
});
|
||||
|
||||
it('should match source columns based on convention', () => {
|
||||
const sourceTableColumns: ColumnMeta[] = [
|
||||
createMockColumnMeta({ name: 'Duration', type: 'UInt64' }),
|
||||
createMockColumnMeta({ name: 'Value', type: 'UInt64' }),
|
||||
];
|
||||
|
||||
const targetTableColumnSum = createMockColumnMeta({
|
||||
name: 'sum__Duration',
|
||||
type: 'SimpleAggregateFunction(sum, UInt64)',
|
||||
});
|
||||
const sourceColumnForSum = getSourceTableColumn(
|
||||
'sum',
|
||||
targetTableColumnSum,
|
||||
sourceTableColumns,
|
||||
);
|
||||
expect(sourceColumnForSum).toBe('Duration');
|
||||
});
|
||||
|
||||
it('should match source column based on MV DDL expressions', () => {
|
||||
const sourceTableColumns: ColumnMeta[] = [
|
||||
createMockColumnMeta({ name: 'Duration', type: 'UInt64' }),
|
||||
createMockColumnMeta({ name: 'Value', type: 'UInt64' }),
|
||||
];
|
||||
|
||||
const targetTableColumnQuantile = createMockColumnMeta({
|
||||
name: 'quantileDuration',
|
||||
type: 'AggregateFunction(quantile(0.5), UInt64)',
|
||||
});
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const mvMetadata: TableMetadata = {
|
||||
as_select:
|
||||
'SELECT toStartOfHour(Timestamp) AS Timestamp, ServiceName, SpanKind, count() AS count, sum(Duration) AS sumDuration, quantileState(0.5)(Duration) AS quantileDuration FROM test_source_table GROUP BY Timestamp, ServiceName, SpanKind',
|
||||
} as unknown as TableMetadata;
|
||||
|
||||
const sourceColumnForQuantile = getSourceTableColumn(
|
||||
'quantile',
|
||||
targetTableColumnQuantile,
|
||||
sourceTableColumns,
|
||||
mvMetadata,
|
||||
);
|
||||
|
||||
expect(sourceColumnForQuantile).toBe('Duration');
|
||||
});
|
||||
|
||||
it('should match source column based on MV DDL expressions when there are overlapping source column names', () => {
|
||||
const sourceTableColumns: ColumnMeta[] = [
|
||||
createMockColumnMeta({ name: 'MaxDuration', type: 'UInt64' }),
|
||||
createMockColumnMeta({ name: 'Duration', type: 'UInt64' }),
|
||||
createMockColumnMeta({ name: 'Value', type: 'UInt64' }),
|
||||
];
|
||||
|
||||
const targetTableColumnMaxMax = createMockColumnMeta({
|
||||
name: 'maxMaxDuration',
|
||||
type: 'AggregateFunction(max, UInt64)',
|
||||
});
|
||||
|
||||
const targetTableColumnMax = createMockColumnMeta({
|
||||
name: 'maxDuration',
|
||||
type: 'AggregateFunction(max, UInt64)',
|
||||
});
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const mvMetadata: TableMetadata = {
|
||||
as_select:
|
||||
'SELECT toStartOfHour(Timestamp) AS Timestamp, ServiceName, SpanKind, count() AS count, max(MaxDuration) AS maxMaxDuration, max(Duration) AS maxDuration FROM test_source_table GROUP BY Timestamp, ServiceName, SpanKind',
|
||||
} as unknown as TableMetadata;
|
||||
|
||||
const sourceColumnForMax = getSourceTableColumn(
|
||||
'max',
|
||||
targetTableColumnMax,
|
||||
sourceTableColumns,
|
||||
mvMetadata,
|
||||
);
|
||||
|
||||
expect(sourceColumnForMax).toBe('Duration');
|
||||
|
||||
const sourceColumnForMaxMax = getSourceTableColumn(
|
||||
'max',
|
||||
targetTableColumnMaxMax,
|
||||
sourceTableColumns,
|
||||
mvMetadata,
|
||||
);
|
||||
|
||||
expect(sourceColumnForMaxMax).toBe('MaxDuration');
|
||||
});
|
||||
});
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
import {
|
||||
ColumnMeta,
|
||||
extractColumnReferencesFromKey,
|
||||
filterColumnMetaByType,
|
||||
JSDataType,
|
||||
|
|
@ -7,7 +8,10 @@ import {
|
|||
TableConnection,
|
||||
TableMetadata,
|
||||
} from '@hyperdx/common-utils/dist/core/metadata';
|
||||
import { splitAndTrimWithBracket } from '@hyperdx/common-utils/dist/core/utils';
|
||||
import {
|
||||
Granularity,
|
||||
splitAndTrimWithBracket,
|
||||
} from '@hyperdx/common-utils/dist/core/utils';
|
||||
import {
|
||||
InternalAggregateFunction,
|
||||
MaterializedViewConfiguration,
|
||||
|
|
@ -25,6 +29,37 @@ export const MV_AGGREGATE_FUNCTIONS = [
|
|||
'histogram',
|
||||
];
|
||||
|
||||
/**
|
||||
* To maximize the number of queries which are compatible with materialized views,
|
||||
* every granularity should be a multiple of every smaller granularity.
|
||||
*
|
||||
* Further, these should match the granularities supported by charts, defined
|
||||
* in convertDateRangeToGranularityString().
|
||||
* */
|
||||
export const MV_GRANULARITY_OPTIONS = [
|
||||
{ value: '1 second', label: '1 second' },
|
||||
{ value: Granularity.FifteenSecond, label: '15 seconds' },
|
||||
{ value: Granularity.ThirtySecond, label: '30 seconds' },
|
||||
{ value: Granularity.OneMinute, label: '1 minute' },
|
||||
{ value: Granularity.FiveMinute, label: '5 minutes' },
|
||||
{ value: Granularity.FifteenMinute, label: '15 minutes' },
|
||||
{ value: Granularity.ThirtyMinute, label: '30 minutes' },
|
||||
{ value: Granularity.OneHour, label: '1 hour' },
|
||||
{ value: Granularity.TwoHour, label: '2 hours' },
|
||||
{ value: Granularity.SixHour, label: '6 hours' },
|
||||
{ value: Granularity.TwelveHour, label: '12 hours' },
|
||||
{ value: Granularity.OneDay, label: '1 day' },
|
||||
{ value: Granularity.TwoDay, label: '2 days' },
|
||||
{ value: Granularity.SevenDay, label: '7 days' },
|
||||
{ value: Granularity.ThirtyDay, label: '30 days' },
|
||||
];
|
||||
|
||||
const isGranularity = (value: string): value is Granularity => {
|
||||
return MV_GRANULARITY_OPTIONS.map(option => option.value as string).includes(
|
||||
value,
|
||||
);
|
||||
};
|
||||
|
||||
const MV_DDL_PATTERN = /MATERIALIZED VIEW [^\s]+\.[^\s]+ TO ([^\s]+)\.([^\s]+)/;
|
||||
function getViewTargetTable(meta: TableMetadata) {
|
||||
const match = meta.create_table_query.match(MV_DDL_PATTERN);
|
||||
|
|
@ -50,6 +85,10 @@ function isAggregatingMergeTree(meta: TableMetadata) {
|
|||
return meta.engine?.includes('AggregatingMergeTree') ?? false;
|
||||
}
|
||||
|
||||
function isSummingMergeTree(meta: TableMetadata) {
|
||||
return meta.engine?.includes('SummingMergeTree') ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a table that is either a materialized view or a table targeted by a materialized view,
|
||||
* fetches the metadata for both the materialized view and the target table.
|
||||
|
|
@ -80,11 +119,15 @@ async function getMetadataForMaterializedViewAndTable({
|
|||
connectionId,
|
||||
});
|
||||
|
||||
return isAggregatingMergeTree(mvTableMetadata)
|
||||
return isAggregatingMergeTree(mvTableMetadata) ||
|
||||
isSummingMergeTree(mvTableMetadata)
|
||||
? { mvMetadata, mvTableMetadata }
|
||||
: undefined;
|
||||
}
|
||||
} else if (isAggregatingMergeTree(givenMetadata)) {
|
||||
} else if (
|
||||
isAggregatingMergeTree(givenMetadata) ||
|
||||
isSummingMergeTree(givenMetadata)
|
||||
) {
|
||||
const mvTableMetadata = givenMetadata;
|
||||
const sourceViews = await metadata.queryMaterializedViewsByTarget({
|
||||
databaseName,
|
||||
|
|
@ -175,14 +218,20 @@ export function inferTimestampColumnGranularity(
|
|||
// Only accept specific granularities matching the ones defined above
|
||||
if (timestampExpression.includes(`toStartOfInterval(`)) {
|
||||
const intervalMatch = timestampExpression.match(
|
||||
/INTERVAL\s+(\d+)\s+(SECOND|MINUTE|HOUR|DAY)\)/i,
|
||||
/INTERVAL\s+(\d+)\s+(SECOND|MINUTE|HOUR|DAY)S?\)/i,
|
||||
);
|
||||
const intervalFunctionMatch = timestampExpression.match(
|
||||
/toInterval(Second|Minute|Hour|Day)\((\d+)\)/,
|
||||
);
|
||||
const granularity = intervalMatch
|
||||
? `${intervalMatch[1]} ${intervalMatch[2].toLowerCase()}`
|
||||
: '';
|
||||
: intervalFunctionMatch
|
||||
? `${intervalFunctionMatch[2]} ${intervalFunctionMatch[1].toLowerCase()}`
|
||||
: null;
|
||||
if (
|
||||
granularity &&
|
||||
Object.values(intervalToGranularityMap).includes(granularity)
|
||||
isGranularity(granularity) &&
|
||||
MV_GRANULARITY_OPTIONS.map(option => option.value).includes(granularity)
|
||||
) {
|
||||
return granularity;
|
||||
}
|
||||
|
|
@ -192,8 +241,76 @@ export function inferTimestampColumnGranularity(
|
|||
}
|
||||
}
|
||||
|
||||
/** Returns the set of columns that are summed in the given SummingMergeTree engine table. */
|
||||
export function parseSummedColumns(mvTableMetadata: TableMetadata) {
|
||||
if (!isSummingMergeTree(mvTableMetadata)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Extract the column list from the engine parameters
|
||||
// SummingMergeTree(col1) or SummingMergeTree((col1, col2, ...))
|
||||
const engineParamStr = mvTableMetadata.engine_full?.match(
|
||||
/SummingMergeTree\((\(?[^(]*)\)/,
|
||||
)?.[1];
|
||||
|
||||
// Remove surrounding parentheses if present
|
||||
const engineParamStripped =
|
||||
engineParamStr?.at(0) === '(' && engineParamStr?.at(-1) === ')'
|
||||
? engineParamStr.slice(1, -1)
|
||||
: engineParamStr;
|
||||
|
||||
if (engineParamStripped) {
|
||||
return new Set(splitAndTrimWithBracket(engineParamStripped));
|
||||
}
|
||||
}
|
||||
|
||||
function escapeRegExp(s: string) {
|
||||
// $& means the whole matched string
|
||||
return s.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
||||
}
|
||||
|
||||
export function getSourceTableColumn(
|
||||
aggFn: string,
|
||||
targetTableColumn: ColumnMeta,
|
||||
sourceTableColumns: ColumnMeta[],
|
||||
mvMetadata?: TableMetadata,
|
||||
) {
|
||||
if (aggFn === 'count') {
|
||||
// Count may not have a source column
|
||||
return '';
|
||||
}
|
||||
|
||||
// By convention: MV Columns are named "<aggFn>__<sourceColumn>"
|
||||
const nameSuffix = targetTableColumn.name.split('__')[1];
|
||||
if (sourceTableColumns.find(col => col.name === nameSuffix)) {
|
||||
return nameSuffix;
|
||||
}
|
||||
|
||||
// Try to infer from the MV's SELECT expression
|
||||
if (mvMetadata) {
|
||||
const selectExpressions = extractSelectExpressions(mvMetadata);
|
||||
const matchingSelectExpression = selectExpressions.find(expr =>
|
||||
// Use endsWith because the expression must have an alias
|
||||
// matching the target column name.
|
||||
expr.endsWith(targetTableColumn.name),
|
||||
);
|
||||
const matchingSourceColumn =
|
||||
matchingSelectExpression &&
|
||||
sourceTableColumns.find(col =>
|
||||
new RegExp(`\\b${escapeRegExp(col.name)}\\b`).test(
|
||||
matchingSelectExpression,
|
||||
),
|
||||
);
|
||||
if (matchingSourceColumn) {
|
||||
return matchingSourceColumn.name;
|
||||
}
|
||||
}
|
||||
|
||||
return '';
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to a MaterializedViewConfiguration object from the given TableConnections
|
||||
* Attempts to create a MaterializedViewConfiguration object from the given TableConnections
|
||||
* by introspecting the view, target table, and source table.
|
||||
*
|
||||
* @param mvTableOrView - A TableConnection representing either the materialized view or the target table.
|
||||
|
|
@ -239,18 +356,19 @@ export async function inferMaterializedViewConfig(
|
|||
}),
|
||||
]);
|
||||
|
||||
const sourceTableColumnNames = new Set(
|
||||
sourceTableColumns.map(col => col.name),
|
||||
);
|
||||
|
||||
const aggregatedColumns: MaterializedViewConfiguration['aggregatedColumns'] =
|
||||
mvTableColumns
|
||||
.filter(col => col.type.includes('AggregateFunction'))
|
||||
.map(col => {
|
||||
let aggFn: string | undefined = col.type.match(
|
||||
.filter(targetTableColumn =>
|
||||
targetTableColumn.type.includes('AggregateFunction'),
|
||||
)
|
||||
.map(targetTableColumn => {
|
||||
let aggFn: string | undefined = targetTableColumn.type.match(
|
||||
/AggregateFunction\(([a-zA-Z0-9_]+)/,
|
||||
)?.[1];
|
||||
if (aggFn === 'sum' && col.name.toLowerCase().includes('count')) {
|
||||
if (
|
||||
aggFn === 'sum' &&
|
||||
targetTableColumn.name.toLowerCase().includes('count')
|
||||
) {
|
||||
aggFn = 'count';
|
||||
} else if (aggFn?.startsWith('quantile')) {
|
||||
aggFn = 'quantile';
|
||||
|
|
@ -260,21 +378,51 @@ export async function inferMaterializedViewConfig(
|
|||
return undefined;
|
||||
}
|
||||
|
||||
// Convention: MV Columns are named "<aggFn>__<sourceColumn>"
|
||||
const nameSuffix = col.name.split('__')[1];
|
||||
const sourceColumn =
|
||||
sourceTableColumnNames.has(nameSuffix) && aggFn !== 'count'
|
||||
? nameSuffix
|
||||
: '';
|
||||
const sourceColumn = getSourceTableColumn(
|
||||
aggFn,
|
||||
targetTableColumn,
|
||||
sourceTableColumns,
|
||||
mvMetadata,
|
||||
);
|
||||
|
||||
return {
|
||||
mvColumn: col.name,
|
||||
mvColumn: targetTableColumn.name,
|
||||
aggFn,
|
||||
sourceColumn,
|
||||
};
|
||||
})
|
||||
.filter(c => c != undefined);
|
||||
|
||||
// Add Aggregated columns from the SummingMergeTree engine, if applicable
|
||||
const summedColumnNames = isSummingMergeTree(mvTableMetadata)
|
||||
? parseSummedColumns(mvTableMetadata)
|
||||
: undefined;
|
||||
for (const summedColumn of summedColumnNames ?? []) {
|
||||
const aggFn: InternalAggregateFunction = summedColumn
|
||||
.toLowerCase()
|
||||
.includes('count')
|
||||
? 'count'
|
||||
: 'sum';
|
||||
|
||||
const summedColumnMeta = mvTableColumns.find(
|
||||
col => col.name === summedColumn,
|
||||
);
|
||||
|
||||
if (summedColumnMeta) {
|
||||
const sourceColumn = getSourceTableColumn(
|
||||
aggFn,
|
||||
summedColumnMeta,
|
||||
sourceTableColumns,
|
||||
mvMetadata,
|
||||
);
|
||||
aggregatedColumns.push({
|
||||
mvColumn: summedColumn,
|
||||
aggFn,
|
||||
sourceColumn,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Infer the timestamp column
|
||||
const primaryKeyColumns = new Set(
|
||||
extractColumnReferencesFromKey(mvTableMetadata.primary_key),
|
||||
|
|
@ -296,6 +444,7 @@ export async function inferMaterializedViewConfig(
|
|||
.filter(
|
||||
col =>
|
||||
!col.type.includes('AggregateFunction') &&
|
||||
!summedColumnNames?.has(col.name) &&
|
||||
!timestampColumns.includes(col),
|
||||
)
|
||||
.map(col => col.name)
|
||||
|
|
|
|||
Loading…
Reference in a new issue