Skip to content

Commit f333d45

Browse files
authored
Merge pull request #164 from alpenlabs/90-finalized-blocks
duty_executor: figure out which blocks are newly finalized
2 parents 350f4df + c76e73d commit f333d45

File tree

5 files changed

+226
-86
lines changed

5 files changed

+226
-86
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/consensus-logic/src/duties.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ impl BlockSigningDuty {
5858
pub struct DutyTracker {
5959
next_id: u64,
6060
duties: Vec<DutyEntry>,
61+
finalized_block: Option<L2BlockId>,
6162
}
6263

6364
impl DutyTracker {
@@ -66,6 +67,7 @@ impl DutyTracker {
6667
Self {
6768
next_id: 1,
6869
duties: Vec::new(),
70+
finalized_block: None,
6971
}
7072
}
7173

@@ -78,6 +80,10 @@ impl DutyTracker {
7880
pub fn update(&mut self, update: &StateUpdate) -> usize {
7981
let mut kept_duties = Vec::new();
8082

83+
if update.latest_finalized_block.is_some() {
84+
self.set_finalized_block(update.latest_finalized_block);
85+
}
86+
8187
for d in self.duties.drain(..) {
8288
match d.duty.expiry() {
8389
Expiry::NextBlock => {
@@ -120,6 +126,14 @@ impl DutyTracker {
120126
}));
121127
}
122128

129+
pub fn set_finalized_block(&mut self, blkid: Option<L2BlockId>) {
130+
self.finalized_block = blkid;
131+
}
132+
133+
pub fn get_finalized_block(&self) -> Option<L2BlockId> {
134+
self.finalized_block
135+
}
136+
123137
/// Returns the slice of duties we're keeping around.
124138
pub fn duties(&self) -> &[DutyEntry] {
125139
&self.duties
@@ -162,6 +176,9 @@ pub struct StateUpdate {
162176

163177
/// Newly finalized blocks, must be sorted.
164178
newly_finalized_blocks: Vec<L2BlockId>,
179+
180+
/// Latest finalized block.
181+
latest_finalized_block: Option<L2BlockId>,
165182
}
166183

167184
impl StateUpdate {
@@ -170,11 +187,16 @@ impl StateUpdate {
170187
cur_timestamp: time::Instant,
171188
mut newly_finalized_blocks: Vec<L2BlockId>,
172189
) -> Self {
190+
// Extract latest finalized block before sorting
191+
let latest_finalized_block = newly_finalized_blocks.first().cloned();
192+
173193
newly_finalized_blocks.sort();
194+
174195
Self {
175196
last_block_slot,
176197
cur_timestamp,
177198
newly_finalized_blocks,
199+
latest_finalized_block,
178200
}
179201
}
180202

crates/consensus-logic/src/duty_executor.rs

Lines changed: 120 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,17 @@ pub fn duty_tracker_task<D: Database>(
5454
batch_queue: broadcast::Sender<DutyBatch>,
5555
ident: Identity,
5656
database: Arc<D>,
57-
) {
57+
) -> Result<(), Error> {
5858
let mut duties_tracker = duties::DutyTracker::new_empty();
5959

60+
let idx = database.client_state_provider().get_last_checkpoint_idx()?;
61+
let last_checkpoint_state = database.client_state_provider().get_state_checkpoint(idx)?;
62+
let last_finalized_blk = match last_checkpoint_state {
63+
Some(state) => state.sync().map(|sync| *sync.finalized_blkid()),
64+
None => None,
65+
};
66+
duties_tracker.set_finalized_block(last_finalized_blk);
67+
6068
loop {
6169
let update = match cupdate_rx.blocking_recv() {
6270
Ok(u) => u,
@@ -87,6 +95,8 @@ pub fn duty_tracker_task<D: Database>(
8795
}
8896

8997
info!("duty extractor task exiting");
98+
99+
Ok(())
90100
}
91101

92102
fn update_tracker<D: Database>(
@@ -111,9 +121,15 @@ fn update_tracker<D: Database>(
111121
let block_idx = block.header().blockidx();
112122
let ts = time::Instant::now(); // FIXME XXX use .timestamp()!!!
113123

114-
// TODO figure out which blocks were finalized
115-
let newly_finalized = Vec::new();
116-
let tracker_update = duties::StateUpdate::new(block_idx, ts, newly_finalized);
124+
// Figure out which blocks were finalized
125+
let new_finalized = state.sync().map(|sync| *sync.finalized_blkid());
126+
let newly_finalized_blocks: Vec<L2BlockId> = get_finalized_blocks(
127+
tracker.get_finalized_block(),
128+
l2prov.as_ref(),
129+
new_finalized,
130+
)?;
131+
132+
let tracker_update = duties::StateUpdate::new(block_idx, ts, newly_finalized_blocks);
117133
let n_evicted = tracker.update(&tracker_update);
118134
trace!(%n_evicted, "evicted old duties from new consensus state");
119135

@@ -123,6 +139,35 @@ fn update_tracker<D: Database>(
123139
Ok(())
124140
}
125141

142+
fn get_finalized_blocks(
143+
last_finalized_block: Option<L2BlockId>,
144+
l2prov: &impl L2DataProvider,
145+
finalized: Option<L2BlockId>,
146+
) -> Result<Vec<L2BlockId>, Error> {
147+
// Figure out which blocks were finalized
148+
let mut newly_finalized_blocks: Vec<L2BlockId> = Vec::new();
149+
let mut new_finalized = finalized;
150+
151+
while let Some(finalized) = new_finalized {
152+
// If the last finalized block is equal to the new finalized block,
153+
// it means that no new blocks are finalized
154+
if last_finalized_block == Some(finalized) {
155+
break;
156+
}
157+
158+
// else loop till we reach to the last finalized block or go all the way
159+
// as long as we get some block data
160+
match l2prov.get_block_data(finalized)? {
161+
Some(block) => new_finalized = Some(*block.header().parent()),
162+
None => break,
163+
}
164+
165+
newly_finalized_blocks.push(finalized);
166+
}
167+
168+
Ok(newly_finalized_blocks)
169+
}
170+
126171
pub fn duty_dispatch_task<
127172
D: Database + Sync + Send + 'static,
128173
E: ExecEngineCtl + Sync + Send + 'static,
@@ -377,3 +422,74 @@ fn poll_status_loop<E: ExecEngineCtl>(
377422

378423
Ok(None)
379424
}
425+
426+
#[cfg(test)]
427+
mod tests {
428+
use alpen_express_db::traits::{Database, L2DataStore};
429+
use alpen_express_state::header::L2Header;
430+
use alpen_test_utils::l2::gen_l2_chain;
431+
432+
use super::get_finalized_blocks;
433+
434+
#[test]
435+
fn test_get_finalized_blocks() {
436+
let db = alpen_test_utils::get_common_db();
437+
let chain = gen_l2_chain(None, 5);
438+
439+
for block in chain.clone() {
440+
db.as_ref()
441+
.l2_store()
442+
.as_ref()
443+
.put_block_data(block)
444+
.unwrap();
445+
}
446+
447+
let block_ids: Vec<_> = chain.iter().map(|b| b.header().get_blockid()).collect();
448+
449+
{
450+
let last_finalized_block = Some(block_ids[0]);
451+
let new_finalized = Some(block_ids[4]);
452+
let finalized_blocks = get_finalized_blocks(
453+
last_finalized_block,
454+
db.l2_provider().as_ref(),
455+
new_finalized,
456+
)
457+
.unwrap();
458+
459+
let expected_finalized_blocks: Vec<_> =
460+
block_ids[1..=4].iter().rev().cloned().collect();
461+
462+
assert_eq!(finalized_blocks, expected_finalized_blocks);
463+
}
464+
465+
{
466+
let last_finalized_block = None;
467+
let new_finalized = Some(block_ids[4]);
468+
let finalized_blocks = get_finalized_blocks(
469+
last_finalized_block,
470+
db.l2_provider().as_ref(),
471+
new_finalized,
472+
)
473+
.unwrap();
474+
475+
let expected_finalized_blocks: Vec<_> =
476+
block_ids[0..=4].iter().rev().cloned().collect();
477+
478+
assert_eq!(finalized_blocks, expected_finalized_blocks);
479+
}
480+
481+
{
482+
let last_finalized_block = None;
483+
let new_finalized = None;
484+
let finalized_blocks = get_finalized_blocks(
485+
last_finalized_block,
486+
db.l2_provider().as_ref(),
487+
new_finalized,
488+
)
489+
.unwrap();
490+
491+
let expected_finalized_blocks: Vec<_> = vec![];
492+
assert_eq!(finalized_blocks, expected_finalized_blocks);
493+
}
494+
}
495+
}

crates/consensus-logic/src/unfinalized_tracker.rs

Lines changed: 21 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -315,50 +315,11 @@ mod tests {
315315
use std::collections::HashSet;
316316

317317
use alpen_express_db::traits::{Database, L2DataProvider, L2DataStore};
318-
use alpen_express_state::{
319-
block::{L2Block, L2BlockAccessory, L2BlockBody, L2BlockBundle},
320-
header::{L2BlockHeader, L2Header, SignedL2BlockHeader},
321-
id::L2BlockId,
322-
};
323-
use alpen_test_utils::ArbitraryGenerator;
318+
use alpen_express_state::{header::L2Header, id::L2BlockId};
319+
use alpen_test_utils::l2::gen_l2_chain;
324320

325321
use crate::unfinalized_tracker;
326322

327-
fn get_genesis_block() -> L2BlockBundle {
328-
let arb = ArbitraryGenerator::new();
329-
let gen_header: SignedL2BlockHeader = arb.generate();
330-
let body: L2BlockBody = arb.generate();
331-
let accessory: L2BlockAccessory = arb.generate();
332-
333-
let empty_hash = L2BlockId::default();
334-
let header = L2BlockHeader::new(
335-
0,
336-
gen_header.timestamp(),
337-
empty_hash,
338-
&body,
339-
*gen_header.state_root(),
340-
);
341-
let signed_header = SignedL2BlockHeader::new(header, *gen_header.sig());
342-
L2BlockBundle::new(L2Block::new(signed_header, body), accessory)
343-
}
344-
345-
fn get_mock_block_with_parent(parent: &SignedL2BlockHeader) -> L2BlockBundle {
346-
let arb = ArbitraryGenerator::new();
347-
let gen_header: SignedL2BlockHeader = arb.generate();
348-
let body: L2BlockBody = arb.generate();
349-
let accessory: L2BlockAccessory = arb.generate();
350-
351-
let header = L2BlockHeader::new(
352-
parent.blockidx() + 1,
353-
gen_header.timestamp(),
354-
parent.get_blockid(),
355-
&body,
356-
*gen_header.state_root(),
357-
);
358-
let signed_header = SignedL2BlockHeader::new(header, *gen_header.sig());
359-
L2BlockBundle::new(L2Block::new(signed_header, body), accessory)
360-
}
361-
362323
fn setup_test_chain(l2_prov: &impl L2DataStore) -> [L2BlockId; 7] {
363324
// Chain A: g -> a1 -> a2 -> a3
364325
// Chain B: g -> a1 -> b2 -> b3
@@ -375,43 +336,27 @@ mod tests {
375336
// g
376337
// |
377338

378-
let genesis = get_genesis_block();
379-
let genesis_header = genesis.header().clone();
380-
381-
let block_a1 = get_mock_block_with_parent(genesis.header());
382-
let block_a1_header = block_a1.header().clone();
383-
384-
let block_c1 = get_mock_block_with_parent(genesis.header());
385-
let block_c1_header = block_c1.header().clone();
386-
387-
let block_a2 = get_mock_block_with_parent(block_a1.header());
388-
let block_a2_header = block_a2.header().clone();
389-
390-
let block_b2 = get_mock_block_with_parent(block_a1.header());
391-
let block_b2_header = block_b2.header().clone();
392-
393-
let block_a3 = get_mock_block_with_parent(block_a2.header());
394-
let block_a3_header = block_a3.header().clone();
395-
396-
let block_b3 = get_mock_block_with_parent(block_b2.header());
397-
let block_b3_header = block_b3.header().clone();
398-
399-
l2_prov.put_block_data(genesis.clone()).unwrap();
400-
l2_prov.put_block_data(block_a1.clone()).unwrap();
401-
l2_prov.put_block_data(block_c1.clone()).unwrap();
402-
l2_prov.put_block_data(block_a2.clone()).unwrap();
403-
l2_prov.put_block_data(block_b2.clone()).unwrap();
404-
l2_prov.put_block_data(block_a3.clone()).unwrap();
405-
l2_prov.put_block_data(block_b3.clone()).unwrap();
339+
let a_chain = gen_l2_chain(None, 3);
340+
let b_chain = gen_l2_chain(Some(a_chain[1].header().clone()), 2);
341+
let c_chain = gen_l2_chain(Some(a_chain[0].header().clone()), 1);
342+
343+
for b in a_chain
344+
.clone()
345+
.into_iter()
346+
.chain(b_chain.clone())
347+
.chain(c_chain.clone())
348+
{
349+
l2_prov.put_block_data(b).unwrap();
350+
}
406351

407352
[
408-
genesis_header.get_blockid(),
409-
block_a1_header.get_blockid(),
410-
block_c1_header.get_blockid(),
411-
block_a2_header.get_blockid(),
412-
block_b2_header.get_blockid(),
413-
block_a3_header.get_blockid(),
414-
block_b3_header.get_blockid(),
353+
a_chain[0].header().get_blockid(),
354+
a_chain[1].header().get_blockid(),
355+
c_chain[0].header().get_blockid(),
356+
a_chain[2].header().get_blockid(),
357+
b_chain[0].header().get_blockid(),
358+
a_chain[3].header().get_blockid(),
359+
b_chain[1].header().get_blockid(),
415360
]
416361
}
417362

0 commit comments

Comments
 (0)