Skip to content

Commit a68f34a

Browse files
Broadcast VC requests in parallel and fix subscription error (#6223)
* Broadcast VC requests in parallel * Remove outdated comment * Try some things * Fix subscription error * Remove junk logging
1 parent 42a1cd8 commit a68f34a

File tree

4 files changed

+94
-59
lines changed

4 files changed

+94
-59
lines changed

common/eth2/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ impl fmt::Display for Error {
121121
pub struct Timeouts {
122122
pub attestation: Duration,
123123
pub attester_duties: Duration,
124+
pub attestation_subscriptions: Duration,
124125
pub liveness: Duration,
125126
pub proposal: Duration,
126127
pub proposer_duties: Duration,
@@ -137,6 +138,7 @@ impl Timeouts {
137138
Timeouts {
138139
attestation: timeout,
139140
attester_duties: timeout,
141+
attestation_subscriptions: timeout,
140142
liveness: timeout,
141143
proposal: timeout,
142144
proposer_duties: timeout,
@@ -2515,7 +2517,12 @@ impl BeaconNodeHttpClient {
25152517
.push("validator")
25162518
.push("beacon_committee_subscriptions");
25172519

2518-
self.post(path, &subscriptions).await?;
2520+
self.post_with_timeout(
2521+
path,
2522+
&subscriptions,
2523+
self.timeouts.attestation_subscriptions,
2524+
)
2525+
.await?;
25192526

25202527
Ok(())
25212528
}

validator_client/src/beacon_node_fallback.rs

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ impl<T: Debug> fmt::Display for Errors<T> {
134134
}
135135
}
136136

137+
impl<T> Errors<T> {
138+
pub fn num_errors(&self) -> usize {
139+
self.0.len()
140+
}
141+
}
142+
137143
/// Reasons why a candidate might not be ready.
138144
#[derive(Debug, Clone, Copy)]
139145
pub enum CandidateError {
@@ -599,46 +605,41 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
599605
F: Fn(&'a BeaconNodeHttpClient) -> R,
600606
R: Future<Output = Result<O, Err>>,
601607
{
602-
let mut results = vec![];
603608
let mut to_retry = vec![];
604609
let mut retry_unsynced = vec![];
605610

606611
// Run `func` using a `candidate`, returning the value or capturing errors.
607-
//
608-
// We use a macro instead of a closure here since it is not trivial to move `func` into a
609-
// closure.
610-
macro_rules! try_func {
611-
($candidate: ident) => {{
612-
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
612+
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
613+
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);
613614

614-
// There exists a race condition where `func` may be called when the candidate is
615-
// actually not ready. We deem this an acceptable inefficiency.
616-
match func(&$candidate.beacon_node).await {
617-
Ok(val) => results.push(Ok(val)),
618-
Err(e) => {
619-
// If we have an error on this function, make the client as not-ready.
620-
//
621-
// There exists a race condition where the candidate may have been marked
622-
// as ready between the `func` call and now. We deem this an acceptable
623-
// inefficiency.
624-
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
625-
$candidate.set_offline().await;
626-
}
627-
results.push(Err((
628-
$candidate.beacon_node.to_string(),
629-
Error::RequestFailed(e),
630-
)));
631-
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
615+
// There exists a race condition where `func` may be called when the candidate is
616+
// actually not ready. We deem this an acceptable inefficiency.
617+
match func(&candidate.beacon_node).await {
618+
Ok(val) => Ok(val),
619+
Err(e) => {
620+
// If we have an error on this function, mark the client as not-ready.
621+
//
622+
// There exists a race condition where the candidate may have been marked
623+
// as ready between the `func` call and now. We deem this an acceptable
624+
// inefficiency.
625+
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
626+
candidate.set_offline().await;
632627
}
628+
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
629+
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
633630
}
634-
}};
635-
}
631+
}
632+
};
636633

637634
// First pass: try `func` on all synced and ready candidates.
638635
//
639636
// This ensures that we always choose a synced node if it is available.
637+
let mut first_batch_futures = vec![];
640638
for candidate in &self.candidates {
641639
match candidate.status(RequireSynced::Yes).await {
640+
Ok(_) => {
641+
first_batch_futures.push(run_on_candidate(candidate));
642+
}
642643
Err(CandidateError::NotSynced) if require_synced == false => {
643644
// This client is unsynced we will try it after trying all synced clients
644645
retry_unsynced.push(candidate);
@@ -647,22 +648,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
647648
// This client was not ready on the first pass, we might try it again later.
648649
to_retry.push(candidate);
649650
}
650-
Ok(_) => try_func!(candidate),
651651
}
652652
}
653+
let first_batch_results = futures::future::join_all(first_batch_futures).await;
653654

654655
// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
655656
// unsynced candidates.
656657
//
657658
// Due to async race-conditions, it is possible that we will send a request to a candidate
658659
// that has been set to an offline/unready status. This is acceptable.
659-
if require_synced == false {
660-
for candidate in retry_unsynced {
661-
try_func!(candidate);
662-
}
663-
}
660+
let second_batch_results = if require_synced == false {
661+
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
662+
} else {
663+
vec![]
664+
};
664665

665666
// Third pass: try again, attempting to make non-ready clients become ready.
667+
let mut third_batch_futures = vec![];
668+
let mut third_batch_results = vec![];
666669
for candidate in to_retry {
667670
// If the candidate hasn't luckily transferred into the correct state in the meantime,
668671
// force an update of the state.
@@ -676,16 +679,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
676679
};
677680

678681
match new_status {
679-
Ok(()) => try_func!(candidate),
680-
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
681-
Err(e) => {
682-
results.push(Err((
683-
candidate.beacon_node.to_string(),
684-
Error::Unavailable(e),
685-
)));
682+
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
683+
Err(CandidateError::NotSynced) if require_synced == false => {
684+
third_batch_futures.push(run_on_candidate(candidate))
686685
}
686+
Err(e) => third_batch_results.push(Err((
687+
candidate.beacon_node.to_string(),
688+
Error::Unavailable(e),
689+
))),
687690
}
688691
}
692+
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);
693+
694+
let mut results = first_batch_results;
695+
results.extend(second_batch_results);
696+
results.extend(third_batch_results);
689697

690698
let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();
691699

validator_client/src/duties_service.rs

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ const _: () = assert!({
8686
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
8787
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
8888
/// bringing in the entire crate.
89-
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);
89+
const MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD: u64 = 2;
90+
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD);
9091

9192
// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
9293
#[derive(Debug)]
@@ -121,6 +122,8 @@ pub struct DutyAndProof {
121122
pub struct SubscriptionSlots {
122123
/// Pairs of `(slot, already_sent)` in slot-descending order.
123124
slots: Vec<(Slot, AtomicBool)>,
125+
/// The slot of the duty itself.
126+
duty_slot: Slot,
124127
}
125128

126129
/// Create a selection proof for `duty`.
@@ -172,18 +175,20 @@ impl SubscriptionSlots {
172175
.filter(|scheduled_slot| *scheduled_slot > current_slot)
173176
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
174177
.collect();
175-
Arc::new(Self { slots })
178+
Arc::new(Self { slots, duty_slot })
176179
}
177180

178181
/// Return `true` if we should send a subscription at `slot`.
179182
fn should_send_subscription_at(&self, slot: Slot) -> bool {
180183
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
181-
self.slots
182-
.iter()
183-
.rev()
184-
.any(|(scheduled_slot, already_sent)| {
185-
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
186-
})
184+
slot + MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD <= self.duty_slot
185+
&& self
186+
.slots
187+
.iter()
188+
.rev()
189+
.any(|(scheduled_slot, already_sent)| {
190+
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
191+
})
187192
}
188193

189194
/// Update our record of subscribed slots to account for successful subscription at `slot`.
@@ -737,7 +742,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
737742
// If there are any subscriptions, push them out to beacon nodes
738743
if !subscriptions.is_empty() {
739744
let subscriptions_ref = &subscriptions;
740-
if let Err(e) = duties_service
745+
let subscription_result = duties_service
741746
.beacon_nodes
742747
.request(
743748
RequireSynced::No,
@@ -753,15 +758,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
753758
.await
754759
},
755760
)
756-
.await
757-
{
758-
error!(
759-
log,
760-
"Failed to subscribe validators";
761-
"error" => %e
762-
)
763-
} else {
764-
// Record that subscriptions were successfully sent.
761+
.await;
762+
if subscription_result.as_ref().is_ok() {
765763
debug!(
766764
log,
767765
"Broadcast attestation subscriptions";
@@ -770,6 +768,25 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
770768
for subscription_slots in subscription_slots_to_confirm {
771769
subscription_slots.record_successful_subscription_at(current_slot);
772770
}
771+
} else if let Err(e) = subscription_result {
772+
if e.num_errors() < duties_service.beacon_nodes.num_total() {
773+
warn!(
774+
log,
775+
"Some subscriptions failed";
776+
"error" => %e,
777+
);
778+
// If subscriptions were sent to at least one node, regard that as a success.
779+
// There is some redundancy built into the subscription schedule to handle failures.
780+
for subscription_slots in subscription_slots_to_confirm {
781+
subscription_slots.record_successful_subscription_at(current_slot);
782+
}
783+
} else {
784+
error!(
785+
log,
786+
"All subscriptions failed";
787+
"error" => %e
788+
);
789+
}
773790
}
774791
}
775792

validator_client/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
7575
/// This can help ensure that proper endpoint fallback occurs.
7676
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
7777
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
78+
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
7879
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
7980
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
8081
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
@@ -323,6 +324,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
323324
Timeouts {
324325
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
325326
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
327+
attestation_subscriptions: slot_duration
328+
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
326329
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
327330
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
328331
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,

0 commit comments

Comments
 (0)