Skip to content

Commit 36e6af8

Browse files
authored
Merge of #5327
2 parents c8ffafb + ea8373b commit 36e6af8

File tree

6 files changed

+113
-1
lines changed

6 files changed

+113
-1
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2422,6 +2422,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
24222422
proposer_slashing: ProposerSlashing,
24232423
) -> Result<ObservationOutcome<ProposerSlashing, T::EthSpec>, Error> {
24242424
let wall_clock_state = self.wall_clock_state()?;
2425+
24252426
Ok(self.observed_proposer_slashings.lock().verify_and_observe(
24262427
proposer_slashing,
24272428
&wall_clock_state,
@@ -2434,6 +2435,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
24342435
&self,
24352436
proposer_slashing: SigVerifiedOp<ProposerSlashing, T::EthSpec>,
24362437
) {
2438+
if let Some(event_handler) = self.event_handler.as_ref() {
2439+
if event_handler.has_proposer_slashing_subscribers() {
2440+
event_handler.register(EventKind::ProposerSlashing(Box::new(
2441+
proposer_slashing.clone().into_inner(),
2442+
)));
2443+
}
2444+
}
2445+
24372446
if self.eth1_chain.is_some() {
24382447
self.op_pool.insert_proposer_slashing(proposer_slashing)
24392448
}
@@ -2445,6 +2454,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
24452454
attester_slashing: AttesterSlashing<T::EthSpec>,
24462455
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>, T::EthSpec>, Error> {
24472456
let wall_clock_state = self.wall_clock_state()?;
2457+
24482458
Ok(self.observed_attester_slashings.lock().verify_and_observe(
24492459
attester_slashing,
24502460
&wall_clock_state,
@@ -2465,6 +2475,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
24652475
.fork_choice_write_lock()
24662476
.on_attester_slashing(attester_slashing.as_inner());
24672477

2478+
if let Some(event_handler) = self.event_handler.as_ref() {
2479+
if event_handler.has_attester_slashing_subscribers() {
2480+
event_handler.register(EventKind::AttesterSlashing(Box::new(
2481+
attester_slashing.clone().into_inner(),
2482+
)));
2483+
}
2484+
}
2485+
24682486
// Add to the op pool (if we have the ability to propose blocks).
24692487
if self.eth1_chain.is_some() {
24702488
self.op_pool.insert_attester_slashing(attester_slashing)

beacon_node/beacon_chain/src/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,8 +708,8 @@ where
708708
.ok_or("Cannot build without a genesis state root")?;
709709
let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();
710710
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());
711-
712711
let beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>> = <_>::default();
712+
713713
let mut validator_monitor = ValidatorMonitor::new(
714714
validator_monitor_config,
715715
beacon_proposer_cache.clone(),

beacon_node/beacon_chain/src/events.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub struct ServerSentEventHandler<E: EthSpec> {
2020
light_client_finality_update_tx: Sender<EventKind<E>>,
2121
light_client_optimistic_update_tx: Sender<EventKind<E>>,
2222
block_reward_tx: Sender<EventKind<E>>,
23+
proposer_slashing_tx: Sender<EventKind<E>>,
24+
attester_slashing_tx: Sender<EventKind<E>>,
2325
log: Logger,
2426
}
2527

@@ -45,6 +47,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
4547
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
4648
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
4749
let (block_reward_tx, _) = broadcast::channel(capacity);
50+
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
51+
let (attester_slashing_tx, _) = broadcast::channel(capacity);
4852

4953
Self {
5054
attestation_tx,
@@ -60,6 +64,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
6064
light_client_finality_update_tx,
6165
light_client_optimistic_update_tx,
6266
block_reward_tx,
67+
proposer_slashing_tx,
68+
attester_slashing_tx,
6369
log,
6470
}
6571
}
@@ -126,6 +132,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
126132
.block_reward_tx
127133
.send(kind)
128134
.map(|count| log_count("block reward", count)),
135+
EventKind::ProposerSlashing(_) => self
136+
.proposer_slashing_tx
137+
.send(kind)
138+
.map(|count| log_count("proposer slashing", count)),
139+
EventKind::AttesterSlashing(_) => self
140+
.attester_slashing_tx
141+
.send(kind)
142+
.map(|count| log_count("attester slashing", count)),
129143
};
130144
if let Err(SendError(event)) = result {
131145
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@@ -184,6 +198,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
184198
self.block_reward_tx.subscribe()
185199
}
186200

201+
pub fn subscribe_attester_slashing(&self) -> Receiver<EventKind<E>> {
202+
self.attester_slashing_tx.subscribe()
203+
}
204+
205+
pub fn subscribe_proposer_slashing(&self) -> Receiver<EventKind<E>> {
206+
self.proposer_slashing_tx.subscribe()
207+
}
208+
187209
pub fn has_attestation_subscribers(&self) -> bool {
188210
self.attestation_tx.receiver_count() > 0
189211
}
@@ -227,4 +249,12 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
227249
pub fn has_block_reward_subscribers(&self) -> bool {
228250
self.block_reward_tx.receiver_count() > 0
229251
}
252+
253+
pub fn has_proposer_slashing_subscribers(&self) -> bool {
254+
self.proposer_slashing_tx.receiver_count() > 0
255+
}
256+
257+
pub fn has_attester_slashing_subscribers(&self) -> bool {
258+
self.attester_slashing_tx.receiver_count() > 0
259+
}
230260
}

beacon_node/http_api/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4358,6 +4358,12 @@ pub fn serve<T: BeaconChainTypes>(
43584358
api_types::EventTopic::BlockReward => {
43594359
event_handler.subscribe_block_reward()
43604360
}
4361+
api_types::EventTopic::AttesterSlashing => {
4362+
event_handler.subscribe_attester_slashing()
4363+
}
4364+
api_types::EventTopic::ProposerSlashing => {
4365+
event_handler.subscribe_proposer_slashing()
4366+
}
43614367
};
43624368

43634369
receivers.push(

beacon_node/http_api/tests/tests.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5214,6 +5214,8 @@ impl ApiTester {
52145214
EventTopic::Block,
52155215
EventTopic::Head,
52165216
EventTopic::FinalizedCheckpoint,
5217+
EventTopic::AttesterSlashing,
5218+
EventTopic::ProposerSlashing,
52175219
];
52185220
let mut events_future = self
52195221
.client
@@ -5353,6 +5355,42 @@ impl ApiTester {
53535355
.await;
53545356
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);
53555357

5358+
// Test attester slashing event
5359+
let mut attester_slashing_event_future = self
5360+
.client
5361+
.get_events::<E>(&[EventTopic::AttesterSlashing])
5362+
.await
5363+
.unwrap();
5364+
5365+
self.harness.add_attester_slashing(vec![1, 2, 3]).unwrap();
5366+
5367+
let attester_slashing_event = poll_events(
5368+
&mut attester_slashing_event_future,
5369+
1,
5370+
Duration::from_millis(10000),
5371+
)
5372+
.await;
5373+
5374+
assert!(attester_slashing_event.len() == 1);
5375+
5376+
// Test proposer slashing event
5377+
let mut proposer_slashing_event_future = self
5378+
.client
5379+
.get_events::<E>(&[EventTopic::ProposerSlashing])
5380+
.await
5381+
.unwrap();
5382+
5383+
self.harness.add_proposer_slashing(1).unwrap();
5384+
5385+
let proposer_slashing_event = poll_events(
5386+
&mut proposer_slashing_event_future,
5387+
1,
5388+
Duration::from_millis(10000),
5389+
)
5390+
.await;
5391+
5392+
assert!(proposer_slashing_event.len() == 1);
5393+
53565394
self
53575395
}
53585396

common/eth2/src/types.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,8 @@ pub enum EventKind<E: EthSpec> {
10801080
#[cfg(feature = "lighthouse")]
10811081
BlockReward(BlockReward),
10821082
PayloadAttributes(VersionedSsePayloadAttributes),
1083+
ProposerSlashing(Box<ProposerSlashing>),
1084+
AttesterSlashing(Box<AttesterSlashing<E>>),
10831085
}
10841086

10851087
impl<E: EthSpec> EventKind<E> {
@@ -1099,6 +1101,8 @@ impl<E: EthSpec> EventKind<E> {
10991101
EventKind::LightClientOptimisticUpdate(_) => "light_client_optimistic_update",
11001102
#[cfg(feature = "lighthouse")]
11011103
EventKind::BlockReward(_) => "block_reward",
1104+
EventKind::ProposerSlashing(_) => "proposer_slashing",
1105+
EventKind::AttesterSlashing(_) => "attester_slashing",
11021106
}
11031107
}
11041108

@@ -1179,6 +1183,16 @@ impl<E: EthSpec> EventKind<E> {
11791183
"block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err(
11801184
|e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)),
11811185
)?)),
1186+
"attester_slashing" => Ok(EventKind::AttesterSlashing(
1187+
serde_json::from_str(data).map_err(|e| {
1188+
ServerError::InvalidServerSentEvent(format!("Attester Slashing: {:?}", e))
1189+
})?,
1190+
)),
1191+
"proposer_slashing" => Ok(EventKind::ProposerSlashing(
1192+
serde_json::from_str(data).map_err(|e| {
1193+
ServerError::InvalidServerSentEvent(format!("Proposer Slashing: {:?}", e))
1194+
})?,
1195+
)),
11821196
_ => Err(ServerError::InvalidServerSentEvent(
11831197
"Could not parse event tag".to_string(),
11841198
)),
@@ -1210,6 +1224,8 @@ pub enum EventTopic {
12101224
LightClientOptimisticUpdate,
12111225
#[cfg(feature = "lighthouse")]
12121226
BlockReward,
1227+
AttesterSlashing,
1228+
ProposerSlashing,
12131229
}
12141230

12151231
impl FromStr for EventTopic {
@@ -1231,6 +1247,8 @@ impl FromStr for EventTopic {
12311247
"light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate),
12321248
#[cfg(feature = "lighthouse")]
12331249
"block_reward" => Ok(EventTopic::BlockReward),
1250+
"attester_slashing" => Ok(EventTopic::AttesterSlashing),
1251+
"proposer_slashing" => Ok(EventTopic::ProposerSlashing),
12341252
_ => Err("event topic cannot be parsed.".to_string()),
12351253
}
12361254
}
@@ -1253,6 +1271,8 @@ impl fmt::Display for EventTopic {
12531271
EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"),
12541272
#[cfg(feature = "lighthouse")]
12551273
EventTopic::BlockReward => write!(f, "block_reward"),
1274+
EventTopic::AttesterSlashing => write!(f, "attester_slashing"),
1275+
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
12561276
}
12571277
}
12581278
}

0 commit comments

Comments
 (0)