Skip to content

Commit 20cb24e

Browse files
fl0rekzvolin
authored andcommitted
wip
1 parent 33e4f2d commit 20cb24e

File tree

4 files changed

+153
-33
lines changed

4 files changed

+153
-33
lines changed

grpc/src/client.rs

Lines changed: 78 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use celestia_types::hash::Hash;
2626
use celestia_types::state::auth::{Account, AuthParams, BaseAccount};
2727
use celestia_types::state::{
2828
AbciQueryResponse, PageRequest, QueryDelegationResponse, QueryRedelegationsResponse,
29-
QueryUnbondingDelegationResponse, RawTxBody, ValAddress,
29+
QueryUnbondingDelegationResponse, RawTx, RawTxBody, ValAddress,
3030
};
3131
use celestia_types::state::{
3232
AccAddress, Address, AddressTrait, Coin, ErrorCode, TxResponse, BOND_DENOM,
@@ -446,9 +446,33 @@ impl GrpcClient {
446446
..RawTxBody::default()
447447
};
448448

449-
let (tx_hash, sequence) = self.sign_and_broadcast_tx(tx_body, cfg, context).await?;
449+
// lock the account; tx signing and broadcast must be atomic
450+
// because node requires all transactions to be sequenced by account.sequence
451+
let (account, signer) = self.load_account(context).await?;
452+
453+
let tx = self
454+
.sign_and_broadcast_tx(tx_body, &cfg, &account, signer, context)
455+
.await?;
456+
let tx_bytes = tx.encode_to_vec();
457+
458+
let original_sequence = account.sequence;
459+
let mut account_lock = Some(account);
460+
461+
loop {
462+
let tx_hash = self
463+
.broadcast_tx_with_account(tx_bytes.clone(), &cfg, context)
464+
.await?;
465+
466+
if let Some(mut account) = account_lock.take() {
467+
account.sequence += 1; // increment, but only on first iter
468+
};
450469

451-
self.confirm_tx(tx_hash, sequence, context).await
470+
match self.confirm_tx(tx_hash, original_sequence, context).await {
471+
Ok(status) => return Ok(status),
472+
Err(Error::TxEvicted(_)) => continue,
473+
Err(e) => return Err(e),
474+
}
475+
}
452476
}
453477

454478
async fn submit_blobs_impl(
@@ -465,11 +489,33 @@ impl GrpcClient {
465489
blob.validate(app_version)?;
466490
}
467491

468-
let (tx_hash, sequence) = self
469-
.sign_and_broadcast_blobs(blobs.to_vec(), cfg, context)
492+
// lock the account; tx signing and broadcast must be atomic
493+
// because node requires all transactions to be sequenced by account.sequence
494+
let (account, signer) = self.load_account(context).await?;
495+
496+
let blob_tx = self
497+
.sign_and_broadcast_blobs(blobs.to_vec(), &cfg, &account, signer, context)
470498
.await?;
499+
let tx_bytes = blob_tx.encode_to_vec();
500+
501+
let original_sequence = account.sequence;
502+
let mut account_lock = Some(account);
503+
504+
loop {
505+
let tx_hash = self
506+
.broadcast_tx_with_account(tx_bytes.clone(), &cfg, context)
507+
.await?;
471508

472-
self.confirm_tx(tx_hash, sequence, context).await
509+
if let Some(mut account) = account_lock.take() {
510+
account.sequence += 1; // increment, but only on first iter
511+
};
512+
513+
match self.confirm_tx(tx_hash, original_sequence, context).await {
514+
Ok(status) => return Ok(status),
515+
//Err(Error::TxEvicted(_)) => continue,
516+
Err(e) => return Err(e),
517+
}
518+
}
473519
}
474520

475521
async fn load_chain_state(&self, context: &Context) -> Result<ChainState> {
@@ -557,13 +603,12 @@ impl GrpcClient {
557603
async fn sign_and_broadcast_blobs(
558604
&self,
559605
blobs: Vec<Blob>,
560-
cfg: TxConfig,
606+
cfg: &TxConfig,
607+
account: &BaseAccount,
608+
signer: &SignerConfig,
561609
context: &Context,
562-
) -> Result<(Hash, u64)> {
610+
) -> Result<RawBlobTx> {
563611
let ChainState { chain_id, .. } = self.load_chain_state(context).await?;
564-
// lock the account; tx signing and broadcast must be atomic
565-
// because node requires all transactions to be sequenced by account.sequence
566-
let (account, signer) = self.load_account(context).await?;
567612

568613
let pfb = MsgPayForBlobs::new(&blobs, account.address.clone())?;
569614
let pfb = RawTxBody {
@@ -573,14 +618,14 @@ impl GrpcClient {
573618
};
574619

575620
let (gas_limit, gas_price) = self
576-
.calculate_transaction_gas_params(&pfb, &cfg, chain_id.clone(), &account, context)
621+
.calculate_transaction_gas_params(&pfb, cfg, chain_id.clone(), account, context)
577622
.await?;
578623

579624
let fee = (gas_limit as f64 * gas_price).ceil() as u64;
580625
let tx = sign_tx(
581626
pfb,
582627
chain_id.clone(),
583-
&account,
628+
account,
584629
&signer.pubkey,
585630
&signer.signer,
586631
gas_limit,
@@ -595,49 +640,45 @@ impl GrpcClient {
595640
type_id: BLOB_TX_TYPE_ID.to_string(),
596641
};
597642

598-
self.broadcast_tx_with_account(blob_tx.encode_to_vec(), cfg, account, context)
599-
.await
643+
Ok(blob_tx)
600644
}
601645

602646
async fn sign_and_broadcast_tx(
603647
&self,
604648
tx: RawTxBody,
605-
cfg: TxConfig,
649+
cfg: &TxConfig,
650+
account: &BaseAccount,
651+
signer: &SignerConfig,
606652
context: &Context,
607-
) -> Result<(Hash, u64)> {
653+
) -> Result<RawTx> {
608654
let ChainState { chain_id, .. } = self.load_chain_state(context).await?;
609655

610-
// lock the account; tx signing and broadcast must be atomic
611-
// because node requires all transactions to be sequenced by account.sequence
612-
let (account, signer) = self.load_account(context).await?;
613-
614656
let (gas_limit, gas_price) = self
615-
.calculate_transaction_gas_params(&tx, &cfg, chain_id.clone(), &account, context)
657+
.calculate_transaction_gas_params(&tx, cfg, chain_id.clone(), account, context)
616658
.await?;
617659

618660
let fee = (gas_limit as f64 * gas_price).ceil();
619661
let tx = sign_tx(
620662
tx,
621663
chain_id,
622-
&account,
664+
account,
623665
&signer.pubkey,
624666
&signer.signer,
625667
gas_limit,
626668
fee as u64,
627669
)
628670
.await?;
629671

630-
self.broadcast_tx_with_account(tx.encode_to_vec(), cfg, account, context)
631-
.await
672+
Ok(tx)
632673
}
633674

634675
async fn broadcast_tx_with_account(
635676
&self,
636677
tx: Vec<u8>,
637-
cfg: TxConfig,
638-
mut account: MappedMutexGuard<'_, Account>,
678+
cfg: &TxConfig,
679+
//mut account: MappedMutexGuard<'_, Account>,
639680
context: &Context,
640-
) -> Result<(Hash, u64)> {
681+
) -> Result<Hash> {
641682
let resp = self
642683
.broadcast_tx(tx, BroadcastMode::Sync)
643684
.context(context)
@@ -659,10 +700,7 @@ impl GrpcClient {
659700
return Err(Error::TxBroadcastFailed(resp.txhash, resp.code, message));
660701
}
661702

662-
let tx_sequence = account.sequence;
663-
account.sequence += 1;
664-
665-
Ok((resp.txhash, tx_sequence))
703+
Ok(resp.txhash)
666704
}
667705

668706
async fn confirm_tx(&self, hash: Hash, sequence: u64, context: &Context) -> Result<TxInfo> {
@@ -686,6 +724,14 @@ impl GrpcClient {
686724
));
687725
}
688726
}
727+
TxStatus::Rejected => {
728+
// TODO: we could latch on WrongSequence error here to mark account as stale
729+
return Err(Error::TxRejected(
730+
hash,
731+
tx_status.execution_code,
732+
tx_status.error,
733+
));
734+
}
689735
// node will treat this transaction like if it never happened, so
690736
// we need to revert the account's sequence to the one of evicted tx.
691737
// all transactions that were already submitted after this one will fail

grpc/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ pub enum Error {
5656
#[error("Transaction {0} execution failed; code: {1}, error: {2}")]
5757
TxExecutionFailed(Hash, ErrorCode, String),
5858

59+
#[error("Transaction {0} was rejected; code: {1}, error: {2}")]
60+
TxRejected(Hash, ErrorCode, String),
61+
5962
/// Transaction was evicted from the mempool
6063
#[error("Transaction {0} was evicted from the mempool")]
6164
TxEvicted(Hash),

grpc/src/grpc/celestia_tx.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub enum TxStatus {
5454
Pending,
5555
/// The transaction was evicted from the mempool.
5656
Evicted,
57+
/// The transaction was rejected
58+
Rejected,
5759
/// The transaction was committed into the block.
5860
Committed,
5961
}
@@ -64,6 +66,7 @@ impl fmt::Display for TxStatus {
6466
TxStatus::Unknown => "UNKNOWN",
6567
TxStatus::Pending => "PENDING",
6668
TxStatus::Evicted => "EVICTED",
69+
TxStatus::Rejected => "REJECTED",
6770
TxStatus::Committed => "COMMITTED",
6871
};
6972
write!(f, "{s}")
@@ -78,6 +81,7 @@ impl FromStr for TxStatus {
7881
"UNKNOWN" => Ok(TxStatus::Unknown),
7982
"PENDING" => Ok(TxStatus::Pending),
8083
"EVICTED" => Ok(TxStatus::Evicted),
84+
"REJECTED" => Ok(TxStatus::Rejected),
8185
"COMMITTED" => Ok(TxStatus::Committed),
8286
_ => Err(Error::FailedToParseResponse),
8387
}

grpc/tests/tonic.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
use std::future::IntoFuture;
22
use std::sync::Arc;
33

4-
use celestia_grpc::{Error, TxConfig};
4+
use celestia_grpc::grpc::TxPriority;
5+
use celestia_grpc::{Error, TxConfig, TxInfo};
56
use celestia_proto::cosmos::bank::v1beta1::MsgSend;
67
use celestia_rpc::HeaderClient;
8+
use celestia_types::consts::appconsts::{self, SHARE_SIZE};
79
use celestia_types::nmt::Namespace;
810
use celestia_types::state::{Coin, ErrorCode};
911
use celestia_types::{AppVersion, Blob};
1012
use futures::FutureExt;
13+
use k256::ecdsa::SigningKey;
1114
use lumina_utils::test_utils::async_test;
15+
use tokio::task::JoinSet;
1216
use utils::{load_account, new_rpc_client, TestAccount};
1317

1418
pub mod utils;
@@ -212,6 +216,69 @@ async fn submit_blobs_parallel() {
212216
}
213217
}
214218

219+
#[async_test]
220+
async fn resubmit_evicted() {
221+
let (_lock, tx_client) = new_tx_client().await;
222+
let tx_client = Arc::new(tx_client);
223+
224+
async fn submit(tx_client: Arc<celestia_grpc::GrpcClient>, priority: TxPriority) -> TxInfo {
225+
let ns = Namespace::new_v0(&[1, 1, 1]).unwrap();
226+
let payload = vec![0; appconsts::v5::MAX_TX_SIZE as usize];
227+
let blobs = vec![Blob::new(ns, payload, None, AppVersion::V5).unwrap()];
228+
let cfg = TxConfig {
229+
priority,
230+
..Default::default()
231+
};
232+
tx_client.submit_blobs(&blobs, cfg).await.unwrap()
233+
}
234+
235+
let mut futs = JoinSet::new();
236+
for i in 0..13 {
237+
let tx_client = tx_client.clone();
238+
futs.spawn(async move {
239+
let priority = if i < 5 {
240+
TxPriority::Low
241+
} else {
242+
TxPriority::High
243+
};
244+
(i, submit(tx_client, priority).await)
245+
});
246+
}
247+
248+
let results = futs.join_all().await;
249+
println!("{results:#?}");
250+
assert_eq!(1, 2);
251+
/*
252+
let (high_prio_submit, low_prio_submit) = tokio::join!(
253+
async {
254+
let tx_client = tx_client.clone();
255+
let ns = Namespace::new_v0(&[1, 1, 1]).unwrap();
256+
let payload = vec![0; appconsts::v5::MAX_TX_SIZE as usize - 1];
257+
let blobs = vec![Blob::new(ns, payload, None, AppVersion::V5).unwrap()];
258+
let cfg = TxConfig {
259+
priority: TxPriority::Low,
260+
..Default::default()
261+
};
262+
tx_client.submit_blobs(&blobs, cfg).await.unwrap()
263+
},
264+
async {
265+
let tx_client = tx_client.clone();
266+
let ns = Namespace::new_v0(&[1, 1, 1]).unwrap();
267+
let payload = vec![0; appconsts::v5::MAX_TX_SIZE as usize];
268+
let blobs = vec![Blob::new(ns, payload, None, AppVersion::V5).unwrap()];
269+
let cfg = TxConfig {
270+
priority: TxPriority::High,
271+
..Default::default()
272+
};
273+
tx_client.submit_blobs(&blobs, cfg).await.unwrap()
274+
}
275+
);
276+
*/
277+
278+
//println!("{}, {}", low_prio_submit.height, high_prio_submit.height);
279+
//assert!(low_prio_submit.height > high_prio_submit.height);
280+
}
281+
215282
#[async_test]
216283
async fn submit_blobs_insufficient_gas_price_and_limit() {
217284
let (_lock, tx_client) = new_tx_client().await;

0 commit comments

Comments
 (0)