1
1
use crate :: {
2
2
beacon_chain:: BeaconChainTypes ,
3
- summaries_dag:: { DAGStateSummaryV22 , StateSummariesDAG } ,
3
+ summaries_dag:: { DAGStateSummary , DAGStateSummaryV22 , StateSummariesDAG } ,
4
4
} ;
5
- use ssz:: Decode ;
5
+ use ssz:: { Decode , Encode } ;
6
6
use ssz_derive:: { Decode , Encode } ;
7
7
use std:: {
8
8
sync:: Arc ,
9
9
time:: { Duration , Instant } ,
10
10
} ;
11
11
use store:: {
12
- get_full_state_v22, hdiff:: StorageStrategy , hot_cold_store:: DiffBaseStateRoot , DBColumn , Error ,
13
- HotColdDB , HotStateSummary , KeyValueStore , KeyValueStoreOp , StoreItem ,
12
+ get_full_state_v22, hdiff:: StorageStrategy , hot_cold_store:: DiffBaseStateRoot ,
13
+ store_full_state_v22, DBColumn , Error , HotColdDB , HotStateSummary , KeyValueStore ,
14
+ KeyValueStoreOp , StoreItem ,
14
15
} ;
15
16
use tracing:: { debug, info, warn} ;
16
17
use types:: { EthSpec , Hash256 , Slot } ;
@@ -94,8 +95,8 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
94
95
slots_count = summaries_by_slot. len( ) ,
95
96
min_slot = ?summaries_by_slot. first_key_value( ) . map( |( slot, _) | slot) ,
96
97
max_slot = ?summaries_by_slot. last_key_value( ) . map( |( slot, _) | slot) ,
97
- state_summaries_dag_roots = ?state_summaries_dag_roots,
98
- hot_hdiff_start_slot = %hot_hdiff_start_slot,
98
+ ?state_summaries_dag_roots,
99
+ %hot_hdiff_start_slot,
99
100
split_state_root = ?split. state_root,
100
101
"Starting hot states migration"
101
102
) ;
@@ -178,21 +179,22 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
178
179
} ) ?
179
180
} ;
180
181
181
- let diff_base_state_root =
182
- if let Some ( diff_base_slot) = storage_strategy. diff_base_slot ( ) {
183
- DiffBaseStateRoot :: new (
182
+ let diff_base_state_root = if let Some ( diff_base_slot) =
183
+ storage_strategy. diff_base_slot ( )
184
+ {
185
+ DiffBaseStateRoot :: new (
184
186
diff_base_slot,
185
187
state_summaries_dag
186
188
. ancestor_state_root_at_slot ( state_root, diff_base_slot)
187
189
. map_err ( |e| {
188
190
Error :: MigrationError ( format ! (
189
- "error computing ancestor_state_root_at_slot {e:?}"
191
+ "error computing ancestor_state_root_at_slot({state_root:?}, {diff_base_slot}) {e:?}"
190
192
) )
191
193
} ) ?,
192
194
)
193
- } else {
194
- DiffBaseStateRoot :: zero ( )
195
- } ;
195
+ } else {
196
+ DiffBaseStateRoot :: zero ( )
197
+ } ;
196
198
197
199
let new_summary = HotStateSummary {
198
200
slot,
@@ -227,8 +229,8 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
227
229
if last_log_time. elapsed ( ) > Duration :: from_secs ( 5 ) {
228
230
last_log_time = Instant :: now ( ) ;
229
231
info ! (
230
- diffs_written = diffs_written ,
231
- summaries_written = summaries_written ,
232
+ diffs_written,
233
+ summaries_written,
232
234
summaries_count = state_summaries_dag. summaries_count( ) ,
233
235
"Hot states migration in progress"
234
236
) ;
@@ -239,8 +241,8 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
239
241
// TODO(hdiff): Should run hot DB compaction after deleting potentially a lot of states. Or should wait
240
242
// for the next finality event?
241
243
info ! (
242
- diffs_written = diffs_written ,
243
- summaries_written = summaries_written ,
244
+ diffs_written,
245
+ summaries_written,
244
246
summaries_count = state_summaries_dag. summaries_count( ) ,
245
247
"Hot states migration complete"
246
248
) ;
@@ -249,10 +251,100 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
249
251
}
250
252
251
253
pub fn downgrade_from_v24 < T : BeaconChainTypes > (
252
- _db : Arc < HotColdDB < T :: EthSpec , T :: HotStore , T :: ColdStore > > ,
254
+ db : Arc < HotColdDB < T :: EthSpec , T :: HotStore , T :: ColdStore > > ,
253
255
) -> Result < Vec < KeyValueStoreOp > , Error > {
254
- // TODO(hdiff): proper error
255
- panic ! ( "downgrade not supported" ) ;
256
+ let state_summaries = db
257
+ . load_hot_state_summaries ( ) ?
258
+ . into_iter ( )
259
+ . map ( |( state_root, summary) | ( state_root, summary. into ( ) ) )
260
+ . collect :: < Vec < ( Hash256 , DAGStateSummary ) > > ( ) ;
261
+
262
+ info ! (
263
+ summaries_count = state_summaries. len( ) ,
264
+ "DB downgrade of v24 state summaries started"
265
+ ) ;
266
+
267
+ let state_summaries_dag = StateSummariesDAG :: new ( state_summaries)
268
+ . map_err ( |e| Error :: MigrationError ( format ! ( "Error on new StateSumariesDAG {e:?}" ) ) ) ?;
269
+
270
+ let mut migrate_ops = vec ! [ ] ;
271
+ let mut states_written = 0 ;
272
+ let mut summaries_written = 0 ;
273
+ let mut last_log_time = Instant :: now ( ) ;
274
+
275
+ // TODO(tree-states): What about the anchor_slot? Is it safe to run the prior version of
276
+ // Lighthouse with an a higher anchor_slot than expected?
277
+
278
+ for ( state_root, summary) in state_summaries_dag
279
+ . summaries_by_slot_ascending ( )
280
+ . into_iter ( )
281
+ . flat_map ( |( _, summaries) | summaries)
282
+ {
283
+ // If boundary state persist
284
+ if summary. slot % T :: EthSpec :: slots_per_epoch ( ) == 0 {
285
+ let ( state, _) = db
286
+ . load_hot_state ( & state_root) ?
287
+ . ok_or ( Error :: MissingState ( state_root) ) ?;
288
+
289
+ // Immediately commit the state. Otherwise we will OOM and it's stored in a different
290
+ // column. So if the migration crashes we just get extra harmless junk in the DB.
291
+ let mut state_write_ops = vec ! [ ] ;
292
+ store_full_state_v22 ( & state_root, & state, & mut state_write_ops) ?;
293
+ db. hot_db . do_atomically ( state_write_ops) ?;
294
+ states_written += 1 ;
295
+ }
296
+
297
+ // Persist old summary
298
+ let epoch_boundary_state_slot = summary. slot - summary. slot % T :: EthSpec :: slots_per_epoch ( ) ;
299
+ let old_summary = HotStateSummaryV22 {
300
+ slot : summary. slot ,
301
+ latest_block_root : summary. latest_block_root ,
302
+ epoch_boundary_state_root : state_summaries_dag
303
+ . ancestor_state_root_at_slot ( state_root, epoch_boundary_state_slot)
304
+ . map_err ( |e| {
305
+ Error :: MigrationError ( format ! (
306
+ "error computing ancestor_state_root_at_slot({state_root:?}, {epoch_boundary_state_slot}) {e:?}"
307
+ ) )
308
+ } ) ?,
309
+ } ;
310
+ migrate_ops. push ( KeyValueStoreOp :: PutKeyValue (
311
+ DBColumn :: BeaconStateSummary ,
312
+ state_root. as_slice ( ) . to_vec ( ) ,
313
+ old_summary. as_ssz_bytes ( ) ,
314
+ ) ) ;
315
+ summaries_written += 1 ;
316
+
317
+ // Delete existing data
318
+ for db_column in [
319
+ DBColumn :: BeaconStateHotSummary ,
320
+ DBColumn :: BeaconStateHotDiff ,
321
+ DBColumn :: BeaconStateHotSnapshot ,
322
+ ] {
323
+ migrate_ops. push ( KeyValueStoreOp :: DeleteKey (
324
+ db_column,
325
+ state_root. as_slice ( ) . to_vec ( ) ,
326
+ ) ) ;
327
+ }
328
+
329
+ if last_log_time. elapsed ( ) > Duration :: from_secs ( 5 ) {
330
+ last_log_time = Instant :: now ( ) ;
331
+ info ! (
332
+ states_written,
333
+ summaries_written,
334
+ summaries_count = state_summaries_dag. summaries_count( ) ,
335
+ "DB downgrade of v24 state summaries in progress"
336
+ ) ;
337
+ }
338
+ }
339
+
340
+ info ! (
341
+ states_written,
342
+ summaries_written,
343
+ summaries_count = state_summaries_dag. summaries_count( ) ,
344
+ "DB downgrade of v24 state summaries completed"
345
+ ) ;
346
+
347
+ Ok ( migrate_ops)
256
348
}
257
349
258
350
fn new_dag < T : BeaconChainTypes > (
0 commit comments