Skip to content

Commit f128ac9

Browse files
feat: [MR-354] Load & validate protos asynchronously (dfinity#2594)
As a part of the asynchronous checkpointing, we move reading protos to the Tip thread. Currently we have the ReplicatedState from execution, serialize it into a checkpoint, load back the checkpoint in order to ensure it's loadable and equivalent to the original one, use the loaded checkpoint to switch the file descriptors in the original state to point to the new files. The protos are only used for validation and at replica init. With this change we do loading and validation away from the critical path. --------- Co-authored-by: Stefan Schneider <[email protected]>
1 parent c22d478 commit f128ac9

File tree

11 files changed

+396
-175
lines changed

11 files changed

+396
-175
lines changed

rs/replicated_state/src/canister_state/queues.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ pub struct CanisterQueues {
152152

153153
/// Slot and memory reservation stats. Message count and size stats are
154154
/// maintained separately in the `MessagePool`.
155+
#[validate_eq(CompareWithValidateEq)]
155156
queue_stats: QueueStats,
156157

157158
/// Round-robin schedule for `pop_input()` across ingress, local subnet senders
@@ -1896,7 +1897,7 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can
18961897
///
18971898
/// Stats for the enqueued messages themselves (counts and sizes by kind,
18981899
/// context and class) are tracked separately in `message_pool::MessageStats`.
1899-
#[derive(Clone, Eq, PartialEq, Debug, Default)]
1900+
#[derive(Clone, Eq, PartialEq, Debug, Default, ValidateEq)]
19001901
struct QueueStats {
19011902
/// Count of guaranteed response memory reservations across input and output
19021903
/// queues. This is equivalent to the number of outstanding (inbound or outbound)

rs/replicated_state/src/page_map/storage.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ pub(crate) struct StorageImpl {
161161
overlays: Vec<OverlayFile>,
162162
}
163163

164-
pub fn verify(storage_layout: &dyn StorageLayout) -> Result<(), PersistenceError> {
164+
/// Validate that the overlay files are loadable.
165+
pub fn validate(storage_layout: &dyn StorageLayout) -> Result<(), PersistenceError> {
165166
StorageImpl::load(storage_layout)?;
166167
Ok(())
167168
}

rs/replicated_state/src/page_map/storage/tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99

1010
use crate::page_map::{
1111
storage::{
12-
verify, Checkpoint, FileIndex, MergeCandidate, MergeDestination, OverlayFile,
12+
validate, Checkpoint, FileIndex, MergeCandidate, MergeDestination, OverlayFile,
1313
PageIndexRange, Shard, Storage, StorageLayout, CURRENT_OVERLAY_VERSION,
1414
PAGE_INDEX_RANGE_NUM_BYTES, SIZE_NUM_BYTES, VERSION_NUM_BYTES,
1515
},
@@ -1388,7 +1388,7 @@ fn overlapping_shards_is_an_error() {
13881388
tempdir.path().join("000000_010_vmemory_0.overlay"),
13891389
]
13901390
);
1391-
assert!(verify(&ShardedTestStorageLayout {
1391+
assert!(validate(&ShardedTestStorageLayout {
13921392
dir_path: tempdir.path().to_path_buf(),
13931393
base: tempdir.path().join("vmemory_0.bin"),
13941394
overlay_suffix: "vmemory_0.overlay".to_owned(),
@@ -1399,7 +1399,7 @@ fn overlapping_shards_is_an_error() {
13991399
tempdir.path().join("000000_011_vmemory_0.overlay"),
14001400
)
14011401
.unwrap();
1402-
assert!(verify(&ShardedTestStorageLayout {
1402+
assert!(validate(&ShardedTestStorageLayout {
14031403
dir_path: tempdir.path().to_path_buf(),
14041404
base: tempdir.path().join("vmemory_0.bin"),
14051405
overlay_suffix: "vmemory_0.overlay".to_owned(),
@@ -1425,7 +1425,7 @@ fn returns_an_error_if_file_size_is_not_a_multiple_of_page_size() {
14251425
.write_all(&vec![1; PAGE_SIZE / 2])
14261426
.unwrap();
14271427

1428-
match verify(&base_only_storage_layout(heap_file.to_path_buf())) {
1428+
match validate(&base_only_storage_layout(heap_file.to_path_buf())) {
14291429
Err(err) => assert!(
14301430
err.is_invalid_heap_file(),
14311431
"Expected invalid heap file error, got {:?}",

rs/replicated_state/src/replicated_state.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,34 @@ impl ReplicatedState {
447447
}
448448
}
449449

450+
/// References into _all_ fields.
451+
pub fn component_refs(
452+
&self,
453+
) -> (
454+
&BTreeMap<CanisterId, CanisterState>,
455+
&SystemMetadata,
456+
&CanisterQueues,
457+
&Vec<ConsensusResponse>,
458+
&RawQueryStats,
459+
&CanisterSnapshots,
460+
) {
461+
let ReplicatedState {
462+
ref canister_states,
463+
ref metadata,
464+
ref subnet_queues,
465+
ref consensus_queue,
466+
ref epoch_query_stats,
467+
ref canister_snapshots,
468+
} = self;
469+
(
470+
canister_states,
471+
metadata,
472+
subnet_queues,
473+
consensus_queue,
474+
epoch_query_stats,
475+
canister_snapshots,
476+
)
477+
}
450478
pub fn canister_state(&self, canister_id: &CanisterId) -> Option<&CanisterState> {
451479
self.canister_states.get(canister_id)
452480
}

0 commit comments

Comments
 (0)