mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: Support fetching distributed table metadata with cluster() (#1944)
## Summary Some Distributed tables refer to "local" tables which are not available on the local node. To read metadata (primary key, skip indexes) for such distributed tables, we can read from `cluster(<cluster with local tables>, system, <metadata table>)` instead of the local system tables. ### Screenshots or video After adding the distributed table as a source, we can see that the order by optimization and the skip index detection are working as intended, indicating that the cluster() queries are working and fetching the required metadata: <img width="2099" height="624" alt="Screenshot 2026-03-19 at 8 39 48 AM" src="https://github.com/user-attachments/assets/1f8384fb-8ae1-4549-9432-e4359ac72e02" /> ### How to test locally or on Vercel <details> <summary>First, setup two clusters</summary> - docker compose: https://pastila.clickhouse.com/?003644f9/c65444330e3601726c00b7cc9e095e71#7W62EjQox6MnTj0vCGL0AA==GCM - config.xml: https://pastila.clickhouse.com/?002ee55c/d82248e8db633b3fbaf14cee2ee51b0e#royNZZ4snbBpZUd8xulw5w==GCM - config-2.xml: https://pastila.clickhouse.com/?009f57b4/cf1d51fa36eee025f17beda4da6621fa#KBbHphEhcS+1m7mBqNfY4A==GCM - config-3.xml: https://pastila.clickhouse.com/?003115c7/e984fc157de834095bedea86bc698dca#1rEmfXnq6H0tiT4qNgayNg==GCM - keeper.xml: https://pastila.clickhouse.com/?005dc0a8/1599254d15dbac2868f04f5ab33125c2#R90W3HfA3J0yeTNf9hrDNQ==GCM </details> <details> <summary>Then setup the local and distributed tables</summary> ```sql CREATE TABLE default.otel_logs_toStartOf on cluster hdx_cluster_2 ( `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), `TimestampTime` DateTime DEFAULT toDateTime(Timestamp), `TraceId` String CODEC(ZSTD(1)), `SpanId` String CODEC(ZSTD(1)), `TraceFlags` UInt8, `SeverityText` LowCardinality(String) CODEC(ZSTD(1)), `SeverityNumber` UInt8, `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), `Body` String CODEC(ZSTD(1)), `ResourceSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)), `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `ScopeSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)), `ScopeName` String CODEC(ZSTD(1)), `ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)), `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8 ) ENGINE = MergeTree PARTITION BY toDate(TimestampTime) PRIMARY KEY (toStartOfMinute(TimestampTime), ServiceName, TimestampTime) ORDER BY (toStartOfMinute(TimestampTime), ServiceName, TimestampTime, Timestamp) TTL TimestampTime + toIntervalDay(30) SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; CREATE TABLE default.otel_logs_toStartOf_distributed on cluster hdx_cluster ( `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), `TimestampTime` DateTime DEFAULT toDateTime(Timestamp), `TraceId` String CODEC(ZSTD(1)), `SpanId` String CODEC(ZSTD(1)), `TraceFlags` UInt8, `SeverityText` LowCardinality(String) CODEC(ZSTD(1)), `SeverityNumber` UInt8, `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), `Body` String CODEC(ZSTD(1)), `ResourceSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)), `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `ScopeSchemaUrl` LowCardinality(String) CODEC(ZSTD(1)), `ScopeName` String CODEC(ZSTD(1)), `ScopeVersion` LowCardinality(String) CODEC(ZSTD(1)), `ScopeAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), `LogAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)) ) ENGINE = Distributed('hdx_cluster_2', 'default', 'otel_logs_toStartOf', rand()); ALTER TABLE otel_logs_toStartOf ON CLUSTER hdx_cluster_2 ADD INDEX text_idx(Body) TYPE text(tokenizer=splitByNonAlpha, preprocessor=lower(Body)) SETTINGS enable_full_text_index=1; ALTER TABLE otel_logs_toStartOf ON CLUSTER hdx_cluster_2 MATERIALIZE INDEX text_idx; ``` </details> <details> <summary>To test text index detection, first enable full text indexes locally in your users.xml file</summary> ```xml <clickhouse> <profiles> <default> ... <enable_full_text_index>1</enable_full_text_index> </default> </profiles> ... <clickhouse> ``` </details> ### References - Linear Issue: - Related PRs:
This commit is contained in:
parent
66445af1fa
commit
243e3baa26
5 changed files with 320 additions and 91 deletions
5
.changeset/tidy-phones-brake.md
Normal file
5
.changeset/tidy-phones-brake.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
"@hyperdx/common-utils": patch
|
||||
---
|
||||
|
||||
feat: Support fetching distributed table metadata with cluster()
|
||||
|
|
@ -199,6 +199,62 @@ describe('Metadata', () => {
|
|||
expect(result!.partition_key).toEqual('column1');
|
||||
});
|
||||
|
||||
it('should query via cluster() for Distributed table underlying metadata', async () => {
|
||||
const distributedMetadata = {
|
||||
database: 'test_db',
|
||||
name: 'dist_table',
|
||||
engine: 'Distributed',
|
||||
engine_full:
|
||||
"Distributed('my_cluster', 'test_db', 'local_table', rand())",
|
||||
partition_key: '',
|
||||
sorting_key: '',
|
||||
primary_key: '',
|
||||
sampling_key: '',
|
||||
create_table_query: 'CREATE TABLE test_db.dist_table ...',
|
||||
};
|
||||
|
||||
const localMetadata = {
|
||||
database: 'test_db',
|
||||
name: 'local_table',
|
||||
engine: 'MergeTree',
|
||||
engine_full: 'MergeTree() ORDER BY id',
|
||||
partition_key: 'toYYYYMM(timestamp)',
|
||||
sorting_key: 'id, timestamp',
|
||||
primary_key: 'id',
|
||||
sampling_key: '',
|
||||
create_table_query: 'CREATE TABLE test_db.local_table ...',
|
||||
};
|
||||
|
||||
let callCount = 0;
|
||||
(mockClickhouseClient.query as jest.Mock).mockImplementation(() => {
|
||||
callCount++;
|
||||
return Promise.resolve({
|
||||
json: jest.fn().mockResolvedValue({
|
||||
data: [callCount === 1 ? distributedMetadata : localMetadata],
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
const result = await metadata.getTableMetadata({
|
||||
databaseName: 'test_db',
|
||||
tableName: 'dist_table',
|
||||
connectionId: 'test_connection',
|
||||
});
|
||||
|
||||
// Two queries: one for the distributed table, one via cluster() for the local table
|
||||
expect(callCount).toBe(2);
|
||||
expect(result!.engine).toBe('MergeTree');
|
||||
expect(result!.sorting_key).toBe('id, timestamp');
|
||||
expect(result!.create_local_table_query).toBe(
|
||||
'CREATE TABLE test_db.local_table ...',
|
||||
);
|
||||
// The second query should use cluster() - verify it references system.tables via cluster
|
||||
const secondQuery = (mockClickhouseClient.query as jest.Mock).mock
|
||||
.calls[1][0].query;
|
||||
expect(secondQuery).toContain('cluster(');
|
||||
expect(secondQuery).toContain('system.tables');
|
||||
});
|
||||
|
||||
it('should use the cache when retrieving table metadata', async () => {
|
||||
// Setup the mock implementation
|
||||
mockCache.getOrFetch.mockReset();
|
||||
|
|
@ -213,7 +269,7 @@ describe('Metadata', () => {
|
|||
|
||||
// Setup the cache to return the mock data
|
||||
mockCache.getOrFetch.mockImplementation((key, queryFn) => {
|
||||
if (key === 'test_connection.test_db.test_table.metadata') {
|
||||
if (key === 'test_connection.test_db.test_table.undefined.metadata') {
|
||||
return Promise.resolve(mockTableMetadata);
|
||||
}
|
||||
return queryFn();
|
||||
|
|
@ -227,7 +283,7 @@ describe('Metadata', () => {
|
|||
|
||||
// Verify the cache was called with the right key
|
||||
expect(mockCache.getOrFetch).toHaveBeenCalledWith(
|
||||
'test_connection.test_db.test_table.metadata',
|
||||
'test_connection.test_db.test_table.undefined.metadata',
|
||||
expect.any(Function),
|
||||
);
|
||||
|
||||
|
|
@ -239,6 +295,117 @@ describe('Metadata', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('getSkipIndices', () => {
|
||||
beforeEach(() => {
|
||||
mockCache.getOrFetch.mockImplementation((key, queryFn) => queryFn());
|
||||
});
|
||||
|
||||
it('should query via cluster() for Distributed table skip indices', async () => {
|
||||
const distributedMetadata = {
|
||||
database: 'test_db',
|
||||
name: 'dist_table',
|
||||
engine: 'Distributed',
|
||||
engine_full:
|
||||
"Distributed('my_cluster', 'test_db', 'local_table', rand())",
|
||||
create_table_query: 'CREATE TABLE test_db.dist_table ...',
|
||||
};
|
||||
|
||||
const skipIndicesData = [
|
||||
{
|
||||
name: 'idx_body',
|
||||
type: 'tokenbf_v1',
|
||||
typeFull: "tokenbf_v1(tokenizer='splitByNonAlpha')",
|
||||
expression: 'tokens(lower(Body))',
|
||||
granularity: '1',
|
||||
},
|
||||
];
|
||||
|
||||
let callCount = 0;
|
||||
(mockClickhouseClient.query as jest.Mock).mockImplementation(() => {
|
||||
callCount++;
|
||||
return Promise.resolve({
|
||||
json: jest.fn().mockResolvedValue({
|
||||
data: callCount === 1 ? [distributedMetadata] : skipIndicesData,
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
const result = await metadata.getSkipIndices({
|
||||
databaseName: 'test_db',
|
||||
tableName: 'dist_table',
|
||||
connectionId: 'test_connection',
|
||||
});
|
||||
|
||||
// Two queries: one for table metadata, one via cluster() for skip indices
|
||||
expect(callCount).toBe(2);
|
||||
expect(result).toEqual([
|
||||
{
|
||||
name: 'idx_body',
|
||||
type: 'tokenbf_v1',
|
||||
typeFull: "tokenbf_v1(tokenizer='splitByNonAlpha')",
|
||||
expression: 'tokens(lower(Body))',
|
||||
granularity: 1,
|
||||
},
|
||||
]);
|
||||
// The second query should use cluster() for system.data_skipping_indices
|
||||
const secondQuery = (mockClickhouseClient.query as jest.Mock).mock
|
||||
.calls[1][0].query;
|
||||
expect(secondQuery).toContain('cluster(');
|
||||
expect(secondQuery).toContain('system.data_skipping_indices');
|
||||
});
|
||||
|
||||
it('should query local system.data_skipping_indices for non-Distributed tables', async () => {
|
||||
const mergeTreeMetadata = {
|
||||
database: 'test_db',
|
||||
name: 'local_table',
|
||||
engine: 'MergeTree',
|
||||
engine_full: 'MergeTree() ORDER BY id',
|
||||
};
|
||||
|
||||
const skipIndicesData = [
|
||||
{
|
||||
name: 'idx_body',
|
||||
type: 'tokenbf_v1',
|
||||
typeFull: "tokenbf_v1(tokenizer='splitByNonAlpha')",
|
||||
expression: 'tokens(lower(Body))',
|
||||
granularity: '1',
|
||||
},
|
||||
];
|
||||
|
||||
let callCount = 0;
|
||||
(mockClickhouseClient.query as jest.Mock).mockImplementation(() => {
|
||||
callCount++;
|
||||
return Promise.resolve({
|
||||
json: jest.fn().mockResolvedValue({
|
||||
data: callCount === 1 ? [mergeTreeMetadata] : skipIndicesData,
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
const result = await metadata.getSkipIndices({
|
||||
databaseName: 'test_db',
|
||||
tableName: 'local_table',
|
||||
connectionId: 'test_connection',
|
||||
});
|
||||
|
||||
expect(callCount).toBe(2);
|
||||
expect(result).toEqual([
|
||||
{
|
||||
name: 'idx_body',
|
||||
type: 'tokenbf_v1',
|
||||
typeFull: "tokenbf_v1(tokenizer='splitByNonAlpha')",
|
||||
expression: 'tokens(lower(Body))',
|
||||
granularity: 1,
|
||||
},
|
||||
]);
|
||||
// Should NOT use cluster() for non-Distributed tables
|
||||
const secondQuery = (mockClickhouseClient.query as jest.Mock).mock
|
||||
.calls[1][0].query;
|
||||
expect(secondQuery).not.toContain('cluster(');
|
||||
expect(secondQuery).toContain('system.data_skipping_indices');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getKeyValues', () => {
|
||||
const mockChartConfig: BuilderChartConfigWithDateRange = {
|
||||
from: {
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ import {
|
|||
findJsonExpressions,
|
||||
formatDate,
|
||||
getAlignedDateRange,
|
||||
getDistributedTableArgs,
|
||||
getFirstOrderingItem,
|
||||
getLocalTableFromDistributedTable,
|
||||
isFirstOrderByAscending,
|
||||
isJsonExpression,
|
||||
isTimestampExpressionInFirstOrderBy,
|
||||
|
|
@ -30,7 +30,6 @@ import {
|
|||
replaceJsonExpressions,
|
||||
splitAndTrimCSV,
|
||||
splitAndTrimWithBracket,
|
||||
TextIndexTokenizer,
|
||||
} from '../core/utils';
|
||||
|
||||
describe('utils', () => {
|
||||
|
|
@ -1996,61 +1995,85 @@ describe('utils', () => {
|
|||
({ engine_full: engineFull }) as any;
|
||||
|
||||
it('parses a simple Distributed engine_full', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata("Distributed('default', 'mydb', 'local_table', rand())"),
|
||||
);
|
||||
expect(result).toEqual({ database: 'mydb', table: 'local_table' });
|
||||
expect(result).toEqual({
|
||||
cluster: 'default',
|
||||
database: 'mydb',
|
||||
table: 'local_table',
|
||||
});
|
||||
});
|
||||
|
||||
it('parses without a sharding key', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata("Distributed('cluster', 'db', 'tbl')"),
|
||||
);
|
||||
expect(result).toEqual({ database: 'db', table: 'tbl' });
|
||||
expect(result).toEqual({
|
||||
cluster: 'cluster',
|
||||
database: 'db',
|
||||
table: 'tbl',
|
||||
});
|
||||
});
|
||||
|
||||
it('handles double-quoted identifiers', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata('Distributed("cluster", "my_database", "my_table")'),
|
||||
);
|
||||
expect(result).toEqual({ database: 'my_database', table: 'my_table' });
|
||||
expect(result).toEqual({
|
||||
cluster: 'cluster',
|
||||
database: 'my_database',
|
||||
table: 'my_table',
|
||||
});
|
||||
});
|
||||
|
||||
it('handles backtick-quoted identifiers', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata("Distributed('cluster', `mydb`, `local_tbl`, rand())"),
|
||||
);
|
||||
expect(result).toEqual({ database: 'mydb', table: 'local_tbl' });
|
||||
expect(result).toEqual({
|
||||
cluster: 'cluster',
|
||||
database: 'mydb',
|
||||
table: 'local_tbl',
|
||||
});
|
||||
});
|
||||
|
||||
it('handles unquoted identifiers', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata('Distributed(cluster, mydb, local_tbl, rand())'),
|
||||
);
|
||||
expect(result).toEqual({ database: 'mydb', table: 'local_tbl' });
|
||||
expect(result).toEqual({
|
||||
cluster: 'cluster',
|
||||
database: 'mydb',
|
||||
table: 'local_tbl',
|
||||
});
|
||||
});
|
||||
|
||||
it('returns undefined when engine_full has fewer than 3 args', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata("Distributed('cluster', 'db')"),
|
||||
);
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it('returns undefined when engine_full does not match Distributed pattern', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata('MergeTree() ORDER BY id'),
|
||||
);
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
|
||||
it('handles a complex sharding expression with nested parentheses', () => {
|
||||
const result = getLocalTableFromDistributedTable(
|
||||
const result = getDistributedTableArgs(
|
||||
makeMetadata(
|
||||
"Distributed('cluster', 'db', 'tbl', sipHash64(UserID, EventDate))",
|
||||
),
|
||||
);
|
||||
expect(result).toEqual({ database: 'db', table: 'tbl' });
|
||||
expect(result).toEqual({
|
||||
cluster: 'cluster',
|
||||
database: 'db',
|
||||
table: 'tbl',
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import type {
|
|||
import { SourceKind } from '@/types';
|
||||
|
||||
import { optimizeGetKeyValuesCalls } from './materializedViews';
|
||||
import { getLocalTableFromDistributedTable, objectHash } from './utils';
|
||||
import { getDistributedTableArgs, objectHash } from './utils';
|
||||
|
||||
// If filters initially are taking too long to load, decrease this number.
|
||||
// Between 1e6 - 5e6 is a good range.
|
||||
|
|
@ -149,27 +149,78 @@ export class Metadata {
|
|||
table,
|
||||
cache,
|
||||
connectionId,
|
||||
cluster,
|
||||
}: {
|
||||
database: string;
|
||||
table: string;
|
||||
cache: MetadataCache;
|
||||
connectionId: string;
|
||||
cluster?: string;
|
||||
}): Promise<TableMetadata | undefined> {
|
||||
return cache.getOrFetch(
|
||||
`${connectionId}.${database}.${table}.metadata`,
|
||||
async () => {
|
||||
const sql = chSql`SELECT * FROM system.tables where database = ${{ String: database }} AND name = ${{ String: table }}`;
|
||||
const json = await this.clickhouseClient
|
||||
.query<'JSON'>({
|
||||
connectionId,
|
||||
query: sql.sql,
|
||||
query_params: sql.params,
|
||||
clickhouse_settings: this.getClickHouseSettings(),
|
||||
})
|
||||
.then(res => res.json<TableMetadata>());
|
||||
return json.data[0];
|
||||
},
|
||||
);
|
||||
const cacheKey = `${connectionId}.${database}.${table}.${cluster}.metadata`;
|
||||
return cache.getOrFetch(cacheKey, async () => {
|
||||
const sql = cluster
|
||||
? chSql`SELECT * FROM cluster(${{ String: cluster }}, system.tables) WHERE database = ${{ String: database }} AND name = ${{ String: table }} LIMIT 1`
|
||||
: chSql`SELECT * FROM system.tables WHERE database = ${{ String: database }} AND name = ${{ String: table }} LIMIT 1`;
|
||||
const json = await this.clickhouseClient
|
||||
.query<'JSON'>({
|
||||
connectionId,
|
||||
query: sql.sql,
|
||||
query_params: sql.params,
|
||||
clickhouse_settings: this.getClickHouseSettings(),
|
||||
})
|
||||
.then(res => res.json<TableMetadata>());
|
||||
return json.data[0];
|
||||
});
|
||||
}
|
||||
|
||||
private async querySkipIndices({
|
||||
database,
|
||||
table,
|
||||
connectionId,
|
||||
cluster,
|
||||
}: {
|
||||
database: string;
|
||||
table: string;
|
||||
connectionId: string;
|
||||
cluster?: string;
|
||||
}): Promise<SkipIndexMetadata[]> {
|
||||
const sql = cluster
|
||||
? chSql`
|
||||
SELECT
|
||||
name,
|
||||
type,
|
||||
type_full as typeFull,
|
||||
expr as expression,
|
||||
granularity
|
||||
FROM cluster(${{ String: cluster }}, system.data_skipping_indices)
|
||||
WHERE database = ${{ String: database }} AND table = ${{ String: table }}`
|
||||
: chSql`
|
||||
SELECT
|
||||
name,
|
||||
type,
|
||||
type_full as typeFull,
|
||||
expr as expression,
|
||||
granularity
|
||||
FROM system.data_skipping_indices
|
||||
WHERE database = ${{ String: database }} AND table = ${{ String: table }}`;
|
||||
|
||||
const data = await this.clickhouseClient
|
||||
.query<'JSON'>({
|
||||
connectionId,
|
||||
query: sql.sql,
|
||||
query_params: sql.params,
|
||||
clickhouse_settings: this.getClickHouseSettings(),
|
||||
})
|
||||
.then(res => res.json<SkipIndexMetadata>())
|
||||
.then(d => {
|
||||
return d.data.map(row => ({
|
||||
...row,
|
||||
granularity: Number(row.granularity),
|
||||
}));
|
||||
});
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
/** Queries and returns the list of materialized views which insert into the given target table */
|
||||
|
|
@ -620,22 +671,30 @@ export class Metadata {
|
|||
// For Distributed tables, fetch metadata of the underlying local table to get correct partition key, sorting key, etc.
|
||||
if (tableMetadata?.engine === 'Distributed') {
|
||||
try {
|
||||
const { database, table } =
|
||||
getLocalTableFromDistributedTable(tableMetadata) ?? {};
|
||||
const { cluster, database, table } =
|
||||
getDistributedTableArgs(tableMetadata) ?? {};
|
||||
|
||||
if (!database || !table) {
|
||||
if (!database || !table || !cluster) {
|
||||
throw new Error(
|
||||
`Could not parse underlying local table from Distributed table metadata: ${tableMetadata.create_table_query}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Query local table metadata from the specified cluster
|
||||
const localTableMetadata = await this.queryTableMetadata({
|
||||
cache: this.cache,
|
||||
database,
|
||||
table,
|
||||
cluster,
|
||||
connectionId,
|
||||
});
|
||||
|
||||
if (!localTableMetadata) {
|
||||
throw new Error(
|
||||
`Could not find underlying local table metadata for Distributed table: ${database}.${table}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Override Distributed table metadata with local table metadata where relevant
|
||||
tableMetadata = {
|
||||
...tableMetadata,
|
||||
|
|
@ -774,60 +833,33 @@ export class Metadata {
|
|||
connectionId,
|
||||
});
|
||||
|
||||
let localDatabase = databaseName;
|
||||
let localTable = tableName;
|
||||
let database = databaseName;
|
||||
let table = tableName;
|
||||
let cluster: string | undefined;
|
||||
|
||||
// For Distributed tables, fetch skip indices on the underlying local table.
|
||||
// For Distributed tables, query skip indices on the underlying local
|
||||
// table via the cluster() function so we reach the correct cluster.
|
||||
if (tableMetadata?.engine === 'Distributed') {
|
||||
try {
|
||||
const { database, table } =
|
||||
getLocalTableFromDistributedTable(tableMetadata) ?? {};
|
||||
const parsed = getDistributedTableArgs(tableMetadata);
|
||||
|
||||
if (!database || !table) {
|
||||
throw new Error(
|
||||
`Could not parse local table from Distributed table metadata: ${tableMetadata.create_table_query}`,
|
||||
);
|
||||
}
|
||||
|
||||
localDatabase = database;
|
||||
localTable = table;
|
||||
} catch (e) {
|
||||
if (!parsed) {
|
||||
console.error(
|
||||
'Failed to get local table name for Distributed table, using Distributed table as fallback',
|
||||
e,
|
||||
`Could not parse local table from Distributed table metadata: ${tableMetadata.create_table_query}`,
|
||||
);
|
||||
} else {
|
||||
database = parsed.database;
|
||||
table = parsed.table;
|
||||
cluster = parsed.cluster;
|
||||
}
|
||||
}
|
||||
|
||||
const sql = chSql`
|
||||
SELECT
|
||||
name,
|
||||
type,
|
||||
type_full as typeFull,
|
||||
expr as expression,
|
||||
granularity
|
||||
FROM system.data_skipping_indices
|
||||
WHERE database = ${{ String: localDatabase }}
|
||||
AND table = ${{ String: localTable }}
|
||||
`;
|
||||
|
||||
try {
|
||||
const data = await this.clickhouseClient
|
||||
.query<'JSON'>({
|
||||
connectionId,
|
||||
query: sql.sql,
|
||||
query_params: sql.params,
|
||||
clickhouse_settings: this.getClickHouseSettings(),
|
||||
})
|
||||
.then(res => res.json<SkipIndexMetadata>())
|
||||
.then(d => {
|
||||
return d.data.map(row => ({
|
||||
...row,
|
||||
granularity: Number(row.granularity), // granularity comes back as string, convert to number for easier usage
|
||||
}));
|
||||
});
|
||||
|
||||
return data;
|
||||
return await this.querySkipIndices({
|
||||
database,
|
||||
table,
|
||||
connectionId,
|
||||
cluster,
|
||||
});
|
||||
} catch (e) {
|
||||
// Don't retry permissions errors, just silently return empty array
|
||||
if (
|
||||
|
|
|
|||
|
|
@ -986,23 +986,25 @@ export function aliasMapToWithClauses(
|
|||
return withClauses.length > 0 ? withClauses : undefined;
|
||||
}
|
||||
|
||||
/** Parses and returns the local table and database name from the given distributed table metadata */
|
||||
export function getLocalTableFromDistributedTable(
|
||||
const stripQuotes = (s: string) => s.replace(/^["'`]|["'`]$/g, '');
|
||||
|
||||
/** Parses and returns the cluster, database, and table from the given distributed table metadata */
|
||||
export function getDistributedTableArgs(
|
||||
tableMetadata: TableMetadata,
|
||||
): { database: string; table: string } | undefined {
|
||||
): { cluster: string; database: string; table: string } | undefined {
|
||||
const args = tableMetadata.engine_full.match(/Distributed\((.+)\)$/)?.[1];
|
||||
const splitArgs = splitAndTrimWithBracket(args ?? '');
|
||||
|
||||
if (splitArgs.length < 3) {
|
||||
console.error(
|
||||
`Failed to parse engine_full for Distributed table: ${tableMetadata.engine_full}`,
|
||||
`Failed to parse engine arguments for Distributed table: ${tableMetadata.engine_full}`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Remove surrounding quotes
|
||||
const localDatabase = splitArgs[1].replace(/^["'`]|["'`]$/g, '');
|
||||
const localTable = splitArgs[2].replace(/^["'`]|["'`]$/g, '');
|
||||
|
||||
return { database: localDatabase, table: localTable };
|
||||
return {
|
||||
cluster: stripQuotes(splitArgs[0]),
|
||||
database: stripQuotes(splitArgs[1]),
|
||||
table: stripQuotes(splitArgs[2]),
|
||||
};
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue