diff --git a/operator/node/src/rpc.rs b/operator/node/src/rpc.rs
index 81d59467..25380cdb 100644
--- a/operator/node/src/rpc.rs
+++ b/operator/node/src/rpc.rs
@@ -24,10 +24,13 @@
use crate::consensus::BabeConsensusDataProvider;
use crate::eth::DefaultEthConfig;
use datahaven_runtime_common::{time::SLOT_DURATION, Block, BlockNumber, Hash};
-use fc_rpc::TxPool;
use fc_rpc::{Eth, EthBlockDataCacheTask, EthFilter, Net, Web3};
+use fc_rpc::{EthPubSub, TxPool};
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
-use fc_rpc_core::{EthApiServer, EthFilterApiServer, NetApiServer, TxPoolApiServer, Web3ApiServer};
+use fc_rpc_core::{
+ EthApiServer, EthFilterApiServer, EthPubSubApiServer, NetApiServer, TxPoolApiServer,
+ Web3ApiServer,
+};
use fc_storage::StorageOverride;
use fp_rpc::EthereumRuntimeRPCApi;
use jsonrpsee::RpcModule;
@@ -111,6 +114,12 @@ where
/// Instantiate all full RPC extensions.
pub fn create_full
(
deps: FullDeps
,
+ subscription_task_executor: sc_rpc::SubscriptionTaskExecutor,
+ pubsub_notification_sinks: Arc<
+ fc_mapping_sync::EthereumBlockNotificationSinks<
+ fc_mapping_sync::EthereumBlockNotification,
+ >,
+ >,
) -> Result, Box>
where
P: TransactionPool + 'static,
@@ -263,6 +272,17 @@ where
)?;
module.merge(Web3::new(Arc::clone(&client)).into_rpc())?;
+ module.merge(
+ EthPubSub::new(
+ pool,
+ Arc::clone(&client),
+ sync.clone(),
+ subscription_task_executor,
+ overrides,
+ pubsub_notification_sinks.clone(),
+ )
+ .into_rpc(),
+ )?;
if let Some(command_sink) = command_sink {
module.merge(
@@ -275,19 +295,5 @@ where
let tx_pool = TxPool::new(client.clone(), graph.clone());
module.merge(tx_pool.into_rpc())?;
- // module.merge(FrontierFinality::new(client.clone(), frontier_backend.clone()).into_rpc())?;
-
- // Extend this RPC with a custom API by using the following syntax.
- // `YourRpcStruct` should have a reference to a client, which is needed
- // to call into the runtime.
- // `module.merge(YourRpcTrait::into_rpc(YourRpcStruct::new(ReferenceToClient, ...)))?;`
-
- // You probably want to enable the `rpc v2 chainSpec` API as well
- //
- // let chain_name = chain_spec.name().to_string();
- // let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
- // let properties = chain_spec.properties();
- // module.merge(ChainSpec::new(chain_name, genesis_hash, properties).into_rpc())?;
-
Ok(module)
}
diff --git a/operator/node/src/service.rs b/operator/node/src/service.rs
index 11c6a247..710d6e81 100644
--- a/operator/node/src/service.rs
+++ b/operator/node/src/service.rs
@@ -439,7 +439,7 @@ pub async fn new_full_impl<
RuntimeApi,
N: sc_network::NetworkBackend::Hash>,
>(
- config: Configuration,
+ mut config: Configuration,
mut eth_config: EthConfiguration,
role_options: Option,
indexer_options: Option,
@@ -673,7 +673,7 @@ where
},
storage_override,
sync: sync_service.clone(),
- pubsub_notification_sinks,
+ pubsub_notification_sinks: pubsub_notification_sinks.clone(),
},
)
.await;
@@ -693,38 +693,52 @@ where
let fee_history_limit = eth_config.fee_history_limit;
let sync = sync_service.clone();
- Box::new(move |subscription_executor| {
- let deps = crate::rpc::FullDeps {
- client: client.clone(),
- pool: pool.clone(),
- graph: pool.pool().clone(),
- beefy: BeefyDeps:: {
- beefy_finality_proof_stream: beefy_rpc_links.from_voter_justif_stream.clone(),
- beefy_best_block_stream: beefy_rpc_links.from_voter_best_beefy_stream.clone(),
+ Box::new(
+ move |subscription_executor: sc_rpc::SubscriptionTaskExecutor| {
+ let deps = crate::rpc::FullDeps {
+ client: client.clone(),
+ pool: pool.clone(),
+ graph: pool.pool().clone(),
+ beefy: BeefyDeps:: {
+ beefy_finality_proof_stream: beefy_rpc_links
+ .from_voter_justif_stream
+ .clone(),
+ beefy_best_block_stream: beefy_rpc_links
+ .from_voter_best_beefy_stream
+ .clone(),
+ subscription_executor: subscription_executor.clone(),
+ },
+ max_past_logs,
+ fee_history_limit,
+ fee_history_cache: fee_history_cache.clone(),
+ network: Arc::new(network.clone()),
+ sync: sync.clone(),
+ filter_pool: filter_pool.clone(),
+ block_data_cache: block_data_cache.clone(),
+ overrides: overrides.clone(),
+ is_authority: is_authority.clone(),
+ command_sink: command_sink.clone(),
+ backend: backend.clone(),
+ frontier_backend: match &*frontier_backend {
+ fc_db::Backend::KeyValue(b) => b.clone(),
+ fc_db::Backend::Sql(b) => b.clone(),
+ },
+ forced_parent_hashes: None,
+ maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(),
+ };
+ crate::rpc::create_full(
+ deps,
subscription_executor,
- },
- max_past_logs,
- fee_history_limit,
- fee_history_cache: fee_history_cache.clone(),
- network: Arc::new(network.clone()),
- sync: sync.clone(),
- filter_pool: filter_pool.clone(),
- block_data_cache: block_data_cache.clone(),
- overrides: overrides.clone(),
- is_authority: is_authority.clone(),
- command_sink: command_sink.clone(),
- backend: backend.clone(),
- frontier_backend: match &*frontier_backend {
- fc_db::Backend::KeyValue(b) => b.clone(),
- fc_db::Backend::Sql(b) => b.clone(),
- },
- forced_parent_hashes: None,
- maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(),
- };
- crate::rpc::create_full(deps).map_err(Into::into)
- })
+ pubsub_notification_sinks.clone(),
+ )
+ .map_err(Into::into)
+ },
+ )
};
+ // Use Ethereum-style hex subscription IDs (0x-prefixed) instead of jsonrpsee defaults.
+ config.rpc.id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
+
let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network: Arc::new(network.clone()),
client: client.clone(),
diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs.ts
new file mode 100644
index 00000000..1f33aaa1
--- /dev/null
+++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs.ts
@@ -0,0 +1,43 @@
+import { describeSuite, expect } from "@moonwall/cli";
+
+describeSuite({
+ id: "D023501",
+ title: "Subscription - Logs",
+ foundationMethods: "dev",
+ testCases: ({ context, it, log }) => {
+ it({
+ id: "T01",
+ title: "should send a notification on new transaction",
+ test: async function () {
+ const logs: any[] = [];
+ const sub = await context.web3().eth.subscribe("logs");
+
+ await new Promise(async (resolve, reject) => {
+ sub.once("data", async (event) => {
+ logs.push(event);
+ resolve("success");
+ });
+
+ sub.once("error", (error) => {
+ console.error(error);
+ reject(error);
+ });
+
+ await context.deployContract!("EventEmitter");
+ });
+
+ const block = await context.viem().getBlock();
+
+ expect(logs[0]).to.include({
+ blockHash: block.hash,
+ blockNumber: block.number,
+ data: "0x",
+ logIndex: 0n,
+ removed: false,
+ transactionHash: block.transactions[0],
+ transactionIndex: 0n,
+ });
+ },
+ });
+ },
+});
diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs2.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs2.ts
new file mode 100644
index 00000000..2b58d33b
--- /dev/null
+++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-logs2.ts
@@ -0,0 +1,195 @@
+import { beforeAll, describeSuite, expect } from "@moonwall/cli";
+import { ALITH_CONTRACT_ADDRESSES } from "@moonwall/util";
+import type { Log } from "web3";
+
+describeSuite({
+ id: "D023502",
+ title: "Subscription - Logs",
+ foundationMethods: "dev",
+ testCases: ({ context, it, log }) => {
+ let deployedContract: `0x${string}`;
+ let deployHash: `0x${string}`;
+
+ let subSingleAddPromise: Promise;
+ let subMultiAddPromise: Promise;
+ let subTopicPromise: Promise;
+ let subTopicWildcardPromise: Promise;
+ let subTopicListPromise: Promise;
+ let subTopicCondPromise: Promise;
+ let subTopicMultiCondPromise: Promise;
+ let subTopicWildAndCondPromise: Promise;
+
+ beforeAll(async () => {
+ const openSub = async (filter?: object) => await context.web3().eth.subscribe("logs", filter);
+
+ const onData = (logSub: any) => {
+ return new Promise((resolve) => {
+ logSub.once("data", resolve);
+ });
+ };
+
+ const [
+ singleSub,
+ multiSub,
+ subTopic,
+ subTopicWildcard,
+ subTopicList,
+ subTopicCond,
+ subTopicMultiCond,
+ subTopicWildAndCond,
+ ] = await Promise.all([
+ openSub({
+ address: ALITH_CONTRACT_ADDRESSES[0],
+ }),
+ openSub({
+ address: [
+ ALITH_CONTRACT_ADDRESSES[3],
+ ALITH_CONTRACT_ADDRESSES[2],
+ ALITH_CONTRACT_ADDRESSES[1],
+ ALITH_CONTRACT_ADDRESSES[0],
+ ],
+ }),
+ openSub({
+ topics: ["0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d"],
+ }),
+ openSub({
+ topics: [null, "0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"],
+ }),
+ openSub({
+ topics: [
+ ["0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d"],
+ ["0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"],
+ ],
+ }),
+
+ openSub({
+ topics: [
+ "0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d",
+ ["0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac"],
+ ],
+ }),
+
+ openSub({
+ topics: [
+ "0x0040d54d5e5b097202376b55bcbaaedd2ee468ce4496f1d30030c4e5308bf94d",
+ [
+ "0x0000000000000000000000000000000000000000000000000000000000000000",
+ "0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac",
+ ],
+ ],
+ }),
+ openSub({
+ topics: [
+ null,
+ [
+ "0x000000000000000000000000f24ff3a9cf04c71dbc94d0b566f7a27b94566cac",
+ "0x0000000000000000000000000000000000000000000000000000000000000000",
+ ],
+ null,
+ ],
+ }),
+ ]);
+
+ subSingleAddPromise = onData(singleSub);
+ subMultiAddPromise = onData(multiSub);
+ subTopicPromise = onData(subTopic);
+ subTopicWildcardPromise = onData(subTopicWildcard);
+ subTopicListPromise = onData(subTopicList);
+ subTopicCondPromise = onData(subTopicCond);
+ subTopicMultiCondPromise = onData(subTopicMultiCond);
+ subTopicWildAndCondPromise = onData(subTopicWildAndCond);
+
+ const { contractAddress, hash } = await context.deployContract!("EventEmitter");
+ deployedContract = contractAddress;
+ deployHash = hash;
+ });
+
+ it({
+ id: "T01",
+ title: "should be able to filter by address",
+ test: async function () {
+ const eventLog = await subSingleAddPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+
+ it({
+ id: "T02",
+ title: "should be able to filter by multiple addresses",
+ test: async function () {
+ const eventLog = await subMultiAddPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+
+ it({
+ id: "T03",
+ title: "should be able to filter by topic",
+ test: async function () {
+ const eventLog = await subTopicPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+
+ it({
+ id: "T04",
+ title: "should be able to filter by topic wildcards",
+ test: async function () {
+ const eventLog = await subTopicWildcardPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+
+ it({
+ id: "T05",
+ title: "should be able to filter by topic list",
+ test: async function () {
+ const eventLog = await subTopicListPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+
+ it({
+ id: "T06",
+ title: "should be able to filter by topic conditional parameters",
+ test: async function () {
+ const eventLog = await subTopicCondPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+
+ it({
+ id: "T07",
+ title: "should support multiple topic conditional parameters",
+ test: async function () {
+ const eventLog = await subTopicMultiCondPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+
+ it({
+ id: "T08",
+ title: "should combine topic wildcards and conditional parameters",
+ test: async function () {
+ const eventLog = await subTopicWildAndCondPromise;
+ expect(eventLog.blockNumber).toBe(1n);
+ expect(eventLog.address).toBe(deployedContract.toLowerCase());
+ expect(eventLog.transactionHash).toBe(deployHash);
+ },
+ });
+ },
+});
diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription-pending.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-pending.ts
new file mode 100644
index 00000000..d053667b
--- /dev/null
+++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription-pending.ts
@@ -0,0 +1,30 @@
+import { describeSuite, expect } from "@moonwall/cli";
+import { BALTATHAR_ADDRESS, GLMR, createRawTransfer, sendRawTransaction } from "@moonwall/util";
+import { setTimeout } from "node:timers/promises";
+
+describeSuite({
+ id: "D023504",
+ title: "Subscription - Pending transactions",
+ foundationMethods: "dev",
+ testCases: ({ context, it, log }) => {
+ it({
+ id: "T01",
+ title: "should return a valid subscriptionId",
+ test: async function () {
+ let response: any;
+ const sub = await context.web3().eth.subscribe("newPendingTransactions");
+
+ sub.once("data", (data) => {
+ response = data;
+ });
+
+ const rawTx = await createRawTransfer(context, BALTATHAR_ADDRESS, GLMR);
+ const hash = await sendRawTransaction(context, rawTx);
+ await setTimeout(200);
+
+ expect(response).not.toBeUndefined();
+ expect(response).toBe(hash);
+ },
+ });
+ },
+});
diff --git a/test/moonwall/suites/dev/stagenet/subscription/test-subscription.ts b/test/moonwall/suites/dev/stagenet/subscription/test-subscription.ts
new file mode 100644
index 00000000..ae853cfb
--- /dev/null
+++ b/test/moonwall/suites/dev/stagenet/subscription/test-subscription.ts
@@ -0,0 +1,61 @@
+import { beforeAll, describeSuite, expect } from "@moonwall/cli";
+import { ALITH_ADDRESS, BALTATHAR_ADDRESS, createRawTransfer } from "@moonwall/util";
+import { type PublicClient, createPublicClient, webSocket } from "viem";
+
+describeSuite({
+ id: "D023505",
+ title: "Subscription - Block headers",
+ foundationMethods: "dev",
+ testCases: ({ context, it, log }) => {
+ let client: PublicClient;
+
+ beforeAll(async () => {
+ const transport = webSocket(context.viem().transport.url.replace("http", "ws"));
+ client = createPublicClient({
+ transport,
+ });
+ });
+
+ it({
+ id: "T01",
+ title: "should return a valid subscriptionId",
+ test: async function () {
+ const result = (await client.transport.request({
+ method: "eth_subscribe",
+ params: ["newHeads"],
+ })) as any;
+
+ expect(result.length).toBe(34);
+ },
+ });
+
+ it({
+ id: "T02",
+ title: "should send notification on new block",
+ test: async function () {
+ const blocks: any[] = [];
+ const unwatch = client.watchBlocks({
+ onBlock: (block) => blocks.push(block),
+ });
+
+ await context.createBlock(createRawTransfer(context, BALTATHAR_ADDRESS, 0));
+ unwatch();
+
+ const block = await context.viem().getBlock();
+
+ expect(blocks[0]).to.include({
+ author: ALITH_ADDRESS.toLowerCase(),
+ difficulty: 0n,
+ extraData: "0x",
+ logsBloom: `0x${"0".repeat(512)}`,
+ miner: ALITH_ADDRESS.toLowerCase(),
+ sha3Uncles: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
+ });
+ expect(blocks[0].nonce).to.be.eq("0x0000000000000000");
+ // Verify subscription roots match the block fetched via RPC
+ expect(blocks[0].receiptsRoot).toBe(block.receiptsRoot);
+ expect(blocks[0].transactionsRoot).toBe(block.transactionsRoot);
+ },
+ });
+ },
+});