Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cf573b6
Decode `DataSsz` variants explicitly based on duty and fork
dknopik May 12, 2025
a353880
Set `max_transmit_size`
dknopik May 9, 2025
159126e
Avoid conflict between timer-based and message-based round increments
dknopik May 12, 2025
f6d8998
Remember PREPARE even if we have not received the PROPOSE yet
dknopik May 12, 2025
5342908
fmt
dknopik May 12, 2025
d03c091
improve readability
diegomrsantos May 13, 2025
79eb2a9
log block decoding error
dknopik May 13, 2025
554991a
better naming and docs
dknopik May 15, 2025
762250d
fmt
dknopik May 16, 2025
529c823
Merge branch 'refs/heads/unstable' into proposal-fixes
dknopik May 19, 2025
a309453
Address review and fix proposal data
dknopik May 19, 2025
460a288
Use current slot for RANDAO Reveal
dknopik May 16, 2025
6cc2749
Merge branch 'unstable' into proposal-fixes
dknopik May 20, 2025
4c0fe0d
Custom LH until upstream PR is merged...
dknopik May 20, 2025
b09a285
Further simplify `DataVersion`
dknopik May 22, 2025
4019efa
Remove `DataSsz`
dknopik May 22, 2025
963b245
Move max transmit size to a `const`
dknopik May 22, 2025
2995db3
Merge branch 'unstable' into proposal-fixes
dknopik May 22, 2025
444aa30
Merge branch 'unstable' into proposal-fixes
dknopik May 23, 2025
73c5ce7
Improve fix for round timeouts
dknopik May 23, 2025
f73c324
Merge branch 'unstable' into proposal-fixes
dknopik May 26, 2025
4e9dea8
Merge branch 'unstable' into proposal-fixes
dknopik May 28, 2025
84eb708
Add comment explaining `DataVersion`
dknopik May 28, 2025
2e9e688
reintroduce round check in qbft
dknopik May 28, 2025
b45034f
add test for data race issue
dknopik May 28, 2025
d0e562e
address review
dknopik May 28, 2025
cd61569
more address review
dknopik May 28, 2025
2ab837f
Merge branch 'unstable' into proposal-fixes
dknopik May 28, 2025
91cb7d7
nicer conversion
dknopik May 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
valid_start_data: ValidData<D>,
/// All of the data that we have seen
data: HashMap<D::Hash, Arc<D>>,
/// The current round this instance state is in.a
/// The current round this instance state is in.
current_round: Round,
/// The current state of the instance
state: InstanceState,
Expand Down Expand Up @@ -195,8 +195,8 @@ where
}

/// Get the current round
pub fn get_round(&self) -> &Round {
&self.current_round
pub fn get_round(&self) -> Round {
self.current_round
}

// Shifts this instance into a new round>
Expand Down Expand Up @@ -235,6 +235,16 @@ where
&self,
wrapped_msg: &WrappedQbftMessage,
) -> Option<(Option<ValidData<D>>, OperatorId)> {
// Ensure that this message is for the correct round
if wrapped_msg.qbft_message.round < self.current_round.into() {
debug!(
message_round = wrapped_msg.qbft_message.round,
current_round = *self.current_round,
"Message received for a previous round"
);
return None;
}

// Make sure we are at the correct instance height
if wrapped_msg.qbft_message.height != *self.instance_height as u64 {
warn!(
Expand Down Expand Up @@ -592,12 +602,6 @@ where
return;
}

// Make sure that we have accepted a proposal for this round
if !self.proposal_accepted_for_current_round {
warn!(from=?operator_id, ?self.state, self=?self.config.operator_id(), "Have not accepted Proposal for current round yet");
return;
}

debug!(from = ?operator_id, self = ?self.config.operator_id(), state = ?self.state, "PREPARE received");

// Store the prepare message
Expand All @@ -608,6 +612,12 @@ where
warn!(from = ?operator_id, "PREPARE message is a duplicate")
}

// Make sure that we have accepted a proposal for this round
if !self.proposal_accepted_for_current_round {
warn!(from=?operator_id, ?self.state, self=?self.config.operator_id(), "Have not accepted Proposal for current round yet");
return;
}

// Check if we have reached a prepare quorum for this round, if so send the commit message
if let Some(hash) = self.prepare_container.has_quorum(round) {
// Make sure we are in the correct state
Expand Down
106 changes: 46 additions & 60 deletions anchor/common/ssv_types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use std::{
ops::Deref,
};

use derive_more::{From, Into};
use sha2::{Digest, Sha256};
use ssz::{Decode, DecodeError, Encode};
use ssz_derive::{Decode, Encode};
use tree_hash::{PackedEncoding, TreeHash, TreeHashType};
use tree_hash_derive::TreeHash;
use types::{
AggregateAndProof, AggregateAndProofBase, AggregateAndProofElectra, BeaconBlock,
BlindedBeaconBlock, Checkpoint, CommitteeIndex, EthSpec, Hash256, PublicKeyBytes, Signature,
Slot, SyncCommitteeContribution, VariableList,
Checkpoint, CommitteeIndex, EthSpec, ForkName, Hash256, PublicKeyBytes, Signature, Slot,
SyncCommitteeContribution, VariableList,
typenum::{U13, U56},
};

Expand Down Expand Up @@ -227,75 +227,61 @@ impl TreeHash for BeaconRole {
}
}

#[derive(Clone, Debug, PartialEq, Decode, Encode)]
#[ssz(struct_behaviour = "transparent")]
pub struct DataVersion(u64);

pub const DATA_VERSION_UNKNOWN: DataVersion = DataVersion(0);
pub const DATA_VERSION_PHASE0: DataVersion = DataVersion(1);
pub const DATA_VERSION_ALTAIR: DataVersion = DataVersion(2);
pub const DATA_VERSION_BELLATRIX: DataVersion = DataVersion(3);
pub const DATA_VERSION_CAPELLA: DataVersion = DataVersion(4);
pub const DATA_VERSION_DENEB: DataVersion = DataVersion(5);
pub const DATA_VERSION_ELECTRA: DataVersion = DataVersion(6);
/// Wrapper for [`ForkName`] to allow custom encoding/decoding used by SSV.
///
/// `ForkName` is encoded by starting from 0 for `Phase0` and increasing by 1 for each fork.
/// This type encodes starting from 1.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, From, Into)]
pub struct DataVersion(ForkName);

impl TreeHash for DataVersion {
fn tree_hash_type() -> TreeHashType {
u64::tree_hash_type()
impl Encode for DataVersion {
fn is_ssz_fixed_len() -> bool {
true
}

fn tree_hash_packed_encoding(&self) -> PackedEncoding {
self.0.tree_hash_packed_encoding()
fn ssz_append(&self, buf: &mut Vec<u8>) {
let num: u64 = match self.0 {
ForkName::Base => 1,
ForkName::Altair => 2,
ForkName::Bellatrix => 3,
ForkName::Capella => 4,
ForkName::Deneb => 5,
ForkName::Electra => 6,
ForkName::Fulu => 7,
};
num.ssz_append(buf)
}

fn tree_hash_packing_factor() -> usize {
u64::tree_hash_packing_factor()
fn ssz_fixed_len() -> usize {
<u64 as Encode>::ssz_fixed_len()
}

fn tree_hash_root(&self) -> tree_hash::Hash256 {
self.0.tree_hash_root()
fn ssz_bytes_len(&self) -> usize {
u64::ssz_bytes_len(&0)
}
}

#[derive(Clone, Debug, TreeHash, Encode)]
#[tree_hash(enum_behaviour = "transparent")]
#[ssz(enum_behaviour = "transparent")]
pub enum DataSsz<E: EthSpec> {
AggregateAndProof(AggregateAndProof<E>),
BlindedBeaconBlock(BlindedBeaconBlock<E>),
BeaconBlock(BeaconBlock<E>),
Contributions(VariableList<Contribution<E>, U13>),
}

impl<E: EthSpec> DataSsz<E> {
/// SSZ deserialization that tries all possible variants
pub fn from_ssz_bytes(bytes: &[u8]) -> Result<Self, ssz::DecodeError> {
// 1. Try BeaconBlock variants
if let Ok(block) = BeaconBlock::any_from_ssz_bytes(bytes) {
return Ok(Self::BeaconBlock(block));
}

// 2. Try BlindedBeaconBlock
if let Ok(blinded) = BlindedBeaconBlock::any_from_ssz_bytes(bytes) {
return Ok(Self::BlindedBeaconBlock(blinded));
}

// 3. Handle AggregateAndProof variants explicitly
if let Ok(base) = AggregateAndProofBase::<E>::from_ssz_bytes(bytes) {
return Ok(Self::AggregateAndProof(AggregateAndProof::Base(base)));
}
if let Ok(electra) = AggregateAndProofElectra::<E>::from_ssz_bytes(bytes) {
return Ok(Self::AggregateAndProof(AggregateAndProof::Electra(electra)));
}
impl Decode for DataVersion {
fn is_ssz_fixed_len() -> bool {
true
}

// 4. Try Contributions
if let Ok(contributions) = VariableList::<Contribution<E>, U13>::from_ssz_bytes(bytes) {
return Ok(Self::Contributions(contributions));
}
fn ssz_fixed_len() -> usize {
<u64 as Decode>::ssz_fixed_len()
}

Err(ssz::DecodeError::BytesInvalid(
"Failed to decode as any DataSsz variant".into(),
))
fn from_ssz_bytes(bytes: &[u8]) -> Result<Self, DecodeError> {
let num = u64::from_ssz_bytes(bytes)?;
Ok(DataVersion(match num {
1 => ForkName::Base,
2 => ForkName::Altair,
3 => ForkName::Bellatrix,
4 => ForkName::Capella,
5 => ForkName::Deneb,
6 => ForkName::Electra,
7 => ForkName::Fulu,
_ => return Err(DecodeError::NoMatchingVariant),
}))
}
}

Expand Down
5 changes: 5 additions & 0 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use crate::{
transport::build_transport,
};

const MAX_TRANSMIT_SIZE_BYTES: usize = 5_000_000;

#[derive(Debug, Error)]
pub enum NetworkError {
#[error("Unable to listen on address {address}: {source}")]
Expand Down Expand Up @@ -381,6 +383,9 @@ async fn build_anchor_behaviour<E: EthSpec>(
.history_gossip(4)
.max_ihave_length(1500)
.max_ihave_messages(32)
// `SignedSSVMessage` has a full data field with max 4,194,532 bytes, so 5M bytes seems like
// a reasonable upper bound for that and the rest of the message.
.max_transmit_size(MAX_TRANSMIT_SIZE_BYTES)
.validate_messages()
.build()?;

Expand Down
22 changes: 9 additions & 13 deletions anchor/qbft_manager/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{pin::Pin, sync::Arc};
use std::sync::Arc;

use message_sender::MessageSender;
use qbft::{Completed, DefaultLeaderFunction, UnsignedWrappedQbftMessage, WrappedQbftMessage};
Expand All @@ -10,7 +10,7 @@ use tokio::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
time::{Instant, Sleep},
time::Instant,
};
use tracing::{debug, error, trace, warn};
use types::Hash256;
Expand Down Expand Up @@ -45,7 +45,6 @@ struct Uninitialized {

struct Initialized<D: QbftData<Hash = Hash256>> {
qbft: Box<Qbft<D>>,
round_end: Pin<Box<Sleep>>,
msgs_sent_by_us: UnboundedReceiver<WrappedQbftMessage>,
on_completed: Vec<oneshot::Sender<Completed<D>>>,
start_time: Instant,
Expand Down Expand Up @@ -136,10 +135,6 @@ impl Uninitialized {
},
));

// Calculate inital round sleep time at round 1
let sleep = calculate_round_timeout(1, &init.start_time);
let sleep = tokio::time::sleep_until(sleep);

if !self.message_buffer.is_empty() {
debug!(
len = self.message_buffer.len(),
Expand All @@ -151,7 +146,6 @@ impl Uninitialized {
}

Initialized {
round_end: Box::pin(sleep),
qbft: instance,
msgs_sent_by_us: sent_by_us_rx,
on_completed: vec![init.on_completed],
Expand All @@ -177,6 +171,12 @@ impl<D: QbftData> From<Option<QbftMessage<D>>> for RecvResult<D> {

impl<D: QbftData<Hash = Hash256>> Initialized<D> {
async fn recv(&mut self, rx: &mut UnboundedReceiver<QbftMessage<D>>) -> RecvResult<D> {
// We calculate the sleep dynamically, as both messages and the local timer might cause the
// round to advance
let round_end = calculate_round_timeout(self.qbft.get_round().into(), &self.start_time);
let round_timeout_sleep = tokio::time::sleep_until(round_end);
tokio::pin!(round_timeout_sleep);

select! {
message = rx.recv() => message.into(),
sent_by_us = self.msgs_sent_by_us.recv() => {
Expand All @@ -185,11 +185,7 @@ impl<D: QbftData<Hash = Hash256>> Initialized<D> {
drop_on_finish: None
}).into()
},
_ = &mut self.round_end => {
// Compute timeout for next round
let next_round = self.qbft.get_round().get() as u64 + 1_u64;
let sleep = calculate_round_timeout(next_round, &self.start_time);
self.round_end = Box::pin(tokio::time::sleep_until(sleep));
_ = &mut round_timeout_sleep => {
RecvResult::RoundEnd
}
}
Expand Down
Loading
Loading