diff --git a/packages/services/api/src/modules/shared/__tests__/mutex.spec.ts b/packages/services/api/src/modules/shared/__tests__/mutex.spec.ts index 8e6b15c38..e28e4432b 100644 --- a/packages/services/api/src/modules/shared/__tests__/mutex.spec.ts +++ b/packages/services/api/src/modules/shared/__tests__/mutex.spec.ts @@ -13,9 +13,9 @@ it('should allow only one lock at a time', async ({ expect }) => { const lock2 = mutex.lock('1', { signal }); // second lock shouldnt resolve - await expect(Promise.race([throwAfter(), lock2])).rejects.toBeTruthy(); + await expect(Promise.race([throwAfter(50), lock2])).rejects.toBeTruthy(); - await unlock1(); + unlock1(); // after the first lock releases, second one resolves await expect(lock2).resolves.toBeTruthy(); @@ -31,20 +31,6 @@ it('should allow different locks at any time', async ({ expect }) => { await expect(mutex.lock('3', { signal })).resolves.toBeTruthy(); }); -it('should time out after the specified duration', async ({ expect }) => { - const mutex = new Mutex(new Tlogger(), new Redis(differentPort())); - - const [signal] = createSignal(); - - await mutex.lock('1', { signal }); - - const lock2 = mutex.lock('1', { signal, retries: 0 }); - - await expect(lock2).rejects.toMatchInlineSnapshot( - '[ExecutionError: The operation was unable to achieve a quorum during its retry window.]', - ); -}); - it('should cancel locking on abort signal', async ({ expect }) => { const mutex = new Mutex(new Tlogger(), new Redis(differentPort())); @@ -58,7 +44,7 @@ it('should cancel locking on abort signal', async ({ expect }) => { await expect(lock2).rejects.toMatchInlineSnapshot('[Error: Locking aborted]'); - await unlock1(); + unlock1(); // make sure that the aborted lock does not lock await expect(mutex.lock('1', { signal: createSignal()[0] })).resolves.toBeTruthy(); @@ -74,7 +60,7 @@ it('should unlock on abort signal', async ({ expect }) => { const lock2 = mutex.lock('1', { signal: createSignal()[0] }); // second lock shouldnt resolve - await expect(Promise.race([throwAfter(), lock2])).rejects.toBeTruthy(); + await expect(Promise.race([throwAfter(50), lock2])).rejects.toBeTruthy(); abort(); @@ -130,6 +116,30 @@ describe.concurrent('should serialise concurrent threads', () => { } }); +it('should keep auto-extending lock until unlock', async ({ expect }) => { + const mutex = new Mutex(new Tlogger(), new Redis(differentPort())); + const [signal] = createSignal(); + + const unlock = await mutex.lock('1', { + signal, + autoExtendThreshold: 100, + // duration needs to be 100ms more than the autoExtendThreshold + duration: 200, + }); + + await sleep(600); // extended at least 2 times + + const lock2 = mutex.lock('1', { signal: createSignal()[0] }); + + // second lock still cannot be acquired resolve + await expect(Promise.race([throwAfter(50), lock2])).rejects.toBeTruthy(); + + unlock(); + + // only after unlock can the second lock be acquired + await expect(lock2).resolves.toBeTruthy(); +}); + class Tlogger implements Logger { public info = vi.fn(); public warn = vi.fn(); @@ -144,7 +154,7 @@ function sleep(ms = 50) { return new Promise(resolve => setTimeout(resolve, ms)); } -async function throwAfter(ms?: number) { +async function throwAfter(ms: number) { await sleep(ms); throw `Throwing after ${ms}ms`; } diff --git a/packages/services/api/src/modules/shared/providers/mutex.ts b/packages/services/api/src/modules/shared/providers/mutex.ts index 98ac8287f..d34c30666 100644 --- a/packages/services/api/src/modules/shared/providers/mutex.ts +++ b/packages/services/api/src/modules/shared/providers/mutex.ts @@ -8,9 +8,12 @@ export interface MutexLockOptions { signal: AbortSignal; /** * The lock duration in milliseconds. Beware that the duration - * is how long the lock can be held, not the acquire timeout. + * is how long is the lock held, not the acquire timeout. * - * @default 60_000 + * Note that the lock will be auto-extended by the duration all + * the way until released (unlocked). + * + * @default 10_000 */ duration?: number; /** @@ -28,6 +31,12 @@ export interface MutexLockOptions { * @default 1000 */ retryDelay?: number; + /** + * The minimum remaining time in milliseconds on the lock before auto-extension. + * + * @default 500 + */ + autoExtendThreshold?: number; } @Injectable() @@ -49,64 +58,64 @@ export class Mutex { public lock( id: string, - { signal, duration = 60_000, retries = 60, retryDelay = 1000 }: MutexLockOptions, - ) { - return Promise.race([ - new Promise((_, reject) => { - const listener = () => { - signal.removeEventListener('abort', listener); - reject(new Error('Locking aborted')); - }; - signal.addEventListener('abort', listener); - }), - (async () => { - if (signal.aborted) { - throw new Error('Locking aborted'); - } - this.logger.debug('Acquiring lock (id=%s)', id); - const lock = await this.redlock.acquire([id], duration, { - retryCount: retries, - retryDelay, - }); - if (signal.aborted) { - lock.release().catch(err => { - // it is safe to not throw the error, as the lock will - // automatically expire after its duration is exceeded - // TODO: should this be logged as an error? a release may fail if there - // is no lock to release, like when the duration gets exceeded - this.logger.warn('Lock release problem after aborted (id=%s, err=%s)', id, err); - }); - throw new Error('Locking aborted'); - } - this.logger.debug('Lock acquired (id=%s)', id); - const listener = () => { - this.logger.debug('Releasing lock after aborted (id=%s)', id); - signal.removeEventListener('abort', listener); - lock.release().catch(err => { - // it is safe to not throw the error, as the lock will - // automatically expire after its duration is exceeded - // TODO: should this be logged as an error? a release may fail if there - // is no lock to release, like when the duration gets exceeded - this.logger.warn('Lock release problem after aborted (id=%s, err=%s)', id, err); - }); - }; - signal.addEventListener('abort', listener); - return async () => { - if (signal.aborted) { - this.logger.debug('Lock already released because aborted (id=%s)', id); - return; - } - this.logger.debug('Releasing lock (id=%s)', id); - await lock.release().catch(err => { - // it is safe to not throw the error, as the lock will - // automatically expire after its duration is exceeded - // TODO: should this be logged as an error? a release may fail if there - // is no lock to release, like when the duration gets exceeded - this.logger.warn('Lock release problem (id=%s, err=%s)', id, err); - }); - }; - })(), - ]); + { + signal, + duration = 10_000, + retries = 60, + retryDelay = 1000, + autoExtendThreshold = 500, + }: MutexLockOptions, + ): Promise<() => void> { + return new Promise((acquired, notAcquired) => { + this.logger.debug('Acquiring lock (id=%s)', id); + + let unlock!: () => void; + const l = Promise.race([ + new Promise(resolve => { + signal.addEventListener( + 'abort', + () => { + this.logger.warn('Lock aborted (id=%s)', id); + // reject lock acquire + notAcquired(new Error('Locking aborted')); + // but resolve lock (so that redlock releases) + resolve(); + }, + { once: true }, + ); + }), + new Promise(resolve => (unlock = resolve)), + ]); + + this.redlock + .using( + [id], + duration, + { + retryCount: retries, + retryDelay, + automaticExtensionThreshold: autoExtendThreshold, + }, + autoExtensionFailSignal => { + autoExtensionFailSignal.addEventListener( + 'abort', + event => { + // TODO: how to bubble this to the caller? the lock is basically released at this point + this.logger.error('Lock auto-extension failed (id=%s, event=%s)', id, event); + }, + { once: true }, + ); + this.logger.debug('Lock acquired (id=%s)', id); + acquired(() => { + this.logger.debug('Releasing lock (id=%s)', id); + unlock(); + }); + return l; + }, + ) + // nothing in the lock usage throws, so the error can only be a failed acquire + .catch(notAcquired); + }); } public async perform( @@ -118,7 +127,7 @@ export class Mutex { try { return await action(); } finally { - await unlock(); + unlock(); } } }