Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 215 additions & 7 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::aggregation::{
use crate::key_manager::ValidatorKeyPair;
use spawned_concurrency::actor;
use spawned_concurrency::error::ActorError;
use spawned_concurrency::message::Message;
use spawned_concurrency::protocol;
use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -47,6 +48,24 @@ pub const MILLISECONDS_PER_SLOT: u64 = MILLISECONDS_PER_INTERVAL * INTERVALS_PER
/// See: leanSpec commit 0c9528a (PR #536).
pub const MAX_ATTESTATIONS_DATA: usize = 16;

/// Interval-within-slot at which a single attestation is normally produced
/// by validators. An early gossip arrival is held until this interval of
/// the attestation's own slot before being processed.
const ATTESTATION_PRODUCTION_INTERVAL: u64 = 1;

/// Interval-within-slot at which an aggregated attestation is normally
/// produced by aggregators.
const AGGREGATION_PRODUCTION_INTERVAL: u64 = 2;

/// How far ahead of the local clock we are willing to schedule a retry.
/// Anything further is dropped on receipt — honest peers do not gossip
/// that far in advance.
const MAX_DEFER_FUTURE_SLOTS: u64 = 4;

/// Cap on in-flight retry timers, to prevent a peer from amplifying memory
/// pressure by spraying future-slot messages.
const MAX_DEFERRED_GOSSIP_MESSAGES: usize = 1024;

impl BlockChain {
pub fn spawn(
store: Store,
Expand All @@ -65,6 +84,7 @@ impl BlockChain {
aggregator,
pending_block_parents: HashMap::new(),
current_aggregation: None,
deferred_gossip_count: 0,
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -115,6 +135,13 @@ pub struct BlockChainServer {
/// worker started at the most recent interval 2 is still running or until
/// the next interval 2 takes over.
current_aggregation: Option<AggregationSession>,

/// In-flight retry timers for gossip messages that arrived early.
/// Each timer fires a [`RetryDeferredAttestation`] or
/// [`RetryDeferredAggregated`] at the message's "supposed" arrival
/// interval; this counter just bounds how many such timers may exist
/// concurrently so a peer cannot amplify memory pressure.
deferred_gossip_count: usize,
}

impl BlockChainServer {
Expand Down Expand Up @@ -549,19 +576,125 @@ impl BlockChainServer {
}
}

fn on_gossip_attestation(&mut self, attestation: &SignedAttestation) {
fn on_gossip_attestation(&mut self, attestation: SignedAttestation, ctx: &Context<Self>) {
let slot = attestation.data.slot;
if self.is_gossip_too_early(slot)
&& let Some(delay) = self.reserve_deferred_slot(slot, ATTESTATION_PRODUCTION_INTERVAL)
{
trace!(
slot,
delay_ms = delay.as_millis() as u64,
"Deferring early attestation until its production interval"
);
send_after(delay, ctx.clone(), RetryDeferredAttestation { attestation });
return;
}
self.process_gossip_attestation(attestation);
}

fn on_gossip_aggregated_attestation(
&mut self,
attestation: SignedAggregatedAttestation,
ctx: &Context<Self>,
) {
let slot = attestation.data.slot;
if self.is_gossip_too_early(slot)
&& let Some(delay) = self.reserve_deferred_slot(slot, AGGREGATION_PRODUCTION_INTERVAL)
{
trace!(
slot,
delay_ms = delay.as_millis() as u64,
"Deferring early aggregated attestation until its production interval"
);
send_after(delay, ctx.clone(), RetryDeferredAggregated { attestation });
return;
}
self.process_gossip_aggregated_attestation(attestation);
}

fn process_gossip_attestation(&mut self, attestation: SignedAttestation) {
// Read fresh here too: a gossip event can arrive between ticks, and
// if the admin API just toggled, the first gossip after the toggle
// should already use the new value.
let is_aggregator = self.aggregator.is_enabled();
let _ = store::on_gossip_attestation(&mut self.store, attestation, is_aggregator)
let _ = store::on_gossip_attestation(&mut self.store, &attestation, is_aggregator)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}

fn on_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) {
fn process_gossip_aggregated_attestation(&mut self, attestation: SignedAggregatedAttestation) {
let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation"));
}

/// Mirrors the future-slot rejection in [`store::validate_attestation_data`].
/// Kept in sync deliberately: messages that would be rejected as
/// `AttestationTooFarInFuture` are buffered instead of dropped.
fn is_gossip_too_early(&self, slot: u64) -> bool {
let current_slot = self.store.time() / INTERVALS_PER_SLOT;
slot > current_slot + 1
}

/// Reserve a slot in the deferred-retry budget and compute when the
/// retry should fire. Returns `None` if the message is too far ahead
/// or the budget is exhausted, in which case the caller falls back to
/// the immediate-process path (validation will reject and log).
///
/// `interval_in_slot` is the interval at which honest peers normally
/// produce this kind of message; the retry fires at exactly that point
/// in the message's own slot.
fn reserve_deferred_slot(&mut self, slot: u64, interval_in_slot: u64) -> Option<Duration> {
let store_time = self.store.time();
let delay = defer_delay(store_time, slot, interval_in_slot)?;
if self.deferred_gossip_count >= MAX_DEFERRED_GOSSIP_MESSAGES {
warn!(
slot,
in_flight = self.deferred_gossip_count,
cap = MAX_DEFERRED_GOSSIP_MESSAGES,
"Dropping early gossip message: deferred budget exhausted"
);
return None;
}
self.deferred_gossip_count += 1;
Some(delay)
}

fn release_deferred_slot(&mut self) {
self.deferred_gossip_count = self.deferred_gossip_count.saturating_sub(1);
}
}

/// Time until the supposed arrival of a gossip message for `slot` at
/// `interval_in_slot`. Returns `None` if `slot` is more than
/// [`MAX_DEFER_FUTURE_SLOTS`] ahead of the store's current slot.
fn defer_delay(store_time: u64, slot: u64, interval_in_slot: u64) -> Option<Duration> {
let current_slot = store_time / INTERVALS_PER_SLOT;
if slot > current_slot + MAX_DEFER_FUTURE_SLOTS {
return None;
}
let target_time = slot
.checked_mul(INTERVALS_PER_SLOT)?
.checked_add(interval_in_slot)?;
let delay_intervals = target_time.saturating_sub(store_time);
Some(Duration::from_millis(
delay_intervals * MILLISECONDS_PER_INTERVAL,
))
}

/// Retry envelope scheduled via `send_after` when an attestation arrived
/// before its supposed production interval.
struct RetryDeferredAttestation {
attestation: SignedAttestation,
}
impl Message for RetryDeferredAttestation {
type Result = ();
}

/// Retry envelope for early aggregated attestations.
struct RetryDeferredAggregated {
attestation: SignedAggregatedAttestation,
}
impl Message for RetryDeferredAggregated {
type Result = ();
}

// Protocol trait for internal messages only (tick scheduling).
Expand Down Expand Up @@ -638,14 +771,28 @@ impl Handler<NewBlock> for BlockChainServer {
}

impl Handler<NewAttestation> for BlockChainServer {
async fn handle(&mut self, msg: NewAttestation, _ctx: &Context<Self>) {
self.on_gossip_attestation(&msg.attestation);
async fn handle(&mut self, msg: NewAttestation, ctx: &Context<Self>) {
self.on_gossip_attestation(msg.attestation, ctx);
}
}

impl Handler<NewAggregatedAttestation> for BlockChainServer {
async fn handle(&mut self, msg: NewAggregatedAttestation, _ctx: &Context<Self>) {
self.on_gossip_aggregated_attestation(msg.attestation);
async fn handle(&mut self, msg: NewAggregatedAttestation, ctx: &Context<Self>) {
self.on_gossip_aggregated_attestation(msg.attestation, ctx);
}
}

impl Handler<RetryDeferredAttestation> for BlockChainServer {
async fn handle(&mut self, msg: RetryDeferredAttestation, _ctx: &Context<Self>) {
self.release_deferred_slot();
self.process_gossip_attestation(msg.attestation);
}
}

impl Handler<RetryDeferredAggregated> for BlockChainServer {
async fn handle(&mut self, msg: RetryDeferredAggregated, _ctx: &Context<Self>) {
self.release_deferred_slot();
self.process_gossip_aggregated_attestation(msg.attestation);
}
}

Expand Down Expand Up @@ -711,3 +858,64 @@ impl Handler<AggregationDeadline> for BlockChainServer {
}
}
}

#[cfg(test)]
mod defer_tests {
use super::*;

/// store_time helper: combine slot + interval_in_slot into a store time.
fn time(slot: u64, interval: u64) -> u64 {
slot * INTERVALS_PER_SLOT + interval
}

#[test]
fn delay_targets_attestation_production_interval_in_message_slot() {
// current_time = slot 3, interval 4; message slot = 5.
// Supposed arrival = slot 5, interval 1 → delay = (5*5+1) - (3*5+4)
// = 26 - 19 = 7 intervals.
let delay = defer_delay(time(3, 4), 5, ATTESTATION_PRODUCTION_INTERVAL).unwrap();
assert_eq!(delay.as_millis() as u64, 7 * MILLISECONDS_PER_INTERVAL);
}

#[test]
fn delay_targets_aggregation_production_interval_in_message_slot() {
// current_time = slot 3, interval 4; message slot = 5.
// Supposed arrival = slot 5, interval 2 → delay = (5*5+2) - (3*5+4)
// = 27 - 19 = 8 intervals.
let delay = defer_delay(time(3, 4), 5, AGGREGATION_PRODUCTION_INTERVAL).unwrap();
assert_eq!(delay.as_millis() as u64, 8 * MILLISECONDS_PER_INTERVAL);
}

#[test]
fn delay_returns_none_when_slot_is_beyond_max_defer_window() {
let now = time(3, 0);
let current_slot = now / INTERVALS_PER_SLOT;
// The window's edge (current + MAX_DEFER_FUTURE_SLOTS) is still buffered.
assert!(
defer_delay(
now,
current_slot + MAX_DEFER_FUTURE_SLOTS,
ATTESTATION_PRODUCTION_INTERVAL,
)
.is_some()
);
// One slot beyond is dropped.
assert!(
defer_delay(
now,
current_slot + MAX_DEFER_FUTURE_SLOTS + 1,
ATTESTATION_PRODUCTION_INTERVAL,
)
.is_none()
);
}

#[test]
fn delay_is_zero_when_supposed_interval_is_already_past() {
// current_time is past the supposed arrival (e.g. clock skew, late
// gossip): delay collapses to zero so the retry handler runs ASAP.
let now = time(10, 4);
let delay = defer_delay(now, /*slot=*/ 5, ATTESTATION_PRODUCTION_INTERVAL).unwrap();
assert_eq!(delay, Duration::ZERO);
}
}
Loading