feat: move rrweb event stream to client and query through /api/clickhouse-proxy (#755)

This commit is contained in:
Aaron Knudtson 2025-04-22 18:01:09 -04:00 committed by GitHub
parent b4b5f6ba9b
commit 92a48003a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 225 additions and 269 deletions

View file

@ -0,0 +1,7 @@
---
"@hyperdx/common-utils": patch
"@hyperdx/api": patch
"@hyperdx/app": patch
---
feat: move rrweb event fetching to the client instead of an api route

View file

@ -89,7 +89,6 @@ app.use('/', routers.rootRouter);
app.use('/alerts', isUserAuthenticated, routers.alertsRouter);
app.use('/dashboards', isUserAuthenticated, routers.dashboardRouter);
app.use('/me', isUserAuthenticated, routers.meRouter);
app.use('/sessions', isUserAuthenticated, routers.sessionsRouter);
app.use('/team', isUserAuthenticated, routers.teamRouter);
app.use('/webhooks', isUserAuthenticated, routers.webhooksRouter);
app.use('/datasources', isUserAuthenticated, routers.datasourceRouter);

View file

@ -0,0 +1,17 @@
import express from 'express';
import { z } from 'zod';
export function validateRequestHeaders<T extends z.Schema>(schema: T) {
return function (
req: express.Request,
res: express.Response,
next: express.NextFunction,
) {
const parsed = schema.safeParse(req.headers);
if (!parsed.success) {
return res.status(400).json({ type: 'Headers', errors: parsed.error });
}
return next();
};
}

View file

@ -1,10 +1,11 @@
import express, { Request, Response } from 'express';
import express, { RequestHandler, Response } from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';
import { z } from 'zod';
import { validateRequest } from 'zod-express-middleware';
import { getConnectionById } from '@/controllers/connection';
import { getNonNullUserWithTeam } from '@/middleware/auth';
import { validateRequestHeaders } from '@/middleware/validation';
import { objectIdSchema } from '@/utils/zod';
const router = express.Router();
@ -58,17 +59,22 @@ router.post(
},
);
router.get(
'/*',
validateRequest({
query: z.object({
hyperdx_connection_id: objectIdSchema,
}),
const hasConnectionId = validateRequestHeaders(
z.object({
'x-hyperdx-connection-id': objectIdSchema,
}),
);
const getConnection: RequestHandler =
// prettier-ignore-next-line
async (req, res, next) => {
try {
const { teamId } = getNonNullUserWithTeam(req);
const { hyperdx_connection_id } = req.query;
const connection_id = req.headers['x-hyperdx-connection-id']!; // ! because zod already validated
delete req.headers['x-hyperdx-connection-id'];
const hyperdx_connection_id = Array.isArray(connection_id)
? connection_id.join('')
: connection_id;
const connection = await getConnectionById(
teamId.toString(),
@ -93,13 +99,15 @@ router.get(
console.error('Error fetching connection info:', e);
next(e);
}
},
};
const proxyMiddleware: RequestHandler =
// prettier-ignore-next-line
createProxyMiddleware({
target: '', // doesn't matter. it should be overridden by the router
changeOrigin: true,
pathFilter: (path, _req) => {
// TODO: allow other methods
return _req.method === 'GET';
return _req.method === 'GET' || _req.method === 'POST';
},
pathRewrite: {
'^/clickhouse-proxy': '',
@ -113,8 +121,8 @@ router.get(
on: {
proxyReq: (proxyReq, _req) => {
const newPath = _req.params[0];
// @ts-expect-error _req.query is type ParamQs, which doesn't play nicely with URLSearchParams. TODO: Replace with getting query params from _req.url eventually
const qparams = new URLSearchParams(_req.query);
qparams.delete('hyperdx_connection_id');
if (_req._hdx_connection?.username && _req._hdx_connection?.password) {
proxyReq.setHeader(
'X-ClickHouse-User',
@ -122,7 +130,11 @@ router.get(
);
proxyReq.setHeader('X-ClickHouse-Key', _req._hdx_connection.password);
}
proxyReq.path = `/${newPath}?${qparams.toString()}`;
if (_req.method === 'POST') {
// TODO: Use fixRequestBody after this issue is resolved: https://github.com/chimurai/http-proxy-middleware/issues/1102
proxyReq.write(_req.body);
}
proxyReq.path = `/${newPath}?${qparams}`;
},
proxyRes: (proxyRes, _req, res) => {
// since clickhouse v24, the cors headers * will be attached to the response by default
@ -158,7 +170,9 @@ router.get(
// ...(config.IS_DEV && {
// logger: console,
// }),
}),
);
});
router.get('/*', hasConnectionId, getConnection, proxyMiddleware);
router.post('/*', hasConnectionId, getConnection, proxyMiddleware);
export default router;

View file

@ -3,7 +3,6 @@ import dashboardRouter from './dashboards';
import datasourceRouter from './datasources';
import meRouter from './me';
import rootRouter from './root';
import sessionsRouter from './sessions';
import teamRouter from './team';
import webhooksRouter from './webhooks';
@ -13,7 +12,6 @@ export default {
dashboardRouter,
meRouter,
rootRouter,
sessionsRouter,
teamRouter,
webhooksRouter,
};

View file

@ -1,152 +0,0 @@
import type { Row } from '@clickhouse/client';
import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse';
import { getMetadata } from '@hyperdx/common-utils/dist/metadata';
import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig';
import opentelemetry, { SpanStatusCode } from '@opentelemetry/api';
import express from 'express';
import { parseInt } from 'lodash';
import { serializeError } from 'serialize-error';
import { z } from 'zod';
import { validateRequest } from 'zod-express-middleware';
import { getConnectionById } from '@/controllers/connection';
import { getNonNullUserWithTeam } from '@/middleware/auth';
import { Source } from '@/models/source';
import logger from '@/utils/logger';
import { objectIdSchema } from '@/utils/zod';
const router = express.Router();
router.get(
'/:sessionId/rrweb',
validateRequest({
params: z.object({
sessionId: z.string(),
}),
query: z.object({
endTime: z.string().regex(/^\d+$/, 'Must be an integer string'),
limit: z.string().regex(/^\d+$/, 'Must be an integer string'),
offset: z.string().regex(/^\d+$/, 'Must be an integer string'),
serviceName: z.string(),
sourceId: objectIdSchema,
startTime: z.string().regex(/^\d+$/, 'Must be an integer string'),
}),
}),
async (req, res, next) => {
try {
const { sessionId } = req.params;
const { endTime, limit, offset, serviceName, sourceId, startTime } =
req.query;
const { teamId } = getNonNullUserWithTeam(req);
const source = await Source.findById(sourceId);
if (!source) {
res.status(404).send('Source not found');
return;
}
const connection = await getConnectionById(
teamId.toString(),
source.connection.toString(),
true,
);
if (!connection) {
res.status(404).send('Connection not found');
return;
}
const MAX_LIMIT = 1e6;
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // flush the headers to establish SSE with client
const clickhouseClient = new ClickhouseClient({
host: connection.host,
username: connection.username,
password: connection.password,
});
const metadata = getMetadata(clickhouseClient);
const query = await renderChartConfig(
{
// FIXME: add mappings to session source
select: [
{
valueExpression: `${source.implicitColumnExpression}`,
alias: 'b',
},
{
valueExpression: `simpleJSONExtractInt(${source.implicitColumnExpression}, 'type')`,
alias: 't',
},
{
valueExpression: `${source.eventAttributesExpression}['rr-web.chunk']`,
alias: 'ck',
},
{
valueExpression: `${source.eventAttributesExpression}['rr-web.total-chunks']`,
alias: 'tcks',
},
],
dateRange: [
new Date(parseInt(startTime)),
new Date(parseInt(endTime)),
],
from: source.from,
whereLanguage: 'lucene',
where: `ServiceName:"${serviceName}" AND ${source.resourceAttributesExpression}.rum.sessionId:"${sessionId}"`,
timestampValueExpression: source.timestampValueExpression,
implicitColumnExpression: source.implicitColumnExpression,
connection: connection.id,
orderBy: `${source.timestampValueExpression} ASC`,
limit: {
limit: Math.min(MAX_LIMIT, parseInt(limit)),
offset: parseInt(offset),
},
},
metadata,
);
const resultSet = await clickhouseClient.query({
query: query.sql,
query_params: query.params,
format: 'JSONEachRow',
clickhouse_settings: {
wait_end_of_query: 0,
send_progress_in_http_headers: 0,
},
});
const stream = resultSet.stream();
stream.on('data', (rows: Row[]) => {
res.write(`${rows.map(row => `data: ${row.text}`).join('\n')}\n\n`);
res.flush();
});
stream.on('end', () => {
logger.info('Stream ended');
res.write('event: end\ndata:\n\n');
res.end();
});
} catch (e) {
const span = opentelemetry.trace.getActiveSpan();
span?.recordException(e as Error);
span?.setStatus({ code: SpanStatusCode.ERROR });
// WARNING: no need to call next(e) here, as the stream will be closed
logger.error({
message: 'Error while streaming rrweb events',
error: serializeError(e),
teamId: req.user?.team,
query: req.query,
});
res.end();
}
},
);
export default router;

View file

@ -6,7 +6,7 @@ const DEFAULT_SERVER_URL = `http://127.0.0.1:${process.env.HYPERDX_API_PORT}`;
export const config = {
api: {
externalResolver: true,
bodyParser: true,
bodyParser: false,
},
};
@ -17,12 +17,6 @@ export default (req: NextApiRequest, res: NextApiResponse) => {
pathRewrite: { '^/api': '' },
target: process.env.NEXT_PUBLIC_SERVER_URL || DEFAULT_SERVER_URL,
autoRewrite: true,
/**
* Fix bodyParser
**/
on: {
proxyReq: fixRequestBody,
},
// ...(IS_DEV && {
// logger: console,
// }),

View file

@ -1,5 +1,7 @@
import { useCallback, useEffect, useRef, useState } from 'react';
import produce from 'immer';
import type { ResponseJSON } from '@clickhouse/client';
import { createClient } from '@clickhouse/client-web';
import { chSql } from '@hyperdx/common-utils/dist/clickhouse';
import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig';
import {
@ -8,13 +10,15 @@ import {
SearchConditionLanguage,
TSource,
} from '@hyperdx/common-utils/dist/types';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import { useQuery, UseQueryOptions } from '@tanstack/react-query';
import { getMetadata } from '@/metadata';
import { usePrevious } from '@/utils';
import { getClickhouseClient } from './clickhouse';
import { IS_LOCAL_MODE } from './config';
import { getLocalConnections } from './connection';
import { useSource } from './source';
export type Session = {
errorCount: string;
@ -236,6 +240,27 @@ class FatalError extends Error {}
class TimeoutError extends Error {}
const EventStreamContentType = 'text/event-stream';
async function* streamToAsyncIterator<T>(
stream: ReadableStream<T>,
): AsyncIterableIterator<T> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
}
// OPTIMIZATION STRATEGY
//
// 1. Write a clickhouse query to divide a session into different chunks, where each chunk has a start time. Maybe each chunk contains 100 events.
// 2. When slider advances, use the timestamp to determine which chunk you are in
// 3. Fetch data associated with that chunk
// 4. Probably do some prefetching for future times
export function useRRWebEventStream(
{
serviceName,
@ -278,6 +303,8 @@ export function useRRWebEventStream(
const [fetchStatus, setFetchStatus] = useState<'fetching' | 'idle'>('idle');
const lastFetchStatusRef = useRef<'fetching' | 'idle' | undefined>();
const { data: source } = useSource({ id: sourceId });
const fetchResults = useCallback(
async ({
pageParam = 0,
@ -286,20 +313,14 @@ export function useRRWebEventStream(
pageParam: number;
limit?: number;
}) => {
if (!source) return;
const resBuffer: any[] = [];
let linesFetched = 0;
const startTime = startDate.getTime().toString();
const endTime = endDate.getTime().toString();
const searchParams = new URLSearchParams([
['endTime', endTime],
['limit', (limitOverride ?? limit).toString()],
['offset', pageParam.toString()],
['serviceName', serviceName],
['sourceId', sourceId],
['startTime', startTime],
]);
const queryLimit = (limitOverride ?? limit).toString();
const offset = pageParam.toString();
const ctrl = new AbortController();
lastAbortController.current = ctrl;
@ -308,90 +329,137 @@ export function useRRWebEventStream(
setFetchStatus('fetching');
lastFetchStatusRef.current = 'fetching';
const fetchPromise = fetchEventSource(
`/api/sessions/${sessionId}/rrweb?${searchParams.toString()}`,
const MAX_LIMIT = 1e6;
const metadata = getMetadata();
const query = await renderChartConfig(
{
method: 'GET',
signal: ctrl.signal,
credentials: 'include',
async onopen(response) {
if (
response.ok &&
response.headers.get('content-type') === EventStreamContentType
) {
return; // everything's good
} else if (
response.status >= 400 &&
response.status < 500 &&
response.status !== 429
) {
// client-side errors are usually non-retriable:
// TODO: handle these???
throw new FatalError();
} else {
throw new RetriableError();
}
// FIXME: add mappings to session source
select: [
{
valueExpression: `${source.implicitColumnExpression}`,
alias: 'b',
},
{
valueExpression: `simpleJSONExtractInt(${source.implicitColumnExpression}, 'type')`,
alias: 't',
},
{
valueExpression: `${source.eventAttributesExpression}['rr-web.chunk']`,
alias: 'ck',
},
{
valueExpression: `${source.eventAttributesExpression}['rr-web.total-chunks']`,
alias: 'tcks',
},
],
dateRange: [
new Date(parseInt(startTime)),
new Date(parseInt(endTime)),
],
from: source.from,
whereLanguage: 'lucene',
where: `ServiceName:"${serviceName}" AND ${source.resourceAttributesExpression}.rum.sessionId:"${sessionId}"`,
timestampValueExpression: source.timestampValueExpression,
implicitColumnExpression: source.implicitColumnExpression,
connection: source.connection,
orderBy: `${source.timestampValueExpression} ASC`,
limit: {
limit: Math.min(MAX_LIMIT, parseInt(queryLimit)),
offset: parseInt(offset),
},
onmessage(event) {
if (event.event === '') {
const parsedRows = event.data
.split('\n')
.map((row: string) => {
try {
const parsed = JSON.parse(row);
linesFetched++;
return parsed;
} catch (e) {
return null;
}
})
.filter((v: any) => v !== null);
if (onEvent != null) {
parsedRows.forEach(onEvent);
} else if (keepPreviousData) {
resBuffer.push(...parsedRows);
} else {
setResults(prevResults => ({
key: resultsKey ?? prevResults.key ?? 'DEFAULT_KEY',
data: [...prevResults.data, ...parsedRows],
}));
}
} else if (event.event === 'end') {
onEnd?.();
if (keepPreviousData) {
setResults({
key: resultsKey ?? 'DEFAULT_KEY',
data: resBuffer,
});
}
if (linesFetched === 0 || linesFetched < limit) {
setHasNextPage(false);
}
}
},
onclose() {
ctrl.abort();
setIsFetching(false);
setFetchStatus('idle');
lastFetchStatusRef.current = 'idle';
// if the server closes the connection unexpectedly, retry:
// throw new RetriableError();
},
// onerror(err) {
// if (err instanceof FatalError) {
// throw err; // rethrow to stop the operation
// } else {
// // do nothing to automatically retry. You can also
// // return a specific retry interval here.
// }
// },
},
metadata,
);
// TODO: Change ClickhouseClient class to use this under the hood,
// and refactor this to use ClickhouseClient.query. Also change pathname
// in createClient to PROXY_CLICKHOUSE_HOST instead
const format = 'JSONEachRow';
const queryFn = async () => {
if (IS_LOCAL_MODE) {
const localConnections = getLocalConnections();
const localModeUrl = new URL(localConnections[0].host);
localModeUrl.username = localConnections[0].username;
localModeUrl.password = localConnections[0].password;
const clickhouseClient = getClickhouseClient();
return clickhouseClient.query({
query: query.sql,
query_params: query.params,
format,
});
} else {
const clickhouseClient = createClient({
clickhouse_settings: {
add_http_cors_header: IS_LOCAL_MODE ? 1 : 0,
cancel_http_readonly_queries_on_client_close: 1,
date_time_output_format: 'iso',
wait_end_of_query: 0,
},
http_headers: { 'x-hyperdx-connection-id': source.connection },
keep_alive: {
enabled: true,
},
url: window.location.origin,
pathname: '/api/clickhouse-proxy',
compression: {
response: true,
},
});
return clickhouseClient.query({
query: query.sql,
query_params: query.params,
format,
});
}
};
const fetchPromise = (async () => {
const resultSet = await queryFn();
let forFunc: (data: any) => void;
if (onEvent) {
forFunc = onEvent;
} else if (keepPreviousData) {
forFunc = (data: any) => resBuffer.push(data);
} else {
forFunc = (data: any) =>
setResults(prevResults =>
produce(prevResults, draft => {
draft.key = resultsKey ?? draft.key ?? 'DEFAULT_KEY';
draft.data.push(data);
}),
);
}
const stream = resultSet.stream();
for await (const chunk of streamToAsyncIterator(stream)) {
for (const row of chunk) {
try {
const parsed = row.json();
linesFetched++;
forFunc(parsed);
} catch {
// do noting
}
}
}
onEnd?.();
if (keepPreviousData) {
setResults({
key: resultsKey ?? 'DEFAULT_KEY',
data: resBuffer,
});
}
if (linesFetched === 0 || linesFetched < limit) {
setHasNextPage(false);
}
})();
try {
await Promise.race([
fetchPromise,
@ -413,8 +481,15 @@ export function useRRWebEventStream(
console.error(e);
}
}
ctrl.abort();
setIsFetching(false);
setFetchStatus('idle');
lastFetchStatusRef.current = 'idle';
},
[
source,
serviceName,
sessionId,
startDate,
endDate,

View file

@ -361,15 +361,13 @@ export class ClickhouseClient {
clickhouse_settings?: Record<string, any>;
connectionId?: string;
queryId?: string;
}): Promise<BaseResultSet<any, T>> {
}): Promise<BaseResultSet<ReadableStream, T>> {
const isLocalMode = this.username != null && this.password != null;
const includeCredentials = !isLocalMode;
const includeCorsHeader = isLocalMode;
const _connectionId = isLocalMode ? undefined : connectionId;
const searchParams = new URLSearchParams([
...(includeCorsHeader ? [['add_http_cors_header', '1']] : []),
...(_connectionId ? [['hyperdx_connection_id', _connectionId]] : []),
['query', query],
['default_format', format],
['date_time_output_format', 'iso'],
@ -405,11 +403,17 @@ export class ClickhouseClient {
if (isBrowser) {
// TODO: check if we can use the client-web directly
const { ResultSet } = await import('@clickhouse/client-web');
const headers = {};
if (!isLocalMode && connectionId) {
headers['x-hyperdx-connection-id'] = connectionId;
}
// https://github.com/ClickHouse/clickhouse-js/blob/1ebdd39203730bb99fad4c88eac35d9a5e96b34a/packages/client-web/src/connection/web_connection.ts#L200C7-L200C23
const response = await fetch(`${this.host}/?${searchParams.toString()}`, {
...(includeCredentials ? { credentials: 'include' } : {}),
signal: abort_signal,
method: 'GET',
headers,
});
// TODO: Send command to CH to cancel query on abort_signal