feat: Support fetching table metadata for Distributed tables (#1920)

## Summary

This PR updates the `getTableMetadata` and `getSkipIndices` functions to handle distributed tables by looking up primary keys and indexes (respectively) from the underlying local table (since the distributed table does not have them).

- Source config inference works again
- The default order by optimization (adding `toStartOfXX()` to the search page order by when it's present in the primary key) now correctly applies when querying a distributed table source
- The date range filter now correctly filters on both `toStartOfXX(TimestampTime)` and `TimestampTime` when `toStartOfXX(TimestampTime)` is present in the primary key of the local table.
- Source schema preview now shows both the distributed table and the local table, when the source is defined by a distributed table.
- Text indexes are now detected correctly for distributed tables



### Screenshots or video

https://github.com/user-attachments/assets/d1c60964-99f0-4470-9378-a812f963c692

When text index is present, hasAllTokens is used:
<img width="848" height="139" alt="Screenshot 2026-03-16 at 10 55 24 AM" src="https://github.com/user-attachments/assets/2bd780dc-291d-495f-bd12-c636988648c1" />

### How to test locally or on Vercel

<details>
<summary>Testing locally, you'll need to create a distributed logs table with a local table that has a timestamp optimization:</summary>


```sql
CREATE TABLE default.otel_logs_toStartOf on cluster hdx_cluster
(
    `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)),
    `TimestampTime` DateTime DEFAULT toDateTime(Timestamp),
    `TraceId` String CODEC(ZSTD(1)),
    `SpanId` String CODEC(ZSTD(1)),
    `TraceFlags` UInt8,
    `SeverityText` LowCardinality(String) CODEC(ZSTD(1)),
    `SeverityNumber` UInt8,
    `ServiceName` LowCardinality(String) CODEC(ZSTD(1)),
    `Body` String CODEC(ZSTD(1)),
    `ResourceSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)),
    `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    `ScopeSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)),
    `ScopeName` String CODEC(ZSTD(1)),
    `ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)),
    `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
    INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8
)
ENGINE = MergeTree
PARTITION BY toDate(TimestampTime)
PRIMARY KEY (toStartOfMinute(TimestampTime), ServiceName, TimestampTime)
ORDER BY (toStartOfMinute(TimestampTime), ServiceName, TimestampTime, Timestamp)
TTL TimestampTime + toIntervalDay(30)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

CREATE TABLE default.otel_logs_toStartOf_distributed on cluster hdx_cluster
(
    `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)),
    `TimestampTime` DateTime DEFAULT toDateTime(Timestamp),
    `TraceId` String CODEC(ZSTD(1)),
    `SpanId` String CODEC(ZSTD(1)),
    `TraceFlags` UInt8,
    `SeverityText` LowCardinality(String) CODEC(ZSTD(1)),
    `SeverityNumber` UInt8,
    `ServiceName` LowCardinality(String) CODEC(ZSTD(1)),
    `Body` String CODEC(ZSTD(1)),
    `ResourceSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)),
    `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    `ScopeSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)),
    `ScopeName` String CODEC(ZSTD(1)),
    `ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)),
    `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1))
)
ENGINE = Distributed('hdx_cluster', 'default', 'otel_logs_toStartOf', rand());

ALTER TABLE otel_logs_toStartOf ON CLUSTER hdx_cluster ADD INDEX text_idx(Body) 
	TYPE text(tokenizer=splitByNonAlpha, preprocessor=lower(Body))
	SETTINGS enable_full_text_index=1;

ALTER TABLE otel_logs_toStartOf ON CLUSTER hdx_cluster MATERIALIZE INDEX text_idx;
```
</details>

<details>
<summary>To test text index detection, first enable full text indexes locally in your users.xml file</summary>

```xml
<clickhouse>
    <profiles>
        <default>
            ...
            <enable_full_text_index>1</enable_full_text_index>
        </default>
    </profiles>
    ...
<clickhouse>
```
</details>

### References



- Linear Issue: Closes HDX-3703
- Related PRs:
This commit is contained in:
Drew Davis 2026-03-17 10:35:08 -04:00 committed by GitHub
parent 76453de5fa
commit 74d925949c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 336 additions and 14 deletions

View file

@ -0,0 +1,7 @@
---
"@hyperdx/common-utils": patch
"@hyperdx/api": patch
"@hyperdx/app": patch
---
feat: Support fetching table metadata for Distributed tables

View file

@ -1,6 +1,14 @@
import { useState } from 'react';
import { MetricsDataType, TSource } from '@hyperdx/common-utils/dist/types';
import { Modal, Paper, Tabs, Text, TextProps, Tooltip } from '@mantine/core';
import {
Modal,
Paper,
Stack,
Tabs,
Text,
TextProps,
Tooltip,
} from '@mantine/core';
import { IconCode, IconRefresh } from '@tabler/icons-react';
import { useTableMetadata } from '@/hooks/useMetadata';
@ -81,11 +89,30 @@ const TableSchemaPreview = ({
<IconRefresh className="spin-animate" />
</div>
) : (
<SQLPreview
data={data?.create_table_query ?? 'Schema is not available'}
enableCopy={!!data?.create_table_query}
copyButtonSize="xs"
/>
<Stack gap="sm">
{data?.create_local_table_query && (
<Text size="xs" fw={600} c="dimmed">
Distributed Table
</Text>
)}
<SQLPreview
data={data?.create_table_query ?? 'Schema is not available'}
enableCopy={!!data?.create_table_query}
copyButtonSize="xs"
/>
{data?.create_local_table_query && (
<>
<Text size="xs" fw={600} c="dimmed">
Local Table
</Text>
<SQLPreview
data={data.create_local_table_query}
enableCopy
copyButtonSize="xs"
/>
</>
)}
</Stack>
)}
</Paper>
);

View file

@ -524,6 +524,119 @@ describe('Metadata Integration Tests', () => {
});
});
describe('Distributed tables support', () => {
let metadata: Metadata;
const localTableName = 'test_dist_local';
const distributedTableName = 'test_dist';
beforeAll(async () => {
await client.command({
query: `CREATE OR REPLACE TABLE default.${localTableName} (
Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
ServiceName LowCardinality(String) CODEC(ZSTD(1)),
Body String CODEC(ZSTD(1)),
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8,
INDEX idx_ts Timestamp TYPE minmax GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, Timestamp)
`,
});
await client.command({
query: `CREATE OR REPLACE TABLE default.${distributedTableName}
AS default.${localTableName}
ENGINE = Distributed('hdx_cluster', 'default', '${localTableName}', rand())
`,
});
});
beforeEach(() => {
metadata = new Metadata(hdxClient, new MetadataCache());
});
afterAll(async () => {
await client.command({
query: `DROP TABLE IF EXISTS default.${distributedTableName}`,
});
await client.command({
query: `DROP TABLE IF EXISTS default.${localTableName}`,
});
});
it('should return local table keys for a distributed table', async () => {
const result = await metadata.getTableMetadata({
databaseName: 'default',
tableName: distributedTableName,
connectionId: 'test_connection',
});
// Should have the distributed table's create_table_query
expect(result.create_table_query).toContain(distributedTableName);
expect(result.create_table_query).toContain('Distributed');
// Should have the local table's create_table_query
expect(result.create_local_table_query).toContain(localTableName);
expect(result.create_local_table_query).toContain('MergeTree');
// Keys should come from the local table
expect(result.primary_key).toBe('ServiceName, Timestamp');
expect(result.sorting_key).toBe('ServiceName, Timestamp');
expect(result.partition_key).toBe('toDate(Timestamp)');
// Engine should be overridden with local table's engine
expect(result.engine).toBe('MergeTree');
});
it('should not set create_local_table_query for non-distributed tables', async () => {
const result = await metadata.getTableMetadata({
databaseName: 'default',
tableName: localTableName,
connectionId: 'test_connection',
});
expect(result.create_local_table_query).toBeUndefined();
expect(result.engine).toBe('MergeTree');
expect(result.primary_key).toBe('ServiceName, Timestamp');
});
it('should return skip indices from the local table when querying a distributed table', async () => {
const result = await metadata.getSkipIndices({
databaseName: 'default',
tableName: distributedTableName,
connectionId: 'test_connection',
});
expect(result).toHaveLength(2);
const bodyIndex = result.find(idx => idx.name === 'idx_body');
expect(bodyIndex).toBeDefined();
expect(bodyIndex!.type).toBe('tokenbf_v1');
expect(bodyIndex!.expression).toBe('Body');
expect(bodyIndex!.granularity).toBe(8);
const tsIndex = result.find(idx => idx.name === 'idx_ts');
expect(tsIndex).toBeDefined();
expect(tsIndex!.type).toBe('minmax');
expect(tsIndex!.granularity).toBe(1);
});
it('should return skip indices directly for a non-distributed table', async () => {
const result = await metadata.getSkipIndices({
databaseName: 'default',
tableName: localTableName,
connectionId: 'test_connection',
});
expect(result).toHaveLength(2);
expect(result.map(idx => idx.name)).toEqual(
expect.arrayContaining(['idx_body', 'idx_ts']),
);
});
});
describe('getSetting', () => {
let metadata: Metadata;
beforeEach(async () => {

View file

@ -18,6 +18,7 @@ import {
formatDate,
getAlignedDateRange,
getFirstOrderingItem,
getLocalTableFromDistributedTable,
isFirstOrderByAscending,
isJsonExpression,
isTimestampExpressionInFirstOrderBy,
@ -1912,4 +1913,67 @@ describe('utils', () => {
expect(result![1].name).toBe('service');
});
});
describe('getLocalTableFromDistributedTable', () => {
const makeMetadata = (engineFull: string) =>
({ engine_full: engineFull }) as any;
it('parses a simple Distributed engine_full', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata("Distributed('default', 'mydb', 'local_table', rand())"),
);
expect(result).toEqual({ database: 'mydb', table: 'local_table' });
});
it('parses without a sharding key', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata("Distributed('cluster', 'db', 'tbl')"),
);
expect(result).toEqual({ database: 'db', table: 'tbl' });
});
it('handles double-quoted identifiers', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata('Distributed("cluster", "my_database", "my_table")'),
);
expect(result).toEqual({ database: 'my_database', table: 'my_table' });
});
it('handles backtick-quoted identifiers', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata("Distributed('cluster', `mydb`, `local_tbl`, rand())"),
);
expect(result).toEqual({ database: 'mydb', table: 'local_tbl' });
});
it('handles unquoted identifiers', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata('Distributed(cluster, mydb, local_tbl, rand())'),
);
expect(result).toEqual({ database: 'mydb', table: 'local_tbl' });
});
it('returns undefined when engine_full has fewer than 3 args', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata("Distributed('cluster', 'db')"),
);
expect(result).toBeUndefined();
});
it('returns undefined when engine_full does not match Distributed pattern', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata('MergeTree() ORDER BY id'),
);
expect(result).toBeUndefined();
});
it('handles a complex sharding expression with nested parentheses', () => {
const result = getLocalTableFromDistributedTable(
makeMetadata(
"Distributed('cluster', 'db', 'tbl', sipHash64(UserID, EventDate))",
),
);
expect(result).toEqual({ database: 'db', table: 'tbl' });
});
});
});

View file

@ -20,7 +20,7 @@ import type {
} from '@/types';
import { optimizeGetKeyValuesCalls } from './materializedViews';
import { objectHash } from './utils';
import { getLocalTableFromDistributedTable, objectHash } from './utils';
// If filters initially are taking too long to load, decrease this number.
// Between 1e6 - 5e6 is a good range.
@ -77,18 +77,27 @@ export type TableMetadata = {
database: string;
name: string;
uuid: string;
/** Note: This will contain the engine of the local table, when the table is Distributed */
engine: string;
is_temporary: number;
data_paths: string[];
metadata_path: string;
metadata_modification_time: string;
metadata_version: number;
/** Note: This may be a Distributed table. Use create_local_table_query for the local table's DDL. */
create_table_query: string;
/** DDL for the local (non-distributed) table, when the table is Distributed */
create_local_table_query?: string;
/** Note: This will contain the engine_full of the local table, when the table is Distributed */
engine_full: string;
as_select: string;
/** Note: This will contain the partition_key of the local table, when the table is Distributed */
partition_key: string;
/** Note: This will contain the sorting_key of the local table, when the table is Distributed */
sorting_key: string;
/** Note: This will contain the primary_key of the local table, when the table is Distributed */
primary_key: string;
/** Note: This will contain the sampling_key of the local table, when the table is Distributed */
sampling_key: string;
storage_policy: string;
total_rows: string;
@ -600,13 +609,56 @@ export class Metadata {
tableName: string;
connectionId: string;
}) {
const tableMetadata = await this.queryTableMetadata({
let tableMetadata = await this.queryTableMetadata({
cache: this.cache,
database: databaseName,
table: tableName,
connectionId,
});
// For Distributed tables, fetch metadata of the underlying local table to get correct partition key, sorting key, etc.
if (tableMetadata.engine === 'Distributed') {
try {
const { database, table } =
getLocalTableFromDistributedTable(tableMetadata) ?? {};
if (!database || !table) {
throw new Error(
`Could not parse underlying local table from Distributed table metadata: ${tableMetadata.create_table_query}`,
);
}
const localTableMetadata = await this.queryTableMetadata({
cache: this.cache,
database,
table,
connectionId,
});
// Override Distributed table metadata with local table metadata where relevant
tableMetadata = {
...tableMetadata,
...pick(localTableMetadata, [
// Distributed tables have these, but we make use of the
// underlying local table's engine value for optimizations instead.
'engine',
'engine_full',
// Distributed tables never have these, so we'll use the local table's
'partition_key',
'sorting_key',
'primary_key',
'sampling_key',
]),
create_local_table_query: localTableMetadata.create_table_query,
};
} catch (e) {
console.error(
'Failed to fetch underlying table metadata for Distributed table, using Distributed table metadata as fallback',
e,
);
}
}
// partition_key which includes parenthesis, unlike other keys such as 'primary_key' or 'sorting_key'
if (
tableMetadata.partition_key.startsWith('(') &&
@ -714,6 +766,38 @@ export class Metadata {
return this.cache.getOrFetch<SkipIndexMetadata[]>(
`${connectionId}.${databaseName}.${tableName}.skipIndices`,
async () => {
const tableMetadata = await this.queryTableMetadata({
cache: this.cache,
database: databaseName,
table: tableName,
connectionId,
});
let localDatabase = databaseName;
let localTable = tableName;
// For Distributed tables, fetch skip indices on the underlying local table.
if (tableMetadata.engine === 'Distributed') {
try {
const { database, table } =
getLocalTableFromDistributedTable(tableMetadata) ?? {};
if (!database || !table) {
throw new Error(
`Could not parse local table from Distributed table metadata: ${tableMetadata.create_table_query}`,
);
}
localDatabase = database;
localTable = table;
} catch (e) {
console.error(
'Failed to get local table name for Distributed table, using Distributed table as fallback',
e,
);
}
}
const sql = chSql`
SELECT
name,
@ -722,21 +806,27 @@ export class Metadata {
expr as expression,
granularity
FROM system.data_skipping_indices
WHERE database = ${{ String: databaseName }}
AND table = ${{ String: tableName }}
WHERE database = ${{ String: localDatabase }}
AND table = ${{ String: localTable }}
`;
try {
const json = await this.clickhouseClient
const data = await this.clickhouseClient
.query<'JSON'>({
connectionId,
query: sql.sql,
query_params: sql.params,
clickhouse_settings: this.getClickHouseSettings(),
})
.then(res => res.json<SkipIndexMetadata>());
.then(res => res.json<SkipIndexMetadata>())
.then(d => {
return d.data.map(row => ({
...row,
granularity: Number(row.granularity), // granularity comes back as string, convert to number for easier usage
}));
});
return json.data;
return data;
} catch (e) {
// Don't retry permissions errors, just silently return empty array
if (

View file

@ -22,7 +22,7 @@ import {
TSourceUnion,
} from '@/types';
import { SkipIndexMetadata } from './metadata';
import { SkipIndexMetadata, TableMetadata } from './metadata';
/** The default maximum number of buckets setting when determining a bucket duration for 'auto' granularity */
export const DEFAULT_AUTO_GRANULARITY_MAX_BUCKETS = 60;
@ -981,3 +981,24 @@ export function aliasMapToWithClauses(
return withClauses.length > 0 ? withClauses : undefined;
}
/** Parses and returns the local table and database name from the given distributed table metadata */
export function getLocalTableFromDistributedTable(
tableMetadata: TableMetadata,
): { database: string; table: string } | undefined {
const args = tableMetadata.engine_full.match(/Distributed\((.+)\)$/)?.[1];
const splitArgs = splitAndTrimWithBracket(args ?? '');
if (splitArgs.length < 3) {
console.error(
`Failed to parse engine_full for Distributed table: ${tableMetadata.engine_full}`,
);
return undefined;
}
// Remove surrounding quotes
const localDatabase = splitArgs[1].replace(/^["'`]|["'`]$/g, '');
const localTable = splitArgs[2].replace(/^["'`]|["'`]$/g, '');
return { database: localDatabase, table: localTable };
}