// Copyright 2025 DataHaven // This file is part of DataHaven. // DataHaven is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // DataHaven is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with DataHaven. If not, see . //! Service and ServiceFactory implementation. Specialized wrapper over substrate service. use crate::cli::{ProviderType, Sealing, StorageLayer}; use crate::command::{ProviderOptions, RoleOptions}; use crate::eth::{ new_frontier_partial, spawn_frontier_tasks, BackendType, FrontierBackend, FrontierPartialComponents, FrontierTasksParams, }; use crate::eth::{EthConfiguration, StorageOverrideHandler}; use crate::rpc::BeefyDeps; use async_channel::Receiver; use datahaven_runtime_common::{AccountId, Balance, Block, BlockNumber, Hash, Nonce}; use fc_consensus::FrontierBlockImport; use fc_db::DatabaseSource; use fc_storage::StorageOverride; use futures::channel::mpsc; use futures::FutureExt; use log::info; use sc_client_api::{AuxStore, Backend, BlockBackend, StateBackend, StorageProvider}; use sc_consensus_babe::ImportQueueParams; use sc_consensus_grandpa::SharedVoterState; use sc_consensus_manual_seal::consensus::babe::BabeConsensusDataProvider; use sc_consensus_manual_seal::rpc::EngineCommand; use sc_consensus_manual_seal::{self, InstantSealParams, ManualSealParams}; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sc_network::request_responses::IncomingRequest; use sc_network::service::traits::NetworkService; use sc_network::ProtocolName; use sc_service::RpcHandlers; use sc_service::{ error::Error as ServiceError, ChainType, Configuration, TaskManager, WarpSyncConfig, }; use sc_telemetry::{Telemetry, TelemetryWorker}; use sc_transaction_pool::BasicPool; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use shc_actors_framework::actor::TaskSpawner; use shc_blockchain_service::capacity_manager::CapacityConfig; use shc_client::types::FishermanRole; use shc_client::{ builder::{ Buildable, FishermanOptions, IndexerOptions, StorageHubBuilder, StorageLayerBuilder, }, handler::{RunnableTasks, StorageHubHandler}, types::{ BspProvider, InMemoryStorageLayer, MspProvider, NoStorageLayer, RocksDbStorageLayer, ShNodeType, ShRole, ShStorageLayer, UserRole, }, }; use shc_common::traits::StorageEnableRuntime; use shc_common::types::StorageHubClient; use shc_file_transfer_service::fetch_genesis_hash; use shc_indexer_db::DbPool; use shc_indexer_service::spawn_indexer_service; use shc_rpc::{RpcConfig, StorageHubClientRpcConfig}; use sp_api::ProvideRuntimeApi; use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_consensus_beefy::ecdsa_crypto::AuthorityId as BeefyId; use sp_keystore::KeystorePtr; use sp_runtime::traits::BlakeTwo256; use sp_runtime::SaturatedConversion; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::{default::Default, path::Path, sync::Arc, time::Duration}; pub(crate) type FullClient = StorageHubClient; type FullBackend = sc_service::TFullBackend; type FullSelectChain = sc_consensus::LongestChain; type FullGrandpaBlockImport = sc_consensus_grandpa::GrandpaBlockImport< FullBackend, Block, FullClient, FullSelectChain, >; type FullBeefyBlockImport = sc_consensus_beefy::import::BeefyBlockImport< Block, FullBackend, FullClient, InnerBlockImport, AuthorityId, >; /// The minimum period of blocks on which justifications will be /// imported and generated. const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512; // Mock timestamp used for manual/instant sealing in dev mode, similar to Moonbeam. // Each new block will advance the timestamp by one slot duration to satisfy // pallet_timestamp MinimumPeriod checks when sealing back-to-back. static MOCK_TIMESTAMP: AtomicU64 = AtomicU64::new(0); pub(crate) trait FullRuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue + sp_api::Metadata + crate::eth::EthCompatRuntimeApiCollection + frame_system_rpc_runtime_api::AccountNonceApi + sp_session::SessionKeys + sp_api::ApiExt + pallet_mmr::primitives::MmrApi + pallet_beefy_mmr::BeefyMmrApi + sp_consensus_beefy::BeefyApi + pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi + sp_offchain::OffchainWorkerApi + sp_block_builder::BlockBuilder + sp_consensus_babe::BabeApi + sp_consensus_grandpa::GrandpaApi + fp_rpc::ConvertTransactionRuntimeApi + fp_rpc::EthereumRuntimeRPCApi { } impl FullRuntimeApi for T where T: sp_transaction_pool::runtime_api::TaggedTransactionQueue + sp_api::Metadata + crate::eth::EthCompatRuntimeApiCollection + frame_system_rpc_runtime_api::AccountNonceApi + sp_session::SessionKeys + sp_api::ApiExt + pallet_mmr::primitives::MmrApi + pallet_beefy_mmr::BeefyMmrApi + sp_consensus_beefy::BeefyApi + pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi + sp_offchain::OffchainWorkerApi + sp_block_builder::BlockBuilder + sp_consensus_babe::BabeApi + sp_consensus_grandpa::GrandpaApi + fp_rpc::ConvertTransactionRuntimeApi + fp_rpc::EthereumRuntimeRPCApi { } pub type Service = sc_service::PartialComponents< FullClient, FullBackend, FullSelectChain, sc_consensus::DefaultImportQueue, sc_transaction_pool::BasicPool< sc_transaction_pool::FullChainApi, Block>, Block, >, ( sc_consensus_babe::BabeBlockImport< Block, FullClient, FullBeefyBlockImport< FrontierBlockImport< Block, FullGrandpaBlockImport, FullClient, >, BeefyId, RuntimeApi, >, >, sc_consensus_grandpa::LinkHalf, FullSelectChain>, sc_consensus_babe::BabeLink, sc_consensus_beefy::BeefyVoterLinks, sc_consensus_beefy::BeefyRPCLinks, Arc>>, Arc>, Option, ), >; // StorageHub Enable client pub(crate) type StorageEnableClient = shc_common::types::StorageHubClient<::RuntimeApi>; pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf { config .base_path .config_dir(config.chain_spec.id()) .join("frontier") .join(path) } pub fn open_frontier_backend( client: Arc, config: &Configuration, eth_config: &mut EthConfiguration, ) -> Result, String> where C: ProvideRuntimeApi + StorageProvider + AuxStore, C: HeaderBackend + HeaderMetadata, C: Send + Sync + 'static, C::Api: fp_rpc::EthereumRuntimeRPCApi, BE: Backend + 'static, BE::State: StateBackend, { let frontier_backend = match eth_config.frontier_backend_type { BackendType::KeyValue => { fc_db::Backend::KeyValue(Arc::new(fc_db::kv::Backend::::new( client, &fc_db::kv::DatabaseSettings { source: match config.database { DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb { path: frontier_database_dir(config, "db"), cache_size: 0, }, DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb { path: frontier_database_dir(config, "paritydb"), }, DatabaseSource::Auto { .. } => DatabaseSource::Auto { rocksdb_path: frontier_database_dir(config, "db"), paritydb_path: frontier_database_dir(config, "paritydb"), cache_size: 0, }, _ => { return Err( "Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string() ) } }, }, )?)) } BackendType::Sql => { let overrides = Arc::new(StorageOverrideHandler::new(client.clone())); let sqlite_db_path = frontier_database_dir(config, "sql"); std::fs::create_dir_all(&sqlite_db_path).expect("failed creating sql db directory"); let backend = futures::executor::block_on(fc_db::sql::Backend::new( fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig { path: Path::new("sqlite:///") .join(sqlite_db_path) .join("frontier.db3") .to_str() .expect("frontier sql path error"), create_if_missing: true, thread_count: eth_config.frontier_sql_backend_thread_count, cache_size: eth_config.frontier_sql_backend_cache_size, }), eth_config.frontier_sql_backend_pool_size, std::num::NonZeroU32::new(eth_config.frontier_sql_backend_num_ops_timeout), overrides.clone(), )) .unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err)); fc_db::Backend::Sql(Arc::new(backend)) } }; Ok(frontier_backend) } fn build_babe_inherent_providers( slot_duration: sp_consensus_babe::SlotDuration, use_mock_timestamp: bool, ) -> ( sp_consensus_babe::inherents::InherentDataProvider, sp_timestamp::InherentDataProvider, ) { if use_mock_timestamp { // In manual/instant sealing we want to advance time deterministically per block // to satisfy `pallet_timestamp` MinimumPeriod without sleeping. We increment a // static counter by one slot each time and use that value as the timestamp. let increment = slot_duration.as_millis(); let next_ts = MOCK_TIMESTAMP .fetch_add(increment, Ordering::SeqCst) .saturating_add(increment); let timestamp = sp_timestamp::InherentDataProvider::new(sp_timestamp::Timestamp::new(next_ts)); let slot = sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration( *timestamp, slot_duration, ); (slot, timestamp) } else { let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); let slot = sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration( *timestamp, slot_duration, ); (slot, timestamp) } } pub fn new_partial( config: &Configuration, eth_config: &mut EthConfiguration, use_mock_timestamp: bool, ) -> Result, ServiceError> where Runtime: shc_common::traits::StorageEnableRuntime, RuntimeApi: sp_api::ConstructRuntimeApi> + Send + Sync + 'static, RuntimeApi::RuntimeApi: FullRuntimeApi, { let telemetry = config .telemetry_endpoints .clone() .filter(|x| !x.is_empty()) .map(|endpoints| -> Result<_, sc_telemetry::Error> { let worker = TelemetryWorker::new(16)?; let telemetry = worker.handle().new_telemetry(endpoints); Ok((worker, telemetry)) }) .transpose()?; let heap_pages = config .executor .default_heap_pages .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _, }); let wasm_builder = WasmExecutor::builder() .with_execution_method(config.executor.wasm_method) .with_onchain_heap_alloc_strategy(heap_pages) .with_offchain_heap_alloc_strategy(heap_pages) .with_ignore_onchain_heap_pages(true) .with_max_runtime_instances(config.executor.max_runtime_instances) .with_runtime_cache_size(config.executor.runtime_cache_size); let executor = wasm_builder.build(); let (client, backend, keystore_container, task_manager) = sc_service::new_full_parts::( config, telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()), executor, )?; let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { task_manager .spawn_handle() .spawn("telemetry", None, worker.run()); telemetry }); let select_chain = sc_consensus::LongestChain::new(backend.clone()); // FIXME: The `config.transaction_pool.options` field is private, so for now use its default value let transaction_pool = Arc::from(BasicPool::new_full( Default::default(), config.role.is_authority().into(), config.prometheus_registry(), task_manager.spawn_essential_handle(), client.clone(), )); let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import( client.clone(), GRANDPA_JUSTIFICATION_PERIOD, &client, select_chain.clone(), telemetry.as_ref().map(|x| x.handle()), )?; let frontier_block_import = FrontierBlockImport::new(grandpa_block_import.clone(), client.clone()); let (beefy_block_import, beefy_voter_links, beefy_rpc_links) = sc_consensus_beefy::beefy_block_import_and_links( frontier_block_import, backend.clone(), client.clone(), config.prometheus_registry().cloned(), ); let (block_import, babe_link) = sc_consensus_babe::block_import( sc_consensus_babe::configuration(&*client)?, beefy_block_import, client.clone(), )?; let slot_duration = babe_link.config().slot_duration(); let storage_override = Arc::new(StorageOverrideHandler::::new(client.clone())); let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, eth_config)?); let (import_queue, babe_worker_handle) = sc_consensus_babe::import_queue(ImportQueueParams { link: babe_link.clone(), block_import: block_import.clone(), justification_import: Some(Box::new(grandpa_block_import.clone())), client: client.clone(), select_chain: select_chain.clone(), create_inherent_data_providers: move |_, ()| { std::future::ready(Ok::<_, Box>( build_babe_inherent_providers(slot_duration, use_mock_timestamp), )) }, spawner: &task_manager.spawn_essential_handle(), registry: config.prometheus_registry(), telemetry: telemetry.as_ref().map(|x| x.handle()), offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool.clone()), })?; // TODO Wire up to RPC std::mem::forget(babe_worker_handle); Ok(sc_service::PartialComponents { client, backend, task_manager, import_queue, keystore_container, select_chain, transaction_pool, other: ( block_import, grandpa_link, babe_link, beefy_voter_links, beefy_rpc_links, frontier_backend, storage_override, telemetry, ), }) } /// Builds a new service for a full client. // TODO: Find a way to remove `RuntimeApi` and to just keep `Runtime` pub async fn new_full_impl< R: ShRole, S: ShStorageLayer, Runtime, RuntimeApi, N: sc_network::NetworkBackend::Hash>, >( config: Configuration, mut eth_config: EthConfiguration, role_options: Option, indexer_options: Option, sealing: Option, ) -> Result where Runtime: shc_common::traits::StorageEnableRuntime, RuntimeApi: sp_api::ConstructRuntimeApi> + Send + Sync + 'static, RuntimeApi::RuntimeApi: FullRuntimeApi, (R, S): ShNodeType, StorageHubBuilder: StorageLayerBuilder + Buildable<(R, S), Runtime>, StorageHubHandler<(R, S), Runtime>: RunnableTasks, { let role = config.role; let mut sealing = match sealing { Some(_) if !matches!(config.chain_spec.chain_type(), ChainType::Development) => { log::warn!("Manual sealing is only available for development chains; disabling."); None } other => other, }; if sealing.is_some() && !role.is_authority() { log::warn!( "Manual sealing requested but the node is not running as an authority; disabling." ); sealing = None; } let use_mock_timestamp = sealing.is_some(); let sc_service::PartialComponents { client, backend, mut task_manager, import_queue, keystore_container, select_chain, transaction_pool, other: ( block_import, grandpa_link, babe_link, beefy_voter_links, beefy_rpc_links, frontier_backend, storage_override, mut telemetry, ), } = new_partial::(&config, &mut eth_config, use_mock_timestamp)?; let is_authority = role.is_authority(); let FrontierPartialComponents { filter_pool, fee_history_cache, fee_history_cache_limit, } = new_frontier_partial(ð_config)?; let mut net_config = sc_network::config::FullNetworkConfiguration::< Block, ::Hash, N, >::new(&config.network, config.prometheus_registry().cloned()); // Starting StorageHub file transfer service. let mut file_transfer_request_protocol = None; if role_options.is_some() { file_transfer_request_protocol = Some( shc_file_transfer_service::configure_file_transfer_network::<_, Runtime>( fetch_genesis_hash(client.clone()), config.chain_spec.fork_id(), &mut net_config, ), ); } let metrics = N::register_notification_metrics(config.prometheus_registry()); let peer_store_handle = net_config.peer_store_handle(); let genesis_hash = client .block_hash(0) .ok() .flatten() .expect("Genesis block exists; qed"); let grandpa_protocol_name = sc_consensus_grandpa::protocol_standard_name(&genesis_hash, &config.chain_spec); let (grandpa_protocol_config, grandpa_notification_service) = sc_consensus_grandpa::grandpa_peers_set_config::<_, N>( grandpa_protocol_name.clone(), metrics.clone(), Arc::clone(&peer_store_handle), ); net_config.add_notification_protocol(grandpa_protocol_config); let beefy_gossip_proto_name = sc_consensus_beefy::gossip_protocol_name(genesis_hash, config.chain_spec.fork_id()); let (beefy_on_demand_justifications_handler, beefy_req_resp_cfg) = sc_consensus_beefy::communication::request_response::BeefyJustifsRequestHandler::new::<_, N>( &genesis_hash, config.chain_spec.fork_id(), client.clone(), config.prometheus_registry().cloned(), ); let enable_beefy = true; let beefy_notification_service = match enable_beefy { false => None, true => { let (beefy_notification_config, beefy_notification_service) = sc_consensus_beefy::communication::beefy_peers_set_config::<_, N>( beefy_gossip_proto_name.clone(), metrics.clone(), Arc::clone(&peer_store_handle), ); net_config.add_notification_protocol(beefy_notification_config); net_config.add_request_response_protocol(beefy_req_resp_cfg); Some(beefy_notification_service) } }; let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new( backend.clone(), grandpa_link.shared_authority_set().clone(), Vec::default(), )); let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, net_config, client: client.clone(), transaction_pool: transaction_pool.clone(), spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), block_relay: None, metrics, })?; if config.offchain_worker.enabled { task_manager.spawn_handle().spawn( "offchain-workers-runner", "offchain-worker", sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { runtime_api_provider: client.clone(), is_validator: config.role.is_authority(), keystore: Some(keystore_container.keystore()), offchain_db: backend.offchain_storage(), transaction_pool: Some(OffchainTransactionPoolFactory::new( transaction_pool.clone(), )), network_provider: Arc::new(network.clone()), enable_http_requests: true, custom_extensions: |_| vec![], })? .run(client.clone(), task_manager.spawn_handle()) .boxed(), ); } // Storage Hub builder let (sh_builder, maybe_storage_hub_client_rpc_config) = match init_sh_builder::( &role_options, &indexer_options, &task_manager, file_transfer_request_protocol, network.clone(), keystore_container.keystore(), client.clone(), ) .await? { Some((shb, rpc)) => (Some(shb), Some(rpc)), None => (None, None), }; let force_authoring = config.force_authoring; let backoff_authoring_blocks: Option<()> = None; let name = config.network.node_name.clone(); let enable_grandpa = sealing.is_none() && !config.disable_grandpa; let prometheus_registry = config.prometheus_registry().cloned(); let overrides = Arc::new(StorageOverrideHandler::new(client.clone())); let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new( task_manager.spawn_handle(), overrides.clone(), eth_config.eth_log_block_cache, eth_config.eth_statuses_cache, prometheus_registry.clone(), )); let mut manual_commands_stream: Option>> = None; let command_sink = if matches!(sealing, Some(Sealing::Manual)) { let (sink, stream) = mpsc::channel::>(1000); manual_commands_stream = Some(stream); Some(sink) } else { None }; // Sinks for pubsub notifications. // Everytime a new subscription is created, a new mpsc channel is added to the sink pool. // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel. // This way we avoid race conditions when using native substrate block import notification stream. let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks< fc_mapping_sync::EthereumBlockNotification, > = Default::default(); let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); spawn_frontier_tasks( &task_manager, FrontierTasksParams { client: client.clone(), backend: backend.clone(), frontier_backend: frontier_backend.clone(), frontier_partial_components: FrontierPartialComponents { filter_pool: filter_pool.clone(), fee_history_cache: fee_history_cache.clone(), fee_history_cache_limit, }, storage_override, sync: sync_service.clone(), pubsub_notification_sinks, }, ) .await; let base_path = config.base_path.path().to_path_buf().clone(); let rpc_extensions_builder = { let client = client.clone(); let pool = transaction_pool.clone(); let backend = backend.clone(); let frontier_backend = frontier_backend.clone(); let network = network.clone(); let max_past_logs = eth_config.max_past_logs; let overrides = overrides.clone(); let fee_history_cache = fee_history_cache.clone(); let block_data_cache = block_data_cache.clone(); let fee_history_limit = eth_config.fee_history_limit; let sync = sync_service.clone(); Box::new(move |subscription_executor| { let deps = crate::rpc::FullDeps { client: client.clone(), pool: pool.clone(), graph: pool.pool().clone(), beefy: BeefyDeps:: { beefy_finality_proof_stream: beefy_rpc_links.from_voter_justif_stream.clone(), beefy_best_block_stream: beefy_rpc_links.from_voter_best_beefy_stream.clone(), subscription_executor, }, max_past_logs, fee_history_limit, fee_history_cache: fee_history_cache.clone(), network: Arc::new(network.clone()), sync: sync.clone(), filter_pool: filter_pool.clone(), block_data_cache: block_data_cache.clone(), overrides: overrides.clone(), is_authority: is_authority.clone(), command_sink: command_sink.clone(), backend: backend.clone(), frontier_backend: match &*frontier_backend { fc_db::Backend::KeyValue(b) => b.clone(), fc_db::Backend::Sql(b) => b.clone(), }, forced_parent_hashes: None, maybe_storage_hub_client_config: maybe_storage_hub_client_rpc_config.clone(), }; crate::rpc::create_full(deps).map_err(Into::into) }) }; let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { network: Arc::new(network.clone()), client: client.clone(), keystore: keystore_container.keystore(), task_manager: &mut task_manager, transaction_pool: transaction_pool.clone(), rpc_builder: rpc_extensions_builder, backend: backend.clone(), system_rpc_tx, tx_handler_controller, sync_service: sync_service.clone(), config, telemetry: telemetry.as_mut(), })?; if is_authority { if let Some(mode) = sealing { let proposer_factory = sc_basic_authorship::ProposerFactory::new( task_manager.spawn_handle(), client.clone(), transaction_pool.clone(), prometheus_registry.as_ref(), telemetry.as_ref().map(|x| x.handle()), ); let slot_duration = babe_link.config().slot_duration(); let epoch_changes = babe_link.epoch_changes().clone(); let authorities = babe_link.config().authorities.clone(); let keystore = keystore_container.keystore(); let client_for_consensus = client.clone(); let consensus_data_provider = move || { BabeConsensusDataProvider::new( client_for_consensus.clone(), keystore.clone(), epoch_changes.clone(), authorities.clone(), ) .map(|provider| Box::new(provider) as _) .map_err(|e| ServiceError::Other(e.to_string())) }; let create_inherent_data_providers = move |_, ()| { std::future::ready(Ok::<_, Box>( build_babe_inherent_providers(slot_duration, true), )) }; match mode { Sealing::Manual => { let commands_stream = manual_commands_stream.take().ok_or_else(|| { ServiceError::Other( "Manual sealing requested but command channel is unavailable".into(), ) })?; let future = sc_consensus_manual_seal::run_manual_seal(ManualSealParams { block_import, env: proposer_factory, client: client.clone(), pool: transaction_pool.clone(), commands_stream, select_chain, consensus_data_provider: Some(consensus_data_provider()?), create_inherent_data_providers, }); task_manager.spawn_essential_handle().spawn_blocking( "manual-seal", Some("block-authoring"), future, ); } Sealing::Instant => { let future = sc_consensus_manual_seal::run_instant_seal(InstantSealParams { block_import, env: proposer_factory, client: client.clone(), pool: transaction_pool.clone(), select_chain, consensus_data_provider: Some(consensus_data_provider()?), create_inherent_data_providers, }); task_manager.spawn_essential_handle().spawn_blocking( "manual-seal", Some("block-authoring"), future, ); } } log::info!("Manual sealing enabled (mode: {:?})", mode); } else { let proposer_factory = sc_basic_authorship::ProposerFactory::new( task_manager.spawn_handle(), client.clone(), transaction_pool.clone(), prometheus_registry.as_ref(), telemetry.as_ref().map(|x| x.handle()), ); let slot_duration = babe_link.clone().config().slot_duration(); let create_inherent_data_providers = move |_, ()| { std::future::ready(Ok::<_, Box>( build_babe_inherent_providers(slot_duration, false), )) }; let babe_config = sc_consensus_babe::BabeParams { keystore: keystore_container.keystore(), client: client.clone(), select_chain, env: proposer_factory, block_import, sync_oracle: sync_service.clone(), justification_sync_link: sync_service.clone(), create_inherent_data_providers, force_authoring, backoff_authoring_blocks, babe_link, block_proposal_slot_portion: sc_consensus_babe::SlotProportion::new(0.5), max_block_proposal_slot_portion: None, telemetry: telemetry.as_ref().map(|x| x.handle()), }; let babe = sc_consensus_babe::start_babe(babe_config)?; task_manager.spawn_essential_handle().spawn_blocking( "babe-proposer", Some("block-authoring"), babe, ); } } if enable_grandpa { // if the node isn't actively participating in consensus then it doesn't // need a keystore, regardless of which protocol we use below. let keystore = if role.is_authority() { Some(keystore_container.keystore()) } else { None }; let grandpa_config = sc_consensus_grandpa::Config { // FIXME #1578 make this available through chainspec gossip_duration: Duration::from_millis(333), justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD, name: Some(name), observer_enabled: false, keystore, local_role: role, telemetry: telemetry.as_ref().map(|x| x.handle()), protocol_name: grandpa_protocol_name, }; // start the full GRANDPA voter // NOTE: non-authorities could run the GRANDPA observer protocol, but at // this point the full voter should provide better guarantees of block // and vote data availability than the observer. The observer has not // been tested extensively yet and having most nodes in a network run it // could lead to finality stalls. let grandpa_config = sc_consensus_grandpa::GrandpaParams { config: grandpa_config, link: grandpa_link, network: network.clone(), sync: Arc::new(sync_service.clone()), notification_service: grandpa_notification_service, voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(), prometheus_registry: prometheus_registry.clone(), shared_voter_state: SharedVoterState::empty(), telemetry: telemetry.as_ref().map(|x| x.handle()), offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool), }; // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. task_manager.spawn_essential_handle().spawn_blocking( "grandpa-voter", None, sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?, ); } // if the node isn't actively participating in consensus then it doesn't // need a keystore, regardless of which protocol we use below. let keystore_opt = if role.is_authority() { Some(keystore_container.keystore()) } else { None }; // beefy is enabled if its notification service exists if sealing.is_none() { if let Some(notification_service) = beefy_notification_service { let justifications_protocol_name = beefy_on_demand_justifications_handler.protocol_name(); let network_params = sc_consensus_beefy::BeefyNetworkParams { network: Arc::new(network.clone()), sync: sync_service.clone(), gossip_protocol_name: beefy_gossip_proto_name, justifications_protocol_name, notification_service, _phantom: core::marker::PhantomData::, }; let payload_provider = sp_consensus_beefy::mmr::MmrRootProvider::new(client.clone()); let beefy_params = sc_consensus_beefy::BeefyParams { client: client.clone(), backend: backend.clone(), payload_provider, runtime: client.clone(), key_store: keystore_opt.clone(), network_params, min_block_delta: 8, prometheus_registry: prometheus_registry.clone(), links: beefy_voter_links, on_demand_justifications_handler: beefy_on_demand_justifications_handler, is_authority: role.is_authority(), }; let gadget = sc_consensus_beefy::start_beefy_gadget::<_, _, _, _, _, _, _, BeefyId>( beefy_params, ); // BEEFY is part of consensus, if it fails we'll bring the node down with it to make // sure it is noticed. task_manager .spawn_essential_handle() .spawn_blocking("beefy-gadget", None, gadget); } } if let Some(_) = role_options { finish_sh_builder_and_run_tasks( sh_builder.expect("StorageHubBuilder should already be initialised."), client.clone(), rpc_handlers.clone(), keystore_container.keystore(), base_path.clone(), false, ) .await?; } network_starter.start_network(); Ok(task_manager) } pub async fn new_full< Runtime, RuntimeApi, N: sc_network::NetworkBackend::Hash>, >( config: Configuration, eth_config: EthConfiguration, role_options: Option, indexer_options: Option, sealing: Option, ) -> Result where Runtime: shc_common::traits::StorageEnableRuntime, RuntimeApi: sp_api::ConstructRuntimeApi> + Send + Sync + 'static, RuntimeApi::RuntimeApi: FullRuntimeApi, { if let Some(role_options) = role_options { match role_options { RoleOptions::Provider(ProviderOptions { provider_type: ProviderType::Bsp, storage_layer: StorageLayer::Memory, .. }) => { return new_full_impl::( config, eth_config, Some(role_options), indexer_options, sealing, ) .await; } RoleOptions::Provider(ProviderOptions { provider_type: ProviderType::Bsp, storage_layer: StorageLayer::RocksDB, .. }) => { return new_full_impl::( config, eth_config, Some(role_options), indexer_options, sealing, ) .await; } RoleOptions::Provider(ProviderOptions { provider_type: ProviderType::Msp, storage_layer: StorageLayer::Memory, .. }) => { return new_full_impl::( config, eth_config, Some(role_options), indexer_options, sealing, ) .await; } RoleOptions::Provider(ProviderOptions { provider_type: ProviderType::Msp, storage_layer: StorageLayer::RocksDB, .. }) => { return new_full_impl::( config, eth_config, Some(role_options), indexer_options, sealing, ) .await; } RoleOptions::Fisherman(FishermanOptions { .. }) => { return new_full_impl::( config, eth_config, Some(role_options), indexer_options, sealing, ) .await; } }; } else { return new_full_impl::( config, eth_config, None, indexer_options, sealing, ) .await; }; } //╔═══════════════════════════════════════════════════════════════════════════════════════════════════════════════╗ //║ StorageHub Client Setup Utilities ║ //╚═══════════════════════════════════════════════════════════════════════════════════════════════════════════════╝ /// Helper function to setup database pool async fn setup_database_pool(database_url: String) -> Result { shc_indexer_db::setup_db_pool(database_url) .await .map_err(|e| sc_service::Error::Application(Box::new(e))) } /// Configure and spawn the indexer service. async fn configure_and_spawn_indexer( indexer_options: &Option, task_manager: &TaskManager, client: Arc>, ) -> Result<(), sc_service::Error> { let indexer_options = match indexer_options { Some(config) => config, None => return Ok(()), }; // Setup database pool let db_pool = setup_database_pool(indexer_options.database_url.clone()).await?; info!( "📊 Starting Indexer service (mode: {:?})", indexer_options.indexer_mode ); let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), "indexer-service"); spawn_indexer_service::( &task_spawner, client.clone(), db_pool.clone(), indexer_options.indexer_mode, ) .await; Ok(()) } /// Initialize the StorageHub builder with configured services based on the node's role. /// /// If `indexer_options` is provided, spawns the indexer service regardless of role configuration. /// The indexer service is decoupled from the role system and can run standalone. /// /// Returns `None` if no role is configured (e.g., standalone indexer mode). async fn init_sh_builder( role_options: &Option, indexer_options: &Option, task_manager: &TaskManager, file_transfer_request_protocol: Option<(ProtocolName, Receiver)>, network: Arc, keystore: KeystorePtr, client: Arc>, ) -> Result< Option<( StorageHubBuilder, StorageHubClientRpcConfig< <(R, S) as ShNodeType>::FL, <(R, S) as ShNodeType>::FSH, Runtime, >, )>, sc_service::Error, > where R: ShRole, S: ShStorageLayer, (R, S): ShNodeType, StorageHubBuilder: StorageLayerBuilder, { // Spawn indexer service if enabled. Runs before role check to allow standalone operation. configure_and_spawn_indexer::(&indexer_options, &task_manager, client.clone()).await?; let role_options = match role_options { Some(role) => role, None => return Ok(None), }; let task_spawner_name = match role_options { RoleOptions::Provider(ProviderOptions { provider_type: ProviderType::Msp, .. }) => "msp-service", RoleOptions::Provider(ProviderOptions { provider_type: ProviderType::Bsp, .. }) => "bsp-service", RoleOptions::Fisherman(_) => "fisherman-service", }; let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), task_spawner_name); let mut builder = StorageHubBuilder::::new(task_spawner); // Setup file transfer service (common to all roles) let (file_transfer_request_protocol_name, file_transfer_request_receiver) = file_transfer_request_protocol .expect("FileTransfer request protocol should already be initialised."); builder .with_file_transfer( file_transfer_request_receiver, file_transfer_request_protocol_name, network.clone(), ) .await; // Role-specific configuration let rpc_config = match role_options { RoleOptions::Provider(ProviderOptions { rpc_config, provider_type, storage_path, max_storage_capacity, jump_capacity, msp_charging_period, msp_charge_fees, msp_move_bucket, bsp_upload_file, bsp_move_bucket, bsp_charge_fees, bsp_submit_proof, blockchain_service, msp_database_url, max_open_forests, .. }) => { info!( "Starting as a Storage Provider. Storage path: {:?}, Max storage capacity: {:?}, Jump capacity: {:?}, MSP charging period: {:?}", storage_path, max_storage_capacity, jump_capacity, msp_charging_period, ); // Setup the storage layer and capacity config builder .setup_storage_layer(storage_path.clone(), max_open_forests.unwrap_or(512)) .with_capacity_config(Some(CapacityConfig::new( max_storage_capacity.unwrap_or_default().saturated_into(), jump_capacity.unwrap_or_default().saturated_into(), ))); // Configure provider-specific options builder.with_msp_charge_fees_config(msp_charge_fees.clone()); builder.with_msp_move_bucket_config(msp_move_bucket.clone()); builder.with_bsp_upload_file_config(bsp_upload_file.clone()); builder.with_bsp_move_bucket_config(bsp_move_bucket.clone()); builder.with_bsp_charge_fees_config(bsp_charge_fees.clone()); builder.with_bsp_submit_proof_config(bsp_submit_proof.clone()); // MSP-specific configuration if *provider_type == ProviderType::Msp { builder.with_notify_period(*msp_charging_period); // MSPs can optionally have database access to execute move bucket operations. if let Some(db_url) = msp_database_url { info!("Setting up MSP database connection: {}", db_url); let msp_db_pool = setup_database_pool(db_url.clone()).await?; builder.with_indexer_db_pool(Some(msp_db_pool)); } } if let Some(c) = blockchain_service { let peer_id = network.local_peer_id().to_bytes(); let mut c = c.clone(); c.peer_id = Some(peer_id); builder.with_blockchain_service_config(c); } rpc_config.clone() } RoleOptions::Fisherman(fisherman_options) => { // Validate configuration compatibility with indexer if let Some(indexer_cfg) = indexer_options { if indexer_cfg.indexer_mode == shc_indexer_service::IndexerMode::Lite { return Err(sc_service::Error::Other( "Fisherman service cannot run with 'lite' indexer mode. Please use either 'full' or 'fishing' mode." .to_string(), )); } } // Setup database pool for fisherman let db_pool = setup_database_pool(fisherman_options.database_url.clone()).await?; info!( "🎣 Starting as a Fisherman. Database URL: {}", fisherman_options.database_url ); // Setup the storage layer (ephemeral for fisherman) builder.setup_storage_layer(None, 0); // Set the indexer db pool builder.with_indexer_db_pool(Some(db_pool)); // Spawn the fisherman service builder .with_fisherman(client.clone(), &fisherman_options) .await; RpcConfig::default() } }; // Create RPC configuration let storage_hub_client_rpc_config = builder.create_rpc_config(keystore, rpc_config); Ok(Some((builder, storage_hub_client_rpc_config))) } /// Finish the StorageHubBuilder and run the tasks. async fn finish_sh_builder_and_run_tasks( mut sh_builder: StorageHubBuilder, client: Arc>, rpc_handlers: RpcHandlers, keystore: KeystorePtr, rocksdb_root_path: impl Into, maintenance_mode: bool, ) -> Result<(), sc_service::Error> where R: ShRole, S: ShStorageLayer, (R, S): ShNodeType, StorageHubBuilder: StorageLayerBuilder + Buildable<(R, S), Runtime>, StorageHubHandler<(R, S), Runtime>: RunnableTasks, { let rocks_db_path = rocksdb_root_path.into(); // Spawn the Blockchain Service if node is running as a Storage Provider sh_builder .with_blockchain( client.clone(), keystore.clone(), Arc::new(rpc_handlers), rocks_db_path.clone(), maintenance_mode, ) .await; // Initialize the BSP peer manager sh_builder.with_peer_manager(rocks_db_path.clone()); // Build the StorageHubHandler let mut sh_handler = sh_builder.build(); // Run StorageHub tasks according to the node role sh_handler.run_tasks().await; Ok(()) }