From e0c23d4e40ffdb84f5c9b7af28e69cb69d15fa34 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Mon, 15 Dec 2025 10:56:31 -0500 Subject: [PATCH] feat: buffer and flush parallelized window queries to always flush inorder data (#1481) We should be able to send most chart series queries all at once and view the results as the data comes in. This also ensures the data arrives in order. This is only enabled it for the histogram on DBSearchPage so far. Closes HDX-3051 --- .changeset/hip-fireants-argue.md | 6 + packages/app/src/components/DBTimeChart.tsx | 7 +- .../hooks/__tests__/useChartConfig.test.tsx | 169 ++++++++++++++++++ packages/app/src/hooks/useChartConfig.tsx | 42 +++-- packages/app/src/timeQuery.ts | 7 +- 5 files changed, 215 insertions(+), 16 deletions(-) create mode 100644 .changeset/hip-fireants-argue.md diff --git a/.changeset/hip-fireants-argue.md b/.changeset/hip-fireants-argue.md new file mode 100644 index 00000000..66bc2428 --- /dev/null +++ b/.changeset/hip-fireants-argue.md @@ -0,0 +1,6 @@ +--- +"@hyperdx/api": patch +"@hyperdx/app": patch +--- + +feat: flush chunk data as it arrives if in order diff --git a/packages/app/src/components/DBTimeChart.tsx b/packages/app/src/components/DBTimeChart.tsx index 63ec0a5c..1703d413 100644 --- a/packages/app/src/components/DBTimeChart.tsx +++ b/packages/app/src/components/DBTimeChart.tsx @@ -256,8 +256,11 @@ function DBTimeChartComponent({ queryKeyPrefix, queriedConfig, 'chunked', - disableQueryChunking, - enableParallelQueries && me?.team?.parallelizeWhenPossible, + { + disableQueryChunking, + enableParallelQueries, + parallelizeWhenPossible: me?.team?.parallelizeWhenPossible, + }, ], enabled: enabled && !isLoadingMe, enableQueryChunking: !disableQueryChunking, diff --git a/packages/app/src/hooks/__tests__/useChartConfig.test.tsx b/packages/app/src/hooks/__tests__/useChartConfig.test.tsx index fbf1c431..88894907 100644 --- a/packages/app/src/hooks/__tests__/useChartConfig.test.tsx +++ b/packages/app/src/hooks/__tests__/useChartConfig.test.tsx @@ -1122,5 +1122,174 @@ describe('useChartConfig', () => { expect(result.current.isLoading).toBe(false); expect(result.current.isPending).toBe(false); }); + + const setupParallelQueries = () => { + const config = createMockChartConfig({ + dateRange: [ + new Date('2025-10-01 00:00:00Z'), + new Date('2025-10-02 00:00:00Z'), + ], + granularity: '3 hour', + }); + const mockResponse1 = createMockQueryResponse([ + { + 'count()': '71', + __hdx_time_bucket: '2025-10-01T18:00:00Z', + }, + { + 'count()': '72', + __hdx_time_bucket: '2025-10-01T19:00:00Z', + }, + ]); + const mockResponse2 = createMockQueryResponse([ + { + 'count()': '73', + __hdx_time_bucket: '2025-10-01T12:00:00Z', + }, + { + 'count()': '74', + __hdx_time_bucket: '2025-10-01T14:00:00Z', + }, + ]); + const mockResponse3 = createMockQueryResponse([ + { + 'count()': '75', + __hdx_time_bucket: '2025-10-01T01:00:00Z', + }, + ]); + + return { config, mockResponse1, mockResponse2, mockResponse3 }; + }; + + it('fetches data in parallel when enableParallelQueries is true', async () => { + const { config, mockResponse1, mockResponse2, mockResponse3 } = + setupParallelQueries(); + + // Create promises that resolve with different delays to simulate parallel execution + const promise1 = Promise.resolve(mockResponse1); + const promise2 = new Promise(resolve => + setTimeout(() => resolve(mockResponse2), 50), + ); + const promise3 = new Promise(resolve => + setTimeout(() => resolve(mockResponse3), 100), + ); + + mockClickhouseClient.queryChartConfig + .mockReturnValueOnce(promise1) + .mockReturnValueOnce(promise2) + .mockReturnValueOnce(promise3); + + const { result } = renderHook( + () => + useQueriedChartConfig(config, { + enableQueryChunking: true, + enableParallelQueries: true, + }), + { + wrapper, + }, + ); + + await waitFor(() => expect(result.current.isSuccess).toBe(true), { + timeout: 1000, + }); + await waitFor(() => expect(result.current.isFetching).toBe(false), { + timeout: 1000, + }); + + expect(mockClickhouseClient.queryChartConfig).toHaveBeenCalledTimes(3); + + // Data should be in order based on time window chunks (newest first) + expect(result.current.data).toEqual({ + data: [ + ...mockResponse3.data, + ...mockResponse2.data, + ...mockResponse1.data, + ], + meta: mockResponse1.meta, + rows: 5, + isComplete: true, + }); + expect(result.current.isLoading).toBe(false); + expect(result.current.isPending).toBe(false); + }); + + it('streams parallel query results in order', async () => { + const { config, mockResponse1, mockResponse2, mockResponse3 } = + setupParallelQueries(); + + // Create promises with controlled resolution order - simulate last chunk finishing first + let resolvePromise3: (value: any) => void; + let resolvePromise2: (value: any) => void; + let resolvePromise1: (value: any) => void; + + const promise1 = new Promise(resolve => { + resolvePromise1 = resolve; + }); + const promise2 = new Promise(resolve => { + resolvePromise2 = resolve; + }); + const promise3 = new Promise(resolve => { + resolvePromise3 = resolve; + }); + + mockClickhouseClient.queryChartConfig + .mockReturnValueOnce(promise1 as any) + .mockReturnValueOnce(promise2 as any) + .mockReturnValueOnce(promise3 as any); + + const { result } = renderHook( + () => + useQueriedChartConfig(config, { + enableQueryChunking: true, + enableParallelQueries: true, + }), + { + wrapper, + }, + ); + + // Should be in loading state initially + expect(result.current.isLoading).toBe(true); + expect(result.current.data).toBeUndefined(); + + // Resolve the last chunk first (out of order) + resolvePromise3!(mockResponse3); + + // Should still be loading since we need the first chunk + await new Promise(resolve => setTimeout(resolve, 10)); + expect(result.current.isLoading).toBe(true); + + // Resolve the first chunk + resolvePromise1!(mockResponse1); + + await waitFor(() => expect(result.current.isLoading).toBe(false)); + + // Should have partial data from first chunk only (in chronological order) + expect(result.current.data).toEqual({ + data: mockResponse1.data, + meta: mockResponse1.meta, + rows: 2, + isComplete: false, + }); + expect(result.current.isFetching).toBe(true); + + // Resolve the middle chunk + resolvePromise2!(mockResponse2); + + await waitFor(() => expect(result.current.isFetching).toBe(false)); + + // Should now have all data in chronological order + expect(result.current.data).toEqual({ + data: [ + ...mockResponse3.data, + ...mockResponse2.data, + ...mockResponse1.data, + ], + meta: mockResponse1.meta, + rows: 5, + isComplete: true, + }); + }); }); }); diff --git a/packages/app/src/hooks/useChartConfig.tsx b/packages/app/src/hooks/useChartConfig.tsx index 0a445215..28fc05b8 100644 --- a/packages/app/src/hooks/useChartConfig.tsx +++ b/packages/app/src/hooks/useChartConfig.tsx @@ -153,23 +153,41 @@ async function* fetchDataInChunks({ if (enableParallelQueries) { // fetch in parallel - const results = await Promise.all( - windows.map(w => { - const windowedConfig = { - ...config, - ...(w ?? {}), - }; - return clickhouseClient.queryChartConfig({ + const promises = windows.map(async (w, index) => { + const windowedConfig = { + ...config, + ...(w ?? {}), + }; + return { + index, + queryResult: await clickhouseClient.queryChartConfig({ config: windowedConfig, metadata: getMetadata(), opts: { abort_signal: signal, }, - }); - }), - ); - for (let i = 0; i < results.length; i++) { - yield { chunk: results[i], isComplete: i === results.length - 1 }; + }), + }; + }); + const remainingPromises = [...promises]; + const bufferedChunks = new Array(windows.length); + let flushed = 0; + for (let i = 0; i < promises.length; i++) { + // receive any promise in the array that resolves + const { index, queryResult } = await Promise.race(remainingPromises); + // add to an ordered buffer array, keeping in mind the flushed count thus far + bufferedChunks[index - flushed] = queryResult; + // use promises array (doesn't change in size) to find the index in the ever-changing remainingPromises array + const resolvedPromiseIdx = remainingPromises.indexOf(promises[index]); + // use found index to remove entry from remainingPromises + remainingPromises.splice(resolvedPromiseIdx, 1); + // while bufferedChunks has in-ordered data, flush it + while (bufferedChunks.length > 0 && bufferedChunks[0] !== undefined) { + // remove data from front so that it always arrives in order + const chunk = bufferedChunks.shift(); + yield { chunk, isComplete: bufferedChunks.length === 0 }; + flushed += 1; + } } return; } diff --git a/packages/app/src/timeQuery.ts b/packages/app/src/timeQuery.ts index 3fa67ffc..ed9f19a5 100644 --- a/packages/app/src/timeQuery.ts +++ b/packages/app/src/timeQuery.ts @@ -449,8 +449,11 @@ export function useNewTimeQuery({ }, ); - const [searchedTimeRange, setSearchedTimeRange] = - useState<[Date, Date]>(initialTimeRange); + const [searchedTimeRange, setSearchedTimeRange] = useState<[Date, Date]>( + from != null && to != null + ? [new Date(from), new Date(to)] + : initialTimeRange, + ); const onSearch = useCallback( (timeQuery: string) => {