diff --git a/.changeset/hip-fireants-argue.md b/.changeset/hip-fireants-argue.md new file mode 100644 index 00000000..66bc2428 --- /dev/null +++ b/.changeset/hip-fireants-argue.md @@ -0,0 +1,6 @@ +--- +"@hyperdx/api": patch +"@hyperdx/app": patch +--- + +feat: flush chunk data as it arrives if in order diff --git a/packages/app/src/components/DBTimeChart.tsx b/packages/app/src/components/DBTimeChart.tsx index 63ec0a5c..1703d413 100644 --- a/packages/app/src/components/DBTimeChart.tsx +++ b/packages/app/src/components/DBTimeChart.tsx @@ -256,8 +256,11 @@ function DBTimeChartComponent({ queryKeyPrefix, queriedConfig, 'chunked', - disableQueryChunking, - enableParallelQueries && me?.team?.parallelizeWhenPossible, + { + disableQueryChunking, + enableParallelQueries, + parallelizeWhenPossible: me?.team?.parallelizeWhenPossible, + }, ], enabled: enabled && !isLoadingMe, enableQueryChunking: !disableQueryChunking, diff --git a/packages/app/src/hooks/__tests__/useChartConfig.test.tsx b/packages/app/src/hooks/__tests__/useChartConfig.test.tsx index fbf1c431..88894907 100644 --- a/packages/app/src/hooks/__tests__/useChartConfig.test.tsx +++ b/packages/app/src/hooks/__tests__/useChartConfig.test.tsx @@ -1122,5 +1122,174 @@ describe('useChartConfig', () => { expect(result.current.isLoading).toBe(false); expect(result.current.isPending).toBe(false); }); + + const setupParallelQueries = () => { + const config = createMockChartConfig({ + dateRange: [ + new Date('2025-10-01 00:00:00Z'), + new Date('2025-10-02 00:00:00Z'), + ], + granularity: '3 hour', + }); + const mockResponse1 = createMockQueryResponse([ + { + 'count()': '71', + __hdx_time_bucket: '2025-10-01T18:00:00Z', + }, + { + 'count()': '72', + __hdx_time_bucket: '2025-10-01T19:00:00Z', + }, + ]); + const mockResponse2 = createMockQueryResponse([ + { + 'count()': '73', + __hdx_time_bucket: '2025-10-01T12:00:00Z', + }, + { + 'count()': '74', + __hdx_time_bucket: '2025-10-01T14:00:00Z', + }, + ]); + const mockResponse3 = createMockQueryResponse([ + { + 'count()': '75', + __hdx_time_bucket: '2025-10-01T01:00:00Z', + }, + ]); + + return { config, mockResponse1, mockResponse2, mockResponse3 }; + }; + + it('fetches data in parallel when enableParallelQueries is true', async () => { + const { config, mockResponse1, mockResponse2, mockResponse3 } = + setupParallelQueries(); + + // Create promises that resolve with different delays to simulate parallel execution + const promise1 = Promise.resolve(mockResponse1); + const promise2 = new Promise(resolve => + setTimeout(() => resolve(mockResponse2), 50), + ); + const promise3 = new Promise(resolve => + setTimeout(() => resolve(mockResponse3), 100), + ); + + mockClickhouseClient.queryChartConfig + .mockReturnValueOnce(promise1) + .mockReturnValueOnce(promise2) + .mockReturnValueOnce(promise3); + + const { result } = renderHook( + () => + useQueriedChartConfig(config, { + enableQueryChunking: true, + enableParallelQueries: true, + }), + { + wrapper, + }, + ); + + await waitFor(() => expect(result.current.isSuccess).toBe(true), { + timeout: 1000, + }); + await waitFor(() => expect(result.current.isFetching).toBe(false), { + timeout: 1000, + }); + + expect(mockClickhouseClient.queryChartConfig).toHaveBeenCalledTimes(3); + + // Data should be in order based on time window chunks (newest first) + expect(result.current.data).toEqual({ + data: [ + ...mockResponse3.data, + ...mockResponse2.data, + ...mockResponse1.data, + ], + meta: mockResponse1.meta, + rows: 5, + isComplete: true, + }); + expect(result.current.isLoading).toBe(false); + expect(result.current.isPending).toBe(false); + }); + + it('streams parallel query results in order', async () => { + const { config, mockResponse1, mockResponse2, mockResponse3 } = + setupParallelQueries(); + + // Create promises with controlled resolution order - simulate last chunk finishing first + let resolvePromise3: (value: any) => void; + let resolvePromise2: (value: any) => void; + let resolvePromise1: (value: any) => void; + + const promise1 = new Promise(resolve => { + resolvePromise1 = resolve; + }); + const promise2 = new Promise(resolve => { + resolvePromise2 = resolve; + }); + const promise3 = new Promise(resolve => { + resolvePromise3 = resolve; + }); + + mockClickhouseClient.queryChartConfig + .mockReturnValueOnce(promise1 as any) + .mockReturnValueOnce(promise2 as any) + .mockReturnValueOnce(promise3 as any); + + const { result } = renderHook( + () => + useQueriedChartConfig(config, { + enableQueryChunking: true, + enableParallelQueries: true, + }), + { + wrapper, + }, + ); + + // Should be in loading state initially + expect(result.current.isLoading).toBe(true); + expect(result.current.data).toBeUndefined(); + + // Resolve the last chunk first (out of order) + resolvePromise3!(mockResponse3); + + // Should still be loading since we need the first chunk + await new Promise(resolve => setTimeout(resolve, 10)); + expect(result.current.isLoading).toBe(true); + + // Resolve the first chunk + resolvePromise1!(mockResponse1); + + await waitFor(() => expect(result.current.isLoading).toBe(false)); + + // Should have partial data from first chunk only (in chronological order) + expect(result.current.data).toEqual({ + data: mockResponse1.data, + meta: mockResponse1.meta, + rows: 2, + isComplete: false, + }); + expect(result.current.isFetching).toBe(true); + + // Resolve the middle chunk + resolvePromise2!(mockResponse2); + + await waitFor(() => expect(result.current.isFetching).toBe(false)); + + // Should now have all data in chronological order + expect(result.current.data).toEqual({ + data: [ + ...mockResponse3.data, + ...mockResponse2.data, + ...mockResponse1.data, + ], + meta: mockResponse1.meta, + rows: 5, + isComplete: true, + }); + }); }); }); diff --git a/packages/app/src/hooks/useChartConfig.tsx b/packages/app/src/hooks/useChartConfig.tsx index 0a445215..28fc05b8 100644 --- a/packages/app/src/hooks/useChartConfig.tsx +++ b/packages/app/src/hooks/useChartConfig.tsx @@ -153,23 +153,41 @@ async function* fetchDataInChunks({ if (enableParallelQueries) { // fetch in parallel - const results = await Promise.all( - windows.map(w => { - const windowedConfig = { - ...config, - ...(w ?? {}), - }; - return clickhouseClient.queryChartConfig({ + const promises = windows.map(async (w, index) => { + const windowedConfig = { + ...config, + ...(w ?? {}), + }; + return { + index, + queryResult: await clickhouseClient.queryChartConfig({ config: windowedConfig, metadata: getMetadata(), opts: { abort_signal: signal, }, - }); - }), - ); - for (let i = 0; i < results.length; i++) { - yield { chunk: results[i], isComplete: i === results.length - 1 }; + }), + }; + }); + const remainingPromises = [...promises]; + const bufferedChunks = new Array(windows.length); + let flushed = 0; + for (let i = 0; i < promises.length; i++) { + // receive any promise in the array that resolves + const { index, queryResult } = await Promise.race(remainingPromises); + // add to an ordered buffer array, keeping in mind the flushed count thus far + bufferedChunks[index - flushed] = queryResult; + // use promises array (doesn't change in size) to find the index in the ever-changing remainingPromises array + const resolvedPromiseIdx = remainingPromises.indexOf(promises[index]); + // use found index to remove entry from remainingPromises + remainingPromises.splice(resolvedPromiseIdx, 1); + // while bufferedChunks has in-ordered data, flush it + while (bufferedChunks.length > 0 && bufferedChunks[0] !== undefined) { + // remove data from front so that it always arrives in order + const chunk = bufferedChunks.shift(); + yield { chunk, isComplete: bufferedChunks.length === 0 }; + flushed += 1; + } } return; } diff --git a/packages/app/src/timeQuery.ts b/packages/app/src/timeQuery.ts index 3fa67ffc..ed9f19a5 100644 --- a/packages/app/src/timeQuery.ts +++ b/packages/app/src/timeQuery.ts @@ -449,8 +449,11 @@ export function useNewTimeQuery({ }, ); - const [searchedTimeRange, setSearchedTimeRange] = - useState<[Date, Date]>(initialTimeRange); + const [searchedTimeRange, setSearchedTimeRange] = useState<[Date, Date]>( + from != null && to != null + ? [new Date(from), new Date(to)] + : initialTimeRange, + ); const onSearch = useCallback( (timeQuery: string) => {