mirror of
https://github.com/hyperdxio/hyperdx
synced 2026-04-21 13:37:15 +00:00
feat: buffer and flush parallelized window queries to always flush inorder data (#1481)
We should be able to send most chart series queries all at once and view the results as the data comes in. This also ensures the data arrives in order. This is only enabled it for the histogram on DBSearchPage so far. Closes HDX-3051
This commit is contained in:
parent
19b710fb68
commit
e0c23d4e40
5 changed files with 215 additions and 16 deletions
6
.changeset/hip-fireants-argue.md
Normal file
6
.changeset/hip-fireants-argue.md
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
"@hyperdx/api": patch
|
||||
"@hyperdx/app": patch
|
||||
---
|
||||
|
||||
feat: flush chunk data as it arrives if in order
|
||||
|
|
@ -256,8 +256,11 @@ function DBTimeChartComponent({
|
|||
queryKeyPrefix,
|
||||
queriedConfig,
|
||||
'chunked',
|
||||
disableQueryChunking,
|
||||
enableParallelQueries && me?.team?.parallelizeWhenPossible,
|
||||
{
|
||||
disableQueryChunking,
|
||||
enableParallelQueries,
|
||||
parallelizeWhenPossible: me?.team?.parallelizeWhenPossible,
|
||||
},
|
||||
],
|
||||
enabled: enabled && !isLoadingMe,
|
||||
enableQueryChunking: !disableQueryChunking,
|
||||
|
|
|
|||
|
|
@ -1122,5 +1122,174 @@ describe('useChartConfig', () => {
|
|||
expect(result.current.isLoading).toBe(false);
|
||||
expect(result.current.isPending).toBe(false);
|
||||
});
|
||||
|
||||
const setupParallelQueries = () => {
|
||||
const config = createMockChartConfig({
|
||||
dateRange: [
|
||||
new Date('2025-10-01 00:00:00Z'),
|
||||
new Date('2025-10-02 00:00:00Z'),
|
||||
],
|
||||
granularity: '3 hour',
|
||||
});
|
||||
const mockResponse1 = createMockQueryResponse([
|
||||
{
|
||||
'count()': '71',
|
||||
__hdx_time_bucket: '2025-10-01T18:00:00Z',
|
||||
},
|
||||
{
|
||||
'count()': '72',
|
||||
__hdx_time_bucket: '2025-10-01T19:00:00Z',
|
||||
},
|
||||
]);
|
||||
const mockResponse2 = createMockQueryResponse([
|
||||
{
|
||||
'count()': '73',
|
||||
__hdx_time_bucket: '2025-10-01T12:00:00Z',
|
||||
},
|
||||
{
|
||||
'count()': '74',
|
||||
__hdx_time_bucket: '2025-10-01T14:00:00Z',
|
||||
},
|
||||
]);
|
||||
const mockResponse3 = createMockQueryResponse([
|
||||
{
|
||||
'count()': '75',
|
||||
__hdx_time_bucket: '2025-10-01T01:00:00Z',
|
||||
},
|
||||
]);
|
||||
|
||||
return { config, mockResponse1, mockResponse2, mockResponse3 };
|
||||
};
|
||||
|
||||
it('fetches data in parallel when enableParallelQueries is true', async () => {
|
||||
const { config, mockResponse1, mockResponse2, mockResponse3 } =
|
||||
setupParallelQueries();
|
||||
|
||||
// Create promises that resolve with different delays to simulate parallel execution
|
||||
const promise1 = Promise.resolve(mockResponse1);
|
||||
const promise2 = new Promise<typeof mockResponse2>(resolve =>
|
||||
setTimeout(() => resolve(mockResponse2), 50),
|
||||
);
|
||||
const promise3 = new Promise<typeof mockResponse3>(resolve =>
|
||||
setTimeout(() => resolve(mockResponse3), 100),
|
||||
);
|
||||
|
||||
mockClickhouseClient.queryChartConfig
|
||||
.mockReturnValueOnce(promise1)
|
||||
.mockReturnValueOnce(promise2)
|
||||
.mockReturnValueOnce(promise3);
|
||||
|
||||
const { result } = renderHook(
|
||||
() =>
|
||||
useQueriedChartConfig(config, {
|
||||
enableQueryChunking: true,
|
||||
enableParallelQueries: true,
|
||||
}),
|
||||
{
|
||||
wrapper,
|
||||
},
|
||||
);
|
||||
|
||||
await waitFor(() => expect(result.current.isSuccess).toBe(true), {
|
||||
timeout: 1000,
|
||||
});
|
||||
await waitFor(() => expect(result.current.isFetching).toBe(false), {
|
||||
timeout: 1000,
|
||||
});
|
||||
|
||||
expect(mockClickhouseClient.queryChartConfig).toHaveBeenCalledTimes(3);
|
||||
|
||||
// Data should be in order based on time window chunks (newest first)
|
||||
expect(result.current.data).toEqual({
|
||||
data: [
|
||||
...mockResponse3.data,
|
||||
...mockResponse2.data,
|
||||
...mockResponse1.data,
|
||||
],
|
||||
meta: mockResponse1.meta,
|
||||
rows: 5,
|
||||
isComplete: true,
|
||||
});
|
||||
expect(result.current.isLoading).toBe(false);
|
||||
expect(result.current.isPending).toBe(false);
|
||||
});
|
||||
|
||||
it('streams parallel query results in order', async () => {
|
||||
const { config, mockResponse1, mockResponse2, mockResponse3 } =
|
||||
setupParallelQueries();
|
||||
|
||||
// Create promises with controlled resolution order - simulate last chunk finishing first
|
||||
let resolvePromise3: (value: any) => void;
|
||||
let resolvePromise2: (value: any) => void;
|
||||
let resolvePromise1: (value: any) => void;
|
||||
|
||||
const promise1 = new Promise(resolve => {
|
||||
resolvePromise1 = resolve;
|
||||
});
|
||||
const promise2 = new Promise(resolve => {
|
||||
resolvePromise2 = resolve;
|
||||
});
|
||||
const promise3 = new Promise(resolve => {
|
||||
resolvePromise3 = resolve;
|
||||
});
|
||||
|
||||
mockClickhouseClient.queryChartConfig
|
||||
.mockReturnValueOnce(promise1 as any)
|
||||
.mockReturnValueOnce(promise2 as any)
|
||||
.mockReturnValueOnce(promise3 as any);
|
||||
|
||||
const { result } = renderHook(
|
||||
() =>
|
||||
useQueriedChartConfig(config, {
|
||||
enableQueryChunking: true,
|
||||
enableParallelQueries: true,
|
||||
}),
|
||||
{
|
||||
wrapper,
|
||||
},
|
||||
);
|
||||
|
||||
// Should be in loading state initially
|
||||
expect(result.current.isLoading).toBe(true);
|
||||
expect(result.current.data).toBeUndefined();
|
||||
|
||||
// Resolve the last chunk first (out of order)
|
||||
resolvePromise3!(mockResponse3);
|
||||
|
||||
// Should still be loading since we need the first chunk
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
expect(result.current.isLoading).toBe(true);
|
||||
|
||||
// Resolve the first chunk
|
||||
resolvePromise1!(mockResponse1);
|
||||
|
||||
await waitFor(() => expect(result.current.isLoading).toBe(false));
|
||||
|
||||
// Should have partial data from first chunk only (in chronological order)
|
||||
expect(result.current.data).toEqual({
|
||||
data: mockResponse1.data,
|
||||
meta: mockResponse1.meta,
|
||||
rows: 2,
|
||||
isComplete: false,
|
||||
});
|
||||
expect(result.current.isFetching).toBe(true);
|
||||
|
||||
// Resolve the middle chunk
|
||||
resolvePromise2!(mockResponse2);
|
||||
|
||||
await waitFor(() => expect(result.current.isFetching).toBe(false));
|
||||
|
||||
// Should now have all data in chronological order
|
||||
expect(result.current.data).toEqual({
|
||||
data: [
|
||||
...mockResponse3.data,
|
||||
...mockResponse2.data,
|
||||
...mockResponse1.data,
|
||||
],
|
||||
meta: mockResponse1.meta,
|
||||
rows: 5,
|
||||
isComplete: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -153,23 +153,41 @@ async function* fetchDataInChunks({
|
|||
|
||||
if (enableParallelQueries) {
|
||||
// fetch in parallel
|
||||
const results = await Promise.all(
|
||||
windows.map(w => {
|
||||
const windowedConfig = {
|
||||
...config,
|
||||
...(w ?? {}),
|
||||
};
|
||||
return clickhouseClient.queryChartConfig({
|
||||
const promises = windows.map(async (w, index) => {
|
||||
const windowedConfig = {
|
||||
...config,
|
||||
...(w ?? {}),
|
||||
};
|
||||
return {
|
||||
index,
|
||||
queryResult: await clickhouseClient.queryChartConfig({
|
||||
config: windowedConfig,
|
||||
metadata: getMetadata(),
|
||||
opts: {
|
||||
abort_signal: signal,
|
||||
},
|
||||
});
|
||||
}),
|
||||
);
|
||||
for (let i = 0; i < results.length; i++) {
|
||||
yield { chunk: results[i], isComplete: i === results.length - 1 };
|
||||
}),
|
||||
};
|
||||
});
|
||||
const remainingPromises = [...promises];
|
||||
const bufferedChunks = new Array(windows.length);
|
||||
let flushed = 0;
|
||||
for (let i = 0; i < promises.length; i++) {
|
||||
// receive any promise in the array that resolves
|
||||
const { index, queryResult } = await Promise.race(remainingPromises);
|
||||
// add to an ordered buffer array, keeping in mind the flushed count thus far
|
||||
bufferedChunks[index - flushed] = queryResult;
|
||||
// use promises array (doesn't change in size) to find the index in the ever-changing remainingPromises array
|
||||
const resolvedPromiseIdx = remainingPromises.indexOf(promises[index]);
|
||||
// use found index to remove entry from remainingPromises
|
||||
remainingPromises.splice(resolvedPromiseIdx, 1);
|
||||
// while bufferedChunks has in-ordered data, flush it
|
||||
while (bufferedChunks.length > 0 && bufferedChunks[0] !== undefined) {
|
||||
// remove data from front so that it always arrives in order
|
||||
const chunk = bufferedChunks.shift();
|
||||
yield { chunk, isComplete: bufferedChunks.length === 0 };
|
||||
flushed += 1;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -449,8 +449,11 @@ export function useNewTimeQuery({
|
|||
},
|
||||
);
|
||||
|
||||
const [searchedTimeRange, setSearchedTimeRange] =
|
||||
useState<[Date, Date]>(initialTimeRange);
|
||||
const [searchedTimeRange, setSearchedTimeRange] = useState<[Date, Date]>(
|
||||
from != null && to != null
|
||||
? [new Date(from), new Date(to)]
|
||||
: initialTimeRange,
|
||||
);
|
||||
|
||||
const onSearch = useCallback(
|
||||
(timeQuery: string) => {
|
||||
|
|
|
|||
Loading…
Reference in a new issue