@@ -28,10 +28,10 @@ use ic_https_outcalls_consensus::{
28
28
use ic_ingress_manager:: { bouncer:: IngressBouncer , IngressManager , RandomStateKind } ;
29
29
use ic_interfaces:: {
30
30
batch_payload:: BatchPayloadBuilder ,
31
+ consensus_pool:: { ConsensusBlockCache , ConsensusPoolCache } ,
31
32
execution_environment:: IngressHistoryReader ,
32
33
messaging:: { MessageRouting , XNetPayloadBuilder } ,
33
- p2p:: artifact_manager:: JoinGuard ,
34
- p2p:: state_sync:: StateSyncClient ,
34
+ p2p:: { artifact_manager:: JoinGuard , state_sync:: StateSyncClient } ,
35
35
self_validating_payload:: SelfValidatingPayloadBuilder ,
36
36
time_source:: { SysTimeSource , TimeSource } ,
37
37
} ;
@@ -85,6 +85,105 @@ struct ArtifactPools {
85
85
canister_http_pool : Arc < RwLock < CanisterHttpPoolImpl > > ,
86
86
}
87
87
88
+ impl ArtifactPools {
89
+ fn new (
90
+ log : & ReplicaLogger ,
91
+ metrics_registry : & MetricsRegistry ,
92
+ node_id : NodeId ,
93
+ config : ArtifactPoolConfig ,
94
+ catch_up_package : & CatchUpPackage ,
95
+ ) -> Self {
96
+ let ingress_pool = Arc :: new ( RwLock :: new ( IngressPoolImpl :: new (
97
+ node_id,
98
+ config. clone ( ) ,
99
+ metrics_registry. clone ( ) ,
100
+ log. clone ( ) ,
101
+ ) ) ) ;
102
+
103
+ let mut idkg_pool = IDkgPoolImpl :: new (
104
+ config. clone ( ) ,
105
+ log. clone ( ) ,
106
+ metrics_registry. clone ( ) ,
107
+ Box :: new ( idkg:: IDkgStatsImpl :: new ( metrics_registry. clone ( ) ) ) ,
108
+ ) ;
109
+ idkg_pool. add_initial_dealings ( catch_up_package) ;
110
+ let idkg_pool = Arc :: new ( RwLock :: new ( idkg_pool) ) ;
111
+
112
+ let certification_pool = Arc :: new ( RwLock :: new ( CertificationPoolImpl :: new (
113
+ node_id,
114
+ config,
115
+ log. clone ( ) ,
116
+ metrics_registry. clone ( ) ,
117
+ ) ) ) ;
118
+ let dkg_pool = Arc :: new ( RwLock :: new ( DkgPoolImpl :: new (
119
+ metrics_registry. clone ( ) ,
120
+ log. clone ( ) ,
121
+ ) ) ) ;
122
+ let canister_http_pool = Arc :: new ( RwLock :: new ( CanisterHttpPoolImpl :: new (
123
+ metrics_registry. clone ( ) ,
124
+ log. clone ( ) ,
125
+ ) ) ) ;
126
+ Self {
127
+ ingress_pool,
128
+ certification_pool,
129
+ dkg_pool,
130
+ idkg_pool,
131
+ canister_http_pool,
132
+ }
133
+ }
134
+ }
135
+
136
+ struct Bouncers {
137
+ ingress : Arc < IngressBouncer > ,
138
+ consensus : Arc < ConsensusBouncer > ,
139
+ certifier : Arc < CertifierBouncer > ,
140
+ dkg : Arc < DkgBouncer > ,
141
+ idkg : Arc < idkg:: IDkgBouncer > ,
142
+ https_outcalls : Arc < CanisterHttpGossipImpl > ,
143
+ }
144
+
145
+ impl Bouncers {
146
+ fn new (
147
+ log : & ReplicaLogger ,
148
+ metrics_registry : & MetricsRegistry ,
149
+ subnet_id : SubnetId ,
150
+ time_source : Arc < dyn TimeSource > ,
151
+ message_router : Arc < dyn MessageRouting > ,
152
+ consensus_pool_cache : Arc < dyn ConsensusPoolCache > ,
153
+ consensus_block_cache : Arc < dyn ConsensusBlockCache > ,
154
+ state_reader : Arc < dyn StateReader < State = ReplicatedState > > ,
155
+ ) -> Self {
156
+ let ingress = Arc :: new ( IngressBouncer :: new ( time_source. clone ( ) ) ) ;
157
+ let consensus = Arc :: new ( ConsensusBouncer :: new ( metrics_registry, message_router) ) ;
158
+ let dkg = Arc :: new ( DkgBouncer :: new ( metrics_registry) ) ;
159
+ let certifier = Arc :: new ( CertifierBouncer :: new (
160
+ metrics_registry,
161
+ consensus_pool_cache. clone ( ) ,
162
+ ) ) ;
163
+ let idkg = Arc :: new ( idkg:: IDkgBouncer :: new (
164
+ metrics_registry,
165
+ subnet_id,
166
+ consensus_block_cache,
167
+ state_reader. clone ( ) ,
168
+ ) ) ;
169
+
170
+ let https_outcalls = Arc :: new ( CanisterHttpGossipImpl :: new (
171
+ consensus_pool_cache. clone ( ) ,
172
+ state_reader. clone ( ) ,
173
+ log. clone ( ) ,
174
+ ) ) ;
175
+
176
+ Self {
177
+ ingress,
178
+ consensus,
179
+ dkg,
180
+ idkg,
181
+ certifier,
182
+ https_outcalls,
183
+ }
184
+ }
185
+ }
186
+
88
187
pub type CanisterHttpAdapterClient =
89
188
Box < dyn NonBlockingChannel < CanisterHttpRequest , Response = CanisterHttpResponse > + Send > ;
90
189
@@ -138,7 +237,7 @@ pub fn setup_consensus_and_p2p(
138
237
node_id,
139
238
subnet_id,
140
239
artifact_pool_config,
141
- catch_up_package,
240
+ & catch_up_package,
142
241
Arc :: clone ( & consensus_crypto) as Arc < _ > ,
143
242
Arc :: clone ( & certifier_crypto) as Arc < _ > ,
144
243
Arc :: clone ( & ingress_sig_crypto) as Arc < _ > ,
@@ -219,7 +318,7 @@ fn start_consensus(
219
318
node_id : NodeId ,
220
319
subnet_id : SubnetId ,
221
320
artifact_pool_config : ArtifactPoolConfig ,
222
- catch_up_package : CatchUpPackage ,
321
+ catch_up_package : & CatchUpPackage ,
223
322
// ConsensusCrypto is an extension of the Crypto trait and we can
224
323
// not downcast traits.
225
324
consensus_crypto : Arc < dyn ConsensusCrypto > ,
@@ -245,26 +344,34 @@ fn start_consensus(
245
344
Vec < Box < dyn JoinGuard > > ,
246
345
AbortableBroadcastChannelBuilder ,
247
346
) {
347
+ let artifact_pools = ArtifactPools :: new (
348
+ log,
349
+ metrics_registry,
350
+ node_id,
351
+ artifact_pool_config,
352
+ catch_up_package,
353
+ ) ;
248
354
let time_source = Arc :: new ( SysTimeSource :: new ( ) ) ;
355
+ let consensus_pool_cache = consensus_pool. read ( ) . unwrap ( ) . get_cache ( ) ;
356
+ let consensus_block_cache = consensus_pool. read ( ) . unwrap ( ) . get_block_cache ( ) ;
357
+ let bouncers = Bouncers :: new (
358
+ log,
359
+ metrics_registry,
360
+ subnet_id,
361
+ time_source. clone ( ) ,
362
+ message_router. clone ( ) ,
363
+ consensus_pool_cache. clone ( ) ,
364
+ consensus_block_cache,
365
+ state_reader. clone ( ) ,
366
+ ) ;
367
+
249
368
let mut new_p2p_consensus: ic_consensus_manager:: AbortableBroadcastChannelBuilder =
250
369
ic_consensus_manager:: AbortableBroadcastChannelBuilder :: new (
251
370
log. clone ( ) ,
252
371
rt_handle. clone ( ) ,
253
372
metrics_registry. clone ( ) ,
254
373
) ;
255
374
256
- let artifact_pools = init_artifact_pools (
257
- node_id,
258
- artifact_pool_config,
259
- metrics_registry,
260
- log,
261
- catch_up_package,
262
- time_source. as_ref ( ) ,
263
- ) ;
264
-
265
- let mut join_handles = vec ! [ ] ;
266
-
267
- let consensus_pool_cache = consensus_pool. read ( ) . unwrap ( ) . get_cache ( ) ;
268
375
let consensus_time = consensus_pool. read ( ) . unwrap ( ) . get_consensus_time ( ) ;
269
376
let replica_config = ReplicaConfig { node_id, subnet_id } ;
270
377
let ingress_manager = Arc :: new ( IngressManager :: new (
@@ -302,6 +409,8 @@ fn start_consensus(
302
409
& PoolReader :: new ( & * consensus_pool. read ( ) . unwrap ( ) ) ,
303
410
) ) ) ;
304
411
412
+ let mut join_handles = vec ! [ ] ;
413
+
305
414
{
306
415
let consensus_impl = ConsensusImpl :: new (
307
416
replica_config. clone ( ) ,
@@ -327,14 +436,13 @@ fn start_consensus(
327
436
328
437
let consensus_pool = Arc :: clone ( & consensus_pool) ;
329
438
330
- let bouncer = Arc :: new ( ConsensusBouncer :: new ( metrics_registry, message_router) ) ;
331
439
let ( outbound_tx, inbound_rx) = if HASHES_IN_BLOCKS_FEATURE_ENABLED {
332
440
let assembler = ic_artifact_downloader:: FetchStrippedConsensusArtifact :: new (
333
441
log. clone ( ) ,
334
442
rt_handle. clone ( ) ,
335
443
consensus_pool. clone ( ) ,
336
444
artifact_pools. ingress_pool . clone ( ) ,
337
- bouncer ,
445
+ bouncers . consensus ,
338
446
metrics_registry. clone ( ) ,
339
447
node_id,
340
448
) ;
@@ -344,7 +452,7 @@ fn start_consensus(
344
452
log. clone ( ) ,
345
453
rt_handle. clone ( ) ,
346
454
consensus_pool. clone ( ) ,
347
- bouncer ,
455
+ bouncers . consensus ,
348
456
metrics_registry. clone ( ) ,
349
457
) ;
350
458
new_p2p_consensus. abortable_broadcast_channel ( assembler, SLOT_TABLE_NO_LIMIT )
@@ -366,12 +474,11 @@ fn start_consensus(
366
474
let user_ingress_tx = {
367
475
#[ allow( clippy:: disallowed_methods) ]
368
476
let ( user_ingress_tx, user_ingress_rx) = unbounded_channel ( ) ;
369
- let bouncer = Arc :: new ( IngressBouncer :: new ( time_source. clone ( ) ) ) ;
370
477
let assembler = ic_artifact_downloader:: FetchArtifact :: new (
371
478
log. clone ( ) ,
372
479
rt_handle. clone ( ) ,
373
480
artifact_pools. ingress_pool . clone ( ) ,
374
- bouncer ,
481
+ bouncers . ingress ,
375
482
metrics_registry. clone ( ) ,
376
483
) ;
377
484
@@ -402,12 +509,11 @@ fn start_consensus(
402
509
log. clone ( ) ,
403
510
max_certified_height_tx,
404
511
) ;
405
- let bouncer = CertifierBouncer :: new ( metrics_registry, Arc :: clone ( & consensus_pool_cache) ) ;
406
512
let assembler = ic_artifact_downloader:: FetchArtifact :: new (
407
513
log. clone ( ) ,
408
514
rt_handle. clone ( ) ,
409
515
artifact_pools. certification_pool . clone ( ) ,
410
- Arc :: new ( bouncer ) ,
516
+ bouncers . certifier ,
411
517
metrics_registry. clone ( ) ,
412
518
) ;
413
519
@@ -426,12 +532,11 @@ fn start_consensus(
426
532
} ;
427
533
428
534
{
429
- let bouncer = Arc :: new ( DkgBouncer :: new ( metrics_registry) ) ;
430
535
let assembler = ic_artifact_downloader:: FetchArtifact :: new (
431
536
log. clone ( ) ,
432
537
rt_handle. clone ( ) ,
433
538
artifact_pools. dkg_pool . clone ( ) ,
434
- bouncer ,
539
+ bouncers . dkg ,
435
540
metrics_registry. clone ( ) ,
436
541
) ;
437
542
@@ -469,17 +574,11 @@ fn start_consensus(
469
574
finalized. payload. as_ref( ) . is_summary( ) ,
470
575
finalized. payload. as_ref( ) . as_idkg( ) . is_some( ) ,
471
576
) ;
472
- let bouncer = Arc :: new ( idkg:: IDkgBouncer :: new (
473
- metrics_registry,
474
- subnet_id,
475
- consensus_pool. read ( ) . unwrap ( ) . get_block_cache ( ) ,
476
- Arc :: clone ( & state_reader) ,
477
- ) ) ;
478
577
let assembler = ic_artifact_downloader:: FetchArtifact :: new (
479
578
log. clone ( ) ,
480
579
rt_handle. clone ( ) ,
481
580
artifact_pools. idkg_pool . clone ( ) ,
482
- bouncer ,
581
+ bouncers . idkg ,
483
582
metrics_registry. clone ( ) ,
484
583
) ;
485
584
@@ -506,16 +605,11 @@ fn start_consensus(
506
605
} ;
507
606
508
607
{
509
- let bouncer = Arc :: new ( CanisterHttpGossipImpl :: new (
510
- Arc :: clone ( & consensus_pool_cache) ,
511
- Arc :: clone ( & state_reader) ,
512
- log. clone ( ) ,
513
- ) ) ;
514
608
let assembler = ic_artifact_downloader:: FetchArtifact :: new (
515
609
log. clone ( ) ,
516
610
rt_handle. clone ( ) ,
517
611
artifact_pools. canister_http_pool . clone ( ) ,
518
- bouncer ,
612
+ bouncers . https_outcalls ,
519
613
metrics_registry. clone ( ) ,
520
614
) ;
521
615
@@ -549,50 +643,3 @@ fn start_consensus(
549
643
new_p2p_consensus,
550
644
)
551
645
}
552
-
553
- fn init_artifact_pools (
554
- node_id : NodeId ,
555
- config : ArtifactPoolConfig ,
556
- metrics_registry : & MetricsRegistry ,
557
- log : & ReplicaLogger ,
558
- catch_up_package : CatchUpPackage ,
559
- time_source : & dyn TimeSource ,
560
- ) -> ArtifactPools {
561
- let ingress_pool = Arc :: new ( RwLock :: new ( IngressPoolImpl :: new (
562
- node_id,
563
- config. clone ( ) ,
564
- metrics_registry. clone ( ) ,
565
- log. clone ( ) ,
566
- ) ) ) ;
567
-
568
- let mut idkg_pool = IDkgPoolImpl :: new (
569
- config. clone ( ) ,
570
- log. clone ( ) ,
571
- metrics_registry. clone ( ) ,
572
- Box :: new ( idkg:: IDkgStatsImpl :: new ( metrics_registry. clone ( ) ) ) ,
573
- ) ;
574
- idkg_pool. add_initial_dealings ( & catch_up_package, time_source) ;
575
- let idkg_pool = Arc :: new ( RwLock :: new ( idkg_pool) ) ;
576
-
577
- let certification_pool = Arc :: new ( RwLock :: new ( CertificationPoolImpl :: new (
578
- node_id,
579
- config,
580
- log. clone ( ) ,
581
- metrics_registry. clone ( ) ,
582
- ) ) ) ;
583
- let dkg_pool = Arc :: new ( RwLock :: new ( DkgPoolImpl :: new (
584
- metrics_registry. clone ( ) ,
585
- log. clone ( ) ,
586
- ) ) ) ;
587
- let canister_http_pool = Arc :: new ( RwLock :: new ( CanisterHttpPoolImpl :: new (
588
- metrics_registry. clone ( ) ,
589
- log. clone ( ) ,
590
- ) ) ) ;
591
- ArtifactPools {
592
- ingress_pool,
593
- certification_pool,
594
- dkg_pool,
595
- idkg_pool,
596
- canister_http_pool,
597
- }
598
- }
0 commit comments