console/packages/services/usage/__tests__/buffer.spec.ts

318 lines
8.5 KiB
TypeScript

import { calculateChunkSize, createKVBuffer } from '../src/buffer';
function waitFor(time: number) {
return new Promise(resolve => setTimeout(resolve, time));
}
const eventHubLimitInBytes = 900_000;
const bufferSize = 1200;
const defaultBytesPerUnit = eventHubLimitInBytes / bufferSize;
test('increase the defaultBytesPerOperation estimation by 5% when over 100 calls were made and 10% of them failed', async () => {
const logger = {
// info: vi.fn(console.info),
// error: vi.fn(console.error),
info: vi.fn(),
error: vi.fn(),
};
const flush = vi.fn();
const onRetry = vi.fn();
const interval = 200;
const size = {
successful: bufferSize / 2,
overflow: bufferSize,
error: bufferSize / 2 - 1,
};
const bytesPerUnit = eventHubLimitInBytes / size.successful;
const buffer = createKVBuffer<{
id: string;
size: number;
}>({
logger: logger as any,
size: size.successful,
interval,
limitInBytes: eventHubLimitInBytes,
useEstimator: true,
onRetry,
isTooLargePayloadError() {
return true;
},
calculateReportSize(report) {
return report.size;
},
split(report, numOfChunks) {
const reports: Array<{
id: string;
size: number;
}> = [];
for (let chunkIndex = 0; chunkIndex < numOfChunks; chunkIndex++) {
reports.push({
id: `${report.id}-chunk-${chunkIndex}`,
size: calculateChunkSize(report.size, numOfChunks, chunkIndex),
});
}
return reports;
},
async sender(reports, _bytes, _batchId, validateSize) {
const receivedSize = reports.reduce((sum, report) => report.size + sum, 0);
flush(reports.map(r => r.id).join(','));
if (receivedSize === size.error) {
validateSize(size.error * bytesPerUnit);
throw new Error('Over the size limit!');
} else {
validateSize(receivedSize * bytesPerUnit);
}
},
});
buffer.start();
// make 100 calls
for (let i = 0; i < 100; i++) {
buffer.add({
id: `good - ${i}`,
size: size.successful,
});
}
// Interval passes
await waitFor(interval + 50);
expect(logger.info).not.toHaveBeenCalledWith(
expect.stringContaining('Increasing default bytes per unit'),
);
expect(flush).toBeCalledTimes(100);
// make 10 calls that fail
for (let i = 0; i < 12; i++) {
buffer.add({
id: `bad - ${i}`,
size: size.error,
});
}
// Interval passes
await waitFor(interval + 50);
expect(flush).toBeCalledTimes(112);
expect(logger.info).not.toHaveBeenCalledWith(
expect.stringContaining('Increasing default bytes per unit'),
);
await waitFor(1000);
// make 1 call that fails
buffer.add({
id: 'decider',
size: size.error,
});
// Interval passes
await waitFor(interval + 50);
const newDefault = bytesPerUnit * 1.05;
expect(logger.info).toHaveBeenCalledWith(
expect.stringContaining('Increasing default bytes per unit (ratio=%s, new=%s)'),
0.05,
newDefault,
);
const flushedTimes = 114;
expect(flush).toHaveBeenCalledTimes(flushedTimes);
// Buffer should split into two reports because the defaultBytesPerUnit estimation is increased
// which means that the buffer can hold less operations than before
buffer.add({
id: 'new-reality',
size: Math.ceil(eventHubLimitInBytes / newDefault) + 1,
});
// We reached the limit of bytes (according to the new estimations)
// No need to wait for the interval to pass
await waitFor(interval + 50);
expect(flush).toHaveBeenCalledTimes(flushedTimes + 1);
await buffer.stop();
expect(flush).toHaveBeenCalledTimes(flushedTimes + 1);
});
test('buffer should split the report into multiple reports when the estimated size is greater than the limit', async () => {
const logger = {
info: vi.fn(),
error: vi.fn(),
};
const flush = vi.fn();
const interval = 200;
const buffer = createKVBuffer<{
id: string;
size: number;
operations: number[];
}>({
logger: logger as any,
size: bufferSize,
interval,
limitInBytes: eventHubLimitInBytes,
useEstimator: true,
isTooLargePayloadError() {
return true;
},
calculateReportSize(report) {
return report.size;
},
onRetry() {},
split(report, numOfChunks) {
const reports: Array<{
id: string;
size: number;
operations: number[];
}> = [];
let endedAt = 0;
for (let i = 0; i < numOfChunks; i++) {
const chunkSize = calculateChunkSize(report.size, numOfChunks, i);
const start = endedAt;
const end = start + chunkSize;
endedAt = end;
const operations = report.operations.slice(start, end);
reports.push({
id: `${report.id}-${i}`,
size: operations.length,
operations,
});
}
return reports;
},
async sender(reports, _bytes, _batchId, validateSize) {
const receivedSize = reports.reduce((sum, report) => report.size + sum, 0);
flush(reports.map(r => r.id).join(','), receivedSize);
validateSize(receivedSize * defaultBytesPerUnit);
},
});
buffer.start();
const bigBatchSize = bufferSize + 20;
// add a report bigger than the limit
buffer.add({
id: 'big',
size: bigBatchSize,
operations: new Array(bigBatchSize).fill(0).map((_, i) => i),
});
// Interval passes
await waitFor(interval + 50);
// Buffer should flush two reports, the big report splitted in half
expect(flush).toHaveBeenNthCalledWith(1, 'big-0', bigBatchSize / 2);
expect(flush).toHaveBeenNthCalledWith(2, 'big-1', bigBatchSize / 2);
const biggerBatchSize = bufferSize + bufferSize + 30;
buffer.add({
id: 'bigger',
size: biggerBatchSize,
operations: new Array(biggerBatchSize).fill(0).map((_, i) => i),
});
// Interval passes
await waitFor(interval + 50);
expect(flush).toHaveBeenNthCalledWith(3, 'bigger-0', biggerBatchSize / 3);
expect(flush).toHaveBeenNthCalledWith(4, 'bigger-1', biggerBatchSize / 3);
expect(flush).toHaveBeenNthCalledWith(5, 'bigger-2', biggerBatchSize / 3);
await buffer.stop();
});
test('buffer create two chunks out of one buffer when actual buffer size is too big', async () => {
const logger = {
info: vi.fn(),
error: vi.fn(),
// info: vi.fn(console.info),
// error: vi.fn(console.error),
};
const flush = vi.fn();
const split = vi.fn((report, numOfChunks) => {
const reports: Array<{
id: string;
size: number;
operations: number[];
}> = [];
let endedAt = 0;
for (let i = 0; i < numOfChunks; i++) {
const chunkSize = calculateChunkSize(report.size, numOfChunks, i);
const start = endedAt;
const end = start + chunkSize;
endedAt = end;
const operations = report.operations.slice(start, end);
reports.push({
id: `${report.id}-${i}`,
size: operations.length,
operations,
});
}
return reports;
});
const onRetry = vi.fn();
const interval = 200;
const buffer = createKVBuffer<{
id: string;
size: number;
operations: number[];
}>({
logger: logger as any,
size: bufferSize,
interval,
limitInBytes: eventHubLimitInBytes,
useEstimator: true,
isTooLargePayloadError() {
return true;
},
calculateReportSize(report) {
return report.size;
},
onRetry,
split,
async sender(reports, _bytes, batchId, validateSize) {
const receivedSize = reports.reduce((sum, report) => report.size + sum, 0);
validateSize(receivedSize * 2 * defaultBytesPerUnit);
flush(reports.map(r => r.id).join(','), receivedSize, batchId);
},
});
buffer.start();
// add a report bigger than the limit
buffer.add({
id: 'big',
size: bufferSize,
operations: new Array(bufferSize).fill(0).map((_, i) => i),
});
// Interval passes
await waitFor(interval + 50);
// Reports should be split as well, just in case we have one or few big reports.
// In our case it should be called once (1 report split into 2 reports)
expect(split).toHaveBeenCalledTimes(1);
// Flush should be retried because the buffer size was too big (twice as big)
expect(onRetry).toBeCalledTimes(1);
// Buffer should flush two reports, the big report splitted in half
expect(flush).toHaveBeenNthCalledWith(
1,
'big-0',
bufferSize / 2,
expect.stringContaining('--retry-chunk-0'),
);
expect(flush).toHaveBeenNthCalledWith(
2,
'big-1',
bufferSize / 2,
expect.stringContaining('--retry-chunk-1'),
);
await buffer.stop();
});