mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: move rrweb event stream to client and query through /api/clickhouse-proxy (#755)
This commit is contained in:
parent
b4b5f6ba9b
commit
92a48003a4
9 changed files with 225 additions and 269 deletions
7
.changeset/neat-badgers-matter.md
Normal file
7
.changeset/neat-badgers-matter.md
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
"@hyperdx/common-utils": patch
|
||||
"@hyperdx/api": patch
|
||||
"@hyperdx/app": patch
|
||||
---
|
||||
|
||||
feat: move rrweb event fetching to the client instead of an api route
|
||||
|
|
@ -89,7 +89,6 @@ app.use('/', routers.rootRouter);
|
|||
app.use('/alerts', isUserAuthenticated, routers.alertsRouter);
|
||||
app.use('/dashboards', isUserAuthenticated, routers.dashboardRouter);
|
||||
app.use('/me', isUserAuthenticated, routers.meRouter);
|
||||
app.use('/sessions', isUserAuthenticated, routers.sessionsRouter);
|
||||
app.use('/team', isUserAuthenticated, routers.teamRouter);
|
||||
app.use('/webhooks', isUserAuthenticated, routers.webhooksRouter);
|
||||
app.use('/datasources', isUserAuthenticated, routers.datasourceRouter);
|
||||
|
|
|
|||
17
packages/api/src/middleware/validation.ts
Normal file
17
packages/api/src/middleware/validation.ts
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
import express from 'express';
|
||||
import { z } from 'zod';
|
||||
|
||||
export function validateRequestHeaders<T extends z.Schema>(schema: T) {
|
||||
return function (
|
||||
req: express.Request,
|
||||
res: express.Response,
|
||||
next: express.NextFunction,
|
||||
) {
|
||||
const parsed = schema.safeParse(req.headers);
|
||||
if (!parsed.success) {
|
||||
return res.status(400).json({ type: 'Headers', errors: parsed.error });
|
||||
}
|
||||
|
||||
return next();
|
||||
};
|
||||
}
|
||||
|
|
@ -1,10 +1,11 @@
|
|||
import express, { Request, Response } from 'express';
|
||||
import express, { RequestHandler, Response } from 'express';
|
||||
import { createProxyMiddleware } from 'http-proxy-middleware';
|
||||
import { z } from 'zod';
|
||||
import { validateRequest } from 'zod-express-middleware';
|
||||
|
||||
import { getConnectionById } from '@/controllers/connection';
|
||||
import { getNonNullUserWithTeam } from '@/middleware/auth';
|
||||
import { validateRequestHeaders } from '@/middleware/validation';
|
||||
import { objectIdSchema } from '@/utils/zod';
|
||||
|
||||
const router = express.Router();
|
||||
|
|
@ -58,17 +59,22 @@ router.post(
|
|||
},
|
||||
);
|
||||
|
||||
router.get(
|
||||
'/*',
|
||||
validateRequest({
|
||||
query: z.object({
|
||||
hyperdx_connection_id: objectIdSchema,
|
||||
}),
|
||||
const hasConnectionId = validateRequestHeaders(
|
||||
z.object({
|
||||
'x-hyperdx-connection-id': objectIdSchema,
|
||||
}),
|
||||
);
|
||||
|
||||
const getConnection: RequestHandler =
|
||||
// prettier-ignore-next-line
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const { teamId } = getNonNullUserWithTeam(req);
|
||||
const { hyperdx_connection_id } = req.query;
|
||||
const connection_id = req.headers['x-hyperdx-connection-id']!; // ! because zod already validated
|
||||
delete req.headers['x-hyperdx-connection-id'];
|
||||
const hyperdx_connection_id = Array.isArray(connection_id)
|
||||
? connection_id.join('')
|
||||
: connection_id;
|
||||
|
||||
const connection = await getConnectionById(
|
||||
teamId.toString(),
|
||||
|
|
@ -93,13 +99,15 @@ router.get(
|
|||
console.error('Error fetching connection info:', e);
|
||||
next(e);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
const proxyMiddleware: RequestHandler =
|
||||
// prettier-ignore-next-line
|
||||
createProxyMiddleware({
|
||||
target: '', // doesn't matter. it should be overridden by the router
|
||||
changeOrigin: true,
|
||||
pathFilter: (path, _req) => {
|
||||
// TODO: allow other methods
|
||||
return _req.method === 'GET';
|
||||
return _req.method === 'GET' || _req.method === 'POST';
|
||||
},
|
||||
pathRewrite: {
|
||||
'^/clickhouse-proxy': '',
|
||||
|
|
@ -113,8 +121,8 @@ router.get(
|
|||
on: {
|
||||
proxyReq: (proxyReq, _req) => {
|
||||
const newPath = _req.params[0];
|
||||
// @ts-expect-error _req.query is type ParamQs, which doesn't play nicely with URLSearchParams. TODO: Replace with getting query params from _req.url eventually
|
||||
const qparams = new URLSearchParams(_req.query);
|
||||
qparams.delete('hyperdx_connection_id');
|
||||
if (_req._hdx_connection?.username && _req._hdx_connection?.password) {
|
||||
proxyReq.setHeader(
|
||||
'X-ClickHouse-User',
|
||||
|
|
@ -122,7 +130,11 @@ router.get(
|
|||
);
|
||||
proxyReq.setHeader('X-ClickHouse-Key', _req._hdx_connection.password);
|
||||
}
|
||||
proxyReq.path = `/${newPath}?${qparams.toString()}`;
|
||||
if (_req.method === 'POST') {
|
||||
// TODO: Use fixRequestBody after this issue is resolved: https://github.com/chimurai/http-proxy-middleware/issues/1102
|
||||
proxyReq.write(_req.body);
|
||||
}
|
||||
proxyReq.path = `/${newPath}?${qparams}`;
|
||||
},
|
||||
proxyRes: (proxyRes, _req, res) => {
|
||||
// since clickhouse v24, the cors headers * will be attached to the response by default
|
||||
|
|
@ -158,7 +170,9 @@ router.get(
|
|||
// ...(config.IS_DEV && {
|
||||
// logger: console,
|
||||
// }),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
router.get('/*', hasConnectionId, getConnection, proxyMiddleware);
|
||||
router.post('/*', hasConnectionId, getConnection, proxyMiddleware);
|
||||
|
||||
export default router;
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ import dashboardRouter from './dashboards';
|
|||
import datasourceRouter from './datasources';
|
||||
import meRouter from './me';
|
||||
import rootRouter from './root';
|
||||
import sessionsRouter from './sessions';
|
||||
import teamRouter from './team';
|
||||
import webhooksRouter from './webhooks';
|
||||
|
||||
|
|
@ -13,7 +12,6 @@ export default {
|
|||
dashboardRouter,
|
||||
meRouter,
|
||||
rootRouter,
|
||||
sessionsRouter,
|
||||
teamRouter,
|
||||
webhooksRouter,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -1,152 +0,0 @@
|
|||
import type { Row } from '@clickhouse/client';
|
||||
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse';
|
||||
import { getMetadata } from '@hyperdx/common-utils/dist/metadata';
|
||||
import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig';
|
||||
import opentelemetry, { SpanStatusCode } from '@opentelemetry/api';
|
||||
import express from 'express';
|
||||
import { parseInt } from 'lodash';
|
||||
import { serializeError } from 'serialize-error';
|
||||
import { z } from 'zod';
|
||||
import { validateRequest } from 'zod-express-middleware';
|
||||
|
||||
import { getConnectionById } from '@/controllers/connection';
|
||||
import { getNonNullUserWithTeam } from '@/middleware/auth';
|
||||
import { Source } from '@/models/source';
|
||||
import logger from '@/utils/logger';
|
||||
import { objectIdSchema } from '@/utils/zod';
|
||||
|
||||
const router = express.Router();
|
||||
|
||||
router.get(
|
||||
'/:sessionId/rrweb',
|
||||
validateRequest({
|
||||
params: z.object({
|
||||
sessionId: z.string(),
|
||||
}),
|
||||
query: z.object({
|
||||
endTime: z.string().regex(/^\d+$/, 'Must be an integer string'),
|
||||
limit: z.string().regex(/^\d+$/, 'Must be an integer string'),
|
||||
offset: z.string().regex(/^\d+$/, 'Must be an integer string'),
|
||||
serviceName: z.string(),
|
||||
sourceId: objectIdSchema,
|
||||
startTime: z.string().regex(/^\d+$/, 'Must be an integer string'),
|
||||
}),
|
||||
}),
|
||||
async (req, res, next) => {
|
||||
try {
|
||||
const { sessionId } = req.params;
|
||||
const { endTime, limit, offset, serviceName, sourceId, startTime } =
|
||||
req.query;
|
||||
|
||||
const { teamId } = getNonNullUserWithTeam(req);
|
||||
|
||||
const source = await Source.findById(sourceId);
|
||||
|
||||
if (!source) {
|
||||
res.status(404).send('Source not found');
|
||||
return;
|
||||
}
|
||||
|
||||
const connection = await getConnectionById(
|
||||
teamId.toString(),
|
||||
source.connection.toString(),
|
||||
true,
|
||||
);
|
||||
|
||||
if (!connection) {
|
||||
res.status(404).send('Connection not found');
|
||||
return;
|
||||
}
|
||||
|
||||
const MAX_LIMIT = 1e6;
|
||||
|
||||
res.setHeader('Cache-Control', 'no-cache');
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.flushHeaders(); // flush the headers to establish SSE with client
|
||||
|
||||
const clickhouseClient = new ClickhouseClient({
|
||||
host: connection.host,
|
||||
username: connection.username,
|
||||
password: connection.password,
|
||||
});
|
||||
|
||||
const metadata = getMetadata(clickhouseClient);
|
||||
const query = await renderChartConfig(
|
||||
{
|
||||
// FIXME: add mappings to session source
|
||||
select: [
|
||||
{
|
||||
valueExpression: `${source.implicitColumnExpression}`,
|
||||
alias: 'b',
|
||||
},
|
||||
{
|
||||
valueExpression: `simpleJSONExtractInt(${source.implicitColumnExpression}, 'type')`,
|
||||
alias: 't',
|
||||
},
|
||||
{
|
||||
valueExpression: `${source.eventAttributesExpression}['rr-web.chunk']`,
|
||||
alias: 'ck',
|
||||
},
|
||||
{
|
||||
valueExpression: `${source.eventAttributesExpression}['rr-web.total-chunks']`,
|
||||
alias: 'tcks',
|
||||
},
|
||||
],
|
||||
dateRange: [
|
||||
new Date(parseInt(startTime)),
|
||||
new Date(parseInt(endTime)),
|
||||
],
|
||||
from: source.from,
|
||||
whereLanguage: 'lucene',
|
||||
where: `ServiceName:"${serviceName}" AND ${source.resourceAttributesExpression}.rum.sessionId:"${sessionId}"`,
|
||||
timestampValueExpression: source.timestampValueExpression,
|
||||
implicitColumnExpression: source.implicitColumnExpression,
|
||||
connection: connection.id,
|
||||
orderBy: `${source.timestampValueExpression} ASC`,
|
||||
limit: {
|
||||
limit: Math.min(MAX_LIMIT, parseInt(limit)),
|
||||
offset: parseInt(offset),
|
||||
},
|
||||
},
|
||||
metadata,
|
||||
);
|
||||
|
||||
const resultSet = await clickhouseClient.query({
|
||||
query: query.sql,
|
||||
query_params: query.params,
|
||||
format: 'JSONEachRow',
|
||||
clickhouse_settings: {
|
||||
wait_end_of_query: 0,
|
||||
send_progress_in_http_headers: 0,
|
||||
},
|
||||
});
|
||||
const stream = resultSet.stream();
|
||||
|
||||
stream.on('data', (rows: Row[]) => {
|
||||
res.write(`${rows.map(row => `data: ${row.text}`).join('\n')}\n\n`);
|
||||
res.flush();
|
||||
});
|
||||
stream.on('end', () => {
|
||||
logger.info('Stream ended');
|
||||
|
||||
res.write('event: end\ndata:\n\n');
|
||||
res.end();
|
||||
});
|
||||
} catch (e) {
|
||||
const span = opentelemetry.trace.getActiveSpan();
|
||||
span?.recordException(e as Error);
|
||||
span?.setStatus({ code: SpanStatusCode.ERROR });
|
||||
// WARNING: no need to call next(e) here, as the stream will be closed
|
||||
logger.error({
|
||||
message: 'Error while streaming rrweb events',
|
||||
error: serializeError(e),
|
||||
teamId: req.user?.team,
|
||||
query: req.query,
|
||||
});
|
||||
res.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
|
|
@ -6,7 +6,7 @@ const DEFAULT_SERVER_URL = `http://127.0.0.1:${process.env.HYPERDX_API_PORT}`;
|
|||
export const config = {
|
||||
api: {
|
||||
externalResolver: true,
|
||||
bodyParser: true,
|
||||
bodyParser: false,
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -17,12 +17,6 @@ export default (req: NextApiRequest, res: NextApiResponse) => {
|
|||
pathRewrite: { '^/api': '' },
|
||||
target: process.env.NEXT_PUBLIC_SERVER_URL || DEFAULT_SERVER_URL,
|
||||
autoRewrite: true,
|
||||
/**
|
||||
* Fix bodyParser
|
||||
**/
|
||||
on: {
|
||||
proxyReq: fixRequestBody,
|
||||
},
|
||||
// ...(IS_DEV && {
|
||||
// logger: console,
|
||||
// }),
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||
import produce from 'immer';
|
||||
import type { ResponseJSON } from '@clickhouse/client';
|
||||
import { createClient } from '@clickhouse/client-web';
|
||||
import { chSql } from '@hyperdx/common-utils/dist/clickhouse';
|
||||
import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig';
|
||||
import {
|
||||
|
|
@ -8,13 +10,15 @@ import {
|
|||
SearchConditionLanguage,
|
||||
TSource,
|
||||
} from '@hyperdx/common-utils/dist/types';
|
||||
import { fetchEventSource } from '@microsoft/fetch-event-source';
|
||||
import { useQuery, UseQueryOptions } from '@tanstack/react-query';
|
||||
|
||||
import { getMetadata } from '@/metadata';
|
||||
import { usePrevious } from '@/utils';
|
||||
|
||||
import { getClickhouseClient } from './clickhouse';
|
||||
import { IS_LOCAL_MODE } from './config';
|
||||
import { getLocalConnections } from './connection';
|
||||
import { useSource } from './source';
|
||||
|
||||
export type Session = {
|
||||
errorCount: string;
|
||||
|
|
@ -236,6 +240,27 @@ class FatalError extends Error {}
|
|||
class TimeoutError extends Error {}
|
||||
const EventStreamContentType = 'text/event-stream';
|
||||
|
||||
async function* streamToAsyncIterator<T>(
|
||||
stream: ReadableStream<T>,
|
||||
): AsyncIterableIterator<T> {
|
||||
const reader = stream.getReader();
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) return;
|
||||
yield value;
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
// OPTIMIZATION STRATEGY
|
||||
//
|
||||
// 1. Write a clickhouse query to divide a session into different chunks, where each chunk has a start time. Maybe each chunk contains 100 events.
|
||||
// 2. When slider advances, use the timestamp to determine which chunk you are in
|
||||
// 3. Fetch data associated with that chunk
|
||||
// 4. Probably do some prefetching for future times
|
||||
export function useRRWebEventStream(
|
||||
{
|
||||
serviceName,
|
||||
|
|
@ -278,6 +303,8 @@ export function useRRWebEventStream(
|
|||
const [fetchStatus, setFetchStatus] = useState<'fetching' | 'idle'>('idle');
|
||||
const lastFetchStatusRef = useRef<'fetching' | 'idle' | undefined>();
|
||||
|
||||
const { data: source } = useSource({ id: sourceId });
|
||||
|
||||
const fetchResults = useCallback(
|
||||
async ({
|
||||
pageParam = 0,
|
||||
|
|
@ -286,20 +313,14 @@ export function useRRWebEventStream(
|
|||
pageParam: number;
|
||||
limit?: number;
|
||||
}) => {
|
||||
if (!source) return;
|
||||
const resBuffer: any[] = [];
|
||||
let linesFetched = 0;
|
||||
|
||||
const startTime = startDate.getTime().toString();
|
||||
const endTime = endDate.getTime().toString();
|
||||
|
||||
const searchParams = new URLSearchParams([
|
||||
['endTime', endTime],
|
||||
['limit', (limitOverride ?? limit).toString()],
|
||||
['offset', pageParam.toString()],
|
||||
['serviceName', serviceName],
|
||||
['sourceId', sourceId],
|
||||
['startTime', startTime],
|
||||
]);
|
||||
const queryLimit = (limitOverride ?? limit).toString();
|
||||
const offset = pageParam.toString();
|
||||
|
||||
const ctrl = new AbortController();
|
||||
lastAbortController.current = ctrl;
|
||||
|
|
@ -308,90 +329,137 @@ export function useRRWebEventStream(
|
|||
setFetchStatus('fetching');
|
||||
lastFetchStatusRef.current = 'fetching';
|
||||
|
||||
const fetchPromise = fetchEventSource(
|
||||
`/api/sessions/${sessionId}/rrweb?${searchParams.toString()}`,
|
||||
const MAX_LIMIT = 1e6;
|
||||
|
||||
const metadata = getMetadata();
|
||||
const query = await renderChartConfig(
|
||||
{
|
||||
method: 'GET',
|
||||
signal: ctrl.signal,
|
||||
credentials: 'include',
|
||||
async onopen(response) {
|
||||
if (
|
||||
response.ok &&
|
||||
response.headers.get('content-type') === EventStreamContentType
|
||||
) {
|
||||
return; // everything's good
|
||||
} else if (
|
||||
response.status >= 400 &&
|
||||
response.status < 500 &&
|
||||
response.status !== 429
|
||||
) {
|
||||
// client-side errors are usually non-retriable:
|
||||
// TODO: handle these???
|
||||
throw new FatalError();
|
||||
} else {
|
||||
throw new RetriableError();
|
||||
}
|
||||
// FIXME: add mappings to session source
|
||||
select: [
|
||||
{
|
||||
valueExpression: `${source.implicitColumnExpression}`,
|
||||
alias: 'b',
|
||||
},
|
||||
{
|
||||
valueExpression: `simpleJSONExtractInt(${source.implicitColumnExpression}, 'type')`,
|
||||
alias: 't',
|
||||
},
|
||||
{
|
||||
valueExpression: `${source.eventAttributesExpression}['rr-web.chunk']`,
|
||||
alias: 'ck',
|
||||
},
|
||||
{
|
||||
valueExpression: `${source.eventAttributesExpression}['rr-web.total-chunks']`,
|
||||
alias: 'tcks',
|
||||
},
|
||||
],
|
||||
dateRange: [
|
||||
new Date(parseInt(startTime)),
|
||||
new Date(parseInt(endTime)),
|
||||
],
|
||||
from: source.from,
|
||||
whereLanguage: 'lucene',
|
||||
where: `ServiceName:"${serviceName}" AND ${source.resourceAttributesExpression}.rum.sessionId:"${sessionId}"`,
|
||||
timestampValueExpression: source.timestampValueExpression,
|
||||
implicitColumnExpression: source.implicitColumnExpression,
|
||||
connection: source.connection,
|
||||
orderBy: `${source.timestampValueExpression} ASC`,
|
||||
limit: {
|
||||
limit: Math.min(MAX_LIMIT, parseInt(queryLimit)),
|
||||
offset: parseInt(offset),
|
||||
},
|
||||
onmessage(event) {
|
||||
if (event.event === '') {
|
||||
const parsedRows = event.data
|
||||
.split('\n')
|
||||
.map((row: string) => {
|
||||
try {
|
||||
const parsed = JSON.parse(row);
|
||||
linesFetched++;
|
||||
return parsed;
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
})
|
||||
.filter((v: any) => v !== null);
|
||||
|
||||
if (onEvent != null) {
|
||||
parsedRows.forEach(onEvent);
|
||||
} else if (keepPreviousData) {
|
||||
resBuffer.push(...parsedRows);
|
||||
} else {
|
||||
setResults(prevResults => ({
|
||||
key: resultsKey ?? prevResults.key ?? 'DEFAULT_KEY',
|
||||
data: [...prevResults.data, ...parsedRows],
|
||||
}));
|
||||
}
|
||||
} else if (event.event === 'end') {
|
||||
onEnd?.();
|
||||
|
||||
if (keepPreviousData) {
|
||||
setResults({
|
||||
key: resultsKey ?? 'DEFAULT_KEY',
|
||||
data: resBuffer,
|
||||
});
|
||||
}
|
||||
|
||||
if (linesFetched === 0 || linesFetched < limit) {
|
||||
setHasNextPage(false);
|
||||
}
|
||||
}
|
||||
},
|
||||
onclose() {
|
||||
ctrl.abort();
|
||||
|
||||
setIsFetching(false);
|
||||
setFetchStatus('idle');
|
||||
lastFetchStatusRef.current = 'idle';
|
||||
// if the server closes the connection unexpectedly, retry:
|
||||
// throw new RetriableError();
|
||||
},
|
||||
// onerror(err) {
|
||||
// if (err instanceof FatalError) {
|
||||
// throw err; // rethrow to stop the operation
|
||||
// } else {
|
||||
// // do nothing to automatically retry. You can also
|
||||
// // return a specific retry interval here.
|
||||
// }
|
||||
// },
|
||||
},
|
||||
metadata,
|
||||
);
|
||||
|
||||
// TODO: Change ClickhouseClient class to use this under the hood,
|
||||
// and refactor this to use ClickhouseClient.query. Also change pathname
|
||||
// in createClient to PROXY_CLICKHOUSE_HOST instead
|
||||
const format = 'JSONEachRow';
|
||||
const queryFn = async () => {
|
||||
if (IS_LOCAL_MODE) {
|
||||
const localConnections = getLocalConnections();
|
||||
const localModeUrl = new URL(localConnections[0].host);
|
||||
localModeUrl.username = localConnections[0].username;
|
||||
localModeUrl.password = localConnections[0].password;
|
||||
|
||||
const clickhouseClient = getClickhouseClient();
|
||||
return clickhouseClient.query({
|
||||
query: query.sql,
|
||||
query_params: query.params,
|
||||
format,
|
||||
});
|
||||
} else {
|
||||
const clickhouseClient = createClient({
|
||||
clickhouse_settings: {
|
||||
add_http_cors_header: IS_LOCAL_MODE ? 1 : 0,
|
||||
cancel_http_readonly_queries_on_client_close: 1,
|
||||
date_time_output_format: 'iso',
|
||||
wait_end_of_query: 0,
|
||||
},
|
||||
http_headers: { 'x-hyperdx-connection-id': source.connection },
|
||||
keep_alive: {
|
||||
enabled: true,
|
||||
},
|
||||
url: window.location.origin,
|
||||
pathname: '/api/clickhouse-proxy',
|
||||
compression: {
|
||||
response: true,
|
||||
},
|
||||
});
|
||||
|
||||
return clickhouseClient.query({
|
||||
query: query.sql,
|
||||
query_params: query.params,
|
||||
format,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const fetchPromise = (async () => {
|
||||
const resultSet = await queryFn();
|
||||
|
||||
let forFunc: (data: any) => void;
|
||||
if (onEvent) {
|
||||
forFunc = onEvent;
|
||||
} else if (keepPreviousData) {
|
||||
forFunc = (data: any) => resBuffer.push(data);
|
||||
} else {
|
||||
forFunc = (data: any) =>
|
||||
setResults(prevResults =>
|
||||
produce(prevResults, draft => {
|
||||
draft.key = resultsKey ?? draft.key ?? 'DEFAULT_KEY';
|
||||
draft.data.push(data);
|
||||
}),
|
||||
);
|
||||
}
|
||||
const stream = resultSet.stream();
|
||||
for await (const chunk of streamToAsyncIterator(stream)) {
|
||||
for (const row of chunk) {
|
||||
try {
|
||||
const parsed = row.json();
|
||||
linesFetched++;
|
||||
forFunc(parsed);
|
||||
} catch {
|
||||
// do noting
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
onEnd?.();
|
||||
|
||||
if (keepPreviousData) {
|
||||
setResults({
|
||||
key: resultsKey ?? 'DEFAULT_KEY',
|
||||
data: resBuffer,
|
||||
});
|
||||
}
|
||||
|
||||
if (linesFetched === 0 || linesFetched < limit) {
|
||||
setHasNextPage(false);
|
||||
}
|
||||
})();
|
||||
|
||||
try {
|
||||
await Promise.race([
|
||||
fetchPromise,
|
||||
|
|
@ -413,8 +481,15 @@ export function useRRWebEventStream(
|
|||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
ctrl.abort();
|
||||
setIsFetching(false);
|
||||
setFetchStatus('idle');
|
||||
lastFetchStatusRef.current = 'idle';
|
||||
},
|
||||
[
|
||||
source,
|
||||
serviceName,
|
||||
sessionId,
|
||||
startDate,
|
||||
endDate,
|
||||
|
|
|
|||
|
|
@ -361,15 +361,13 @@ export class ClickhouseClient {
|
|||
clickhouse_settings?: Record<string, any>;
|
||||
connectionId?: string;
|
||||
queryId?: string;
|
||||
}): Promise<BaseResultSet<any, T>> {
|
||||
}): Promise<BaseResultSet<ReadableStream, T>> {
|
||||
const isLocalMode = this.username != null && this.password != null;
|
||||
const includeCredentials = !isLocalMode;
|
||||
const includeCorsHeader = isLocalMode;
|
||||
const _connectionId = isLocalMode ? undefined : connectionId;
|
||||
|
||||
const searchParams = new URLSearchParams([
|
||||
...(includeCorsHeader ? [['add_http_cors_header', '1']] : []),
|
||||
...(_connectionId ? [['hyperdx_connection_id', _connectionId]] : []),
|
||||
['query', query],
|
||||
['default_format', format],
|
||||
['date_time_output_format', 'iso'],
|
||||
|
|
@ -405,11 +403,17 @@ export class ClickhouseClient {
|
|||
if (isBrowser) {
|
||||
// TODO: check if we can use the client-web directly
|
||||
const { ResultSet } = await import('@clickhouse/client-web');
|
||||
|
||||
const headers = {};
|
||||
if (!isLocalMode && connectionId) {
|
||||
headers['x-hyperdx-connection-id'] = connectionId;
|
||||
}
|
||||
// https://github.com/ClickHouse/clickhouse-js/blob/1ebdd39203730bb99fad4c88eac35d9a5e96b34a/packages/client-web/src/connection/web_connection.ts#L200C7-L200C23
|
||||
const response = await fetch(`${this.host}/?${searchParams.toString()}`, {
|
||||
...(includeCredentials ? { credentials: 'include' } : {}),
|
||||
signal: abort_signal,
|
||||
method: 'GET',
|
||||
headers,
|
||||
});
|
||||
|
||||
// TODO: Send command to CH to cancel query on abort_signal
|
||||
|
|
|
|||
Loading…
Reference in a new issue