mirror of
https://github.com/datahaven-xyz/datahaven
synced 2026-05-23 17:28:23 +00:00
Align slash retry handling with rewards.
Store slash retries as bounded era-tagged batches, preserve the original era across retries, add governance retry support, and downgrade expected retry-path failures to warnings.
This commit is contained in:
parent
e60363ecc3
commit
e14ebee6ec
5 changed files with 262 additions and 69 deletions
|
|
@ -31,7 +31,7 @@ extern crate alloc;
|
|||
use pallet_external_validators::apply;
|
||||
use snowbridge_outbound_queue_primitives::SendError;
|
||||
use {
|
||||
alloc::{collections::vec_deque::VecDeque, string::String, vec, vec::Vec},
|
||||
alloc::{string::String, vec, vec::Vec},
|
||||
frame_support::{pallet_prelude::*, traits::DefensiveSaturating},
|
||||
frame_system::pallet_prelude::*,
|
||||
log::log,
|
||||
|
|
@ -132,10 +132,21 @@ pub mod pallet {
|
|||
},
|
||||
/// The slashes message was sent correctly.
|
||||
SlashesMessageSent { message_id: H256 },
|
||||
/// The slashes message failed to send and the batch was moved to the back
|
||||
/// of the queue for retry.
|
||||
SlashesMessageSendFailed { era: EraIndex, count: u32 },
|
||||
/// A queued slashes batch was retried manually and sent successfully.
|
||||
SlashesMessageRetried {
|
||||
message_id: H256,
|
||||
era: EraIndex,
|
||||
count: u32,
|
||||
},
|
||||
/// We injected a slash
|
||||
SlashInjected { slash_id: T::SlashId, era: u32 },
|
||||
/// Number of slashes processed
|
||||
SlashAddedToQueue { number: u32, era: u32 },
|
||||
/// The unsent queue is full; this slash era could not be enqueued.
|
||||
UnsentQueueFull { era: EraIndex },
|
||||
}
|
||||
|
||||
#[pallet::config]
|
||||
|
|
@ -199,6 +210,9 @@ pub mod pallet {
|
|||
|
||||
/// The weight information of this pallet.
|
||||
type WeightInfo: WeightInfo;
|
||||
|
||||
/// Origin for governance calls such as retrying an unsent slash batch.
|
||||
type GovernanceOrigin: EnsureOrigin<Self::RuntimeOrigin>;
|
||||
}
|
||||
|
||||
#[pallet::error]
|
||||
|
|
@ -226,6 +240,10 @@ pub mod pallet {
|
|||
/// No PendingOffenceKind found for (session, validator) — offence was not
|
||||
/// reported through EquivocationReportWrapper, so the offence kind is unknown.
|
||||
MissingOffenceKind,
|
||||
/// The specified era is not in the unsent slash queue.
|
||||
EraNotInUnsentQueue,
|
||||
/// The message delivery still failed on retry.
|
||||
MessageSendFailed,
|
||||
}
|
||||
|
||||
#[apply(derive_storage_traits)]
|
||||
|
|
@ -269,12 +287,26 @@ pub mod pallet {
|
|||
pub type Slashes<T: Config> =
|
||||
StorageMap<_, Twox64Concat, EraIndex, Vec<Slash<T::AccountId, T::SlashId>>, ValueQuery>;
|
||||
|
||||
/// All unreported slashes that will be processed in the future.
|
||||
/// Maximum number of unsent slash batches in the retry ring buffer.
|
||||
pub const UNSENT_QUEUE_CAPACITY: u32 = 64;
|
||||
|
||||
/// Ring buffer of slash batches whose outbound message still needs to be sent.
|
||||
/// Each slot stores the original slash era together with a bounded-size batch
|
||||
/// of slash records. Retries keep the original era so the outbound message id
|
||||
/// remains stable across later blocks and eras.
|
||||
#[pallet::storage]
|
||||
#[pallet::unbounded]
|
||||
#[pallet::getter(fn unreported_slashes)]
|
||||
pub type UnreportedSlashesQueue<T: Config> =
|
||||
StorageValue<_, VecDeque<Slash<T::AccountId, T::SlashId>>, ValueQuery>;
|
||||
pub type UnsentSlashBatch<T: Config> =
|
||||
StorageMap<_, Twox64Concat, u32, (EraIndex, Vec<Slash<T::AccountId, T::SlashId>>)>;
|
||||
|
||||
/// Ring buffer head: next slot to be processed by `on_initialize`.
|
||||
#[pallet::storage]
|
||||
pub type UnsentSlashHead<T: Config> = StorageValue<_, u32, ValueQuery>;
|
||||
|
||||
/// Ring buffer tail: next slot to write a new entry into.
|
||||
/// When head == tail the buffer is empty.
|
||||
#[pallet::storage]
|
||||
pub type UnsentSlashTail<T: Config> = StorageValue<_, u32, ValueQuery>;
|
||||
|
||||
// Turns slashing on or off
|
||||
#[pallet::storage]
|
||||
|
|
@ -415,6 +447,44 @@ pub mod pallet {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[pallet::call_index(2)]
|
||||
#[pallet::weight(T::WeightInfo::retry_unsent_slash_era())]
|
||||
pub fn retry_unsent_slash_era(origin: OriginFor<T>, era_index: EraIndex) -> DispatchResult {
|
||||
T::GovernanceOrigin::ensure_origin(origin)?;
|
||||
|
||||
let head = UnsentSlashHead::<T>::get();
|
||||
let tail = UnsentSlashTail::<T>::get();
|
||||
let mut found = None;
|
||||
let mut slot = head;
|
||||
while slot != tail {
|
||||
if let Some(entry @ (idx, _)) = UnsentSlashBatch::<T>::get(slot) {
|
||||
if idx == era_index {
|
||||
found = Some((slot, entry));
|
||||
break;
|
||||
}
|
||||
}
|
||||
slot = (slot + 1) % UNSENT_QUEUE_CAPACITY;
|
||||
}
|
||||
|
||||
let (slot, (era, slashes)) = found.ok_or(Error::<T>::EraNotInUnsentQueue)?;
|
||||
let count = slashes.len() as u32;
|
||||
let slashes_to_send = slashes
|
||||
.iter()
|
||||
.map(Self::slash_to_send_data)
|
||||
.collect::<Vec<_>>();
|
||||
let message_id = Self::send_slashes_message(&slashes_to_send, era)
|
||||
.ok_or(Error::<T>::MessageSendFailed)?;
|
||||
|
||||
Self::unsent_queue_remove_slot(slot);
|
||||
Self::deposit_event(Event::<T>::SlashesMessageRetried {
|
||||
message_id,
|
||||
era,
|
||||
count,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[pallet::call_index(3)]
|
||||
#[pallet::weight(T::WeightInfo::set_slashing_mode())]
|
||||
pub fn set_slashing_mode(origin: OriginFor<T>, mode: SlashingModeOption) -> DispatchResult {
|
||||
|
|
@ -429,12 +499,12 @@ pub mod pallet {
|
|||
#[pallet::hooks]
|
||||
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
|
||||
fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
|
||||
let processed = Self::process_slashes_queue(T::QueuedSlashesProcessedPerBlock::get());
|
||||
|
||||
if let Some(p) = processed {
|
||||
T::WeightInfo::process_slashes_queue(p)
|
||||
} else {
|
||||
T::WeightInfo::process_slashes_queue(0)
|
||||
match Self::process_slashes_queue() {
|
||||
ProcessSlashesQueueOutcome::Empty => T::WeightInfo::process_slashes_queue(0),
|
||||
ProcessSlashesQueueOutcome::Sent(count)
|
||||
| ProcessSlashesQueueOutcome::Requeued(count) => {
|
||||
T::WeightInfo::process_slashes_queue(count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -655,70 +725,65 @@ where
|
|||
|
||||
impl<T: Config> Pallet<T> {
|
||||
fn add_era_slashes_to_queue(active_era: EraIndex) {
|
||||
let mut slashes: VecDeque<_> = Slashes::<T>::get(active_era).into();
|
||||
let slashes = Slashes::<T>::get(active_era);
|
||||
if slashes.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let len = slashes.len();
|
||||
let batch_size = T::QueuedSlashesProcessedPerBlock::get().max(1) as usize;
|
||||
let mut enqueued = 0u32;
|
||||
|
||||
UnreportedSlashesQueue::<T>::mutate(|queue| queue.append(&mut slashes));
|
||||
for batch in slashes.chunks(batch_size) {
|
||||
if Self::unsent_queue_push((active_era, batch.to_vec())) {
|
||||
enqueued = enqueued.saturating_add(batch.len() as u32);
|
||||
} else {
|
||||
log::warn!(
|
||||
target: "ext_validators_slashes",
|
||||
"Unsent slash queue full, cannot enqueue era {active_era}",
|
||||
);
|
||||
Self::deposit_event(Event::<T>::UnsentQueueFull { era: active_era });
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if len > 0 {
|
||||
if enqueued > 0 {
|
||||
Self::deposit_event(Event::<T>::SlashAddedToQueue {
|
||||
number: len as u32,
|
||||
number: enqueued,
|
||||
era: active_era,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns number of slashes that were sent to ethereum.
|
||||
fn process_slashes_queue(amount: u32) -> Option<u32> {
|
||||
let mut slashes_to_send: Vec<SlashData<T::AccountId>> = vec![];
|
||||
let era_index = T::EraIndexProvider::active_era().index;
|
||||
fn slash_to_send_data(slash: &Slash<T::AccountId, T::SlashId>) -> SlashData<T::AccountId> {
|
||||
// Keep the original slash batch intact until delivery succeeds so failed
|
||||
// batches can be moved to the back of the queue instead of being dropped.
|
||||
let max_wad = T::MaxSlashWad::get();
|
||||
let wad_to_slash = (slash.percentage.deconstruct() as u128)
|
||||
.saturating_mul(max_wad)
|
||||
.checked_div(1_000_000_000u128)
|
||||
.unwrap_or(0)
|
||||
.min(max_wad);
|
||||
|
||||
UnreportedSlashesQueue::<T>::mutate(|queue| {
|
||||
for _ in 0..amount {
|
||||
let Some(slash) = queue.pop_front() else {
|
||||
// no more slashes to process in the queue
|
||||
break;
|
||||
};
|
||||
|
||||
// Convert Perbill to EigenLayer WAD format with linear mapping.
|
||||
// Perbill(100%) → MaxSlashWad (e.g. 5% WAD = 5e16).
|
||||
// Formula: perbill_inner * MaxSlashWad / 1e9
|
||||
// Clamp to MaxSlashWad to guard against overflow if governance
|
||||
// sets MaxSlashWad high enough for saturating_mul to hit u128::MAX.
|
||||
let max_wad = T::MaxSlashWad::get();
|
||||
let wad_to_slash = (slash.percentage.deconstruct() as u128)
|
||||
.saturating_mul(max_wad)
|
||||
.checked_div(1_000_000_000u128)
|
||||
.unwrap_or(0)
|
||||
.min(max_wad);
|
||||
|
||||
slashes_to_send.push(SlashData {
|
||||
validator: slash.validator,
|
||||
wad_to_slash,
|
||||
description: slash.offence_kind.to_description(),
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
if slashes_to_send.is_empty() {
|
||||
return None;
|
||||
SlashData {
|
||||
validator: slash.validator.clone(),
|
||||
wad_to_slash,
|
||||
description: slash.offence_kind.to_description(),
|
||||
}
|
||||
}
|
||||
|
||||
let slashes_count = slashes_to_send.len() as u32;
|
||||
fn send_slashes_message(
|
||||
slashes_to_send: &[SlashData<T::AccountId>],
|
||||
era_index: EraIndex,
|
||||
) -> Option<H256> {
|
||||
let outbound =
|
||||
T::SendMessage::build(&slashes_to_send.to_vec(), era_index).or_else(|| {
|
||||
log::warn!(target: "ext_validators_slashes", "Failed to build outbound message");
|
||||
None
|
||||
})?;
|
||||
|
||||
let outbound = match T::SendMessage::build(&slashes_to_send, era_index) {
|
||||
Some(send_msg) => send_msg,
|
||||
None => {
|
||||
log::error!(target: "ext_validators_slashes", "Failed to build outbound message");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
// Validate and deliver the message
|
||||
let ticket = T::SendMessage::validate(outbound)
|
||||
.map_err(|e| {
|
||||
log::error!(
|
||||
log::warn!(
|
||||
target: "ext_validators_slashes",
|
||||
"Failed to validate outbound message: {:?}",
|
||||
e
|
||||
|
|
@ -726,20 +791,126 @@ impl<T: Config> Pallet<T> {
|
|||
})
|
||||
.ok()?;
|
||||
|
||||
let message_id = T::SendMessage::deliver(ticket)
|
||||
T::SendMessage::deliver(ticket)
|
||||
.map_err(|e| {
|
||||
log::error!(
|
||||
log::warn!(
|
||||
target: "ext_validators_slashes",
|
||||
"Failed to deliver outbound message: {:?}",
|
||||
e
|
||||
);
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
Self::deposit_event(Event::<T>::SlashesMessageSent { message_id });
|
||||
|
||||
Some(slashes_count)
|
||||
.ok()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn unsent_queue_is_empty() -> bool {
|
||||
UnsentSlashHead::<T>::get() == UnsentSlashTail::<T>::get()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn unsent_queue_len() -> u32 {
|
||||
let head = UnsentSlashHead::<T>::get();
|
||||
let tail = UnsentSlashTail::<T>::get();
|
||||
tail.wrapping_sub(head) % UNSENT_QUEUE_CAPACITY
|
||||
}
|
||||
|
||||
pub(crate) fn unsent_queue_push(
|
||||
entry: (EraIndex, Vec<Slash<T::AccountId, T::SlashId>>),
|
||||
) -> bool {
|
||||
let head = UnsentSlashHead::<T>::get();
|
||||
let tail = UnsentSlashTail::<T>::get();
|
||||
let next_tail = (tail + 1) % UNSENT_QUEUE_CAPACITY;
|
||||
if next_tail == head {
|
||||
return false;
|
||||
}
|
||||
|
||||
UnsentSlashBatch::<T>::insert(tail, entry);
|
||||
UnsentSlashTail::<T>::put(next_tail);
|
||||
true
|
||||
}
|
||||
|
||||
fn unsent_queue_remove_slot(slot: u32) {
|
||||
let tail = UnsentSlashTail::<T>::get();
|
||||
let mut cur = slot;
|
||||
loop {
|
||||
let next = (cur + 1) % UNSENT_QUEUE_CAPACITY;
|
||||
if next == tail {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(entry) = UnsentSlashBatch::<T>::get(next) {
|
||||
UnsentSlashBatch::<T>::insert(cur, entry);
|
||||
}
|
||||
cur = next;
|
||||
}
|
||||
|
||||
UnsentSlashBatch::<T>::remove(cur);
|
||||
let new_tail = if tail == 0 {
|
||||
UNSENT_QUEUE_CAPACITY - 1
|
||||
} else {
|
||||
tail - 1
|
||||
};
|
||||
UnsentSlashTail::<T>::put(new_tail);
|
||||
|
||||
let head = UnsentSlashHead::<T>::get();
|
||||
if head == tail {
|
||||
UnsentSlashHead::<T>::put(new_tail);
|
||||
}
|
||||
}
|
||||
|
||||
/// Retry contract shared with rewards:
|
||||
/// - process the current head batch,
|
||||
/// - if send succeeds, remove it from the queue,
|
||||
/// - if send fails, move the same batch to the back so later slash batches can progress.
|
||||
pub(crate) fn process_slashes_queue() -> ProcessSlashesQueueOutcome {
|
||||
let head = UnsentSlashHead::<T>::get();
|
||||
let tail = UnsentSlashTail::<T>::get();
|
||||
|
||||
if head == tail {
|
||||
return ProcessSlashesQueueOutcome::Empty;
|
||||
}
|
||||
|
||||
let Some((era_index, slashes)) = UnsentSlashBatch::<T>::get(head) else {
|
||||
UnsentSlashHead::<T>::put((head + 1) % UNSENT_QUEUE_CAPACITY);
|
||||
return ProcessSlashesQueueOutcome::Empty;
|
||||
};
|
||||
|
||||
let slashes_count = slashes.len() as u32;
|
||||
let slashes_to_send = slashes
|
||||
.iter()
|
||||
.map(Self::slash_to_send_data)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match Self::send_slashes_message(&slashes_to_send, era_index) {
|
||||
Some(message_id) => {
|
||||
UnsentSlashBatch::<T>::remove(head);
|
||||
UnsentSlashHead::<T>::put((head + 1) % UNSENT_QUEUE_CAPACITY);
|
||||
Self::deposit_event(Event::<T>::SlashesMessageSent { message_id });
|
||||
ProcessSlashesQueueOutcome::Sent(slashes_count)
|
||||
}
|
||||
None => {
|
||||
UnsentSlashBatch::<T>::remove(head);
|
||||
UnsentSlashHead::<T>::put((head + 1) % UNSENT_QUEUE_CAPACITY);
|
||||
UnsentSlashBatch::<T>::insert(tail, (era_index, slashes));
|
||||
UnsentSlashTail::<T>::put((tail + 1) % UNSENT_QUEUE_CAPACITY);
|
||||
log::warn!(
|
||||
target: "ext_validators_slashes",
|
||||
"Failed to send {slashes_count} slash entries for era {era_index}, moved batch to back of queue",
|
||||
);
|
||||
Self::deposit_event(Event::<T>::SlashesMessageSendFailed {
|
||||
era: era_index,
|
||||
count: slashes_count,
|
||||
});
|
||||
ProcessSlashesQueueOutcome::Requeued(slashes_count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum ProcessSlashesQueueOutcome {
|
||||
Empty,
|
||||
Sent(u32),
|
||||
Requeued(u32),
|
||||
}
|
||||
|
||||
/// A pending slash record. The value of the slash has been computed but not applied yet,
|
||||
|
|
|
|||
|
|
@ -134,7 +134,9 @@ thread_local! {
|
|||
pub static SENT_ETHEREUM_MESSAGE_NONCE: RefCell<u64> = const { RefCell::new(0) };
|
||||
pub static MOCK_REPORT_OFFENCE_SHOULD_FAIL: RefCell<bool> = const { RefCell::new(false) };
|
||||
pub static MOCK_REPORT_OFFENCE_CALLED: RefCell<bool> = const { RefCell::new(false) };
|
||||
pub static MOCK_SEND_MESSAGE_SHOULD_FAIL: RefCell<bool> = const { RefCell::new(false) };
|
||||
pub static LAST_SENT_SLASHES: RefCell<Vec<crate::SlashData<AccountId>>> = RefCell::new(Vec::new());
|
||||
pub static LAST_BUILT_ERA: RefCell<Option<EraIndex>> = const { RefCell::new(None) };
|
||||
}
|
||||
|
||||
impl MockEraIndexProvider {
|
||||
|
|
@ -221,19 +223,32 @@ impl MockOkOutboundQueue {
|
|||
pub fn last_sent_slashes() -> Vec<crate::SlashData<AccountId>> {
|
||||
LAST_SENT_SLASHES.with(|r| r.borrow().clone())
|
||||
}
|
||||
|
||||
pub fn last_built_era() -> Option<EraIndex> {
|
||||
LAST_BUILT_ERA.with(|r| *r.borrow())
|
||||
}
|
||||
|
||||
pub fn set_should_fail(fail: bool) {
|
||||
MOCK_SEND_MESSAGE_SHOULD_FAIL.with(|r| *r.borrow_mut() = fail);
|
||||
}
|
||||
}
|
||||
impl crate::SendMessage<AccountId> for MockOkOutboundQueue {
|
||||
type Ticket = ();
|
||||
type Message = ();
|
||||
fn build(slashes: &Vec<crate::SlashData<AccountId>>, _: u32) -> Option<Self::Ticket> {
|
||||
fn build(slashes: &Vec<crate::SlashData<AccountId>>, era: u32) -> Option<Self::Ticket> {
|
||||
LAST_SENT_SLASHES.with(|r| *r.borrow_mut() = slashes.clone());
|
||||
LAST_BUILT_ERA.with(|r| *r.borrow_mut() = Some(era));
|
||||
Some(())
|
||||
}
|
||||
fn validate(_: Self::Ticket) -> Result<Self::Ticket, SendError> {
|
||||
Ok(())
|
||||
}
|
||||
fn deliver(_: Self::Ticket) -> Result<H256, SendError> {
|
||||
Ok(H256::zero())
|
||||
if MOCK_SEND_MESSAGE_SHOULD_FAIL.with(|r| *r.borrow()) {
|
||||
Err(SendError::MessageTooLarge)
|
||||
} else {
|
||||
Ok(H256::zero())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -270,6 +285,7 @@ impl external_validator_slashes::Config for Test {
|
|||
type QueuedSlashesProcessedPerBlock = ConstU32<20>;
|
||||
type WeightInfo = ();
|
||||
type SendMessage = MockOkOutboundQueue;
|
||||
type GovernanceOrigin = frame_system::EnsureRoot<u64>;
|
||||
}
|
||||
|
||||
pub struct FullIdentificationOf;
|
||||
|
|
@ -285,6 +301,9 @@ impl pallet_session::historical::Config for Test {
|
|||
}
|
||||
// Build genesis storage according to the mock runtime.
|
||||
pub fn new_test_ext() -> sp_io::TestExternalities {
|
||||
MOCK_SEND_MESSAGE_SHOULD_FAIL.with(|r| *r.borrow_mut() = false);
|
||||
LAST_SENT_SLASHES.with(|r| r.borrow_mut().clear());
|
||||
LAST_BUILT_ERA.with(|r| *r.borrow_mut() = None);
|
||||
system::GenesisConfig::<Test>::default()
|
||||
.build_storage()
|
||||
.unwrap()
|
||||
|
|
|
|||
|
|
@ -1731,6 +1731,7 @@ impl pallet_external_validator_slashes::Config for Runtime {
|
|||
type QueuedSlashesProcessedPerBlock = ConstU32<10>;
|
||||
type WeightInfo = mainnet_weights::pallet_external_validator_slashes::WeightInfo<Runtime>;
|
||||
type SendMessage = SlashesSendAdapter;
|
||||
type GovernanceOrigin = EnsureRootWithSuccess<AccountId, RootLocation>;
|
||||
}
|
||||
|
||||
parameter_types! {
|
||||
|
|
|
|||
|
|
@ -1727,6 +1727,7 @@ impl pallet_external_validator_slashes::Config for Runtime {
|
|||
type QueuedSlashesProcessedPerBlock = ConstU32<10>;
|
||||
type WeightInfo = stagenet_weights::pallet_external_validator_slashes::WeightInfo<Runtime>;
|
||||
type SendMessage = SlashesSendAdapter;
|
||||
type GovernanceOrigin = EnsureRootWithSuccess<AccountId, RootLocation>;
|
||||
}
|
||||
|
||||
parameter_types! {
|
||||
|
|
|
|||
|
|
@ -1729,6 +1729,7 @@ impl pallet_external_validator_slashes::Config for Runtime {
|
|||
type QueuedSlashesProcessedPerBlock = ConstU32<10>;
|
||||
type WeightInfo = testnet_weights::pallet_external_validator_slashes::WeightInfo<Runtime>;
|
||||
type SendMessage = SlashesSendAdapter;
|
||||
type GovernanceOrigin = EnsureRootWithSuccess<AccountId, RootLocation>;
|
||||
}
|
||||
|
||||
parameter_types! {
|
||||
|
|
|
|||
Loading…
Reference in a new issue