Auto-extend mutex lock until unlock (#2094)

This commit is contained in:
Denis Badurina 2023-04-22 09:43:31 +02:00 committed by GitHub
parent b9a36615c4
commit a7759ab63e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 99 additions and 80 deletions

View file

@ -13,9 +13,9 @@ it('should allow only one lock at a time', async ({ expect }) => {
const lock2 = mutex.lock('1', { signal });
// second lock shouldnt resolve
await expect(Promise.race([throwAfter(), lock2])).rejects.toBeTruthy();
await expect(Promise.race([throwAfter(50), lock2])).rejects.toBeTruthy();
await unlock1();
unlock1();
// after the first lock releases, second one resolves
await expect(lock2).resolves.toBeTruthy();
@ -31,20 +31,6 @@ it('should allow different locks at any time', async ({ expect }) => {
await expect(mutex.lock('3', { signal })).resolves.toBeTruthy();
});
it('should time out after the specified duration', async ({ expect }) => {
const mutex = new Mutex(new Tlogger(), new Redis(differentPort()));
const [signal] = createSignal();
await mutex.lock('1', { signal });
const lock2 = mutex.lock('1', { signal, retries: 0 });
await expect(lock2).rejects.toMatchInlineSnapshot(
'[ExecutionError: The operation was unable to achieve a quorum during its retry window.]',
);
});
it('should cancel locking on abort signal', async ({ expect }) => {
const mutex = new Mutex(new Tlogger(), new Redis(differentPort()));
@ -58,7 +44,7 @@ it('should cancel locking on abort signal', async ({ expect }) => {
await expect(lock2).rejects.toMatchInlineSnapshot('[Error: Locking aborted]');
await unlock1();
unlock1();
// make sure that the aborted lock does not lock
await expect(mutex.lock('1', { signal: createSignal()[0] })).resolves.toBeTruthy();
@ -74,7 +60,7 @@ it('should unlock on abort signal', async ({ expect }) => {
const lock2 = mutex.lock('1', { signal: createSignal()[0] });
// second lock shouldnt resolve
await expect(Promise.race([throwAfter(), lock2])).rejects.toBeTruthy();
await expect(Promise.race([throwAfter(50), lock2])).rejects.toBeTruthy();
abort();
@ -130,6 +116,30 @@ describe.concurrent('should serialise concurrent threads', () => {
}
});
it('should keep auto-extending lock until unlock', async ({ expect }) => {
const mutex = new Mutex(new Tlogger(), new Redis(differentPort()));
const [signal] = createSignal();
const unlock = await mutex.lock('1', {
signal,
autoExtendThreshold: 100,
// duration needs to be 100ms more than the autoExtendThreshold
duration: 200,
});
await sleep(600); // extended at least 2 times
const lock2 = mutex.lock('1', { signal: createSignal()[0] });
// second lock still cannot be acquired resolve
await expect(Promise.race([throwAfter(50), lock2])).rejects.toBeTruthy();
unlock();
// only after unlock can the second lock be acquired
await expect(lock2).resolves.toBeTruthy();
});
class Tlogger implements Logger {
public info = vi.fn();
public warn = vi.fn();
@ -144,7 +154,7 @@ function sleep(ms = 50) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function throwAfter(ms?: number) {
async function throwAfter(ms: number) {
await sleep(ms);
throw `Throwing after ${ms}ms`;
}

View file

@ -8,9 +8,12 @@ export interface MutexLockOptions {
signal: AbortSignal;
/**
* The lock duration in milliseconds. Beware that the duration
* is how long the lock can be held, not the acquire timeout.
* is how long is the lock held, not the acquire timeout.
*
* @default 60_000
* Note that the lock will be auto-extended by the duration all
* the way until released (unlocked).
*
* @default 10_000
*/
duration?: number;
/**
@ -28,6 +31,12 @@ export interface MutexLockOptions {
* @default 1000
*/
retryDelay?: number;
/**
* The minimum remaining time in milliseconds on the lock before auto-extension.
*
* @default 500
*/
autoExtendThreshold?: number;
}
@Injectable()
@ -49,64 +58,64 @@ export class Mutex {
public lock(
id: string,
{ signal, duration = 60_000, retries = 60, retryDelay = 1000 }: MutexLockOptions,
) {
return Promise.race([
new Promise<never>((_, reject) => {
const listener = () => {
signal.removeEventListener('abort', listener);
reject(new Error('Locking aborted'));
};
signal.addEventListener('abort', listener);
}),
(async () => {
if (signal.aborted) {
throw new Error('Locking aborted');
}
this.logger.debug('Acquiring lock (id=%s)', id);
const lock = await this.redlock.acquire([id], duration, {
retryCount: retries,
retryDelay,
});
if (signal.aborted) {
lock.release().catch(err => {
// it is safe to not throw the error, as the lock will
// automatically expire after its duration is exceeded
// TODO: should this be logged as an error? a release may fail if there
// is no lock to release, like when the duration gets exceeded
this.logger.warn('Lock release problem after aborted (id=%s, err=%s)', id, err);
});
throw new Error('Locking aborted');
}
this.logger.debug('Lock acquired (id=%s)', id);
const listener = () => {
this.logger.debug('Releasing lock after aborted (id=%s)', id);
signal.removeEventListener('abort', listener);
lock.release().catch(err => {
// it is safe to not throw the error, as the lock will
// automatically expire after its duration is exceeded
// TODO: should this be logged as an error? a release may fail if there
// is no lock to release, like when the duration gets exceeded
this.logger.warn('Lock release problem after aborted (id=%s, err=%s)', id, err);
});
};
signal.addEventListener('abort', listener);
return async () => {
if (signal.aborted) {
this.logger.debug('Lock already released because aborted (id=%s)', id);
return;
}
this.logger.debug('Releasing lock (id=%s)', id);
await lock.release().catch(err => {
// it is safe to not throw the error, as the lock will
// automatically expire after its duration is exceeded
// TODO: should this be logged as an error? a release may fail if there
// is no lock to release, like when the duration gets exceeded
this.logger.warn('Lock release problem (id=%s, err=%s)', id, err);
});
};
})(),
]);
{
signal,
duration = 10_000,
retries = 60,
retryDelay = 1000,
autoExtendThreshold = 500,
}: MutexLockOptions,
): Promise<() => void> {
return new Promise((acquired, notAcquired) => {
this.logger.debug('Acquiring lock (id=%s)', id);
let unlock!: () => void;
const l = Promise.race([
new Promise<void>(resolve => {
signal.addEventListener(
'abort',
() => {
this.logger.warn('Lock aborted (id=%s)', id);
// reject lock acquire
notAcquired(new Error('Locking aborted'));
// but resolve lock (so that redlock releases)
resolve();
},
{ once: true },
);
}),
new Promise<void>(resolve => (unlock = resolve)),
]);
this.redlock
.using(
[id],
duration,
{
retryCount: retries,
retryDelay,
automaticExtensionThreshold: autoExtendThreshold,
},
autoExtensionFailSignal => {
autoExtensionFailSignal.addEventListener(
'abort',
event => {
// TODO: how to bubble this to the caller? the lock is basically released at this point
this.logger.error('Lock auto-extension failed (id=%s, event=%s)', id, event);
},
{ once: true },
);
this.logger.debug('Lock acquired (id=%s)', id);
acquired(() => {
this.logger.debug('Releasing lock (id=%s)', id);
unlock();
});
return l;
},
)
// nothing in the lock usage throws, so the error can only be a failed acquire
.catch(notAcquired);
});
}
public async perform<T>(
@ -118,7 +127,7 @@ export class Mutex {
try {
return await action();
} finally {
await unlock();
unlock();
}
}
}