feat: use clickhouse client lib by default for queries (#776)

Still uses the old fetch method on local mode

Ref: HDX-1630
Ref: HDX-1653
This commit is contained in:
Aaron Knudtson 2025-04-25 15:08:56 -04:00 committed by GitHub
parent 8dc83c3d4e
commit cfdd523516
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 113 additions and 109 deletions

View file

@ -0,0 +1,6 @@
---
"@hyperdx/common-utils": patch
"@hyperdx/app": patch
---
feat: clickhouse queries are by default conducted through the clickhouse library via POST request. localMode still uses GET for CORS purposes

View file

@ -7,6 +7,7 @@ import {
useQueryState,
} from 'nuqs';
import { useForm } from 'react-hook-form';
import { DataFormat } from '@clickhouse/client-common';
import { DisplayType } from '@hyperdx/common-utils/dist/types';
import {
Button,
@ -56,7 +57,7 @@ function useBenchmarkQueryIds({
.query({
query: shuffledQueries[j],
connectionId: connections[j],
format: 'NULL',
format: 'NULL' as DataFormat, // clickhouse doesn't have this under the client-js lib for some reason
clickhouse_settings: {
min_bytes_to_use_direct_io: '1',
use_query_cache: 0,
@ -133,7 +134,7 @@ function useIndexes(
clickhouseClient
.query({
query: `EXPLAIN indexes=1, json=1, description = 0 ${query}`,
format: 'TSVRaw',
format: 'TabSeparatedRaw',
connectionId: connections[i],
})
.then(res => res.text())

View file

@ -240,7 +240,7 @@ class FatalError extends Error {}
class TimeoutError extends Error {}
const EventStreamContentType = 'text/event-stream';
async function* streamToAsyncIterator<T>(
async function* streamToAsyncIterator<T = any>(
stream: ReadableStream<T>,
): AsyncIterableIterator<T> {
const reader = stream.getReader();
@ -372,52 +372,15 @@ export function useRRWebEventStream(
metadata,
);
// TODO: Change ClickhouseClient class to use this under the hood,
// and refactor this to use ClickhouseClient.query. Also change pathname
// in createClient to PROXY_CLICKHOUSE_HOST instead
const format = 'JSONEachRow';
const queryFn = async () => {
if (IS_LOCAL_MODE) {
const localConnections = getLocalConnections();
const localModeUrl = new URL(localConnections[0].host);
localModeUrl.username = localConnections[0].username;
localModeUrl.password = localConnections[0].password;
const clickhouseClient = getClickhouseClient();
return clickhouseClient.query({
query: query.sql,
query_params: query.params,
format,
});
} else {
const clickhouseClient = createClient({
clickhouse_settings: {
add_http_cors_header: IS_LOCAL_MODE ? 1 : 0,
cancel_http_readonly_queries_on_client_close: 1,
date_time_output_format: 'iso',
wait_end_of_query: 0,
},
http_headers: { 'x-hyperdx-connection-id': source.connection },
keep_alive: {
enabled: true,
},
url: window.location.origin,
pathname: '/api/clickhouse-proxy',
compression: {
response: true,
},
});
return clickhouseClient.query({
query: query.sql,
query_params: query.params,
format,
});
}
};
const fetchPromise = (async () => {
const resultSet = await queryFn();
const clickhouseClient = getClickhouseClient();
const resultSet = await clickhouseClient.query({
query: query.sql,
query_params: query.params,
format,
connectionId: source.connection,
});
let forFunc: (data: any) => void;
if (onEvent) {

View file

@ -345,9 +345,9 @@ export class ClickhouseClient {
}
// https://github.com/ClickHouse/clickhouse-js/blob/1ebdd39203730bb99fad4c88eac35d9a5e96b34a/packages/client-web/src/connection/web_connection.ts#L151
async query<T extends DataFormat>({
async query<Format extends DataFormat>({
query,
format = 'JSON',
format = 'JSON' as Format,
query_params = {},
abort_signal,
clickhouse_settings,
@ -355,37 +355,13 @@ export class ClickhouseClient {
queryId,
}: {
query: string;
format?: string;
format?: Format;
abort_signal?: AbortSignal;
query_params?: Record<string, any>;
clickhouse_settings?: Record<string, any>;
connectionId?: string;
queryId?: string;
}): Promise<BaseResultSet<ReadableStream, T>> {
const isLocalMode = this.username != null && this.password != null;
const includeCredentials = !isLocalMode;
const includeCorsHeader = isLocalMode;
const searchParams = new URLSearchParams([
...(includeCorsHeader ? [['add_http_cors_header', '1']] : []),
['query', query],
['default_format', format],
['date_time_output_format', 'iso'],
['wait_end_of_query', '0'],
['cancel_http_readonly_queries_on_client_close', '1'],
...(this.username ? [['user', this.username]] : []),
...(this.password ? [['password', this.password]] : []),
...(queryId ? [['query_id', queryId]] : []),
...Object.entries(query_params).map(([key, value]) => [
`param_${key}`,
value,
]),
...Object.entries(clickhouse_settings ?? {}).map(([key, value]) => [
key,
value,
]),
]);
}): Promise<BaseResultSet<ReadableStream, Format>> {
let debugSql = '';
try {
debugSql = parameterizedQueryToSql({ sql: query, params: query_params });
@ -402,38 +378,96 @@ export class ClickhouseClient {
if (isBrowser) {
// TODO: check if we can use the client-web directly
const { ResultSet } = await import('@clickhouse/client-web');
const headers = {};
if (!isLocalMode && connectionId) {
headers['x-hyperdx-connection-id'] = connectionId;
}
// https://github.com/ClickHouse/clickhouse-js/blob/1ebdd39203730bb99fad4c88eac35d9a5e96b34a/packages/client-web/src/connection/web_connection.ts#L200C7-L200C23
const response = await fetch(`${this.host}/?${searchParams.toString()}`, {
...(includeCredentials ? { credentials: 'include' } : {}),
signal: abort_signal,
method: 'GET',
headers,
});
// TODO: Send command to CH to cancel query on abort_signal
if (!response.ok) {
if (!isSuccessfulResponse(response.status)) {
const text = await response.text();
throw new ClickHouseQueryError(`${text}`, debugSql);
}
}
if (response.body == null) {
// TODO: Handle empty responses better?
throw new Error('Unexpected empty response from ClickHouse');
}
return new ResultSet<T>(
response.body,
format as T,
queryId ?? '',
getResponseHeaders(response),
const { createClient, ResultSet } = await import(
'@clickhouse/client-web'
);
const isLocalMode = this.username != null && this.password != null;
if (isLocalMode) {
// LocalMode may potentially interact directly with a db, so it needs to
// send a get request. @clickhouse/client-web does not currently support
// querying via GET
const includeCredentials = !isLocalMode;
const includeCorsHeader = isLocalMode;
const searchParams = new URLSearchParams([
...(includeCorsHeader ? [['add_http_cors_header', '1']] : []),
['query', query],
['default_format', format],
['date_time_output_format', 'iso'],
['wait_end_of_query', '0'],
['cancel_http_readonly_queries_on_client_close', '1'],
...(this.username ? [['user', this.username]] : []),
...(this.password ? [['password', this.password]] : []),
...(queryId ? [['query_id', queryId]] : []),
...Object.entries(query_params).map(([key, value]) => [
`param_${key}`,
value,
]),
...Object.entries(clickhouse_settings ?? {}).map(([key, value]) => [
key,
value,
]),
]);
const headers = {};
if (!isLocalMode && connectionId) {
headers['x-hyperdx-connection-id'] = connectionId;
}
// https://github.com/ClickHouse/clickhouse-js/blob/1ebdd39203730bb99fad4c88eac35d9a5e96b34a/packages/client-web/src/connection/web_connection.ts#L200C7-L200C23
const response = await fetch(
`${this.host}/?${searchParams.toString()}`,
{
...(includeCredentials ? { credentials: 'include' } : {}),
signal: abort_signal,
method: 'GET',
headers,
},
);
// TODO: Send command to CH to cancel query on abort_signal
if (!response.ok) {
if (!isSuccessfulResponse(response.status)) {
const text = await response.text();
throw new ClickHouseQueryError(`${text}`, debugSql);
}
}
if (response.body == null) {
// TODO: Handle empty responses better?
throw new Error('Unexpected empty response from ClickHouse');
}
return new ResultSet<Format>(
response.body,
format,
queryId ?? '',
getResponseHeaders(response),
);
} else {
if (connectionId === undefined) {
throw new Error('ConnectionId must be defined');
}
const clickhouseClient = createClient({
url: window.origin,
pathname: this.host,
http_headers: { 'x-hyperdx-connection-id': connectionId },
clickhouse_settings: {
date_time_output_format: 'iso',
wait_end_of_query: 0,
cancel_http_readonly_queries_on_client_close: 1,
},
compression: {
response: true,
},
});
return clickhouseClient.query<Format>({
query,
query_params,
format,
abort_signal,
clickhouse_settings,
query_id: queryId,
}) as Promise<BaseResultSet<ReadableStream, Format>>;
}
} else if (isNode) {
const { createClient } = await import('@clickhouse/client');
const _client = createClient({
@ -448,14 +482,14 @@ export class ClickhouseClient {
});
// TODO: Custom error handling
return _client.query({
return _client.query<Format>({
query,
query_params,
format: format as T,
format,
abort_signal,
clickhouse_settings,
query_id: queryId,
}) as unknown as BaseResultSet<any, T>;
}) as unknown as Promise<BaseResultSet<ReadableStream, Format>>;
} else {
throw new Error(
'ClickhouseClient is only supported in the browser or node environment',