-
Notifications
You must be signed in to change notification settings - Fork 898
[Merged by Bors] - Reduce bandwidth over the VC<>BN API using dependant roots #4170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
9e47bed
d905e8b
9cc8783
fa3c67a
a107882
1f56417
c120108
16797e0
7d6c042
cfaab2d
ab22458
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,15 @@ use crate::{ | |
validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}, | ||
}; | ||
use environment::RuntimeContext; | ||
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; | ||
use eth2::types::{ | ||
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId, | ||
}; | ||
use futures::{stream, StreamExt}; | ||
use parking_lot::RwLock; | ||
use safe_arith::ArithError; | ||
use slog::{debug, error, info, warn, Logger}; | ||
use slot_clock::SlotClock; | ||
use std::cmp::min; | ||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
|
@@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; | |
/// flag in the cli to enable collection of per validator metrics. | ||
const VALIDATOR_METRICS_MIN_COUNT: usize = 64; | ||
|
||
/// The number of validators to request duty information for in the initial request. | ||
/// The initial request is used to determine if further requests are required, so that it | ||
/// reduces the amount of data that needs to be transferred. | ||
const INITIAL_DUTIES_QUERY_SIZE: usize = 1; | ||
|
||
#[derive(Debug)] | ||
pub enum Error { | ||
UnableToReadSlotClock, | ||
|
@@ -674,84 +682,69 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>( | |
&[metrics::UPDATE_ATTESTERS_FETCH], | ||
); | ||
|
||
let response = duties_service | ||
.beacon_nodes | ||
.first_success( | ||
duties_service.require_synced, | ||
OfflineOnFailure::Yes, | ||
|beacon_node| async move { | ||
let _timer = metrics::start_timer_vec( | ||
&metrics::DUTIES_SERVICE_TIMES, | ||
&[metrics::ATTESTER_DUTIES_HTTP_POST], | ||
); | ||
beacon_node | ||
.post_validator_duties_attester(epoch, local_indices) | ||
.await | ||
}, | ||
) | ||
.await | ||
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?; | ||
|
||
drop(fetch_timer); | ||
let _store_timer = metrics::start_timer_vec( | ||
&metrics::DUTIES_SERVICE_TIMES, | ||
&[metrics::UPDATE_ATTESTERS_STORE], | ||
); | ||
// Request duties for either `INITIAL_DUTIES_QUERY_SIZE` validators or the count of validators for which we | ||
// don't already know their duties for that epoch, whichever subset is larger. We use the `dependent_root` | ||
// in the response to determine whether validator duties need to be updated. This is to ensure that we don't | ||
// request for extra data unless necessary in order to save on network bandwidth. | ||
let uninitialized_validators = | ||
get_uninitialized_validators(duties_service, &epoch, local_pubkeys); | ||
let indices_to_request = if !uninitialized_validators.is_empty() { | ||
uninitialized_validators.as_slice() | ||
} else { | ||
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] | ||
}; | ||
|
||
let response = | ||
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?; | ||
let dependent_root = response.dependent_root; | ||
|
||
// Filter any duties that are not relevant or already known. | ||
let new_duties = { | ||
// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch. | ||
let validators_to_update: Vec<_> = { | ||
// Avoid holding the read-lock for any longer than required. | ||
let attesters = duties_service.attesters.read(); | ||
response | ||
local_pubkeys | ||
.iter() | ||
.filter(|pubkey| { | ||
attesters.get(pubkey).map_or(true, |duties| { | ||
duties | ||
.get(&epoch) | ||
.map_or(true, |(prior, _)| *prior != dependent_root) | ||
}) | ||
}) | ||
.collect::<Vec<_>>() | ||
}; | ||
|
||
if validators_to_update.is_empty() { | ||
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch. | ||
return Ok(()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to make sure we run Since we've changed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch 🙏 , I've moved this to where you suggested! |
||
} | ||
|
||
// Filter out validators which have already been requested. | ||
let initial_duties = &response.data; | ||
let indices_to_request = validators_to_update | ||
.iter() | ||
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey)) | ||
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) | ||
.collect::<Vec<_>>(); | ||
|
||
let new_duties = if !indices_to_request.is_empty() { | ||
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice()) | ||
.await? | ||
.data | ||
.into_iter() | ||
.filter(|duty| { | ||
if duties_service.per_validator_metrics() { | ||
let validator_index = duty.validator_index; | ||
let duty_slot = duty.slot; | ||
if let Some(existing_slot_gauge) = | ||
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) | ||
{ | ||
let existing_slot = Slot::new(existing_slot_gauge.get() as u64); | ||
let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); | ||
|
||
// First condition ensures that we switch to the next epoch duty slot | ||
// once the current epoch duty slot passes. | ||
// Second condition is to ensure that next epoch duties don't override | ||
// current epoch duties. | ||
if existing_slot < current_slot | ||
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch | ||
&& duty_slot > current_slot | ||
&& duty_slot != existing_slot) | ||
{ | ||
existing_slot_gauge.set(duty_slot.as_u64() as i64); | ||
} | ||
} else { | ||
set_int_gauge( | ||
&ATTESTATION_DUTY, | ||
&[&validator_index.to_string()], | ||
duty_slot.as_u64() as i64, | ||
); | ||
} | ||
} | ||
|
||
local_pubkeys.contains(&duty.pubkey) && { | ||
// Only update the duties if either is true: | ||
// | ||
// - There were no known duties for this epoch. | ||
// - The dependent root has changed, signalling a re-org. | ||
attesters.get(&duty.pubkey).map_or(true, |duties| { | ||
duties | ||
.get(&epoch) | ||
.map_or(true, |(prior, _)| *prior != dependent_root) | ||
}) | ||
} | ||
}) | ||
.chain(response.data) | ||
.collect::<Vec<_>>() | ||
} else { | ||
response.data | ||
}; | ||
|
||
drop(fetch_timer); | ||
|
||
let _store_timer = metrics::start_timer_vec( | ||
&metrics::DUTIES_SERVICE_TIMES, | ||
&[metrics::UPDATE_ATTESTERS_STORE], | ||
); | ||
|
||
debug!( | ||
log, | ||
"Downloaded attester duties"; | ||
|
@@ -787,6 +780,10 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>( | |
} | ||
drop(attesters); | ||
|
||
if duties_service.per_validator_metrics() { | ||
update_per_validator_duty_metrics::<T, E>(duties_service, epoch, current_slot); | ||
} | ||
|
||
// Spawn the background task to compute selection proofs. | ||
let subservice = duties_service.clone(); | ||
duties_service.context.executor.spawn( | ||
|
@@ -799,6 +796,87 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>( | |
Ok(()) | ||
} | ||
|
||
/// Get a filtered list of local validators for which we don't already know their duties for that epoch | ||
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>( | ||
duties_service: &Arc<DutiesService<T, E>>, | ||
epoch: &Epoch, | ||
local_pubkeys: &HashSet<PublicKeyBytes>, | ||
) -> Vec<u64> { | ||
let attesters = duties_service.attesters.read(); | ||
local_pubkeys | ||
.iter() | ||
.filter(|pubkey| { | ||
attesters | ||
.get(pubkey) | ||
.map_or(true, |duties| !duties.contains_key(epoch)) | ||
}) | ||
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) | ||
.collect::<Vec<_>>() | ||
} | ||
|
||
fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>( | ||
duties_service: &Arc<DutiesService<T, E>>, | ||
epoch: Epoch, | ||
current_slot: Slot, | ||
) { | ||
let attesters = duties_service.attesters.read(); | ||
attesters.values().for_each(|attester_duties_by_epoch| { | ||
if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) { | ||
let duty = &duty_and_proof.duty; | ||
let validator_index = duty.validator_index; | ||
let duty_slot = duty.slot; | ||
if let Some(existing_slot_gauge) = | ||
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) | ||
{ | ||
let existing_slot = Slot::new(existing_slot_gauge.get() as u64); | ||
let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); | ||
|
||
// First condition ensures that we switch to the next epoch duty slot | ||
// once the current epoch duty slot passes. | ||
// Second condition is to ensure that next epoch duties don't override | ||
// current epoch duties. | ||
if existing_slot < current_slot | ||
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch | ||
&& duty_slot > current_slot | ||
&& duty_slot != existing_slot) | ||
{ | ||
existing_slot_gauge.set(duty_slot.as_u64() as i64); | ||
} | ||
} else { | ||
set_int_gauge( | ||
&ATTESTATION_DUTY, | ||
&[&validator_index.to_string()], | ||
duty_slot.as_u64() as i64, | ||
); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>( | ||
duties_service: &Arc<DutiesService<T, E>>, | ||
epoch: Epoch, | ||
validator_indices: &[u64], | ||
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> { | ||
duties_service | ||
.beacon_nodes | ||
.first_success( | ||
duties_service.require_synced, | ||
OfflineOnFailure::Yes, | ||
|beacon_node| async move { | ||
let _timer = metrics::start_timer_vec( | ||
&metrics::DUTIES_SERVICE_TIMES, | ||
&[metrics::ATTESTER_DUTIES_HTTP_POST], | ||
); | ||
beacon_node | ||
.post_validator_duties_attester(epoch, validator_indices) | ||
.await | ||
}, | ||
) | ||
.await | ||
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string())) | ||
} | ||
|
||
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. | ||
/// | ||
/// Duties are computed in batches each slot. If a re-org is detected then the process will | ||
|
Uh oh!
There was an error while loading. Please reload this page.