From ae5deb41e083f0f65ed6d7e2646a83db74920ba4 Mon Sep 17 00:00:00 2001 From: Steve Degosserie <723552+stiiifff@users.noreply.github.com> Date: Tue, 10 Feb 2026 10:12:16 +0100 Subject: [PATCH] feat(node): add Ethereum pub/sub RPC API (#435) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - Wire the Frontier `EthPubSub` module into the node's RPC layer, enabling WebSocket-based `eth_subscribe`/`eth_unsubscribe` support for `newHeads`, `logs`, and `newPendingTransactions` - Use Ethereum-style hex (`0x`-prefixed) subscription IDs via `EthereumSubIdProvider` for client compatibility - Add Moonwall test suites (adapted from Moonbeam) covering block header subscriptions, log filtering (by address, topics, wildcards, conditional parameters), and pending transaction notifications ## Changes ### `operator/node/src/rpc.rs` - Import and merge `EthPubSub` / `EthPubSubApiServer` into the RPC module - Accept `subscription_task_executor` and `pubsub_notification_sinks` parameters in `create_full()` - Remove stale commented-out boilerplate ### `operator/node/src/service.rs` - Clone `pubsub_notification_sinks` and forward it (along with `subscription_executor`) into the RPC factory closure - Set `config.rpc.id_provider` to `EthereumSubIdProvider` for Ethereum-compatible subscription IDs ### `test/moonwall/suites/dev/stagenet/subscription/` - `test-subscription.ts` — `newHeads`: subscription ID format, block header field validation - `test-subscription-logs.ts` — `logs`: basic log notification on contract deployment - `test-subscription-logs2.ts` — `logs`: filtering by single/multiple addresses, topics, wildcards, conditional and combined parameters (8 cases) - `test-subscription-pending.ts` — `newPendingTransactions`: pending tx hash notification Co-authored-by: Claude Opus 4.6 Co-authored-by: Ahmad Kaouk <56095276+ahmadkaouk@users.noreply.github.com> --- operator/node/src/rpc.rs | 38 ++-- operator/node/src/service.rs | 76 ++++--- .../subscription/test-subscription-logs.ts | 43 ++++ .../subscription/test-subscription-logs2.ts | 195 ++++++++++++++++++ .../subscription/test-subscription-pending.ts | 30 +++ .../subscription/test-subscription.ts | 61 ++++++ 6 files changed, 396 insertions(+), 47 deletions(-) create mode 100644 test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs.ts create mode 100644 test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs2.ts create mode 100644 test/moonwall/suites/dev/stagenet/subscription/test-subscription-pending.ts create mode 100644 test/moonwall/suites/dev/stagenet/subscription/test-subscription.ts diff --git a/operator/node/src/rpc.rs b/operator/node/src/rpc.rs index 81d59467..25380cdb 100644 --- a/operator/node/src/rpc.rs +++ b/operator/node/src/rpc.rs @@ -24,10 +24,13 @@ use crate::consensus::BabeConsensusDataProvider; use crate::eth::DefaultEthConfig; use datahaven_runtime_common::{time::SLOT_DURATION, Block, BlockNumber, Hash}; -use fc_rpc::TxPool; use fc_rpc::{Eth, EthBlockDataCacheTask, EthFilter, Net, Web3}; +use fc_rpc::{EthPubSub, TxPool}; use fc_rpc_core::types::{FeeHistoryCache, FilterPool}; -use fc_rpc_core::{EthApiServer, EthFilterApiServer, NetApiServer, TxPoolApiServer, Web3ApiServer}; +use fc_rpc_core::{ + EthApiServer, EthFilterApiServer, EthPubSubApiServer, NetApiServer, TxPoolApiServer, + Web3ApiServer, +}; use fc_storage::StorageOverride; use fp_rpc::EthereumRuntimeRPCApi; use jsonrpsee::RpcModule; @@ -111,6 +114,12 @@ where /// Instantiate all full RPC extensions. pub fn create_full( deps: FullDeps, + subscription_task_executor: sc_rpc::SubscriptionTaskExecutor, + pubsub_notification_sinks: Arc< + fc_mapping_sync::EthereumBlockNotificationSinks< + fc_mapping_sync::EthereumBlockNotification, + >, + >, ) -> Result, Box> where P: TransactionPool + 'static, @@ -263,6 +272,17 @@ where )?; module.merge(Web3::new(Arc::clone(&client)).into_rpc())?; + module.merge( + EthPubSub::new( + pool, + Arc::clone(&client), + sync.clone(), + subscription_task_executor, + overrides, + pubsub_notification_sinks.clone(), + ) + .into_rpc(), + )?; if let Some(command_sink) = command_sink { module.merge( @@ -275,19 +295,5 @@ where let tx_pool = TxPool::new(client.clone(), graph.clone()); module.merge(tx_pool.into_rpc())?; - // module.merge(FrontierFinality::new(client.clone(), frontier_backend.clone()).into_rpc())?; - - // Extend this RPC with a custom API by using the following syntax. - // `YourRpcStruct` should have a reference to a client, which is needed - // to call into the runtime. - // `module.merge(YourRpcTrait::into_rpc(YourRpcStruct::new(ReferenceToClient, ...)))?;` - - // You probably want to enable the `rpc v2 chainSpec` API as well - // - // let chain_name = chain_spec.name().to_string(); - // let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"); - // let properties = chain_spec.properties(); - // module.merge(ChainSpec::new(chain_name, genesis_hash, properties).into_rpc())?; - Ok(module) } diff --git a/operator/node/src/service.rs b/operator/node/src/service.rs index 11c6a247..710d6e81 100644 --- a/operator/node/src/service.rs +++ b/operator/node/src/service.rs @@ -439,7 +439,7 @@ pub async fn new_full_impl< RuntimeApi, N: sc_network::NetworkBackend::Hash>, >( - config: Configuration, + mut config: Configuration, mut eth_config: EthConfiguration, role_options: Option, indexer_options: Option, @@ -673,7 +673,7 @@ where }, storage_override, sync: sync_service.clone(), - pubsub_notification_sinks, + pubsub_notification_sinks: pubsub_notification_sinks.clone(), }, ) .await; @@ -693,38 +693,52 @@ where let fee_history_limit = eth_config.fee_history_limit; let sync = sync_service.clone(); - Box::new(move |subscription_executor| { - let deps = crate::rpc::FullDeps { - client: client.clone(), - pool: pool.clone(), - graph: pool.pool().clone(), - beefy: BeefyDeps:: { - beefy_finality_proof_stream: beefy_rpc_links.from_voter_justif_stream.clone(), - beefy_best_block_stream: beefy_rpc_links.from_voter_best_beefy_stream.clone(), + Box::new( + move |subscription_executor: sc_rpc::SubscriptionTaskExecutor| { + let deps = crate::rpc::FullDeps { + client: client.clone(), + pool: pool.clone(), + graph: pool.pool().clone(), + beefy: BeefyDeps:: { + beefy_finality_proof_stream: beefy_rpc_links + .from_voter_justif_stream + .clone(), + beefy_best_block_stream: beefy_rpc_links + .from_voter_best_beefy_stream + .clone(), + subscription_executor: subscription_executor.clone(), + }, + max_past_logs, + fee_history_limit, + fee_history_cache: fee_history_cache.clone(), + network: Arc::new(network.clone()), + sync: sync.clone(), + filter_pool: filter_pool.clone(), + block_data_cache: block_data_cache.clone(), + overrides: overrides.clone(), + is_authority: is_authority.clone(), + command_sink: command_sink.clone(), + backend: backend.clone(), + frontier_backend: match &*frontier_backend { + fc_db::Backend::KeyValue(b) => b.clone(), + fc_db::Backend::Sql(b) => b.clone(), + }, + forced_parent_hashes: None, + maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(), + }; + crate::rpc::create_full( + deps, subscription_executor, - }, - max_past_logs, - fee_history_limit, - fee_history_cache: fee_history_cache.clone(), - network: Arc::new(network.clone()), - sync: sync.clone(), - filter_pool: filter_pool.clone(), - block_data_cache: block_data_cache.clone(), - overrides: overrides.clone(), - is_authority: is_authority.clone(), - command_sink: command_sink.clone(), - backend: backend.clone(), - frontier_backend: match &*frontier_backend { - fc_db::Backend::KeyValue(b) => b.clone(), - fc_db::Backend::Sql(b) => b.clone(), - }, - forced_parent_hashes: None, - maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(), - }; - crate::rpc::create_full(deps).map_err(Into::into) - }) + pubsub_notification_sinks.clone(), + ) + .map_err(Into::into) + }, + ) }; + // Use Ethereum-style hex subscription IDs (0x-prefixed) instead of jsonrpsee defaults. + config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider)); + let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { network: Arc::new(network.clone()), client: client.clone(), diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs.ts new file mode 100644 index 00000000..1f33aaa1 --- /dev/null +++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs.ts @@ -0,0 +1,43 @@ +import { describeSuite, expect } from "@moonwall/cli"; + +describeSuite({ + id: "D023501", + title: "Subscription - Logs", + foundationMethods: "dev", + testCases: ({ context, it, log }) => { + it({ + id: "T01", + title: "should send a notification on new transaction", + test: async function () { + const logs: any[] = []; + const sub = await context.web3().eth.subscribe("logs"); + + await new Promise(async (resolve, reject) => { + sub.once("data", async (event) => { + logs.push(event); + resolve("success"); + }); + + sub.once("error", (error) => { + console.error(error); + reject(error); + }); + + await context.deployContract!("EventEmitter"); + }); + + const block = await context.viem().getBlock(); + + expect(logs[0]).to.include({ + blockHash: block.hash, + blockNumber: block.number, + data: "0x", + logIndex: 0n, + removed: false, + transactionHash: block.transactions[0], + transactionIndex: 0n, + }); + }, + }); + }, +}); diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs2.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs2.ts new file mode 100644 index 00000000..2b58d33b --- /dev/null +++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs2.ts @@ -0,0 +1,195 @@ +import { beforeAll, describeSuite, expect } from "@moonwall/cli"; +import { ALITH_CONTRACT_ADDRESSES } from "@moonwall/util"; +import type { Log } from "web3"; + +describeSuite({ + id: "D023502", + title: "Subscription - Logs", + foundationMethods: "dev", + testCases: ({ context, it, log }) => { + let deployedContract: `0x${string}`; + let deployHash: `0x${string}`; + + let subSingleAddPromise: Promise; + let subMultiAddPromise: Promise; + let subTopicPromise: Promise; + let subTopicWildcardPromise: Promise; + let subTopicListPromise: Promise; + let subTopicCondPromise: Promise; + let subTopicMultiCondPromise: Promise; + let subTopicWildAndCondPromise: Promise; + + beforeAll(async () => { + const openSub = async (filter?: object) => await context.web3().eth.subscribe("logs", filter); + + const onData = (logSub: any) => { + return new Promise((resolve) => { + logSub.once("data", resolve); + }); + }; + + const [ + singleSub, + multiSub, + subTopic, + subTopicWildcard, + subTopicList, + subTopicCond, + subTopicMultiCond, + subTopicWildAndCond, + ] = await Promise.all([ + openSub({ + address: ALITH_CONTRACT_ADDRESSES[0], + }), + openSub({ + address: [ + ALITH_CONTRACT_ADDRESSES[3], + ALITH_CONTRACT_ADDRESSES[2], + ALITH_CONTRACT_ADDRESSES[1], + ALITH_CONTRACT_ADDRESSES[0], + ], + }), + openSub({ + topics: ["0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d"], + }), + openSub({ + topics: [null, "0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"], + }), + openSub({ + topics: [ + ["0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d"], + ["0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"], + ], + }), + + openSub({ + topics: [ + "0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d", + ["0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"], + ], + }), + + openSub({ + topics: [ + "0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d", + [ + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac", + ], + ], + }), + openSub({ + topics: [ + null, + [ + "0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac", + "0x0000000000000000000000000000000000000000000000000000000000000000", + ], + null, + ], + }), + ]); + + subSingleAddPromise = onData(singleSub); + subMultiAddPromise = onData(multiSub); + subTopicPromise = onData(subTopic); + subTopicWildcardPromise = onData(subTopicWildcard); + subTopicListPromise = onData(subTopicList); + subTopicCondPromise = onData(subTopicCond); + subTopicMultiCondPromise = onData(subTopicMultiCond); + subTopicWildAndCondPromise = onData(subTopicWildAndCond); + + const { contractAddress, hash } = await context.deployContract!("EventEmitter"); + deployedContract = contractAddress; + deployHash = hash; + }); + + it({ + id: "T01", + title: "should be able to filter by address", + test: async function () { + const eventLog = await subSingleAddPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + + it({ + id: "T02", + title: "should be able to filter by multiple addresses", + test: async function () { + const eventLog = await subMultiAddPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + + it({ + id: "T03", + title: "should be able to filter by topic", + test: async function () { + const eventLog = await subTopicPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + + it({ + id: "T04", + title: "should be able to filter by topic wildcards", + test: async function () { + const eventLog = await subTopicWildcardPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + + it({ + id: "T05", + title: "should be able to filter by topic list", + test: async function () { + const eventLog = await subTopicListPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + + it({ + id: "T06", + title: "should be able to filter by topic conditional parameters", + test: async function () { + const eventLog = await subTopicCondPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + + it({ + id: "T07", + title: "should support multiple topic conditional parameters", + test: async function () { + const eventLog = await subTopicMultiCondPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + + it({ + id: "T08", + title: "should combine topic wildcards and conditional parameters", + test: async function () { + const eventLog = await subTopicWildAndCondPromise; + expect(eventLog.blockNumber).toBe(1n); + expect(eventLog.address).toBe(deployedContract.toLowerCase()); + expect(eventLog.transactionHash).toBe(deployHash); + }, + }); + }, +}); diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription-pending.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-pending.ts new file mode 100644 index 00000000..d053667b --- /dev/null +++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-pending.ts @@ -0,0 +1,30 @@ +import { describeSuite, expect } from "@moonwall/cli"; +import { BALTATHAR_ADDRESS, GLMR, createRawTransfer, sendRawTransaction } from "@moonwall/util"; +import { setTimeout } from "node:timers/promises"; + +describeSuite({ + id: "D023504", + title: "Subscription - Pending transactions", + foundationMethods: "dev", + testCases: ({ context, it, log }) => { + it({ + id: "T01", + title: "should return a valid subscriptionId", + test: async function () { + let response: any; + const sub = await context.web3().eth.subscribe("newPendingTransactions"); + + sub.once("data", (data) => { + response = data; + }); + + const rawTx = await createRawTransfer(context, BALTATHAR_ADDRESS, GLMR); + const hash = await sendRawTransaction(context, rawTx); + await setTimeout(200); + + expect(response).not.toBeUndefined(); + expect(response).toBe(hash); + }, + }); + }, +}); diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription.ts new file mode 100644 index 00000000..ae853cfb --- /dev/null +++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription.ts @@ -0,0 +1,61 @@ +import { beforeAll, describeSuite, expect } from "@moonwall/cli"; +import { ALITH_ADDRESS, BALTATHAR_ADDRESS, createRawTransfer } from "@moonwall/util"; +import { type PublicClient, createPublicClient, webSocket } from "viem"; + +describeSuite({ + id: "D023505", + title: "Subscription - Block headers", + foundationMethods: "dev", + testCases: ({ context, it, log }) => { + let client: PublicClient; + + beforeAll(async () => { + const transport = webSocket(context.viem().transport.url.replace("http", "ws")); + client = createPublicClient({ + transport, + }); + }); + + it({ + id: "T01", + title: "should return a valid subscriptionId", + test: async function () { + const result = (await client.transport.request({ + method: "eth_subscribe", + params: ["newHeads"], + })) as any; + + expect(result.length).toBe(34); + }, + }); + + it({ + id: "T02", + title: "should send notification on new block", + test: async function () { + const blocks: any[] = []; + const unwatch = client.watchBlocks({ + onBlock: (block) => blocks.push(block), + }); + + await context.createBlock(createRawTransfer(context, BALTATHAR_ADDRESS, 0)); + unwatch(); + + const block = await context.viem().getBlock(); + + expect(blocks[0]).to.include({ + author: ALITH_ADDRESS.toLowerCase(), + difficulty: 0n, + extraData: "0x", + logsBloom: `0x${"0".repeat(512)}`, + miner: ALITH_ADDRESS.toLowerCase(), + sha3Uncles: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + }); + expect(blocks[0].nonce).to.be.eq("0x0000000000000000"); + // Verify subscription roots match the block fetched via RPC + expect(blocks[0].receiptsRoot).toBe(block.receiptsRoot); + expect(blocks[0].transactionsRoot).toBe(block.transactionsRoot); + }, + }); + }, +});