feat(node): add Ethereum pub/sub RPC API (#435)

## Summary

- Wire the Frontier `EthPubSub` module into the node's RPC layer,
enabling WebSocket-based `eth_subscribe`/`eth_unsubscribe` support for
`newHeads`, `logs`, and `newPendingTransactions`
- Use Ethereum-style hex (`0x`-prefixed) subscription IDs via
`EthereumSubIdProvider` for client compatibility
- Add Moonwall test suites (adapted from Moonbeam) covering block header
subscriptions, log filtering (by address, topics, wildcards, conditional
parameters), and pending transaction notifications

## Changes

### `operator/node/src/rpc.rs`
- Import and merge `EthPubSub` / `EthPubSubApiServer` into the RPC
module
- Accept `subscription_task_executor` and `pubsub_notification_sinks`
parameters in `create_full()`
- Remove stale commented-out boilerplate

### `operator/node/src/service.rs`
- Clone `pubsub_notification_sinks` and forward it (along with
`subscription_executor`) into the RPC factory closure
- Set `config.rpc.id_provider` to `EthereumSubIdProvider` for
Ethereum-compatible subscription IDs

### `test/moonwall/suites/dev/stagenet/subscription/`
- `test-subscription.ts` — `newHeads`: subscription ID format, block
header field validation
- `test-subscription-logs.ts` — `logs`: basic log notification on
contract deployment
- `test-subscription-logs2.ts` — `logs`: filtering by single/multiple
addresses, topics, wildcards, conditional and combined parameters (8
cases)
- `test-subscription-pending.ts` — `newPendingTransactions`: pending tx
hash notification

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Ahmad Kaouk <56095276+ahmadkaouk@users.noreply.github.com>
This commit is contained in:
Steve Degosserie 2026-02-10 10:12:16 +01:00 committed by GitHub
parent 3ae7d2517e
commit ae5deb41e0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 396 additions and 47 deletions

View file

@ -24,10 +24,13 @@
use crate::consensus::BabeConsensusDataProvider;
use crate::eth::DefaultEthConfig;
use datahaven_runtime_common::{time::SLOT_DURATION, Block, BlockNumber, Hash};
use fc_rpc::TxPool;
use fc_rpc::{Eth, EthBlockDataCacheTask, EthFilter, Net, Web3};
use fc_rpc::{EthPubSub, TxPool};
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
use fc_rpc_core::{EthApiServer, EthFilterApiServer, NetApiServer, TxPoolApiServer, Web3ApiServer};
use fc_rpc_core::{
EthApiServer, EthFilterApiServer, EthPubSubApiServer, NetApiServer, TxPoolApiServer,
Web3ApiServer,
};
use fc_storage::StorageOverride;
use fp_rpc::EthereumRuntimeRPCApi;
use jsonrpsee::RpcModule;
@ -111,6 +114,12 @@ where
/// Instantiate all full RPC extensions.
pub fn create_full<P, BE, AuthorityId, A, FL, FSH, Runtime>(
deps: FullDeps<P, BE, AuthorityId, A, FL, FSH, Runtime>,
subscription_task_executor: sc_rpc::SubscriptionTaskExecutor,
pubsub_notification_sinks: Arc<
fc_mapping_sync::EthereumBlockNotificationSinks<
fc_mapping_sync::EthereumBlockNotification<Block>,
>,
>,
) -> Result<RpcModule<()>, Box<dyn std::error::Error + Send + Sync>>
where
P: TransactionPool<Block = Block> + 'static,
@ -263,6 +272,17 @@ where
)?;
module.merge(Web3::new(Arc::clone(&client)).into_rpc())?;
module.merge(
EthPubSub::new(
pool,
Arc::clone(&client),
sync.clone(),
subscription_task_executor,
overrides,
pubsub_notification_sinks.clone(),
)
.into_rpc(),
)?;
if let Some(command_sink) = command_sink {
module.merge(
@ -275,19 +295,5 @@ where
let tx_pool = TxPool::new(client.clone(), graph.clone());
module.merge(tx_pool.into_rpc())?;
// module.merge(FrontierFinality::new(client.clone(), frontier_backend.clone()).into_rpc())?;
// Extend this RPC with a custom API by using the following syntax.
// `YourRpcStruct` should have a reference to a client, which is needed
// to call into the runtime.
// `module.merge(YourRpcTrait::into_rpc(YourRpcStruct::new(ReferenceToClient, ...)))?;`
// You probably want to enable the `rpc v2 chainSpec` API as well
//
// let chain_name = chain_spec.name().to_string();
// let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
// let properties = chain_spec.properties();
// module.merge(ChainSpec::new(chain_name, genesis_hash, properties).into_rpc())?;
Ok(module)
}

View file

@ -439,7 +439,7 @@ pub async fn new_full_impl<
RuntimeApi,
N: sc_network::NetworkBackend<Block, <Block as sp_runtime::traits::Block>::Hash>,
>(
config: Configuration,
mut config: Configuration,
mut eth_config: EthConfiguration,
role_options: Option<RoleOptions>,
indexer_options: Option<IndexerOptions>,
@ -673,7 +673,7 @@ where
},
storage_override,
sync: sync_service.clone(),
pubsub_notification_sinks,
pubsub_notification_sinks: pubsub_notification_sinks.clone(),
},
)
.await;
@ -693,38 +693,52 @@ where
let fee_history_limit = eth_config.fee_history_limit;
let sync = sync_service.clone();
Box::new(move |subscription_executor| {
let deps = crate::rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
graph: pool.pool().clone(),
beefy: BeefyDeps::<BeefyId> {
beefy_finality_proof_stream: beefy_rpc_links.from_voter_justif_stream.clone(),
beefy_best_block_stream: beefy_rpc_links.from_voter_best_beefy_stream.clone(),
Box::new(
move |subscription_executor: sc_rpc::SubscriptionTaskExecutor| {
let deps = crate::rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
graph: pool.pool().clone(),
beefy: BeefyDeps::<BeefyId> {
beefy_finality_proof_stream: beefy_rpc_links
.from_voter_justif_stream
.clone(),
beefy_best_block_stream: beefy_rpc_links
.from_voter_best_beefy_stream
.clone(),
subscription_executor: subscription_executor.clone(),
},
max_past_logs,
fee_history_limit,
fee_history_cache: fee_history_cache.clone(),
network: Arc::new(network.clone()),
sync: sync.clone(),
filter_pool: filter_pool.clone(),
block_data_cache: block_data_cache.clone(),
overrides: overrides.clone(),
is_authority: is_authority.clone(),
command_sink: command_sink.clone(),
backend: backend.clone(),
frontier_backend: match &*frontier_backend {
fc_db::Backend::KeyValue(b) => b.clone(),
fc_db::Backend::Sql(b) => b.clone(),
},
forced_parent_hashes: None,
maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(),
};
crate::rpc::create_full(
deps,
subscription_executor,
},
max_past_logs,
fee_history_limit,
fee_history_cache: fee_history_cache.clone(),
network: Arc::new(network.clone()),
sync: sync.clone(),
filter_pool: filter_pool.clone(),
block_data_cache: block_data_cache.clone(),
overrides: overrides.clone(),
is_authority: is_authority.clone(),
command_sink: command_sink.clone(),
backend: backend.clone(),
frontier_backend: match &*frontier_backend {
fc_db::Backend::KeyValue(b) => b.clone(),
fc_db::Backend::Sql(b) => b.clone(),
},
forced_parent_hashes: None,
maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(),
};
crate::rpc::create_full(deps).map_err(Into::into)
})
pubsub_notification_sinks.clone(),
)
.map_err(Into::into)
},
)
};
// Use Ethereum-style hex subscription IDs (0x-prefixed) instead of jsonrpsee defaults.
config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network: Arc::new(network.clone()),
client: client.clone(),

View file

@ -0,0 +1,43 @@
import { describeSuite, expect } from "@moonwall/cli";
describeSuite({
id: "D023501",
title: "Subscription - Logs",
foundationMethods: "dev",
testCases: ({ context, it, log }) => {
it({
id: "T01",
title: "should send a notification on new transaction",
test: async function () {
const logs: any[] = [];
const sub = await context.web3().eth.subscribe("logs");
await new Promise(async (resolve, reject) => {
sub.once("data", async (event) => {
logs.push(event);
resolve("success");
});
sub.once("error", (error) => {
console.error(error);
reject(error);
});
await context.deployContract!("EventEmitter");
});
const block = await context.viem().getBlock();
expect(logs[0]).to.include({
blockHash: block.hash,
blockNumber: block.number,
data: "0x",
logIndex: 0n,
removed: false,
transactionHash: block.transactions[0],
transactionIndex: 0n,
});
},
});
},
});

View file

@ -0,0 +1,195 @@
import { beforeAll, describeSuite, expect } from "@moonwall/cli";
import { ALITH_CONTRACT_ADDRESSES } from "@moonwall/util";
import type { Log } from "web3";
describeSuite({
id: "D023502",
title: "Subscription - Logs",
foundationMethods: "dev",
testCases: ({ context, it, log }) => {
let deployedContract: `0x${string}`;
let deployHash: `0x${string}`;
let subSingleAddPromise: Promise<Log>;
let subMultiAddPromise: Promise<Log>;
let subTopicPromise: Promise<Log>;
let subTopicWildcardPromise: Promise<Log>;
let subTopicListPromise: Promise<Log>;
let subTopicCondPromise: Promise<Log>;
let subTopicMultiCondPromise: Promise<Log>;
let subTopicWildAndCondPromise: Promise<Log>;
beforeAll(async () => {
const openSub = async (filter?: object) => await context.web3().eth.subscribe("logs", filter);
const onData = (logSub: any) => {
return new Promise<Log>((resolve) => {
logSub.once("data", resolve);
});
};
const [
singleSub,
multiSub,
subTopic,
subTopicWildcard,
subTopicList,
subTopicCond,
subTopicMultiCond,
subTopicWildAndCond,
] = await Promise.all([
openSub({
address: ALITH_CONTRACT_ADDRESSES[0],
}),
openSub({
address: [
ALITH_CONTRACT_ADDRESSES[3],
ALITH_CONTRACT_ADDRESSES[2],
ALITH_CONTRACT_ADDRESSES[1],
ALITH_CONTRACT_ADDRESSES[0],
],
}),
openSub({
topics: ["0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d"],
}),
openSub({
topics: [null, "0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"],
}),
openSub({
topics: [
["0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d"],
["0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"],
],
}),
openSub({
topics: [
"0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d",
["0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"],
],
}),
openSub({
topics: [
"0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d",
[
"0x0000000000000000000000000000000000000000000000000000000000000000",
"0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac",
],
],
}),
openSub({
topics: [
null,
[
"0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac",
"0x0000000000000000000000000000000000000000000000000000000000000000",
],
null,
],
}),
]);
subSingleAddPromise = onData(singleSub);
subMultiAddPromise = onData(multiSub);
subTopicPromise = onData(subTopic);
subTopicWildcardPromise = onData(subTopicWildcard);
subTopicListPromise = onData(subTopicList);
subTopicCondPromise = onData(subTopicCond);
subTopicMultiCondPromise = onData(subTopicMultiCond);
subTopicWildAndCondPromise = onData(subTopicWildAndCond);
const { contractAddress, hash } = await context.deployContract!("EventEmitter");
deployedContract = contractAddress;
deployHash = hash;
});
it({
id: "T01",
title: "should be able to filter by address",
test: async function () {
const eventLog = await subSingleAddPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
it({
id: "T02",
title: "should be able to filter by multiple addresses",
test: async function () {
const eventLog = await subMultiAddPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
it({
id: "T03",
title: "should be able to filter by topic",
test: async function () {
const eventLog = await subTopicPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
it({
id: "T04",
title: "should be able to filter by topic wildcards",
test: async function () {
const eventLog = await subTopicWildcardPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
it({
id: "T05",
title: "should be able to filter by topic list",
test: async function () {
const eventLog = await subTopicListPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
it({
id: "T06",
title: "should be able to filter by topic conditional parameters",
test: async function () {
const eventLog = await subTopicCondPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
it({
id: "T07",
title: "should support multiple topic conditional parameters",
test: async function () {
const eventLog = await subTopicMultiCondPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
it({
id: "T08",
title: "should combine topic wildcards and conditional parameters",
test: async function () {
const eventLog = await subTopicWildAndCondPromise;
expect(eventLog.blockNumber).toBe(1n);
expect(eventLog.address).toBe(deployedContract.toLowerCase());
expect(eventLog.transactionHash).toBe(deployHash);
},
});
},
});

View file

@ -0,0 +1,30 @@
import { describeSuite, expect } from "@moonwall/cli";
import { BALTATHAR_ADDRESS, GLMR, createRawTransfer, sendRawTransaction } from "@moonwall/util";
import { setTimeout } from "node:timers/promises";
describeSuite({
id: "D023504",
title: "Subscription - Pending transactions",
foundationMethods: "dev",
testCases: ({ context, it, log }) => {
it({
id: "T01",
title: "should return a valid subscriptionId",
test: async function () {
let response: any;
const sub = await context.web3().eth.subscribe("newPendingTransactions");
sub.once("data", (data) => {
response = data;
});
const rawTx = await createRawTransfer(context, BALTATHAR_ADDRESS, GLMR);
const hash = await sendRawTransaction(context, rawTx);
await setTimeout(200);
expect(response).not.toBeUndefined();
expect(response).toBe(hash);
},
});
},
});

View file

@ -0,0 +1,61 @@
import { beforeAll, describeSuite, expect } from "@moonwall/cli";
import { ALITH_ADDRESS, BALTATHAR_ADDRESS, createRawTransfer } from "@moonwall/util";
import { type PublicClient, createPublicClient, webSocket } from "viem";
describeSuite({
id: "D023505",
title: "Subscription - Block headers",
foundationMethods: "dev",
testCases: ({ context, it, log }) => {
let client: PublicClient;
beforeAll(async () => {
const transport = webSocket(context.viem().transport.url.replace("http", "ws"));
client = createPublicClient({
transport,
});
});
it({
id: "T01",
title: "should return a valid subscriptionId",
test: async function () {
const result = (await client.transport.request({
method: "eth_subscribe",
params: ["newHeads"],
})) as any;
expect(result.length).toBe(34);
},
});
it({
id: "T02",
title: "should send notification on new block",
test: async function () {
const blocks: any[] = [];
const unwatch = client.watchBlocks({
onBlock: (block) => blocks.push(block),
});
await context.createBlock(createRawTransfer(context, BALTATHAR_ADDRESS, 0));
unwatch();
const block = await context.viem().getBlock();
expect(blocks[0]).to.include({
author: ALITH_ADDRESS.toLowerCase(),
difficulty: 0n,
extraData: "0x",
logsBloom: `0x${"0".repeat(512)}`,
miner: ALITH_ADDRESS.toLowerCase(),
sha3Uncles: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
});
expect(blocks[0].nonce).to.be.eq("0x0000000000000000");
// Verify subscription roots match the block fetched via RPC
expect(blocks[0].receiptsRoot).toBe(block.receiptsRoot);
expect(blocks[0].transactionsRoot).toBe(block.transactionsRoot);
},
});
},
});