diff --git a/operator/pallets/external-validator-slashes/src/lib.rs b/operator/pallets/external-validator-slashes/src/lib.rs index b24b85db..19da25c3 100644 --- a/operator/pallets/external-validator-slashes/src/lib.rs +++ b/operator/pallets/external-validator-slashes/src/lib.rs @@ -31,7 +31,7 @@ extern crate alloc; use pallet_external_validators::apply; use snowbridge_outbound_queue_primitives::SendError; use { - alloc::{collections::vec_deque::VecDeque, string::String, vec, vec::Vec}, + alloc::{string::String, vec, vec::Vec}, frame_support::{pallet_prelude::*, traits::DefensiveSaturating}, frame_system::pallet_prelude::*, log::log, @@ -132,10 +132,21 @@ pub mod pallet { }, /// The slashes message was sent correctly. SlashesMessageSent { message_id: H256 }, + /// The slashes message failed to send and the batch was moved to the back + /// of the queue for retry. + SlashesMessageSendFailed { era: EraIndex, count: u32 }, + /// A queued slashes batch was retried manually and sent successfully. + SlashesMessageRetried { + message_id: H256, + era: EraIndex, + count: u32, + }, /// We injected a slash SlashInjected { slash_id: T::SlashId, era: u32 }, /// Number of slashes processed SlashAddedToQueue { number: u32, era: u32 }, + /// The unsent queue is full; this slash era could not be enqueued. + UnsentQueueFull { era: EraIndex }, } #[pallet::config] @@ -199,6 +210,9 @@ pub mod pallet { /// The weight information of this pallet. type WeightInfo: WeightInfo; + + /// Origin for governance calls such as retrying an unsent slash batch. + type GovernanceOrigin: EnsureOrigin; } #[pallet::error] @@ -226,6 +240,10 @@ pub mod pallet { /// No PendingOffenceKind found for (session, validator) — offence was not /// reported through EquivocationReportWrapper, so the offence kind is unknown. MissingOffenceKind, + /// The specified era is not in the unsent slash queue. + EraNotInUnsentQueue, + /// The message delivery still failed on retry. + MessageSendFailed, } #[apply(derive_storage_traits)] @@ -269,12 +287,26 @@ pub mod pallet { pub type Slashes = StorageMap<_, Twox64Concat, EraIndex, Vec>, ValueQuery>; - /// All unreported slashes that will be processed in the future. + /// Maximum number of unsent slash batches in the retry ring buffer. + pub const UNSENT_QUEUE_CAPACITY: u32 = 64; + + /// Ring buffer of slash batches whose outbound message still needs to be sent. + /// Each slot stores the original slash era together with a bounded-size batch + /// of slash records. Retries keep the original era so the outbound message id + /// remains stable across later blocks and eras. #[pallet::storage] #[pallet::unbounded] - #[pallet::getter(fn unreported_slashes)] - pub type UnreportedSlashesQueue = - StorageValue<_, VecDeque>, ValueQuery>; + pub type UnsentSlashBatch = + StorageMap<_, Twox64Concat, u32, (EraIndex, Vec>)>; + + /// Ring buffer head: next slot to be processed by `on_initialize`. + #[pallet::storage] + pub type UnsentSlashHead = StorageValue<_, u32, ValueQuery>; + + /// Ring buffer tail: next slot to write a new entry into. + /// When head == tail the buffer is empty. + #[pallet::storage] + pub type UnsentSlashTail = StorageValue<_, u32, ValueQuery>; // Turns slashing on or off #[pallet::storage] @@ -415,6 +447,44 @@ pub mod pallet { Ok(()) } + #[pallet::call_index(2)] + #[pallet::weight(T::WeightInfo::retry_unsent_slash_era())] + pub fn retry_unsent_slash_era(origin: OriginFor, era_index: EraIndex) -> DispatchResult { + T::GovernanceOrigin::ensure_origin(origin)?; + + let head = UnsentSlashHead::::get(); + let tail = UnsentSlashTail::::get(); + let mut found = None; + let mut slot = head; + while slot != tail { + if let Some(entry @ (idx, _)) = UnsentSlashBatch::::get(slot) { + if idx == era_index { + found = Some((slot, entry)); + break; + } + } + slot = (slot + 1) % UNSENT_QUEUE_CAPACITY; + } + + let (slot, (era, slashes)) = found.ok_or(Error::::EraNotInUnsentQueue)?; + let count = slashes.len() as u32; + let slashes_to_send = slashes + .iter() + .map(Self::slash_to_send_data) + .collect::>(); + let message_id = Self::send_slashes_message(&slashes_to_send, era) + .ok_or(Error::::MessageSendFailed)?; + + Self::unsent_queue_remove_slot(slot); + Self::deposit_event(Event::::SlashesMessageRetried { + message_id, + era, + count, + }); + + Ok(()) + } + #[pallet::call_index(3)] #[pallet::weight(T::WeightInfo::set_slashing_mode())] pub fn set_slashing_mode(origin: OriginFor, mode: SlashingModeOption) -> DispatchResult { @@ -429,12 +499,12 @@ pub mod pallet { #[pallet::hooks] impl Hooks> for Pallet { fn on_initialize(_n: BlockNumberFor) -> Weight { - let processed = Self::process_slashes_queue(T::QueuedSlashesProcessedPerBlock::get()); - - if let Some(p) = processed { - T::WeightInfo::process_slashes_queue(p) - } else { - T::WeightInfo::process_slashes_queue(0) + match Self::process_slashes_queue() { + ProcessSlashesQueueOutcome::Empty => T::WeightInfo::process_slashes_queue(0), + ProcessSlashesQueueOutcome::Sent(count) + | ProcessSlashesQueueOutcome::Requeued(count) => { + T::WeightInfo::process_slashes_queue(count) + } } } } @@ -655,70 +725,65 @@ where impl Pallet { fn add_era_slashes_to_queue(active_era: EraIndex) { - let mut slashes: VecDeque<_> = Slashes::::get(active_era).into(); + let slashes = Slashes::::get(active_era); + if slashes.is_empty() { + return; + } - let len = slashes.len(); + let batch_size = T::QueuedSlashesProcessedPerBlock::get().max(1) as usize; + let mut enqueued = 0u32; - UnreportedSlashesQueue::::mutate(|queue| queue.append(&mut slashes)); + for batch in slashes.chunks(batch_size) { + if Self::unsent_queue_push((active_era, batch.to_vec())) { + enqueued = enqueued.saturating_add(batch.len() as u32); + } else { + log::warn!( + target: "ext_validators_slashes", + "Unsent slash queue full, cannot enqueue era {active_era}", + ); + Self::deposit_event(Event::::UnsentQueueFull { era: active_era }); + break; + } + } - if len > 0 { + if enqueued > 0 { Self::deposit_event(Event::::SlashAddedToQueue { - number: len as u32, + number: enqueued, era: active_era, }); } } - /// Returns number of slashes that were sent to ethereum. - fn process_slashes_queue(amount: u32) -> Option { - let mut slashes_to_send: Vec> = vec![]; - let era_index = T::EraIndexProvider::active_era().index; + fn slash_to_send_data(slash: &Slash) -> SlashData { + // Keep the original slash batch intact until delivery succeeds so failed + // batches can be moved to the back of the queue instead of being dropped. + let max_wad = T::MaxSlashWad::get(); + let wad_to_slash = (slash.percentage.deconstruct() as u128) + .saturating_mul(max_wad) + .checked_div(1_000_000_000u128) + .unwrap_or(0) + .min(max_wad); - UnreportedSlashesQueue::::mutate(|queue| { - for _ in 0..amount { - let Some(slash) = queue.pop_front() else { - // no more slashes to process in the queue - break; - }; - - // Convert Perbill to EigenLayer WAD format with linear mapping. - // Perbill(100%) → MaxSlashWad (e.g. 5% WAD = 5e16). - // Formula: perbill_inner * MaxSlashWad / 1e9 - // Clamp to MaxSlashWad to guard against overflow if governance - // sets MaxSlashWad high enough for saturating_mul to hit u128::MAX. - let max_wad = T::MaxSlashWad::get(); - let wad_to_slash = (slash.percentage.deconstruct() as u128) - .saturating_mul(max_wad) - .checked_div(1_000_000_000u128) - .unwrap_or(0) - .min(max_wad); - - slashes_to_send.push(SlashData { - validator: slash.validator, - wad_to_slash, - description: slash.offence_kind.to_description(), - }); - } - }); - - if slashes_to_send.is_empty() { - return None; + SlashData { + validator: slash.validator.clone(), + wad_to_slash, + description: slash.offence_kind.to_description(), } + } - let slashes_count = slashes_to_send.len() as u32; + fn send_slashes_message( + slashes_to_send: &[SlashData], + era_index: EraIndex, + ) -> Option { + let outbound = + T::SendMessage::build(&slashes_to_send.to_vec(), era_index).or_else(|| { + log::warn!(target: "ext_validators_slashes", "Failed to build outbound message"); + None + })?; - let outbound = match T::SendMessage::build(&slashes_to_send, era_index) { - Some(send_msg) => send_msg, - None => { - log::error!(target: "ext_validators_slashes", "Failed to build outbound message"); - return None; - } - }; - - // Validate and deliver the message let ticket = T::SendMessage::validate(outbound) .map_err(|e| { - log::error!( + log::warn!( target: "ext_validators_slashes", "Failed to validate outbound message: {:?}", e @@ -726,20 +791,126 @@ impl Pallet { }) .ok()?; - let message_id = T::SendMessage::deliver(ticket) + T::SendMessage::deliver(ticket) .map_err(|e| { - log::error!( + log::warn!( target: "ext_validators_slashes", "Failed to deliver outbound message: {:?}", e ); }) - .ok()?; - - Self::deposit_event(Event::::SlashesMessageSent { message_id }); - - Some(slashes_count) + .ok() } + + #[allow(dead_code)] + pub(crate) fn unsent_queue_is_empty() -> bool { + UnsentSlashHead::::get() == UnsentSlashTail::::get() + } + + #[allow(dead_code)] + pub(crate) fn unsent_queue_len() -> u32 { + let head = UnsentSlashHead::::get(); + let tail = UnsentSlashTail::::get(); + tail.wrapping_sub(head) % UNSENT_QUEUE_CAPACITY + } + + pub(crate) fn unsent_queue_push( + entry: (EraIndex, Vec>), + ) -> bool { + let head = UnsentSlashHead::::get(); + let tail = UnsentSlashTail::::get(); + let next_tail = (tail + 1) % UNSENT_QUEUE_CAPACITY; + if next_tail == head { + return false; + } + + UnsentSlashBatch::::insert(tail, entry); + UnsentSlashTail::::put(next_tail); + true + } + + fn unsent_queue_remove_slot(slot: u32) { + let tail = UnsentSlashTail::::get(); + let mut cur = slot; + loop { + let next = (cur + 1) % UNSENT_QUEUE_CAPACITY; + if next == tail { + break; + } + + if let Some(entry) = UnsentSlashBatch::::get(next) { + UnsentSlashBatch::::insert(cur, entry); + } + cur = next; + } + + UnsentSlashBatch::::remove(cur); + let new_tail = if tail == 0 { + UNSENT_QUEUE_CAPACITY - 1 + } else { + tail - 1 + }; + UnsentSlashTail::::put(new_tail); + + let head = UnsentSlashHead::::get(); + if head == tail { + UnsentSlashHead::::put(new_tail); + } + } + + /// Retry contract shared with rewards: + /// - process the current head batch, + /// - if send succeeds, remove it from the queue, + /// - if send fails, move the same batch to the back so later slash batches can progress. + pub(crate) fn process_slashes_queue() -> ProcessSlashesQueueOutcome { + let head = UnsentSlashHead::::get(); + let tail = UnsentSlashTail::::get(); + + if head == tail { + return ProcessSlashesQueueOutcome::Empty; + } + + let Some((era_index, slashes)) = UnsentSlashBatch::::get(head) else { + UnsentSlashHead::::put((head + 1) % UNSENT_QUEUE_CAPACITY); + return ProcessSlashesQueueOutcome::Empty; + }; + + let slashes_count = slashes.len() as u32; + let slashes_to_send = slashes + .iter() + .map(Self::slash_to_send_data) + .collect::>(); + + match Self::send_slashes_message(&slashes_to_send, era_index) { + Some(message_id) => { + UnsentSlashBatch::::remove(head); + UnsentSlashHead::::put((head + 1) % UNSENT_QUEUE_CAPACITY); + Self::deposit_event(Event::::SlashesMessageSent { message_id }); + ProcessSlashesQueueOutcome::Sent(slashes_count) + } + None => { + UnsentSlashBatch::::remove(head); + UnsentSlashHead::::put((head + 1) % UNSENT_QUEUE_CAPACITY); + UnsentSlashBatch::::insert(tail, (era_index, slashes)); + UnsentSlashTail::::put((tail + 1) % UNSENT_QUEUE_CAPACITY); + log::warn!( + target: "ext_validators_slashes", + "Failed to send {slashes_count} slash entries for era {era_index}, moved batch to back of queue", + ); + Self::deposit_event(Event::::SlashesMessageSendFailed { + era: era_index, + count: slashes_count, + }); + ProcessSlashesQueueOutcome::Requeued(slashes_count) + } + } + } +} + +pub(crate) enum ProcessSlashesQueueOutcome { + Empty, + Sent(u32), + Requeued(u32), } /// A pending slash record. The value of the slash has been computed but not applied yet, diff --git a/operator/pallets/external-validator-slashes/src/mock.rs b/operator/pallets/external-validator-slashes/src/mock.rs index efe2c509..690f887e 100644 --- a/operator/pallets/external-validator-slashes/src/mock.rs +++ b/operator/pallets/external-validator-slashes/src/mock.rs @@ -134,7 +134,9 @@ thread_local! { pub static SENT_ETHEREUM_MESSAGE_NONCE: RefCell = const { RefCell::new(0) }; pub static MOCK_REPORT_OFFENCE_SHOULD_FAIL: RefCell = const { RefCell::new(false) }; pub static MOCK_REPORT_OFFENCE_CALLED: RefCell = const { RefCell::new(false) }; + pub static MOCK_SEND_MESSAGE_SHOULD_FAIL: RefCell = const { RefCell::new(false) }; pub static LAST_SENT_SLASHES: RefCell>> = RefCell::new(Vec::new()); + pub static LAST_BUILT_ERA: RefCell> = const { RefCell::new(None) }; } impl MockEraIndexProvider { @@ -221,19 +223,32 @@ impl MockOkOutboundQueue { pub fn last_sent_slashes() -> Vec> { LAST_SENT_SLASHES.with(|r| r.borrow().clone()) } + + pub fn last_built_era() -> Option { + LAST_BUILT_ERA.with(|r| *r.borrow()) + } + + pub fn set_should_fail(fail: bool) { + MOCK_SEND_MESSAGE_SHOULD_FAIL.with(|r| *r.borrow_mut() = fail); + } } impl crate::SendMessage for MockOkOutboundQueue { type Ticket = (); type Message = (); - fn build(slashes: &Vec>, _: u32) -> Option { + fn build(slashes: &Vec>, era: u32) -> Option { LAST_SENT_SLASHES.with(|r| *r.borrow_mut() = slashes.clone()); + LAST_BUILT_ERA.with(|r| *r.borrow_mut() = Some(era)); Some(()) } fn validate(_: Self::Ticket) -> Result { Ok(()) } fn deliver(_: Self::Ticket) -> Result { - Ok(H256::zero()) + if MOCK_SEND_MESSAGE_SHOULD_FAIL.with(|r| *r.borrow()) { + Err(SendError::MessageTooLarge) + } else { + Ok(H256::zero()) + } } } @@ -270,6 +285,7 @@ impl external_validator_slashes::Config for Test { type QueuedSlashesProcessedPerBlock = ConstU32<20>; type WeightInfo = (); type SendMessage = MockOkOutboundQueue; + type GovernanceOrigin = frame_system::EnsureRoot; } pub struct FullIdentificationOf; @@ -285,6 +301,9 @@ impl pallet_session::historical::Config for Test { } // Build genesis storage according to the mock runtime. pub fn new_test_ext() -> sp_io::TestExternalities { + MOCK_SEND_MESSAGE_SHOULD_FAIL.with(|r| *r.borrow_mut() = false); + LAST_SENT_SLASHES.with(|r| r.borrow_mut().clear()); + LAST_BUILT_ERA.with(|r| *r.borrow_mut() = None); system::GenesisConfig::::default() .build_storage() .unwrap() diff --git a/operator/runtime/mainnet/src/configs/mod.rs b/operator/runtime/mainnet/src/configs/mod.rs index 48f1aad9..11b0673c 100644 --- a/operator/runtime/mainnet/src/configs/mod.rs +++ b/operator/runtime/mainnet/src/configs/mod.rs @@ -1731,6 +1731,7 @@ impl pallet_external_validator_slashes::Config for Runtime { type QueuedSlashesProcessedPerBlock = ConstU32<10>; type WeightInfo = mainnet_weights::pallet_external_validator_slashes::WeightInfo; type SendMessage = SlashesSendAdapter; + type GovernanceOrigin = EnsureRootWithSuccess; } parameter_types! { diff --git a/operator/runtime/stagenet/src/configs/mod.rs b/operator/runtime/stagenet/src/configs/mod.rs index 12b4a960..13952fb6 100644 --- a/operator/runtime/stagenet/src/configs/mod.rs +++ b/operator/runtime/stagenet/src/configs/mod.rs @@ -1727,6 +1727,7 @@ impl pallet_external_validator_slashes::Config for Runtime { type QueuedSlashesProcessedPerBlock = ConstU32<10>; type WeightInfo = stagenet_weights::pallet_external_validator_slashes::WeightInfo; type SendMessage = SlashesSendAdapter; + type GovernanceOrigin = EnsureRootWithSuccess; } parameter_types! { diff --git a/operator/runtime/testnet/src/configs/mod.rs b/operator/runtime/testnet/src/configs/mod.rs index 27dbc538..0b6292d0 100644 --- a/operator/runtime/testnet/src/configs/mod.rs +++ b/operator/runtime/testnet/src/configs/mod.rs @@ -1729,6 +1729,7 @@ impl pallet_external_validator_slashes::Config for Runtime { type QueuedSlashesProcessedPerBlock = ConstU32<10>; type WeightInfo = testnet_weights::pallet_external_validator_slashes::WeightInfo; type SendMessage = SlashesSendAdapter; + type GovernanceOrigin = EnsureRootWithSuccess; } parameter_types! {