Skip to content

Commit 08f16b0

Browse files
committed
[fix] #2133: Rewrite topology to be closer the whitepaper.
Signed-off-by: Sam H. Smith <[email protected]>
1 parent 8eab7c5 commit 08f16b0

File tree

7 files changed

+152
-557
lines changed

7 files changed

+152
-557
lines changed

core/src/block.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ impl PendingBlock {
7979
height: u64,
8080
previous_block_hash: Option<HashOf<VersionedCommittedBlock>>,
8181
view_change_index: u64,
82+
committed_with_topology: Topology,
8283
) -> ChainedBlock {
8384
ChainedBlock {
8485
transactions: self.transactions,
@@ -91,13 +92,13 @@ impl PendingBlock {
9192
previous_block_hash,
9293
transactions_hash: None,
9394
rejected_transactions_hash: None,
94-
genesis_topology: None,
95+
committed_with_topology,
9596
},
9697
}
9798
}
9899

99100
/// Create a new blockchain with current block as a first block.
100-
pub fn chain_first_with_genesis_topology(self, genesis_topology: Topology) -> ChainedBlock {
101+
pub fn chain_first_with_topology(self, genesis_topology: Topology) -> ChainedBlock {
101102
ChainedBlock {
102103
transactions: self.transactions,
103104
event_recommendations: self.event_recommendations,
@@ -109,7 +110,7 @@ impl PendingBlock {
109110
previous_block_hash: None,
110111
transactions_hash: None,
111112
rejected_transactions_hash: None,
112-
genesis_topology: Some(genesis_topology),
113+
committed_with_topology: genesis_topology,
113114
},
114115
}
115116
}
@@ -127,7 +128,7 @@ impl PendingBlock {
127128
previous_block_hash: None,
128129
transactions_hash: None,
129130
rejected_transactions_hash: None,
130-
genesis_topology: None,
131+
committed_with_topology: Topology::new(Vec::new()),
131132
},
132133
}
133134
}
@@ -162,8 +163,8 @@ pub struct BlockHeader {
162163
pub transactions_hash: Option<HashOf<MerkleTree<VersionedSignedTransaction>>>,
163164
/// Hash of merkle tree root of the tree of rejected transactions' hashes.
164165
pub rejected_transactions_hash: Option<HashOf<MerkleTree<VersionedSignedTransaction>>>,
165-
/// Genesis topology
166-
pub genesis_topology: Option<Topology>,
166+
/// Network topology when the block was committed.
167+
pub committed_with_topology: Topology,
167168
}
168169

169170
impl BlockHeader {
@@ -388,7 +389,7 @@ impl SignedBlock {
388389
previous_block_hash: None,
389390
transactions_hash: None,
390391
rejected_transactions_hash: None,
391-
genesis_topology: None,
392+
committed_with_topology: Topology::new(Vec::new()),
392393
},
393394
rejected_transactions: Vec::new(),
394395
transactions: Vec::new(),

core/src/genesis.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
clippy::arithmetic_side_effects
99
)]
1010

11-
use std::{collections::HashSet, fmt::Debug, fs::File, io::BufReader, ops::Deref, path::Path};
11+
use std::{fmt::Debug, fs::File, io::BufReader, ops::Deref, path::Path};
1212

1313
use derive_more::Deref;
1414
use eyre::{bail, eyre, Result, WrapErr};
@@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize};
2525
use tokio::{time, time::Duration};
2626

2727
use crate::{
28-
sumeragi::network_topology::{GenesisBuilder as GenesisTopologyBuilder, Topology},
28+
sumeragi::network_topology::Topology,
2929
tx::VersionedAcceptedTransaction,
3030
IrohaNetwork,
3131
};
@@ -88,31 +88,14 @@ async fn try_get_online_topology(
8888
network_topology: &Topology,
8989
network: Addr<IrohaNetwork>,
9090
) -> Result<Topology> {
91-
let (online_peers, offline_peers) =
91+
let (online_peers, _offline_peers) =
9292
check_peers_status(this_peer_id, network_topology, network).await;
9393
let set_a_len = network_topology.min_votes_for_commit();
9494
if online_peers.len() < set_a_len {
9595
return Err(eyre!("Not enough online peers for consensus."));
9696
}
97-
let genesis_topology = if network_topology.sorted_peers().len() == 1 {
98-
network_topology.clone()
99-
} else {
100-
let set_a: HashSet<_> = online_peers[..set_a_len].iter().cloned().collect();
101-
let set_b: HashSet<_> = online_peers[set_a_len..]
102-
.iter()
103-
.cloned()
104-
.chain(offline_peers.into_iter())
105-
.collect();
106-
#[allow(clippy::expect_used)]
107-
GenesisTopologyBuilder::new()
108-
.with_leader(this_peer_id.clone())
109-
.with_set_a(set_a)
110-
.with_set_b(set_b)
111-
.build()
112-
.expect("Preconditions should be already checked.")
113-
};
11497
iroha_logger::info!("Waiting for active peers finished.");
115-
Ok(genesis_topology)
98+
Ok(network_topology.clone())
11699
}
117100

118101
/// Checks which [`Peer`]s are online and which are offline
@@ -131,7 +114,7 @@ async fn check_peers_status(
131114
iroha_logger::info!(peer_count = peers.len(), "Peers status");
132115

133116
let (online, offline): (SmallVec<_>, SmallVec<_>) = network_topology
134-
.sorted_peers()
117+
.sorted_peers
135118
.iter()
136119
.cloned()
137120
.partition(|id| peers.contains(&id.public_key) || this_peer_id.public_key == id.public_key);

core/src/smartcontracts/isi/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ mod tests {
261261

262262
for height in 1u64..blocks {
263263
let block: VersionedCommittedBlock = PendingBlock::new(transactions.clone(), vec![])
264-
.chain(height, Some(curr_hash), 0)
264+
.chain(height, Some(curr_hash), 0, crate::sumeragi::network_topology::Topology::new(vec![]))
265265
.validate(
266266
&TransactionValidator::new(
267267
limits,

core/src/sumeragi/main_loop.rs

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ impl<F: FaultInjection> SumeragiWithFault<F> {
196196

197197
#[allow(clippy::needless_pass_by_value)]
198198
fn broadcast_packet(&self, msg: MessagePacket, topology: &Topology) {
199-
self.broadcast_packet_to(msg, topology.sorted_peers());
199+
self.broadcast_packet_to(msg, &topology.sorted_peers);
200200
}
201201

202202
fn gossip_transactions(&self, state: &State, view_change_proof_chain: &ProofChain) {
@@ -222,7 +222,7 @@ impl<F: FaultInjection> SumeragiWithFault<F> {
222222
/// Connect or disconnect peers according to the current network topology.
223223
fn connect_peers(&self, topology: &Topology) {
224224
let peers_expected = {
225-
let mut res = topology.sorted_peers().to_owned();
225+
let mut res = topology.sorted_peers.clone();
226226
res.retain(|id| id.address != self.peer_id.address);
227227
res.shuffle(&mut rand::thread_rng());
228228
res
@@ -286,7 +286,7 @@ impl<F: FaultInjection> SumeragiWithFault<F> {
286286
Ok(packet) => {
287287
if let Err(error) = view_change_proof_chain.merge(
288288
packet.view_change_proofs,
289-
current_topology.sorted_peers(),
289+
&current_topology.sorted_peers,
290290
current_topology.max_faults(),
291291
state.latest_block_hash,
292292
) {
@@ -368,11 +368,6 @@ impl<F: FaultInjection> SumeragiWithFault<F> {
368368
};
369369

370370
if block.header().is_genesis() {
371-
if let Some(topology) = block.header().genesis_topology.clone().take() {
372-
state.current_topology = topology;
373-
info!("Using genesis topology");
374-
}
375-
376371
commit_block(self, state, block);
377372
return Err(EarlyReturn::GenesisBlockReceivedAndCommitted);
378373
}
@@ -418,8 +413,12 @@ fn commit_block<F: FaultInjection>(
418413
%block_hash, "Committing block"
419414
);
420415

416+
*current_topology = committed_block.header().committed_with_topology.clone();
417+
current_topology.lift_up_peers(&committed_block.signatures().into_iter().map(|id| id.public_key.clone()).collect::<Vec<PublicKey>>());
418+
current_topology.rotate_a_set();
419+
current_topology.update_peer_list(&state.wsv.peers_ids().iter().map(|id| id.clone()).collect::<Vec<PeerId>>());
420+
421421
sumeragi.kura.store_block(committed_block);
422-
current_topology.recreate(&state.wsv, block_hash);
423422

424423
cache_transaction(state, sumeragi);
425424
}
@@ -458,8 +457,12 @@ fn replace_top_block<F: FaultInjection>(
458457
%block_hash, "Replacing top block"
459458
);
460459

460+
*current_topology = committed_block.header().committed_with_topology.clone();
461+
current_topology.lift_up_peers(&committed_block.signatures().into_iter().map(|id| id.public_key.clone()).collect::<Vec<PublicKey>>());
462+
current_topology.rotate_a_set();
463+
current_topology.update_peer_list(&state.wsv.peers_ids().iter().map(|id| id.clone()).collect::<Vec<PeerId>>());
464+
461465
sumeragi.kura.replace_top_block(committed_block);
462-
current_topology.recreate(&state.wsv, block_hash);
463466

464467
cache_transaction(state, sumeragi)
465468
}
@@ -491,7 +494,7 @@ fn suggest_view_change<F: FaultInjection>(
491494

492495
view_change_proof_chain
493496
.insert_proof(
494-
state.current_topology.sorted_peers(),
497+
&state.current_topology.sorted_peers,
495498
state.current_topology.max_faults(),
496499
state.latest_block_hash,
497500
suspect_proof,
@@ -511,7 +514,7 @@ fn prune_view_change_proofs_and_calculate_current_index(
511514
) -> u64 {
512515
view_change_proof_chain.prune(state.latest_block_hash);
513516
view_change_proof_chain.verify_with_state(
514-
state.current_topology.sorted_peers(),
517+
&state.current_topology.sorted_peers,
515518
state.current_topology.max_faults(),
516519
state.latest_block_hash,
517520
) as u64
@@ -622,6 +625,9 @@ fn handle_message<F: FaultInjection>(
622625
let voting_block_hash = voted_block.block.hash();
623626

624627
if hash == voting_block_hash.transmute() {
628+
// The manipulation of the topology relies upon all peers seeing the same signature set.
629+
// Therefore we must clear the signatures and accept what the proxy tail giveth.
630+
voted_block.block.signatures.clear();
625631
add_signatures::<true>(&mut voted_block, signatures.transmute());
626632

627633
match voted_block.block.commit(current_topology) {
@@ -743,6 +749,7 @@ fn process_message_independent<F: FaultInjection>(
743749
state.latest_block_height,
744750
state.latest_block_hash,
745751
current_view_change_index,
752+
state.current_topology.clone(),
746753
)
747754
.validate(&sumeragi.transaction_validator, &state.wsv)
748755
.sign(sumeragi.key_pair.clone())
@@ -836,17 +843,21 @@ fn reset_state(
836843
// a view change a new block is immediately created by the leader
837844
*round_start_time = Instant::now();
838845
was_commit_or_view_change = true;
846+
*old_view_change_index = 0;
839847
}
840848

841-
if current_view_change_index != *old_view_change_index {
842-
current_topology.rebuild_with_new_view_change_count(current_view_change_index);
849+
while *old_view_change_index < current_view_change_index {
850+
*old_view_change_index += 1;
851+
error!(addr=%peer_id.address, "Rotating the entire topology.");
852+
current_topology.rotate_all();
843853
was_commit_or_view_change = true;
854+
855+
println!("{:?}", current_topology);
844856
}
845857

846858
// Reset state for the next round.
847859
if was_commit_or_view_change {
848860
*old_latest_block_height = current_latest_block_height;
849-
*old_view_change_index = current_view_change_index;
850861

851862
*voting_block = None;
852863
voting_signatures.clear();
@@ -970,11 +981,11 @@ pub(crate) fn run<F: FaultInjection>(
970981

971982
if let Some(VotingBlock { block, .. }) = voting_block.as_ref() {
972983
// NOTE: Suspecting the tail node because it hasn't yet committed a block produced by leader
973-
warn!(%role, block=%block.hash(), "Block not committed in due time, requesting view change...");
984+
warn!(peer_public_key=?sumeragi.peer_id.public_key, %role, block=%block.hash(), "Block not committed in due time, requesting view change...");
974985
} else {
975986
// NOTE: Suspecting the leader node because it hasn't produced a block
976987
// If the current node has a transaction, the leader should have as well
977-
warn!(%role, "No block produced in due time, requesting view change...");
988+
warn!(peer_public_key=?sumeragi.peer_id.public_key, %role, "No block produced in due time, requesting view change...");
978989
}
979990

980991
suggest_view_change(
@@ -1059,6 +1070,16 @@ fn vote_for_block<F: FaultInjection>(
10591070
return None;
10601071
}
10611072

1073+
if block.header().committed_with_topology != state.current_topology
1074+
{
1075+
error!(
1076+
%addr, %role, block_topology=?block.header().committed_with_topology, my_topology=?state.current_topology, hash=%block.hash(),
1077+
"The block is rejected as because the topology field is incorrect."
1078+
);
1079+
1080+
return None;
1081+
}
1082+
10621083
let block = {
10631084
let span = span!(Level::TRACE, "block revalidation");
10641085
let _enter = span.enter();
@@ -1104,7 +1125,7 @@ fn sumeragi_init_commit_genesis<F: FaultInjection>(
11041125
"Genesis transaction set contains no valid transactions"
11051126
);
11061127
let block = PendingBlock::new(transactions, Vec::new())
1107-
.chain_first_with_genesis_topology(state.current_topology.clone());
1128+
.chain_first_with_topology(state.current_topology.clone());
11081129

11091130
{
11101131
info!(block_hash = %block.hash(), "Publishing genesis block.");

core/src/sumeragi/mod.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -280,22 +280,15 @@ impl Sumeragi {
280280
let latest_block_hash = wsv.latest_block_hash();
281281
let previous_block_hash = wsv.previous_block_hash();
282282

283-
let current_topology = latest_block_hash.map_or_else(
284-
|| {
285-
assert!(!sumeragi.config.trusted_peers.peers.is_empty());
286-
Topology::builder()
287-
.with_peers(sumeragi.config.trusted_peers.peers.clone())
288-
.build(0)
289-
.expect("This builder must have been valid. This is a programmer error.")
290-
},
291-
|block_hash| {
292-
Topology::builder()
293-
.with_peers(wsv.peers().iter().map(|peer| peer.id().clone()))
294-
.at_block(block_hash)
295-
.build(0)
296-
.expect("Should be able to reconstruct topology from `wsv`")
297-
},
298-
);
283+
let current_topology = if latest_block_height == 0 {
284+
assert!(!sumeragi.config.trusted_peers.peers.is_empty());
285+
Topology::new(sumeragi.config.trusted_peers.peers.clone())
286+
} else {
287+
let block_ref = sumeragi.internal.kura.get_block_by_height(latest_block_height).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected.");
288+
let mut topology = block_ref.header().committed_with_topology.clone();
289+
topology.rotate_a_set();
290+
topology
291+
};
299292

300293
let sumeragi_state_machine_data = State {
301294
previous_block_hash,

0 commit comments

Comments
 (0)