mirror of
https://github.com/datahaven-xyz/datahaven
synced 2026-05-24 09:50:01 +00:00
feat: ✨ Update SH client initialisation according to new SH releases (#293)
This commit is contained in:
parent
e6cba95563
commit
aaef407eb5
2 changed files with 264 additions and 277 deletions
|
|
@ -81,6 +81,15 @@ pub struct ProviderOptions {
|
|||
// pub maintenance_mode: bool,
|
||||
}
|
||||
|
||||
/// Role configuration enum that ensures mutual exclusivity between Provider and Fisherman roles.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RoleOptions {
|
||||
/// Storage Provider configuration
|
||||
Provider(ProviderOptions),
|
||||
/// Fisherman configuration
|
||||
Fisherman(FishermanOptions),
|
||||
}
|
||||
|
||||
impl SubstrateCli for Cli {
|
||||
fn impl_name() -> String {
|
||||
"DataHaven Node".into()
|
||||
|
|
@ -333,9 +342,8 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
runner.sync_run(|config| cmd.run::<Block>(&config))
|
||||
}
|
||||
None => {
|
||||
let mut provider_options: Option<ProviderOptions> = None;
|
||||
let mut indexer_options: Option<IndexerOptions> = None;
|
||||
let mut fisherman_options: Option<FishermanOptions> = None;
|
||||
let mut role_options = None;
|
||||
let mut indexer_options = None;
|
||||
let runner = cli.create_runner(&cli.run)?;
|
||||
|
||||
// If we have a provider config file
|
||||
|
|
@ -355,10 +363,10 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
|
||||
if has_provider {
|
||||
let provider = c.provider;
|
||||
provider_options = Some(provider);
|
||||
role_options = Some(RoleOptions::Provider(provider));
|
||||
} else if has_fisherman {
|
||||
let fisherman = c.fisherman;
|
||||
fisherman_options = Some(fisherman);
|
||||
role_options = Some(RoleOptions::Fisherman(fisherman));
|
||||
}
|
||||
|
||||
indexer_options = Some(c.indexer);
|
||||
|
|
@ -373,7 +381,9 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
}
|
||||
|
||||
if cli.provider_config.provider {
|
||||
provider_options = Some(cli.provider_config.provider_options());
|
||||
role_options = Some(RoleOptions::Provider(
|
||||
cli.provider_config.provider_options(),
|
||||
));
|
||||
};
|
||||
|
||||
if cli.indexer_config.indexer {
|
||||
|
|
@ -381,7 +391,11 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
};
|
||||
|
||||
if cli.fisherman_config.fisherman {
|
||||
fisherman_options = cli.fisherman_config.fisherman_options();
|
||||
role_options = Some(RoleOptions::Fisherman(
|
||||
cli.fisherman_config
|
||||
.fisherman_options()
|
||||
.expect("Clap/TOML configurations should prevent this from ever failing"),
|
||||
));
|
||||
};
|
||||
|
||||
runner.run_node_until_exit(|config| async move {
|
||||
|
|
@ -407,12 +421,7 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
datahaven_mainnet_runtime::RuntimeApi,
|
||||
sc_network::NetworkWorker<_, _>,
|
||||
>(
|
||||
config,
|
||||
cli.eth,
|
||||
provider_options,
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing_mode,
|
||||
config, cli.eth, role_options, indexer_options, sealing_mode
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -422,12 +431,7 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
datahaven_testnet_runtime::RuntimeApi,
|
||||
sc_network::NetworkWorker<_, _>,
|
||||
>(
|
||||
config,
|
||||
cli.eth,
|
||||
provider_options,
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing_mode,
|
||||
config, cli.eth, role_options, indexer_options, sealing_mode
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -437,12 +441,7 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
datahaven_stagenet_runtime::RuntimeApi,
|
||||
sc_network::NetworkWorker<_, _>,
|
||||
>(
|
||||
config,
|
||||
cli.eth,
|
||||
provider_options,
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing_mode,
|
||||
config, cli.eth, role_options, indexer_options, sealing_mode
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -457,12 +456,7 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
datahaven_mainnet_runtime::RuntimeApi,
|
||||
sc_network::Litep2pNetworkBackend,
|
||||
>(
|
||||
config,
|
||||
cli.eth,
|
||||
provider_options,
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing_mode,
|
||||
config, cli.eth, role_options, indexer_options, sealing_mode
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -472,12 +466,7 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
datahaven_testnet_runtime::RuntimeApi,
|
||||
sc_network::Litep2pNetworkBackend,
|
||||
>(
|
||||
config,
|
||||
cli.eth,
|
||||
provider_options,
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing_mode,
|
||||
config, cli.eth, role_options, indexer_options, sealing_mode
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -487,12 +476,7 @@ pub fn run() -> sc_cli::Result<()> {
|
|||
datahaven_stagenet_runtime::RuntimeApi,
|
||||
sc_network::Litep2pNetworkBackend,
|
||||
>(
|
||||
config,
|
||||
cli.eth,
|
||||
provider_options,
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing_mode,
|
||||
config, cli.eth, role_options, indexer_options, sealing_mode
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
|
||||
|
||||
use crate::cli::{ProviderType, Sealing, StorageLayer};
|
||||
use crate::command::ProviderOptions;
|
||||
use crate::command::{ProviderOptions, RoleOptions};
|
||||
use crate::eth::{
|
||||
new_frontier_partial, spawn_frontier_tasks, BackendType, FrontierBackend,
|
||||
FrontierPartialComponents, FrontierTasksParams,
|
||||
|
|
@ -31,6 +31,7 @@ use fc_db::DatabaseSource;
|
|||
use fc_storage::StorageOverride;
|
||||
use futures::channel::mpsc;
|
||||
use futures::FutureExt;
|
||||
use log::info;
|
||||
use sc_client_api::{AuxStore, Backend, BlockBackend, StateBackend, StorageProvider};
|
||||
use sc_consensus_babe::ImportQueueParams;
|
||||
use sc_consensus_grandpa::SharedVoterState;
|
||||
|
|
@ -50,10 +51,11 @@ use sc_transaction_pool::BasicPool;
|
|||
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
|
||||
use shc_actors_framework::actor::TaskSpawner;
|
||||
use shc_blockchain_service::capacity_manager::CapacityConfig;
|
||||
use shc_client::builder::{FishermanOptions, IndexerOptions};
|
||||
use shc_client::types::FishermanRole;
|
||||
use shc_client::{
|
||||
builder::{Buildable, StorageHubBuilder, StorageLayerBuilder},
|
||||
builder::{
|
||||
Buildable, FishermanOptions, IndexerOptions, StorageHubBuilder, StorageLayerBuilder,
|
||||
},
|
||||
handler::{RunnableTasks, StorageHubHandler},
|
||||
types::{
|
||||
BspProvider, InMemoryStorageLayer, MspProvider, NoStorageLayer, RocksDbStorageLayer,
|
||||
|
|
@ -61,8 +63,10 @@ use shc_client::{
|
|||
},
|
||||
};
|
||||
use shc_common::traits::StorageEnableRuntime;
|
||||
use shc_file_transfer_service::fetch_genesis_hash;
|
||||
use shc_indexer_db::DbPool;
|
||||
use shc_rpc::StorageHubClientRpcConfig;
|
||||
use shc_indexer_service::spawn_indexer_service;
|
||||
use shc_rpc::{RpcConfig, StorageHubClientRpcConfig};
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
|
||||
use sp_consensus_beefy::ecdsa_crypto::AuthorityId as BeefyId;
|
||||
|
|
@ -438,9 +442,8 @@ pub async fn new_full_impl<
|
|||
>(
|
||||
config: Configuration,
|
||||
mut eth_config: EthConfiguration,
|
||||
provider_options: Option<ProviderOptions>,
|
||||
role_options: Option<RoleOptions>,
|
||||
indexer_options: Option<IndexerOptions>,
|
||||
fisherman_options: Option<FishermanOptions>,
|
||||
sealing: Option<Sealing>,
|
||||
) -> Result<TaskManager, ServiceError>
|
||||
where
|
||||
|
|
@ -506,16 +509,10 @@ where
|
|||
|
||||
// Starting StorageHub file transfer service.
|
||||
let mut file_transfer_request_protocol = None;
|
||||
if provider_options.is_some() || fisherman_options.is_some() {
|
||||
let genesis_hash = client
|
||||
.block_hash(0u32.into())
|
||||
.ok()
|
||||
.flatten()
|
||||
.expect("Genesis block exists; qed");
|
||||
|
||||
if role_options.is_some() {
|
||||
file_transfer_request_protocol = Some(
|
||||
shc_file_transfer_service::configure_file_transfer_network::<_, Runtime>(
|
||||
genesis_hash,
|
||||
fetch_genesis_hash(client.clone()),
|
||||
config.chain_spec.fork_id(),
|
||||
&mut net_config,
|
||||
),
|
||||
|
|
@ -609,16 +606,20 @@ where
|
|||
}
|
||||
|
||||
// Storage Hub builder
|
||||
let (sh_builder, maybe_storage_hub_client_rpc_config) = init_sh_builder::<R, S, Runtime>(
|
||||
&provider_options,
|
||||
let (sh_builder, maybe_storage_hub_client_rpc_config) = match init_sh_builder::<R, S, Runtime>(
|
||||
&role_options,
|
||||
&indexer_options,
|
||||
&task_manager,
|
||||
file_transfer_request_protocol,
|
||||
network.clone(),
|
||||
keystore_container.keystore(),
|
||||
client.clone(),
|
||||
indexer_options.clone(),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
{
|
||||
Some((shb, rpc)) => (Some(shb), Some(rpc)),
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
let force_authoring = config.force_authoring;
|
||||
let backoff_authoring_blocks: Option<()> = None;
|
||||
|
|
@ -947,10 +948,9 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
if let Some(_) = provider_options {
|
||||
// Finishing building storage hub
|
||||
if let Some(_) = role_options {
|
||||
finish_sh_builder_and_run_tasks(
|
||||
sh_builder.unwrap(),
|
||||
sh_builder.expect("StorageHubBuilder should already be initialised."),
|
||||
client.clone(),
|
||||
rpc_handlers.clone(),
|
||||
keystore_container.keystore(),
|
||||
|
|
@ -960,18 +960,6 @@ where
|
|||
.await?;
|
||||
}
|
||||
|
||||
configure_and_spawn_fisherman::<Runtime>(
|
||||
&fisherman_options,
|
||||
&indexer_options,
|
||||
&task_manager,
|
||||
client.clone(),
|
||||
keystore_container.keystore(),
|
||||
Arc::new(rpc_handlers.clone()),
|
||||
base_path,
|
||||
network.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
network_starter.start_network();
|
||||
Ok(task_manager)
|
||||
}
|
||||
|
|
@ -983,9 +971,8 @@ pub async fn new_full<
|
|||
>(
|
||||
config: Configuration,
|
||||
eth_config: EthConfiguration,
|
||||
provider_options: Option<ProviderOptions>,
|
||||
role_options: Option<RoleOptions>,
|
||||
indexer_options: Option<IndexerOptions>,
|
||||
fisherman_options: Option<FishermanOptions>,
|
||||
sealing: Option<Sealing>,
|
||||
) -> Result<TaskManager, ServiceError>
|
||||
where
|
||||
|
|
@ -993,51 +980,70 @@ where
|
|||
RuntimeApi: sp_api::ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
|
||||
RuntimeApi::RuntimeApi: FullRuntimeApi,
|
||||
{
|
||||
if let Some(provider_options) = provider_options {
|
||||
match (
|
||||
&provider_options.provider_type,
|
||||
&provider_options.storage_layer,
|
||||
) {
|
||||
(&ProviderType::Bsp, &StorageLayer::Memory) => {
|
||||
if let Some(role_options) = role_options {
|
||||
match role_options {
|
||||
RoleOptions::Provider(ProviderOptions {
|
||||
provider_type: ProviderType::Bsp,
|
||||
storage_layer: StorageLayer::Memory,
|
||||
..
|
||||
}) => {
|
||||
return new_full_impl::<BspProvider, InMemoryStorageLayer, Runtime, RuntimeApi, N>(
|
||||
config,
|
||||
eth_config,
|
||||
Some(provider_options),
|
||||
Some(role_options),
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
(&ProviderType::Bsp, &StorageLayer::RocksDB) => {
|
||||
RoleOptions::Provider(ProviderOptions {
|
||||
provider_type: ProviderType::Bsp,
|
||||
storage_layer: StorageLayer::RocksDB,
|
||||
..
|
||||
}) => {
|
||||
return new_full_impl::<BspProvider, RocksDbStorageLayer, Runtime, RuntimeApi, N>(
|
||||
config,
|
||||
eth_config,
|
||||
Some(provider_options),
|
||||
Some(role_options),
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
(&ProviderType::Msp, &StorageLayer::Memory) => {
|
||||
RoleOptions::Provider(ProviderOptions {
|
||||
provider_type: ProviderType::Msp,
|
||||
storage_layer: StorageLayer::Memory,
|
||||
..
|
||||
}) => {
|
||||
return new_full_impl::<MspProvider, InMemoryStorageLayer, Runtime, RuntimeApi, N>(
|
||||
config,
|
||||
eth_config,
|
||||
Some(provider_options),
|
||||
Some(role_options),
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
(&ProviderType::Msp, &StorageLayer::RocksDB) => {
|
||||
RoleOptions::Provider(ProviderOptions {
|
||||
provider_type: ProviderType::Msp,
|
||||
storage_layer: StorageLayer::RocksDB,
|
||||
..
|
||||
}) => {
|
||||
return new_full_impl::<MspProvider, RocksDbStorageLayer, Runtime, RuntimeApi, N>(
|
||||
config,
|
||||
eth_config,
|
||||
Some(provider_options),
|
||||
Some(role_options),
|
||||
indexer_options,
|
||||
sealing,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
RoleOptions::Fisherman(FishermanOptions { .. }) => {
|
||||
return new_full_impl::<FishermanRole, NoStorageLayer, Runtime, RuntimeApi, N>(
|
||||
config,
|
||||
eth_config,
|
||||
Some(role_options),
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -1049,115 +1055,28 @@ where
|
|||
eth_config,
|
||||
None,
|
||||
indexer_options,
|
||||
fisherman_options,
|
||||
sealing,
|
||||
)
|
||||
.await;
|
||||
};
|
||||
}
|
||||
|
||||
/// Storage Hub
|
||||
//╔═══════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
|
||||
//║ StorageHub Client Setup Utilities ║
|
||||
//╚═══════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
|
||||
|
||||
// Initialize the StorageHubBuilder for the StorageHub node.
|
||||
async fn init_sh_builder<R, S, Runtime: StorageEnableRuntime>(
|
||||
provider_options: &Option<ProviderOptions>,
|
||||
task_manager: &TaskManager,
|
||||
file_transfer_request_protocol: Option<(ProtocolName, Receiver<IncomingRequest>)>,
|
||||
network: Arc<dyn NetworkService>,
|
||||
keystore: KeystorePtr,
|
||||
client: Arc<FullClient<Runtime::RuntimeApi>>,
|
||||
indexer_options: Option<IndexerOptions>,
|
||||
) -> Result<
|
||||
(
|
||||
Option<StorageHubBuilder<R, S, Runtime>>,
|
||||
Option<
|
||||
StorageHubClientRpcConfig<
|
||||
<(R, S) as ShNodeType<Runtime>>::FL,
|
||||
<(R, S) as ShNodeType<Runtime>>::FSH,
|
||||
Runtime,
|
||||
>,
|
||||
>,
|
||||
),
|
||||
sc_service::Error,
|
||||
>
|
||||
where
|
||||
R: ShRole,
|
||||
S: ShStorageLayer,
|
||||
(R, S): ShNodeType<Runtime>,
|
||||
StorageHubBuilder<R, S, Runtime>: StorageLayerBuilder,
|
||||
{
|
||||
let maybe_indexer_db_pool =
|
||||
configure_and_spawn_indexer::<Runtime>(&indexer_options, &task_manager, client.clone())
|
||||
.await?;
|
||||
|
||||
if let Some(provider_options) = provider_options {
|
||||
// Start building the StorageHubHandler, if running as a provider.
|
||||
let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), "sh-builder");
|
||||
let mut storage_hub_builder = StorageHubBuilder::<R, S, Runtime>::new(task_spawner);
|
||||
|
||||
// Setup and spawn the File Transfer Service.
|
||||
let (file_transfer_request_protocol_name, file_transfer_request_receiver) =
|
||||
file_transfer_request_protocol
|
||||
.expect("FileTransfer request protocol should already be initialised.");
|
||||
|
||||
storage_hub_builder
|
||||
.with_file_transfer(
|
||||
file_transfer_request_receiver,
|
||||
file_transfer_request_protocol_name,
|
||||
network.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Setup the `ShStorageLayer` and additional configuration parameters.
|
||||
storage_hub_builder
|
||||
.setup_storage_layer(provider_options.storage_path.clone())
|
||||
.with_capacity_config(Some(CapacityConfig::new(
|
||||
provider_options
|
||||
.max_storage_capacity
|
||||
.unwrap_or_default()
|
||||
.saturated_into(),
|
||||
provider_options
|
||||
.jump_capacity
|
||||
.unwrap_or_default()
|
||||
.saturated_into(),
|
||||
)));
|
||||
|
||||
storage_hub_builder.with_msp_charge_fees_config(provider_options.msp_charge_fees.clone());
|
||||
storage_hub_builder.with_msp_move_bucket_config(provider_options.msp_move_bucket.clone());
|
||||
storage_hub_builder.with_bsp_upload_file_config(provider_options.bsp_upload_file.clone());
|
||||
storage_hub_builder.with_bsp_move_bucket_config(provider_options.bsp_move_bucket.clone());
|
||||
storage_hub_builder.with_bsp_charge_fees_config(provider_options.bsp_charge_fees.clone());
|
||||
storage_hub_builder.with_bsp_submit_proof_config(provider_options.bsp_submit_proof.clone());
|
||||
|
||||
// Setup specific configuration for the MSP node.
|
||||
if provider_options.provider_type == ProviderType::Msp {
|
||||
storage_hub_builder
|
||||
.with_notify_period(provider_options.msp_charging_period)
|
||||
.with_indexer_db_pool(maybe_indexer_db_pool);
|
||||
}
|
||||
|
||||
if let Some(c) = &provider_options.blockchain_service {
|
||||
storage_hub_builder.with_blockchain_service_config(c.clone());
|
||||
}
|
||||
|
||||
// Get the RPC configuration to use for this StorageHub node client.
|
||||
let storage_hub_client_rpc_config =
|
||||
storage_hub_builder.create_rpc_config(keystore, provider_options.rpc_config.clone());
|
||||
|
||||
return Ok((
|
||||
Some(storage_hub_builder),
|
||||
Some(storage_hub_client_rpc_config),
|
||||
));
|
||||
};
|
||||
|
||||
Ok((None, None))
|
||||
/// Helper function to setup database pool
|
||||
async fn setup_database_pool(database_url: String) -> Result<DbPool, sc_service::Error> {
|
||||
shc_indexer_db::setup_db_pool(database_url)
|
||||
.await
|
||||
.map_err(|e| sc_service::Error::Application(Box::new(e)))
|
||||
}
|
||||
|
||||
/// Configure and spawn the indexer service.
|
||||
async fn configure_and_spawn_indexer<Runtime: StorageEnableRuntime>(
|
||||
indexer_options: &Option<IndexerOptions>,
|
||||
task_manager: &TaskManager,
|
||||
client: Arc<FullClient<<Runtime as StorageEnableRuntime>::RuntimeApi>>,
|
||||
client: Arc<StorageEnableClient<Runtime>>,
|
||||
) -> Result<Option<DbPool>, sc_service::Error> {
|
||||
let indexer_options = match indexer_options {
|
||||
Some(config) => config,
|
||||
|
|
@ -1165,12 +1084,15 @@ async fn configure_and_spawn_indexer<Runtime: StorageEnableRuntime>(
|
|||
};
|
||||
|
||||
// Setup database pool
|
||||
let db_pool = shc_indexer_db::setup_db_pool(indexer_options.database_url.clone())
|
||||
.await
|
||||
.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
|
||||
let db_pool = setup_database_pool(indexer_options.database_url.clone()).await?;
|
||||
|
||||
info!(
|
||||
"📊 Starting Indexer service (mode: {:?})",
|
||||
indexer_options.indexer_mode
|
||||
);
|
||||
|
||||
let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), "indexer-service");
|
||||
shc_indexer_service::spawn_indexer_service::<Runtime>(
|
||||
spawn_indexer_service::<Runtime>(
|
||||
&task_spawner,
|
||||
client.clone(),
|
||||
db_pool.clone(),
|
||||
|
|
@ -1181,6 +1103,169 @@ async fn configure_and_spawn_indexer<Runtime: StorageEnableRuntime>(
|
|||
Ok(Some(db_pool))
|
||||
}
|
||||
|
||||
/// Initialize the StorageHub builder with configured services based on the node's role.
|
||||
///
|
||||
/// If `indexer_options` is provided, spawns the indexer service regardless of role configuration.
|
||||
/// The indexer service is decoupled from the role system and can run standalone.
|
||||
///
|
||||
/// Returns `None` if no role is configured (e.g., standalone indexer mode).
|
||||
async fn init_sh_builder<R, S, Runtime: StorageEnableRuntime>(
|
||||
role_options: &Option<RoleOptions>,
|
||||
indexer_options: &Option<IndexerOptions>,
|
||||
task_manager: &TaskManager,
|
||||
file_transfer_request_protocol: Option<(ProtocolName, Receiver<IncomingRequest>)>,
|
||||
network: Arc<dyn NetworkService>,
|
||||
keystore: KeystorePtr,
|
||||
client: Arc<StorageEnableClient<Runtime>>,
|
||||
) -> Result<
|
||||
Option<(
|
||||
StorageHubBuilder<R, S, Runtime>,
|
||||
StorageHubClientRpcConfig<
|
||||
<(R, S) as ShNodeType<Runtime>>::FL,
|
||||
<(R, S) as ShNodeType<Runtime>>::FSH,
|
||||
Runtime,
|
||||
>,
|
||||
)>,
|
||||
sc_service::Error,
|
||||
>
|
||||
where
|
||||
R: ShRole,
|
||||
S: ShStorageLayer,
|
||||
(R, S): ShNodeType<Runtime>,
|
||||
StorageHubBuilder<R, S, Runtime>: StorageLayerBuilder,
|
||||
{
|
||||
// Spawn indexer service if enabled. Runs before role check to allow standalone operation.
|
||||
let maybe_indexer_db_pool =
|
||||
configure_and_spawn_indexer::<Runtime>(&indexer_options, &task_manager, client.clone())
|
||||
.await?;
|
||||
|
||||
let role_options = match role_options {
|
||||
Some(role) => role,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let task_spawner_name = match role_options {
|
||||
RoleOptions::Provider(ProviderOptions {
|
||||
provider_type: ProviderType::Msp,
|
||||
..
|
||||
}) => "msp-service",
|
||||
RoleOptions::Provider(ProviderOptions {
|
||||
provider_type: ProviderType::Bsp,
|
||||
..
|
||||
}) => "bsp-service",
|
||||
RoleOptions::Fisherman(_) => "fisherman-service",
|
||||
};
|
||||
let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), task_spawner_name);
|
||||
let mut builder = StorageHubBuilder::<R, S, Runtime>::new(task_spawner);
|
||||
|
||||
// Setup file transfer service (common to all roles)
|
||||
let (file_transfer_request_protocol_name, file_transfer_request_receiver) =
|
||||
file_transfer_request_protocol
|
||||
.expect("FileTransfer request protocol should already be initialised.");
|
||||
|
||||
builder
|
||||
.with_file_transfer(
|
||||
file_transfer_request_receiver,
|
||||
file_transfer_request_protocol_name,
|
||||
network.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Role-specific configuration
|
||||
let rpc_config = match role_options {
|
||||
RoleOptions::Provider(ProviderOptions {
|
||||
rpc_config,
|
||||
provider_type,
|
||||
storage_path,
|
||||
max_storage_capacity,
|
||||
jump_capacity,
|
||||
msp_charging_period,
|
||||
msp_charge_fees,
|
||||
msp_move_bucket,
|
||||
bsp_upload_file,
|
||||
bsp_move_bucket,
|
||||
bsp_charge_fees,
|
||||
bsp_submit_proof,
|
||||
blockchain_service,
|
||||
..
|
||||
}) => {
|
||||
info!(
|
||||
"Starting as a Storage Provider. Storage path: {:?}, Max storage capacity: {:?}, Jump capacity: {:?}, MSP charging period: {:?}",
|
||||
storage_path, max_storage_capacity, jump_capacity, msp_charging_period,
|
||||
);
|
||||
|
||||
// Setup the storage layer and capacity config
|
||||
builder
|
||||
.setup_storage_layer(storage_path.clone())
|
||||
.with_capacity_config(Some(CapacityConfig::new(
|
||||
max_storage_capacity.unwrap_or_default().saturated_into(),
|
||||
jump_capacity.unwrap_or_default().saturated_into(),
|
||||
)));
|
||||
|
||||
// Configure provider-specific options
|
||||
builder.with_msp_charge_fees_config(msp_charge_fees.clone());
|
||||
builder.with_msp_move_bucket_config(msp_move_bucket.clone());
|
||||
builder.with_bsp_upload_file_config(bsp_upload_file.clone());
|
||||
builder.with_bsp_move_bucket_config(bsp_move_bucket.clone());
|
||||
builder.with_bsp_charge_fees_config(bsp_charge_fees.clone());
|
||||
builder.with_bsp_submit_proof_config(bsp_submit_proof.clone());
|
||||
|
||||
// MSP-specific configuration
|
||||
if *provider_type == ProviderType::Msp {
|
||||
builder
|
||||
.with_notify_period(*msp_charging_period)
|
||||
.with_indexer_db_pool(maybe_indexer_db_pool);
|
||||
}
|
||||
|
||||
if let Some(c) = blockchain_service {
|
||||
let peer_id = network.local_peer_id().to_bytes();
|
||||
let mut c = c.clone();
|
||||
c.peer_id = Some(peer_id);
|
||||
builder.with_blockchain_service_config(c);
|
||||
}
|
||||
|
||||
rpc_config.clone()
|
||||
}
|
||||
RoleOptions::Fisherman(fisherman_options) => {
|
||||
// Validate configuration compatibility with indexer
|
||||
if let Some(indexer_cfg) = indexer_options {
|
||||
if indexer_cfg.indexer_mode == shc_indexer_service::IndexerMode::Lite {
|
||||
return Err(sc_service::Error::Other(
|
||||
"Fisherman service cannot run with 'lite' indexer mode. Please use either 'full' or 'fishing' mode."
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Setup database pool for fisherman
|
||||
let db_pool = setup_database_pool(fisherman_options.database_url.clone()).await?;
|
||||
|
||||
info!(
|
||||
"🎣 Starting as a Fisherman. Database URL: {}",
|
||||
fisherman_options.database_url
|
||||
);
|
||||
|
||||
// Setup the storage layer (ephemeral for fisherman)
|
||||
builder.setup_storage_layer(None);
|
||||
|
||||
// Set the indexer db pool
|
||||
builder.with_indexer_db_pool(Some(db_pool));
|
||||
|
||||
// Spawn the fisherman service
|
||||
builder
|
||||
.with_fisherman(client.clone(), &fisherman_options)
|
||||
.await;
|
||||
|
||||
RpcConfig::default()
|
||||
}
|
||||
};
|
||||
|
||||
// Create RPC configuration
|
||||
let storage_hub_client_rpc_config = builder.create_rpc_config(keystore, rpc_config);
|
||||
|
||||
Ok(Some((builder, storage_hub_client_rpc_config)))
|
||||
}
|
||||
|
||||
/// Finish the StorageHubBuilder and run the tasks.
|
||||
async fn finish_sh_builder_and_run_tasks<R, S, Runtime: StorageEnableRuntime>(
|
||||
mut sh_builder: StorageHubBuilder<R, S, Runtime>,
|
||||
|
|
@ -1221,85 +1306,3 @@ where
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn configure_and_spawn_fisherman<Runtime: StorageEnableRuntime>(
|
||||
fisherman_options: &Option<FishermanOptions>,
|
||||
indexer_config: &Option<IndexerOptions>,
|
||||
task_manager: &TaskManager,
|
||||
client: Arc<StorageEnableClient<Runtime>>,
|
||||
keystore: KeystorePtr,
|
||||
rpc_handlers: Arc<RpcHandlers>,
|
||||
rocksdb_root_path: impl Into<PathBuf>,
|
||||
network: Arc<dyn NetworkService>,
|
||||
) -> Result<Option<DbPool>, sc_service::Error> {
|
||||
let fisherman_options = match fisherman_options {
|
||||
Some(fc) => fc,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
// Validate configuration compatibility with indexer if both are enabled
|
||||
if let Some(indexer_cfg) = indexer_config {
|
||||
if indexer_cfg.indexer_mode == shc_indexer_service::IndexerMode::Lite {
|
||||
return Err(sc_service::Error::Other(
|
||||
"Fisherman service cannot run with 'lite' indexer mode. Please use either 'full' or 'fishing' mode."
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Setup database pool for fisherman
|
||||
let db_pool = setup_database_pool(fisherman_options.database_url.clone()).await?;
|
||||
|
||||
// Build StorageHubHandler for fisherman tasks
|
||||
let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), "fisherman-service");
|
||||
let mut fisherman_builder =
|
||||
StorageHubBuilder::<FishermanRole, NoStorageLayer, Runtime>::new(task_spawner.clone());
|
||||
|
||||
// Convert rocksdb_root_path to PathBuf first
|
||||
let rocksdb_path: PathBuf = rocksdb_root_path.into();
|
||||
|
||||
// Set the indexer db pool
|
||||
fisherman_builder.with_indexer_db_pool(Some(db_pool.clone()));
|
||||
|
||||
// Spawn the fisherman service
|
||||
fisherman_builder
|
||||
.with_fisherman(client.clone(), &fisherman_options)
|
||||
.await;
|
||||
|
||||
// All variables below are not needed for the fisherman service to operate but required by the StorageHubHandler
|
||||
// TODO: Refactor this once we have a proper setup to support role based StorageHubHandler builder
|
||||
fisherman_builder.setup_storage_layer(None);
|
||||
|
||||
// Setup blockchain service
|
||||
fisherman_builder
|
||||
.with_blockchain(
|
||||
client.clone(),
|
||||
keystore,
|
||||
rpc_handlers,
|
||||
rocksdb_path.clone(),
|
||||
false, // Not in maintenance mode
|
||||
)
|
||||
.await;
|
||||
|
||||
fisherman_builder.with_peer_manager(rocksdb_path);
|
||||
let (_sender, receiver) = async_channel::bounded(1);
|
||||
let protocol_name = ProtocolName::from("/storage-hub/file-transfer/1");
|
||||
fisherman_builder
|
||||
.with_file_transfer(receiver, protocol_name, network)
|
||||
.await;
|
||||
|
||||
// Build the handler
|
||||
let mut fisherman_handler = fisherman_builder.build();
|
||||
|
||||
// Run fisherman tasks
|
||||
fisherman_handler.run_tasks().await;
|
||||
|
||||
Ok(Some(db_pool))
|
||||
}
|
||||
|
||||
/// Helper function to setup database pool
|
||||
async fn setup_database_pool(database_url: String) -> Result<DbPool, sc_service::Error> {
|
||||
shc_indexer_db::setup_db_pool(database_url)
|
||||
.await
|
||||
.map_err(|e| sc_service::Error::Application(Box::new(e)))
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue