mirror of
https://github.com/datahaven-xyz/datahaven
synced 2026-05-24 09:50:01 +00:00
## Add E2E validator-set update flow - feat: `test/utils/validators.ts` for on-demand validator orchestration. - feat: `test/suites/validator-set-update.test.ts` covering allowlist → register → update. - some minor launcher updates: avoid docker cache, add `--platform` when building datahaven image, avoid sending validator-set update on launch. - Helpers: ABI shortcut in `test/utils/contracts.ts`; config tweaks in `test/configs/validator-set.json`. - Minor cleanup/formatting across `test/launcher/*`, `test/scripts/setup-validators.ts`, and related tests. - added `keepAlive` flag to `BaseTestSuite`, in order to avoid tearing down the network while debugging. Defaults, obviously, to false. - added a `failOnTomeout` option on to waitForDataHavenEvents() so the test fails of the timeout is reached and no event was captured. ### Coverage - The test simulates an scenario in which we have two active authorities (alice and bob), which are running, and registered as operators, which is the normal state after the chain launches. Then: - It launches two more nodes (charlie and dave) - It add the nodes to allowlist and register them as operators - It sends the validator set update message - Checks that the validator update message was propagated through the gateway and arrived the external-validators pallet - Checks that the chain continues producing blocks ### Notes The last test case has a timeout of 10 minutes. This is to respect propagation times of the message through the relayers. We are testing that the external validators pallet actually updated the validator set. Locally, I could expect 5~6 minutes, I just wanted to be on the safe side. CI is passing showing that this was enough indeed. --------- Co-authored-by: Steve Degosserie <723552+stiiifff@users.noreply.github.com> Co-authored-by: Ahmad Kaouk <56095276+ahmadkaouk@users.noreply.github.com>
262 lines
7.7 KiB
TypeScript
262 lines
7.7 KiB
TypeScript
import { firstValueFrom, of } from "rxjs";
|
|
import { catchError, map, filter as rxFilter, take, tap, timeout } from "rxjs/operators";
|
|
import type { Abi, Address, Log, PublicClient } from "viem";
|
|
import { decodeEventLog } from "viem";
|
|
import { logger } from "./logger";
|
|
import type { DataHavenApi } from "./papi";
|
|
|
|
/**
|
|
* Event utilities for DataHaven and Ethereum chains
|
|
*
|
|
* This module provides utilities for waiting for events on different chains:
|
|
* - DataHaven events (substrate-based chain events)
|
|
* - Ethereum events (using viem event filters)
|
|
*/
|
|
|
|
/**
|
|
* Result from waiting for a DataHaven event
|
|
*/
|
|
export interface DataHavenEventResult<T = unknown> {
|
|
/** Pallet name */
|
|
pallet: string;
|
|
/** Event name */
|
|
event: string;
|
|
/** Event data payload (null if timeout or error) */
|
|
data: T | null;
|
|
/** Metadata about when/where event was emitted */
|
|
meta: any | null;
|
|
}
|
|
|
|
/**
|
|
* Options for waiting for a single DataHaven event
|
|
*/
|
|
export interface WaitForDataHavenEventOptions<T = unknown> {
|
|
/** DataHaven API instance */
|
|
api: DataHavenApi;
|
|
/** Pallet name (e.g., "System", "Balances") */
|
|
pallet: string;
|
|
/** Event name (e.g., "ExtrinsicSuccess", "Transfer") */
|
|
event: string;
|
|
/** Optional filter function to match specific events */
|
|
filter?: (event: T) => boolean;
|
|
/** Timeout in milliseconds (default: 30000) */
|
|
timeout?: number;
|
|
/** Callback for matched event */
|
|
onEvent?: (event: T) => void;
|
|
/** Callback for timeout */
|
|
failOnTimeout?: boolean;
|
|
}
|
|
|
|
/**
|
|
* Wait for a specific event on the DataHaven chain
|
|
* @param options - Options for event waiting
|
|
* @returns Event result with pallet, event name, and converted data
|
|
*/
|
|
export async function waitForDataHavenEvent<T = unknown>(
|
|
options: WaitForDataHavenEventOptions<T>
|
|
): Promise<DataHavenEventResult<T>> {
|
|
const {
|
|
api,
|
|
pallet,
|
|
event,
|
|
filter,
|
|
timeout: timeoutMs = 30000,
|
|
onEvent,
|
|
failOnTimeout
|
|
} = options;
|
|
|
|
const eventWatcher = (api.event as any)?.[pallet]?.[event];
|
|
if (!eventWatcher?.watch) {
|
|
logger.warn(`Event ${pallet}.${event} not found`);
|
|
return { pallet, event, data: null, meta: null };
|
|
}
|
|
|
|
let meta: any = null;
|
|
let data: T | null = null;
|
|
|
|
try {
|
|
const matched: any = await firstValueFrom(
|
|
eventWatcher.watch().pipe(
|
|
// Log every raw emission from the watcher
|
|
tap(() => {
|
|
logger.debug(`Event ${pallet}.${event} received (raw)`);
|
|
}),
|
|
// Normalize to a consistent shape { payload, meta }
|
|
map((raw: any) => ({ payload: raw?.payload ?? raw, meta: raw?.meta ?? null })),
|
|
// Apply the optional filter BEFORE taking the first item
|
|
rxFilter(({ payload }) => {
|
|
if (!filter) return true;
|
|
try {
|
|
return filter(payload as T);
|
|
} catch {
|
|
return false;
|
|
}
|
|
}),
|
|
// Stop on the first matching event
|
|
take(1),
|
|
// Enforce an overall timeout while waiting for a matching event
|
|
timeout({
|
|
first: timeoutMs,
|
|
with: () => {
|
|
if (failOnTimeout) {
|
|
throw new Error(`Timeout waiting for event ${pallet}.${event} after ${timeoutMs}ms`);
|
|
}
|
|
logger.debug(`Timeout waiting for event ${pallet}.${event} after ${timeoutMs}ms`);
|
|
return of(null);
|
|
}
|
|
}),
|
|
catchError((error: unknown) => {
|
|
logger.error(`Error in event subscription ${pallet}.${event}: ${error}`);
|
|
return of(null);
|
|
})
|
|
)
|
|
);
|
|
|
|
if (matched) {
|
|
meta = matched.meta;
|
|
data = matched.payload as T;
|
|
if (data) {
|
|
onEvent?.(data);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.error(`Unexpected error waiting for event ${pallet}.${event}: ${error}`);
|
|
data = null;
|
|
}
|
|
|
|
return { pallet, event, data, meta };
|
|
}
|
|
|
|
// ================== Ethereum Event Utilities ==================
|
|
|
|
/**
|
|
* Result from waiting for an Ethereum event
|
|
*/
|
|
export interface EthereumEventResult {
|
|
/** Contract address */
|
|
address: Address;
|
|
/** Event name */
|
|
eventName: string;
|
|
/** Event log (null if timeout or error) */
|
|
log: Log | null;
|
|
}
|
|
|
|
/**
|
|
* Options for waiting for a single Ethereum event
|
|
*/
|
|
export interface WaitForEthereumEventOptions<TAbi extends Abi = Abi> {
|
|
/** Viem public client instance */
|
|
client: PublicClient;
|
|
/** Contract address */
|
|
address: Address;
|
|
/** Contract ABI */
|
|
abi: TAbi;
|
|
/** Event name to watch for */
|
|
eventName: any;
|
|
/** Optional event arguments to filter */
|
|
args?: any;
|
|
/** Timeout in milliseconds (default: 30000) */
|
|
timeout?: number;
|
|
/** Include events from past blocks */
|
|
fromBlock?: bigint;
|
|
/** Callback for each matched event */
|
|
onEvent?: (log: Log) => void;
|
|
}
|
|
|
|
/**
|
|
* Wait for a specific event on the Ethereum chain
|
|
* @param options - Options for event waiting
|
|
* @returns Event result with address, event name, and log
|
|
*/
|
|
export async function waitForEthereumEvent<TAbi extends Abi = Abi>(
|
|
options: WaitForEthereumEventOptions<TAbi>
|
|
): Promise<EthereumEventResult> {
|
|
const { client, address, abi, eventName, args, timeout = 30000, fromBlock, onEvent } = options;
|
|
|
|
const log = await new Promise<Log | null>((resolve) => {
|
|
let unwatch: (() => void) | null = null;
|
|
let timeoutId: NodeJS.Timeout | null = null;
|
|
let matchedLog: Log | null = null;
|
|
|
|
const cleanup = () => {
|
|
if (unwatch) {
|
|
unwatch();
|
|
}
|
|
if (timeoutId) clearTimeout(timeoutId);
|
|
};
|
|
|
|
// Set up timeout
|
|
timeoutId = setTimeout(() => {
|
|
logger.debug(`Timeout waiting for Ethereum event ${eventName} after ${timeout}ms`);
|
|
cleanup();
|
|
resolve(matchedLog);
|
|
}, timeout);
|
|
|
|
// Watch for events
|
|
try {
|
|
unwatch = client.watchContractEvent({
|
|
address,
|
|
abi,
|
|
eventName,
|
|
args,
|
|
fromBlock,
|
|
onLogs: (logs) => {
|
|
logger.debug(`Ethereum event ${eventName} received: ${logs.length} logs`);
|
|
|
|
// If args include non-indexed fields, viem cannot pre-filter them.
|
|
// Post-filter by decoding logs and matching provided args if any.
|
|
let selected: Log | null = null;
|
|
if (args && Object.keys(args).length > 0) {
|
|
for (const candidate of logs) {
|
|
try {
|
|
const decoded = decodeEventLog({
|
|
abi,
|
|
eventName: eventName as any,
|
|
data: candidate.data,
|
|
topics: candidate.topics
|
|
});
|
|
const decodedArgs = (decoded as any).args ?? {};
|
|
const allMatch = Object.entries(args as Record<string, unknown>).every(
|
|
([key, value]) => decodedArgs?.[key] === value
|
|
);
|
|
if (allMatch) {
|
|
selected = candidate;
|
|
break;
|
|
}
|
|
} catch {
|
|
// Ignore decode errors and continue scanning
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!selected && (!args || Object.keys(args).length === 0) && logs.length > 0) {
|
|
// Only fallback to first log when no args filter provided
|
|
selected = logs[0];
|
|
}
|
|
|
|
if (selected) {
|
|
matchedLog = selected;
|
|
if (onEvent) {
|
|
onEvent(matchedLog);
|
|
}
|
|
cleanup();
|
|
resolve(matchedLog);
|
|
}
|
|
// If no selected log matched, keep watching until timeout
|
|
},
|
|
onError: (error: unknown) => {
|
|
// Log and continue; transient watcher errors shouldn't abort the wait
|
|
logger.error(`Error watching Ethereum event ${eventName}: ${error}`);
|
|
cleanup();
|
|
resolve(null);
|
|
}
|
|
});
|
|
} catch (error) {
|
|
logger.error(`Failed to watch Ethereum event ${eventName}: ${error}`);
|
|
cleanup();
|
|
resolve(null);
|
|
}
|
|
});
|
|
|
|
return { address, eventName, log };
|
|
}
|