Skip to content

Commit 6d3c488

Browse files
authored
fix(consensus): fix the hashes-in-blocks feature implementation (dfinity#3529)
# Problem Currently, with the feature flag enabled, when a node sends a block proposal to their peers it first removes all the ingress messages from the block's payload, leaving behind only the `IngressMessageId`s which identify the ingress messages. When a node receives such a "stripped" block proposal it _reconstructs_ the block by retrieving the referenced ingress messages from: 1. either their local ingress pools if there exists an ingress message with the given `IngressMessageId` there, 2. or by requesting the missing ingress messages from peers who are advertising the block. The problem with the approach is that the `IngressMessageId`s do _not_ uniquely identify the the ingress messages and two ingresses with the same content _but_ with different signatures have the same id. We thus could end up in a situation where peers reconstruct blocks different than what the block maker created. ## Note The feature is disabled on the Mainnet so no subnet is currently affected by the issue # Proposal To address this problem we introduce a new type `SignedIngressId` (which contains both `IngressMessageId` and the hash of a original ingress message bytes) which uniquely identifies a `SignedIngress`, which is used only within the the block proposal assembler module. Everything else remains basically the same but we add several checks to make sure we put the correct ingress message in the block: 1. when retrieving ingress messages from ingress pools by `IngressMessageId`s we double check that the `SignedIngressId` of the ingress message matches what we are expecting; 2. same for when we get an ingress message from a peer. # Alternatives considered Instead of introducing a new type, we could modify the existing `IngressMessageId` so that it uniquely identifies ingress messages. disadvantages over the proposed solution: 1. without any changes to the ingress pool we would end up with duplicates of the same ingress messages (but different signatures) in the pool & which would be further broadcasted to the other peers 2. the duplication detection in the ingress selector would have to be modified 3. computing `IngressMessageId` would be more expensive and we currently do it quite often 4. the change would affect multiple more components advantages over the proposed solution: 1. in some corner-case situations in the proposed solution a node might have to fetch more ingress message from their peers
1 parent 7f9687b commit 6d3c488

File tree

14 files changed

+439
-119
lines changed

14 files changed

+439
-119
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rs/p2p/artifact_downloader/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ DEPENDENCIES = [
2727
]
2828

2929
DEV_DEPENDENCIES = [
30+
"//rs/canister_client/sender",
3031
"//rs/p2p/test_utils",
3132
"//rs/test_utilities/consensus",
3233
"//rs/test_utilities/types",

rs/p2p/artifact_downloader/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ axum = { workspace = true }
1313
backoff = { workspace = true }
1414
bytes = { workspace = true }
1515
ic-base-types = { path = "../../types/base_types" }
16+
ic-canister-client-sender = { path = "../../canister_client/sender" }
1617
ic-consensus-manager = { path = "../consensus_manager" }
1718
ic-interfaces = { path = "../../interfaces" }
1819
ic-logger = { path = "../../monitoring/logger" }

rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs

Lines changed: 132 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ use super::{
2626
download::download_ingress,
2727
metrics::{FetchStrippedConsensusArtifactMetrics, IngressMessageSource, IngressSenderMetrics},
2828
stripper::Strippable,
29-
types::stripped::{
30-
MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId,
29+
types::{
30+
stripped::{
31+
MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId,
32+
},
33+
SignedIngressId,
3134
},
3235
};
3336

@@ -169,12 +172,12 @@ impl ArtifactAssembler<ConsensusMessage, MaybeStrippedConsensusMessage>
169172
.start_timer();
170173
let mut assembler = BlockProposalAssembler::new(stripped_block_proposal);
171174

172-
let missing_ingress_ids = assembler.missing_ingress_messages();
175+
let stripped_ingress_ids = assembler.missing_ingress_messages();
173176
// For each stripped object in the message, try to fetch it either from the local pools
174177
// or from a random peer who is advertising it.
175-
for missing_ingress_id in missing_ingress_ids {
178+
for stripped_ingress_id in stripped_ingress_ids {
176179
join_set.spawn(get_or_fetch(
177-
missing_ingress_id,
180+
stripped_ingress_id,
178181
self.ingress_pool.clone(),
179182
self.transport.clone(),
180183
id.as_ref().clone(),
@@ -246,7 +249,7 @@ impl ArtifactAssembler<ConsensusMessage, MaybeStrippedConsensusMessage>
246249
/// Tries to get the missing object either from the pool(s) or from the peers who are advertising
247250
/// it.
248251
async fn get_or_fetch<P: Peers>(
249-
ingress_message_id: IngressMessageId,
252+
signed_ingress_id: SignedIngressId,
250253
ingress_pool: ValidatedPoolReaderRef<SignedIngress>,
251254
transport: Arc<dyn Transport>,
252255
// Id of the *full* artifact which should contain the missing data
@@ -257,13 +260,21 @@ async fn get_or_fetch<P: Peers>(
257260
peer_rx: P,
258261
) -> (SignedIngress, NodeId) {
259262
// First check if the ingress message exists in the Ingress Pool.
260-
if let Some(ingress_message) = ingress_pool.read().unwrap().get(&ingress_message_id) {
261-
return (ingress_message, node_id);
263+
if let Some(ingress_message) = ingress_pool
264+
.read()
265+
.unwrap()
266+
.get(&signed_ingress_id.ingress_message_id)
267+
{
268+
// Make sure that this is the correct ingress message. [`IngressMessageId`] does _not_
269+
// uniquely identify ingress messages, we thus need to perform an extra check.
270+
if SignedIngressId::from(&ingress_message) == signed_ingress_id {
271+
return (ingress_message, node_id);
272+
}
262273
}
263274

264275
download_ingress(
265276
transport,
266-
ingress_message_id,
277+
signed_ingress_id,
267278
full_consensus_message_id,
268279
&log,
269280
&metrics,
@@ -290,7 +301,7 @@ pub(crate) enum AssemblyError {
290301

291302
struct BlockProposalAssembler {
292303
stripped_block_proposal: StrippedBlockProposal,
293-
ingress_messages: Vec<(IngressMessageId, Option<SignedIngress>)>,
304+
ingress_messages: Vec<(SignedIngressId, Option<SignedIngress>)>,
294305
}
295306

296307
impl BlockProposalAssembler {
@@ -300,19 +311,19 @@ impl BlockProposalAssembler {
300311
.stripped_ingress_payload
301312
.ingress_messages
302313
.iter()
303-
.map(|ingress_message_id| (ingress_message_id.clone(), None))
314+
.map(|signed_ingress_id| (signed_ingress_id.clone(), None))
304315
.collect(),
305316
stripped_block_proposal,
306317
}
307318
}
308319

309-
/// Returns the list of [`IngressMessageId`]s which have been stripped from the block.
310-
pub(crate) fn missing_ingress_messages(&self) -> Vec<IngressMessageId> {
320+
/// Returns the list of ingress messages which have been stripped from the block.
321+
pub(crate) fn missing_ingress_messages(&self) -> Vec<SignedIngressId> {
311322
self.ingress_messages
312323
.iter()
313-
.filter_map(|(ingress_message_id, maybe_ingress)| {
324+
.filter_map(|(signed_ingress_id, maybe_ingress)| {
314325
if maybe_ingress.is_none() {
315-
Some(ingress_message_id)
326+
Some(signed_ingress_id)
316327
} else {
317328
None
318329
}
@@ -326,14 +337,14 @@ impl BlockProposalAssembler {
326337
&mut self,
327338
ingress_message: SignedIngress,
328339
) -> Result<(), InsertionError> {
329-
let ingress_message_id = IngressMessageId::from(&ingress_message);
340+
let signed_ingress_id = SignedIngressId::from(&ingress_message);
330341

331342
// We can have at most 1000 elements in the vector, so it should be reasonably fast to do a
332343
// linear scan here.
333344
let (_, ingress) = self
334345
.ingress_messages
335346
.iter_mut()
336-
.find(|(id, _maybe_ingress)| *id == ingress_message_id)
347+
.find(|(id, _maybe_ingress)| *id == signed_ingress_id)
337348
.ok_or(InsertionError::NotNeeded)?;
338349

339350
if ingress.is_some() {
@@ -356,7 +367,9 @@ impl BlockProposalAssembler {
356367
let ingresses = self
357368
.ingress_messages
358369
.into_iter()
359-
.map(|(id, message)| message.ok_or_else(|| AssemblyError::Missing(id)))
370+
.map(|(id, message)| {
371+
message.ok_or_else(|| AssemblyError::Missing(id.ingress_message_id))
372+
})
360373
.collect::<Result<Vec<_>, _>>()?;
361374
let reconstructed_ingress_payload = IngressPayload::from(ingresses);
362375

@@ -377,8 +390,18 @@ impl BlockProposalAssembler {
377390
mod tests {
378391
use crate::fetch_stripped_artifact::test_utils::{
379392
fake_block_proposal_with_ingresses, fake_ingress_message,
380-
fake_ingress_message_with_arg_size, fake_stripped_block_proposal_with_ingresses,
393+
fake_ingress_message_with_arg_size, fake_ingress_message_with_sig,
394+
fake_stripped_block_proposal_with_ingresses,
381395
};
396+
use crate::fetch_stripped_artifact::types::rpc::GetIngressMessageInBlockResponse;
397+
use bytes::Bytes;
398+
use ic_interfaces::p2p::consensus::BouncerValue;
399+
use ic_logger::no_op_logger;
400+
use ic_p2p_test_utils::mocks::MockBouncerFactory;
401+
use ic_p2p_test_utils::mocks::MockTransport;
402+
use ic_p2p_test_utils::mocks::MockValidatedPoolReader;
403+
use ic_protobuf::proxy::ProtoProxy;
404+
use ic_types_test_utils::ids::NODE_1;
382405

383406
use super::*;
384407

@@ -502,4 +525,94 @@ mod tests {
502525
Err(InsertionError::NotNeeded)
503526
);
504527
}
528+
529+
#[derive(Clone)]
530+
struct MockPeers(NodeId);
531+
532+
impl Peers for MockPeers {
533+
fn peers(&self) -> Vec<NodeId> {
534+
vec![self.0]
535+
}
536+
}
537+
538+
fn set_up_assembler_with_fake_dependencies(
539+
ingress_pool_message: Option<SignedIngress>,
540+
peers_message: Option<SignedIngress>,
541+
) -> FetchStrippedConsensusArtifact {
542+
let mut mock_transport = MockTransport::new();
543+
let mut ingress_pool = MockValidatedPoolReader::<SignedIngress>::default();
544+
545+
if let Some(ingress_message) = ingress_pool_message {
546+
ingress_pool.expect_get().return_const(ingress_message);
547+
}
548+
549+
if let Some(ingress_message) = peers_message {
550+
let fake_response = axum::response::Response::builder()
551+
.body(Bytes::from(
552+
pb::GetIngressMessageInBlockResponse::proxy_encode(
553+
GetIngressMessageInBlockResponse {
554+
serialized_ingress_message: ingress_message.binary().clone(),
555+
},
556+
),
557+
))
558+
.unwrap();
559+
560+
mock_transport
561+
.expect_rpc()
562+
.returning(move |_, _| (Ok(fake_response.clone())));
563+
}
564+
565+
let consensus_pool = MockValidatedPoolReader::<ConsensusMessage>::default();
566+
let mut mock_bouncer_factory = MockBouncerFactory::default();
567+
mock_bouncer_factory
568+
.expect_new_bouncer()
569+
.returning(|_| Box::new(|_| BouncerValue::Wants));
570+
571+
let f = FetchStrippedConsensusArtifact::new(
572+
no_op_logger(),
573+
tokio::runtime::Handle::current(),
574+
Arc::new(RwLock::new(consensus_pool)),
575+
Arc::new(RwLock::new(ingress_pool)),
576+
Arc::new(mock_bouncer_factory),
577+
MetricsRegistry::new(),
578+
NODE_1,
579+
)
580+
.0;
581+
582+
(f)(Arc::new(mock_transport))
583+
}
584+
585+
/// Tests whether the assembler uses the ingress message with the correct signature in the case
586+
/// when the local ingress pool contains an ingress message with the same content as the one in
587+
/// the stripped block proposal but with a different signature.
588+
#[tokio::test]
589+
async fn roundtrip_test_with_two_identical_ingress_messages_different_signatures() {
590+
let (ingress_1, _ingress_1_id) = fake_ingress_message_with_sig("fake_1", vec![1, 2, 3]);
591+
let (ingress_2, _ingress_2_id) = fake_ingress_message_with_sig("fake_1", vec![2, 3, 4]);
592+
assert_eq!(
593+
IngressMessageId::from(&ingress_1),
594+
IngressMessageId::from(&ingress_2)
595+
);
596+
let block_proposal = fake_block_proposal_with_ingresses(vec![ingress_2.clone()]);
597+
598+
let assembler = set_up_assembler_with_fake_dependencies(
599+
/*ingress_pool_message=*/ Some(ingress_1.clone()),
600+
/*consensus_pool_message=*/ Some(ingress_2.clone()),
601+
);
602+
let stripped_block_proposal =
603+
assembler.disassemble_message(ConsensusMessage::BlockProposal(block_proposal.clone()));
604+
let reassembled_block_proposal = assembler
605+
.assemble_message(
606+
stripped_block_proposal.id(),
607+
Some((stripped_block_proposal, NODE_1)),
608+
MockPeers(NODE_1),
609+
)
610+
.await
611+
.expect("should reassemble the message given the dependencies");
612+
613+
assert_eq!(
614+
reassembled_block_proposal,
615+
(ConsensusMessage::BlockProposal(block_proposal), NODE_1)
616+
);
617+
}
505618
}

0 commit comments

Comments
 (0)