mirror of
https://github.com/datahaven-xyz/datahaven
synced 2026-05-24 01:38:32 +00:00
feat: add Prometheus metrics and debug logging to validator set submitter
Add prom-client based metrics exposed via Bun.serve() HTTP server with /metrics, /healthz, and /readyz endpoints. Instrument all tick and submission paths with counters, gauges, and histograms. Add debug-level logging for silent skip paths in the tick handler to aid production troubleshooting.
This commit is contained in:
parent
49286b128d
commit
c90a38f4dc
9 changed files with 3914 additions and 18687 deletions
22295
test/bun.lock
22295
test/bun.lock
File diff suppressed because it is too large
Load diff
|
|
@ -68,6 +68,7 @@
|
|||
"pino": "^9.7.0",
|
||||
"pino-pretty": "^13.0.0",
|
||||
"polkadot-api": "^1.15.1",
|
||||
"prom-client": "^15.1.0",
|
||||
"solc": "^0.8.30",
|
||||
"tiny-invariant": "^1.3.3",
|
||||
"viem": "^2.31.3",
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ COPY test/utils/ ./utils/
|
|||
|
||||
ENV NODE_ENV=production
|
||||
|
||||
EXPOSE 9090
|
||||
|
||||
USER submitter
|
||||
|
||||
ENTRYPOINT ["bun", "run", "tools/validator-set-submitter/main.ts", "run"]
|
||||
|
|
|
|||
|
|
@ -39,6 +39,9 @@ network_id: "anvil"
|
|||
# Fees (in ETH, sent as msg.value to cover Snowbridge relay costs)
|
||||
execution_fee: "0.1"
|
||||
relayer_fee: "0.2"
|
||||
|
||||
# Optional metrics port (default: 9090)
|
||||
# metrics_port: 9090
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
|
@ -64,6 +67,24 @@ bun tools/validator-set-submitter/main.ts run --dry-run
|
|||
|
||||
Private key precedence is: `--submitter-private-key` > `SUBMITTER_PRIVATE_KEY` > `submitter_private_key` in config file.
|
||||
|
||||
## Observability
|
||||
|
||||
The submitter exposes a Prometheus metrics server on `metrics_port` (default `9090`):
|
||||
|
||||
- `GET /metrics` — Prometheus metrics
|
||||
- `GET /healthz` — liveness
|
||||
- `GET /readyz` — readiness (`200` once startup checks pass and watcher is running)
|
||||
|
||||
Key metrics:
|
||||
|
||||
- `validator_set_submitter_submissions_total{outcome="success|failed|dry_run"}`
|
||||
- `validator_set_submitter_ticks_total{result="submitted_success|submitted_failed|skipped_*"}`
|
||||
- `validator_set_submitter_errors_total{type="tick_error|subscription_error"}`
|
||||
- `validator_set_submitter_missed_eras_total`
|
||||
- `validator_set_submitter_consecutive_missed_eras`
|
||||
- `validator_set_submitter_up`
|
||||
- `validator_set_submitter_ready`
|
||||
|
||||
## Docker
|
||||
|
||||
Build the image from the repository root:
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ export interface SubmitterConfig {
|
|||
executionFee: bigint;
|
||||
relayerFee: bigint;
|
||||
dryRun: boolean;
|
||||
metricsPort: number;
|
||||
}
|
||||
|
||||
interface CliOverrides {
|
||||
|
|
@ -42,6 +43,8 @@ export async function loadConfig(
|
|||
const executionFee = parseEther(optionalString(raw, "execution_fee") ?? "0.1");
|
||||
const relayerFee = parseEther(optionalString(raw, "relayer_fee") ?? "0.2");
|
||||
|
||||
const metricsPort = resolveMetricsPort(raw);
|
||||
|
||||
return {
|
||||
ethereumRpcUrl,
|
||||
datahavenWsUrl,
|
||||
|
|
@ -50,7 +53,8 @@ export async function loadConfig(
|
|||
networkId,
|
||||
executionFee,
|
||||
relayerFee,
|
||||
dryRun: cli.dryRun ?? false
|
||||
dryRun: cli.dryRun ?? false,
|
||||
metricsPort
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -99,3 +103,14 @@ function optionalHexString(raw: Record<string, unknown>, key: string): `0x${stri
|
|||
}
|
||||
return val as `0x${string}`;
|
||||
}
|
||||
|
||||
function resolveMetricsPort(raw: Record<string, unknown>): number {
|
||||
const port =
|
||||
raw.metrics_port !== undefined && raw.metrics_port !== null ? Number(raw.metrics_port) : 9090;
|
||||
|
||||
if (!Number.isFinite(port) || !Number.isInteger(port) || port < 1 || port > 65535) {
|
||||
throw new Error(`Invalid metrics port: ${port}. Must be an integer between 1 and 65535.`);
|
||||
}
|
||||
|
||||
return port;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,3 +19,6 @@ network_id: "anvil"
|
|||
# Fees (in ETH, sent as msg.value to cover Snowbridge relay costs)
|
||||
execution_fee: "0.1"
|
||||
relayer_fee: "0.2"
|
||||
|
||||
# Prometheus metrics server port (default: 9090)
|
||||
# metrics_port: 9090
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { logger } from "utils/logger";
|
|||
import { privateKeyToAccount } from "viem/accounts";
|
||||
import { getOnChainSubmitter } from "./chain";
|
||||
import { loadConfig } from "./config";
|
||||
import { createMetricsServer } from "./metrics";
|
||||
import { createClients, startSubmitter } from "./submitter";
|
||||
|
||||
const program = new Command()
|
||||
|
|
@ -28,6 +29,10 @@ program
|
|||
submitterPrivateKey: opts.submitterPrivateKey
|
||||
});
|
||||
|
||||
// Start metrics server early so /healthz is available during init
|
||||
const metricsServer = createMetricsServer(config.metricsPort);
|
||||
logger.info(`Metrics server listening on :${config.metricsPort}`);
|
||||
|
||||
logger.info("Validator Set Submitter starting...");
|
||||
logger.info(`Ethereum RPC: ${config.ethereumRpcUrl}`);
|
||||
logger.info(`DataHaven WS: ${config.datahavenWsUrl}`);
|
||||
|
|
@ -84,6 +89,7 @@ program
|
|||
try {
|
||||
await startSubmitter(clients, config, ac.signal);
|
||||
} finally {
|
||||
metricsServer.stop();
|
||||
clients.papiClient.destroy();
|
||||
logger.info("Submitter stopped, PAPI client destroyed");
|
||||
}
|
||||
|
|
|
|||
134
test/tools/validator-set-submitter/metrics.ts
Normal file
134
test/tools/validator-set-submitter/metrics.ts
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
import { Counter, Gauge, Histogram, Registry } from "prom-client";
|
||||
|
||||
const PREFIX = "validator_set_submitter_";
|
||||
|
||||
export const registry = new Registry();
|
||||
|
||||
// --- Counters ---
|
||||
|
||||
export const submissionsTotal = new Counter({
|
||||
name: `${PREFIX}submissions_total`,
|
||||
help: "Total submission attempts and results",
|
||||
labelNames: ["outcome"] as const,
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const ticksTotal = new Counter({
|
||||
name: `${PREFIX}ticks_total`,
|
||||
help: "Total tick evaluations",
|
||||
labelNames: ["result"] as const,
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const errorsTotal = new Counter({
|
||||
name: `${PREFIX}errors_total`,
|
||||
help: "Non-submission errors",
|
||||
labelNames: ["type"] as const,
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const missedErasTotal = new Counter({
|
||||
name: `${PREFIX}missed_eras_total`,
|
||||
help: "Total eras where a submission attempt failed",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
// --- Gauges ---
|
||||
|
||||
export const activeEra = new Gauge({
|
||||
name: `${PREFIX}active_era`,
|
||||
help: "Current active era on DataHaven",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const targetEra = new Gauge({
|
||||
name: `${PREFIX}target_era`,
|
||||
help: "Target era for next submission",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const externalIndex = new Gauge({
|
||||
name: `${PREFIX}external_index`,
|
||||
help: "Latest confirmed era on-chain",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const currentSession = new Gauge({
|
||||
name: `${PREFIX}current_session`,
|
||||
help: "Current session number",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const lastSubmittedEra = new Gauge({
|
||||
name: `${PREFIX}last_submitted_era`,
|
||||
help: "Last era successfully submitted",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const consecutiveMissedEras = new Gauge({
|
||||
name: `${PREFIX}consecutive_missed_eras`,
|
||||
help: "Consecutive eras missed (resets to 0 on success)",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const up = new Gauge({
|
||||
name: `${PREFIX}up`,
|
||||
help: "1 if watcher is running, 0 if stopped",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const ready = new Gauge({
|
||||
name: `${PREFIX}ready`,
|
||||
help: "1 if startup checks passed and watcher running, 0 otherwise",
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
// --- Histograms ---
|
||||
|
||||
export const submissionDuration = new Histogram({
|
||||
name: `${PREFIX}submission_duration_seconds`,
|
||||
help: "Time from tx send to receipt",
|
||||
buckets: [1, 5, 10, 30, 60, 120, 300],
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
export const tickDuration = new Histogram({
|
||||
name: `${PREFIX}tick_duration_seconds`,
|
||||
help: "Time to process one tick",
|
||||
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
|
||||
registers: [registry]
|
||||
});
|
||||
|
||||
// --- HTTP Server ---
|
||||
|
||||
export function createMetricsServer(port: number) {
|
||||
const server = Bun.serve({
|
||||
port,
|
||||
async fetch(req) {
|
||||
const url = new URL(req.url);
|
||||
|
||||
if (url.pathname === "/metrics") {
|
||||
const metrics = await registry.metrics();
|
||||
return new Response(metrics, {
|
||||
headers: { "Content-Type": registry.contentType }
|
||||
});
|
||||
}
|
||||
|
||||
if (url.pathname === "/healthz") {
|
||||
return new Response("ok\n", { status: 200 });
|
||||
}
|
||||
|
||||
if (url.pathname === "/readyz") {
|
||||
const isReady = (await ready.get()).values[0]?.value === 1;
|
||||
if (isReady) {
|
||||
return new Response("ready\n", { status: 200 });
|
||||
}
|
||||
return new Response("not ready\n", { status: 503 });
|
||||
}
|
||||
|
||||
return new Response("Not Found\n", { status: 404 });
|
||||
}
|
||||
});
|
||||
|
||||
return server;
|
||||
}
|
||||
|
|
@ -14,6 +14,7 @@ import { privateKeyToAccount } from "viem/accounts";
|
|||
import { dataHavenServiceManagerAbi, gatewayAbi } from "../../contract-bindings";
|
||||
import { computeTargetEra, getActiveEra, getExternalIndex, isLastSessionOfEra } from "./chain";
|
||||
import type { SubmitterConfig } from "./config";
|
||||
import * as metrics from "./metrics";
|
||||
|
||||
interface SubmitterClients {
|
||||
publicClient: PublicClient;
|
||||
|
|
@ -71,32 +72,70 @@ async function waitForReceiptWithAbort(
|
|||
function createTicker(clients: SubmitterClients, config: SubmitterConfig, signal: AbortSignal) {
|
||||
let submittedEra: bigint | undefined;
|
||||
|
||||
return async (currentSession: number): Promise<void> => {
|
||||
const { dhApi } = clients;
|
||||
return async (currentSessionValue: number): Promise<void> => {
|
||||
const endTimer = metrics.tickDuration.startTimer();
|
||||
try {
|
||||
const { dhApi } = clients;
|
||||
|
||||
const activeEra = await getActiveEra(dhApi);
|
||||
if (!activeEra) {
|
||||
logger.warn("ActiveEra not set yet");
|
||||
return;
|
||||
metrics.currentSession.set(currentSessionValue);
|
||||
|
||||
const activeEra = await getActiveEra(dhApi);
|
||||
if (!activeEra) {
|
||||
logger.warn("ActiveEra not set yet");
|
||||
metrics.ticksTotal.inc({ result: "skipped_no_active_era" });
|
||||
return;
|
||||
}
|
||||
|
||||
metrics.activeEra.set(activeEra.index);
|
||||
|
||||
const targetEraValue = computeTargetEra(activeEra.index);
|
||||
metrics.targetEra.set(Number(targetEraValue));
|
||||
|
||||
if (submittedEra === targetEraValue) {
|
||||
logger.debug(`Tick skipped: era ${targetEraValue} already submitted locally`);
|
||||
metrics.ticksTotal.inc({ result: "skipped_already_submitted" });
|
||||
return;
|
||||
}
|
||||
|
||||
const externalIndexValue = await getExternalIndex(dhApi);
|
||||
metrics.externalIndex.set(Number(externalIndexValue));
|
||||
|
||||
if (externalIndexValue >= targetEraValue) {
|
||||
logger.debug(
|
||||
`Tick skipped: ExternalIndex=${externalIndexValue} >= TargetEra=${targetEraValue}, already confirmed`
|
||||
);
|
||||
submittedEra = targetEraValue;
|
||||
metrics.ticksTotal.inc({ result: "skipped_already_confirmed" });
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(await isLastSessionOfEra(dhApi))) {
|
||||
logger.debug("Tick skipped: not last session of era");
|
||||
metrics.ticksTotal.inc({ result: "skipped_not_last_session" });
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Session=${currentSessionValue} ActiveEra=${activeEra.index} TargetEra=${targetEraValue} ExternalIndex=${externalIndexValue}`
|
||||
);
|
||||
|
||||
const succeeded = await submitForEra(clients, config, targetEraValue, signal);
|
||||
if (succeeded) {
|
||||
submittedEra = targetEraValue;
|
||||
metrics.consecutiveMissedEras.set(0);
|
||||
metrics.lastSubmittedEra.set(Number(targetEraValue));
|
||||
metrics.ticksTotal.inc({ result: "submitted_success" });
|
||||
} else {
|
||||
if (!signal.aborted) {
|
||||
logger.warn(`Submission failed for target era ${targetEraValue}; era will be missed`);
|
||||
}
|
||||
metrics.missedErasTotal.inc();
|
||||
metrics.consecutiveMissedEras.inc();
|
||||
metrics.ticksTotal.inc({ result: "submitted_failed" });
|
||||
}
|
||||
} finally {
|
||||
endTimer();
|
||||
}
|
||||
|
||||
const targetEra = computeTargetEra(activeEra.index);
|
||||
if (submittedEra === targetEra) return;
|
||||
|
||||
const externalIndex = await getExternalIndex(dhApi);
|
||||
if (externalIndex >= targetEra) {
|
||||
submittedEra = targetEra;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(await isLastSessionOfEra(dhApi))) return;
|
||||
|
||||
logger.info(
|
||||
`Session=${currentSession} ActiveEra=${activeEra.index} TargetEra=${targetEra} ExternalIndex=${externalIndex}`
|
||||
);
|
||||
|
||||
const succeeded = await submitForEra(clients, config, targetEra, signal);
|
||||
if (succeeded) submittedEra = targetEra;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -112,20 +151,28 @@ export async function startSubmitter(
|
|||
const { dhApi } = clients;
|
||||
const tick = createTicker(clients, config, signal);
|
||||
|
||||
metrics.up.set(1);
|
||||
metrics.ready.set(1);
|
||||
logger.info("Submitter started — watching session changes");
|
||||
|
||||
const sub = dhApi.query.Session.CurrentIndex.watchValue("finalized")
|
||||
.pipe(
|
||||
exhaustMap((currentSession) => {
|
||||
exhaustMap((currentSessionValue) => {
|
||||
if (signal.aborted) return EMPTY;
|
||||
return tick(currentSession).catch((err) => {
|
||||
if (!signal.aborted) logger.error(`Tick error: ${err}`);
|
||||
return tick(currentSessionValue).catch((err) => {
|
||||
if (!signal.aborted) {
|
||||
logger.error(`Tick error: ${err}`);
|
||||
metrics.errorsTotal.inc({ type: "tick_error" });
|
||||
}
|
||||
});
|
||||
})
|
||||
)
|
||||
.subscribe({
|
||||
error: (err) => {
|
||||
if (!signal.aborted) logger.error(`Session subscription error: ${err}`);
|
||||
if (!signal.aborted) {
|
||||
logger.error(`Session subscription error: ${err}`);
|
||||
metrics.errorsTotal.inc({ type: "subscription_error" });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -133,6 +180,8 @@ export async function startSubmitter(
|
|||
await Promise.race([onAbort(signal), done]);
|
||||
sub.unsubscribe();
|
||||
|
||||
metrics.up.set(0);
|
||||
metrics.ready.set(0);
|
||||
logger.info("Submitter stopped");
|
||||
}
|
||||
|
||||
|
|
@ -143,14 +192,14 @@ export async function startSubmitter(
|
|||
async function submitForEra(
|
||||
clients: SubmitterClients,
|
||||
config: SubmitterConfig,
|
||||
targetEra: bigint,
|
||||
targetEraValue: bigint,
|
||||
signal: AbortSignal
|
||||
): Promise<boolean> {
|
||||
const { publicClient, walletClient } = clients;
|
||||
|
||||
const totalFee = config.executionFee + config.relayerFee;
|
||||
logger.info(
|
||||
`Submitting era ${targetEra} (execFee=${config.executionFee} relayerFee=${config.relayerFee})`
|
||||
`Submitting era ${targetEraValue} (execFee=${config.executionFee} relayerFee=${config.relayerFee})`
|
||||
);
|
||||
|
||||
if (config.dryRun) {
|
||||
|
|
@ -158,18 +207,20 @@ async function submitForEra(
|
|||
address: config.serviceManagerAddress,
|
||||
abi: dataHavenServiceManagerAbi,
|
||||
functionName: "buildNewValidatorSetMessageForEra",
|
||||
args: [targetEra]
|
||||
args: [targetEraValue]
|
||||
});
|
||||
logger.info(`[DRY RUN] Would send message: ${message}`);
|
||||
metrics.submissionsTotal.inc({ outcome: "dry_run" });
|
||||
return true;
|
||||
}
|
||||
|
||||
const endTimer = metrics.submissionDuration.startTimer();
|
||||
try {
|
||||
const hash = await walletClient.writeContract({
|
||||
address: config.serviceManagerAddress,
|
||||
abi: dataHavenServiceManagerAbi,
|
||||
functionName: "sendNewValidatorSetForEra",
|
||||
args: [targetEra, config.executionFee, config.relayerFee],
|
||||
args: [targetEraValue, config.executionFee, config.relayerFee],
|
||||
value: totalFee,
|
||||
chain: null
|
||||
});
|
||||
|
|
@ -178,6 +229,7 @@ async function submitForEra(
|
|||
const receipt = await waitForReceiptWithAbort(publicClient, hash, signal);
|
||||
if (receipt.status !== "success") {
|
||||
logger.error(`Transaction reverted: ${hash}`);
|
||||
metrics.submissionsTotal.inc({ outcome: "failed" });
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -196,14 +248,20 @@ async function submitForEra(
|
|||
|
||||
if (!hasOutbound) {
|
||||
logger.warn("Transaction succeeded but no OutboundMessageAccepted event found");
|
||||
metrics.submissionsTotal.inc({ outcome: "failed" });
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.info("OutboundMessageAccepted confirmed");
|
||||
metrics.submissionsTotal.inc({ outcome: "success" });
|
||||
metrics.lastSubmittedEra.set(Number(targetEraValue));
|
||||
return true;
|
||||
} catch (err: unknown) {
|
||||
if (signal.aborted) return false;
|
||||
logger.error(`Submission attempt failed: ${err}`);
|
||||
metrics.submissionsTotal.inc({ outcome: "failed" });
|
||||
return false;
|
||||
} finally {
|
||||
endTimer();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue