feat: buffer and flush parallelized window queries to always flush inorder data (#1481)

We should be able to send most chart series queries all at once and view the results as the data comes in. This also ensures the data arrives in order.

This is only enabled it for the histogram on DBSearchPage so far.

Closes HDX-3051
This commit is contained in:
Aaron Knudtson 2025-12-15 10:56:31 -05:00 committed by GitHub
parent 19b710fb68
commit e0c23d4e40
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 215 additions and 16 deletions

View file

@ -0,0 +1,6 @@
---
"@hyperdx/api": patch
"@hyperdx/app": patch
---
feat: flush chunk data as it arrives if in order

View file

@ -256,8 +256,11 @@ function DBTimeChartComponent({
queryKeyPrefix,
queriedConfig,
'chunked',
disableQueryChunking,
enableParallelQueries && me?.team?.parallelizeWhenPossible,
{
disableQueryChunking,
enableParallelQueries,
parallelizeWhenPossible: me?.team?.parallelizeWhenPossible,
},
],
enabled: enabled && !isLoadingMe,
enableQueryChunking: !disableQueryChunking,

View file

@ -1122,5 +1122,174 @@ describe('useChartConfig', () => {
expect(result.current.isLoading).toBe(false);
expect(result.current.isPending).toBe(false);
});
const setupParallelQueries = () => {
const config = createMockChartConfig({
dateRange: [
new Date('2025-10-01 00:00:00Z'),
new Date('2025-10-02 00:00:00Z'),
],
granularity: '3 hour',
});
const mockResponse1 = createMockQueryResponse([
{
'count()': '71',
__hdx_time_bucket: '2025-10-01T18:00:00Z',
},
{
'count()': '72',
__hdx_time_bucket: '2025-10-01T19:00:00Z',
},
]);
const mockResponse2 = createMockQueryResponse([
{
'count()': '73',
__hdx_time_bucket: '2025-10-01T12:00:00Z',
},
{
'count()': '74',
__hdx_time_bucket: '2025-10-01T14:00:00Z',
},
]);
const mockResponse3 = createMockQueryResponse([
{
'count()': '75',
__hdx_time_bucket: '2025-10-01T01:00:00Z',
},
]);
return { config, mockResponse1, mockResponse2, mockResponse3 };
};
it('fetches data in parallel when enableParallelQueries is true', async () => {
const { config, mockResponse1, mockResponse2, mockResponse3 } =
setupParallelQueries();
// Create promises that resolve with different delays to simulate parallel execution
const promise1 = Promise.resolve(mockResponse1);
const promise2 = new Promise<typeof mockResponse2>(resolve =>
setTimeout(() => resolve(mockResponse2), 50),
);
const promise3 = new Promise<typeof mockResponse3>(resolve =>
setTimeout(() => resolve(mockResponse3), 100),
);
mockClickhouseClient.queryChartConfig
.mockReturnValueOnce(promise1)
.mockReturnValueOnce(promise2)
.mockReturnValueOnce(promise3);
const { result } = renderHook(
() =>
useQueriedChartConfig(config, {
enableQueryChunking: true,
enableParallelQueries: true,
}),
{
wrapper,
},
);
await waitFor(() => expect(result.current.isSuccess).toBe(true), {
timeout: 1000,
});
await waitFor(() => expect(result.current.isFetching).toBe(false), {
timeout: 1000,
});
expect(mockClickhouseClient.queryChartConfig).toHaveBeenCalledTimes(3);
// Data should be in order based on time window chunks (newest first)
expect(result.current.data).toEqual({
data: [
...mockResponse3.data,
...mockResponse2.data,
...mockResponse1.data,
],
meta: mockResponse1.meta,
rows: 5,
isComplete: true,
});
expect(result.current.isLoading).toBe(false);
expect(result.current.isPending).toBe(false);
});
it('streams parallel query results in order', async () => {
const { config, mockResponse1, mockResponse2, mockResponse3 } =
setupParallelQueries();
// Create promises with controlled resolution order - simulate last chunk finishing first
let resolvePromise3: (value: any) => void;
let resolvePromise2: (value: any) => void;
let resolvePromise1: (value: any) => void;
const promise1 = new Promise(resolve => {
resolvePromise1 = resolve;
});
const promise2 = new Promise(resolve => {
resolvePromise2 = resolve;
});
const promise3 = new Promise(resolve => {
resolvePromise3 = resolve;
});
mockClickhouseClient.queryChartConfig
.mockReturnValueOnce(promise1 as any)
.mockReturnValueOnce(promise2 as any)
.mockReturnValueOnce(promise3 as any);
const { result } = renderHook(
() =>
useQueriedChartConfig(config, {
enableQueryChunking: true,
enableParallelQueries: true,
}),
{
wrapper,
},
);
// Should be in loading state initially
expect(result.current.isLoading).toBe(true);
expect(result.current.data).toBeUndefined();
// Resolve the last chunk first (out of order)
resolvePromise3!(mockResponse3);
// Should still be loading since we need the first chunk
await new Promise(resolve => setTimeout(resolve, 10));
expect(result.current.isLoading).toBe(true);
// Resolve the first chunk
resolvePromise1!(mockResponse1);
await waitFor(() => expect(result.current.isLoading).toBe(false));
// Should have partial data from first chunk only (in chronological order)
expect(result.current.data).toEqual({
data: mockResponse1.data,
meta: mockResponse1.meta,
rows: 2,
isComplete: false,
});
expect(result.current.isFetching).toBe(true);
// Resolve the middle chunk
resolvePromise2!(mockResponse2);
await waitFor(() => expect(result.current.isFetching).toBe(false));
// Should now have all data in chronological order
expect(result.current.data).toEqual({
data: [
...mockResponse3.data,
...mockResponse2.data,
...mockResponse1.data,
],
meta: mockResponse1.meta,
rows: 5,
isComplete: true,
});
});
});
});

View file

@ -153,23 +153,41 @@ async function* fetchDataInChunks({
if (enableParallelQueries) {
// fetch in parallel
const results = await Promise.all(
windows.map(w => {
const windowedConfig = {
...config,
...(w ?? {}),
};
return clickhouseClient.queryChartConfig({
const promises = windows.map(async (w, index) => {
const windowedConfig = {
...config,
...(w ?? {}),
};
return {
index,
queryResult: await clickhouseClient.queryChartConfig({
config: windowedConfig,
metadata: getMetadata(),
opts: {
abort_signal: signal,
},
});
}),
);
for (let i = 0; i < results.length; i++) {
yield { chunk: results[i], isComplete: i === results.length - 1 };
}),
};
});
const remainingPromises = [...promises];
const bufferedChunks = new Array(windows.length);
let flushed = 0;
for (let i = 0; i < promises.length; i++) {
// receive any promise in the array that resolves
const { index, queryResult } = await Promise.race(remainingPromises);
// add to an ordered buffer array, keeping in mind the flushed count thus far
bufferedChunks[index - flushed] = queryResult;
// use promises array (doesn't change in size) to find the index in the ever-changing remainingPromises array
const resolvedPromiseIdx = remainingPromises.indexOf(promises[index]);
// use found index to remove entry from remainingPromises
remainingPromises.splice(resolvedPromiseIdx, 1);
// while bufferedChunks has in-ordered data, flush it
while (bufferedChunks.length > 0 && bufferedChunks[0] !== undefined) {
// remove data from front so that it always arrives in order
const chunk = bufferedChunks.shift();
yield { chunk, isComplete: bufferedChunks.length === 0 };
flushed += 1;
}
}
return;
}

View file

@ -449,8 +449,11 @@ export function useNewTimeQuery({
},
);
const [searchedTimeRange, setSearchedTimeRange] =
useState<[Date, Date]>(initialTimeRange);
const [searchedTimeRange, setSearchedTimeRange] = useState<[Date, Date]>(
from != null && to != null
? [new Date(from), new Date(to)]
: initialTimeRange,
);
const onSearch = useCallback(
(timeQuery: string) => {