Skip to content

Commit cf239fe

Browse files
committed
Reduce bandwidth over the VC<>BN API using dependant roots (#4170)
## Issue Addressed #4157 ## Proposed Changes See description in #4157. In diagram form: ![reduce-attestation-bandwidth](https://user-images.githubusercontent.com/742762/230277084-f97301c1-0c5d-4fb3-92f9-91f99e4dc7d4.png) Co-authored-by: Jimmy Chen <[email protected]>
1 parent b7b4549 commit cf239fe

File tree

1 file changed

+152
-79
lines changed

1 file changed

+152
-79
lines changed

validator_client/src/duties_service.rs

Lines changed: 152 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ use crate::{
1616
validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore},
1717
};
1818
use environment::RuntimeContext;
19-
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
19+
use eth2::types::{
20+
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
21+
};
2022
use futures::{stream, StreamExt};
2123
use parking_lot::RwLock;
2224
use safe_arith::ArithError;
2325
use slog::{debug, error, info, warn, Logger};
2426
use slot_clock::SlotClock;
27+
use std::cmp::min;
2528
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
2629
use std::sync::Arc;
2730
use std::time::Duration;
@@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
5457
/// flag in the cli to enable collection of per validator metrics.
5558
const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
5659

60+
/// The number of validators to request duty information for in the initial request.
61+
/// The initial request is used to determine if further requests are required, so that it
62+
/// reduces the amount of data that needs to be transferred.
63+
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;
64+
5765
#[derive(Debug)]
5866
pub enum Error {
5967
UnableToReadSlotClock,
@@ -531,7 +539,6 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
531539
current_epoch,
532540
&local_indices,
533541
&local_pubkeys,
534-
current_slot,
535542
)
536543
.await
537544
{
@@ -544,21 +551,18 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
544551
)
545552
}
546553

554+
update_per_validator_duty_metrics::<T, E>(duties_service, current_epoch, current_slot);
555+
547556
drop(current_epoch_timer);
548557
let next_epoch_timer = metrics::start_timer_vec(
549558
&metrics::DUTIES_SERVICE_TIMES,
550559
&[metrics::UPDATE_ATTESTERS_NEXT_EPOCH],
551560
);
552561

553562
// Download the duties and update the duties for the next epoch.
554-
if let Err(e) = poll_beacon_attesters_for_epoch(
555-
duties_service,
556-
next_epoch,
557-
&local_indices,
558-
&local_pubkeys,
559-
current_slot,
560-
)
561-
.await
563+
if let Err(e) =
564+
poll_beacon_attesters_for_epoch(duties_service, next_epoch, &local_indices, &local_pubkeys)
565+
.await
562566
{
563567
error!(
564568
log,
@@ -569,6 +573,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
569573
)
570574
}
571575

576+
update_per_validator_duty_metrics::<T, E>(duties_service, next_epoch, current_slot);
577+
572578
drop(next_epoch_timer);
573579
let subscriptions_timer =
574580
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);
@@ -655,7 +661,6 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
655661
epoch: Epoch,
656662
local_indices: &[u64],
657663
local_pubkeys: &HashSet<PublicKeyBytes>,
658-
current_slot: Slot,
659664
) -> Result<(), Error> {
660665
let log = duties_service.context.log();
661666

@@ -674,84 +679,69 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
674679
&[metrics::UPDATE_ATTESTERS_FETCH],
675680
);
676681

677-
let response = duties_service
678-
.beacon_nodes
679-
.first_success(
680-
duties_service.require_synced,
681-
OfflineOnFailure::Yes,
682-
|beacon_node| async move {
683-
let _timer = metrics::start_timer_vec(
684-
&metrics::DUTIES_SERVICE_TIMES,
685-
&[metrics::ATTESTER_DUTIES_HTTP_POST],
686-
);
687-
beacon_node
688-
.post_validator_duties_attester(epoch, local_indices)
689-
.await
690-
},
691-
)
692-
.await
693-
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?;
694-
695-
drop(fetch_timer);
696-
let _store_timer = metrics::start_timer_vec(
697-
&metrics::DUTIES_SERVICE_TIMES,
698-
&[metrics::UPDATE_ATTESTERS_STORE],
699-
);
682+
// Request duties for all uninitialized validators. If there isn't any, we will just request for
683+
// `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to
684+
// determine whether validator duties need to be updated. This is to ensure that we don't
685+
// request for extra data unless necessary in order to save on network bandwidth.
686+
let uninitialized_validators =
687+
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
688+
let indices_to_request = if !uninitialized_validators.is_empty() {
689+
uninitialized_validators.as_slice()
690+
} else {
691+
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
692+
};
700693

694+
let response =
695+
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?;
701696
let dependent_root = response.dependent_root;
702697

703-
// Filter any duties that are not relevant or already known.
704-
let new_duties = {
698+
// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
699+
let validators_to_update: Vec<_> = {
705700
// Avoid holding the read-lock for any longer than required.
706701
let attesters = duties_service.attesters.read();
707-
response
702+
local_pubkeys
703+
.iter()
704+
.filter(|pubkey| {
705+
attesters.get(pubkey).map_or(true, |duties| {
706+
duties
707+
.get(&epoch)
708+
.map_or(true, |(prior, _)| *prior != dependent_root)
709+
})
710+
})
711+
.collect::<Vec<_>>()
712+
};
713+
714+
if validators_to_update.is_empty() {
715+
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
716+
return Ok(());
717+
}
718+
719+
// Filter out validators which have already been requested.
720+
let initial_duties = &response.data;
721+
let indices_to_request = validators_to_update
722+
.iter()
723+
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey))
724+
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
725+
.collect::<Vec<_>>();
726+
727+
let new_duties = if !indices_to_request.is_empty() {
728+
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
729+
.await?
708730
.data
709731
.into_iter()
710-
.filter(|duty| {
711-
if duties_service.per_validator_metrics() {
712-
let validator_index = duty.validator_index;
713-
let duty_slot = duty.slot;
714-
if let Some(existing_slot_gauge) =
715-
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
716-
{
717-
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
718-
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
719-
720-
// First condition ensures that we switch to the next epoch duty slot
721-
// once the current epoch duty slot passes.
722-
// Second condition is to ensure that next epoch duties don't override
723-
// current epoch duties.
724-
if existing_slot < current_slot
725-
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
726-
&& duty_slot > current_slot
727-
&& duty_slot != existing_slot)
728-
{
729-
existing_slot_gauge.set(duty_slot.as_u64() as i64);
730-
}
731-
} else {
732-
set_int_gauge(
733-
&ATTESTATION_DUTY,
734-
&[&validator_index.to_string()],
735-
duty_slot.as_u64() as i64,
736-
);
737-
}
738-
}
739-
740-
local_pubkeys.contains(&duty.pubkey) && {
741-
// Only update the duties if either is true:
742-
//
743-
// - There were no known duties for this epoch.
744-
// - The dependent root has changed, signalling a re-org.
745-
attesters.get(&duty.pubkey).map_or(true, |duties| {
746-
duties
747-
.get(&epoch)
748-
.map_or(true, |(prior, _)| *prior != dependent_root)
749-
})
750-
}
751-
})
732+
.chain(response.data)
752733
.collect::<Vec<_>>()
734+
} else {
735+
response.data
753736
};
754737

738+
drop(fetch_timer);
739+
740+
let _store_timer = metrics::start_timer_vec(
741+
&metrics::DUTIES_SERVICE_TIMES,
742+
&[metrics::UPDATE_ATTESTERS_STORE],
743+
);
744+
755745
debug!(
756746
log,
757747
"Downloaded attester duties";
@@ -799,6 +789,89 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
799789
Ok(())
800790
}
801791

792+
/// Get a filtered list of local validators for which we don't already know their duties for that epoch
793+
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
794+
duties_service: &Arc<DutiesService<T, E>>,
795+
epoch: &Epoch,
796+
local_pubkeys: &HashSet<PublicKeyBytes>,
797+
) -> Vec<u64> {
798+
let attesters = duties_service.attesters.read();
799+
local_pubkeys
800+
.iter()
801+
.filter(|pubkey| {
802+
attesters
803+
.get(pubkey)
804+
.map_or(true, |duties| !duties.contains_key(epoch))
805+
})
806+
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
807+
.collect::<Vec<_>>()
808+
}
809+
810+
fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
811+
duties_service: &Arc<DutiesService<T, E>>,
812+
epoch: Epoch,
813+
current_slot: Slot,
814+
) {
815+
if duties_service.per_validator_metrics() {
816+
let attesters = duties_service.attesters.read();
817+
attesters.values().for_each(|attester_duties_by_epoch| {
818+
if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) {
819+
let duty = &duty_and_proof.duty;
820+
let validator_index = duty.validator_index;
821+
let duty_slot = duty.slot;
822+
if let Some(existing_slot_gauge) =
823+
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
824+
{
825+
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
826+
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
827+
828+
// First condition ensures that we switch to the next epoch duty slot
829+
// once the current epoch duty slot passes.
830+
// Second condition is to ensure that next epoch duties don't override
831+
// current epoch duties.
832+
if existing_slot < current_slot
833+
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
834+
&& duty_slot > current_slot
835+
&& duty_slot != existing_slot)
836+
{
837+
existing_slot_gauge.set(duty_slot.as_u64() as i64);
838+
}
839+
} else {
840+
set_int_gauge(
841+
&ATTESTATION_DUTY,
842+
&[&validator_index.to_string()],
843+
duty_slot.as_u64() as i64,
844+
);
845+
}
846+
}
847+
});
848+
}
849+
}
850+
851+
async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
852+
duties_service: &Arc<DutiesService<T, E>>,
853+
epoch: Epoch,
854+
validator_indices: &[u64],
855+
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
856+
duties_service
857+
.beacon_nodes
858+
.first_success(
859+
duties_service.require_synced,
860+
OfflineOnFailure::Yes,
861+
|beacon_node| async move {
862+
let _timer = metrics::start_timer_vec(
863+
&metrics::DUTIES_SERVICE_TIMES,
864+
&[metrics::ATTESTER_DUTIES_HTTP_POST],
865+
);
866+
beacon_node
867+
.post_validator_duties_attester(epoch, validator_indices)
868+
.await
869+
},
870+
)
871+
.await
872+
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
873+
}
874+
802875
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
803876
///
804877
/// Duties are computed in batches each slot. If a re-org is detected then the process will

0 commit comments

Comments
 (0)