Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sui-rpc-api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ fn code_as_str(code: tonic::Code) -> &'static str {
#[derive(Clone)]
pub(crate) struct SubscriptionMetrics {
pub inflight_subscribers: IntGauge,
pub last_recieved_checkpoint: IntGauge,
pub last_received_checkpoint: IntGauge,
}

impl SubscriptionMetrics {
Expand Down
24 changes: 12 additions & 12 deletions crates/sui-rpc-api/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ pub struct SubscriptionServiceHandle {

impl SubscriptionServiceHandle {
pub async fn register_subscription(&self) -> Option<mpsc::Receiver<Arc<Checkpoint>>> {
let (sender, reciever) = oneshot::channel();
let (sender, receiver) = oneshot::channel();
let request = SubscriptionRequest { sender };
self.sender.send(request).await.ok()?;

reciever.await.ok()
receiver.await.ok()
}
}

pub struct SubscriptionService {
// Mailbox for recieving `Checkpoint` from the Checkpoint Executor
// Mailbox for receiving `Checkpoint` from the Checkpoint Executor
//
// Expectation is that checkpoints are recieved in-order
// Expectation is that checkpoints are received in-order
checkpoint_mailbox: mpsc::Receiver<Checkpoint>,
mailbox: mpsc::Receiver<SubscriptionRequest>,
subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
Expand Down Expand Up @@ -99,21 +99,21 @@ impl SubscriptionService {
}

fn handle_checkpoint(&mut self, checkpoint: Checkpoint) {
// Check that we recieved checkpoints in-order
// Check that we received checkpoints in-order
{
let last_sequence_number = self.metrics.last_recieved_checkpoint.get();
let last_sequence_number = self.metrics.last_received_checkpoint.get();
let sequence_number = *checkpoint.summary.sequence_number() as i64;

if last_sequence_number != 0 && (last_sequence_number + 1) != sequence_number {
panic!(
"recieved checkpoint out-of-order. expected checkpoint {}, recieved {}",
"received checkpoint out-of-order. expected checkpoint {}, received {}",
last_sequence_number + 1,
sequence_number
);
}

// Update the metric marking the latest checkpoint we've seen
self.metrics.last_recieved_checkpoint.set(sequence_number);
self.metrics.last_received_checkpoint.set(sequence_number);
}

let checkpoint = Arc::new(checkpoint);
Expand All @@ -123,7 +123,7 @@ impl SubscriptionService {
self.subscribers.retain(|subscriber| {
match subscriber.try_send(Arc::clone(&checkpoint)) {
Ok(()) => {
trace!("succesfully enqueued checkpont for subscriber");
trace!("successfully enqueued checkpoint for subscriber");
true // Retain this subscriber
}
Err(e) => {
Expand All @@ -146,10 +146,10 @@ impl SubscriptionService {
return;
}

let (sender, reciever) = mpsc::channel(SUBSCRIPTION_CHANNEL_SIZE);
match request.sender.send(reciever) {
let (sender, receiver) = mpsc::channel(SUBSCRIPTION_CHANNEL_SIZE);
match request.sender.send(receiver) {
Ok(()) => {
trace!("succesfully registered new subscriber");
trace!("successfully registered new subscriber");
self.metrics.inflight_subscribers.inc();
self.subscribers.push(sender);
}
Expand Down