Skip to content

Commit ff02de3

Browse files
committed
fix index_block_start and index_block_end
1 parent ac54bdd commit ff02de3

File tree

3 files changed

+72
-98
lines changed

3 files changed

+72
-98
lines changed

lib/ain-ocean/src/indexer/loan_token.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,6 @@ impl Index for SetLoanToken {
3232
}
3333
}
3434

35-
impl IndexBlockEnd for SetLoanToken {
36-
fn index_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
37-
index_active_price(services, block)
38-
}
39-
40-
fn invalidate_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
41-
invalidate_active_price(services, block)
42-
}
43-
}
44-
4535
fn is_aggregate_valid(aggregate: &OraclePriceAggregated, block: &BlockContext) -> bool {
4636
if (aggregate.block.time - block.time).abs() >= 3600 {
4737
return false;

lib/ain-ocean/src/indexer/mod.rs

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use defichain_rpc::json::blockchain::{Block, Transaction, Vin, VinStandard, Vout
1919
use helper::check_if_evm_tx;
2020
use log::trace;
2121
pub use poolswap::PoolSwapAggregatedInterval;
22+
use loan_token::{index_active_price, invalidate_active_price};
23+
use poolswap::{index_pool_swap_aggregated, invalidate_pool_swap_aggregated};
2224

2325
use crate::{
2426
error::{Error, IndexAction},
@@ -41,18 +43,6 @@ pub trait Index {
4143
fn invalidate(&self, services: &Arc<Services>, ctx: &Context) -> Result<()>;
4244
}
4345

44-
pub trait IndexBlockStart: Index {
45-
fn index_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
46-
47-
fn invalidate_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
48-
}
49-
50-
pub trait IndexBlockEnd: Index {
51-
fn index_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
52-
53-
fn invalidate_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
54-
}
55-
5646
#[derive(Debug)]
5747
pub struct Context {
5848
block: BlockContext,
@@ -538,6 +528,22 @@ pub fn get_block_height(services: &Arc<Services>) -> Result<u32> {
538528
.map_or(0, |block| block.height))
539529
}
540530

531+
pub fn index_block_start(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
532+
index_pool_swap_aggregated(services, block)
533+
}
534+
535+
pub fn invalidate_block_start(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
536+
invalidate_pool_swap_aggregated(services, block)
537+
}
538+
539+
pub fn index_block_end(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
540+
index_active_price(services, block)
541+
}
542+
543+
pub fn invalidate_block_end(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
544+
invalidate_active_price(services, block)
545+
}
546+
541547
pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Result<()> {
542548
trace!("[index_block] Indexing block...");
543549
let start = Instant::now();
@@ -586,12 +592,7 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
586592
}
587593
}
588594

589-
// index_block_start
590-
for (dftx, _) in &dftxs {
591-
if let DfTx::PoolSwap(data) = dftx.clone() {
592-
data.index_block_start(services, &block_ctx)?
593-
}
594-
}
595+
index_block_start(services, &block_ctx)?;
595596

596597
// index_dftx
597598
for (dftx, ctx) in &dftxs {
@@ -615,12 +616,7 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
615616
log_elapsed(start, "Indexed dftx");
616617
}
617618

618-
// index_block_end
619-
for (dftx, _) in dftxs {
620-
if let DfTx::SetLoanToken(data) = dftx {
621-
data.index_block_end(services, &block_ctx)?
622-
}
623-
}
619+
index_block_end(services, &block_ctx)?;
624620

625621
let block_mapper = BlockMapper {
626622
hash: block_hash,
@@ -699,12 +695,7 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
699695
}
700696
}
701697

702-
// invalidate_block_end
703-
for (dftx, _) in &dftxs {
704-
if let DfTx::SetLoanToken(data) = dftx.clone() {
705-
data.invalidate_block_end(services, &block_ctx)?
706-
}
707-
}
698+
invalidate_block_end(services, &block_ctx)?;
708699

709700
// invalidate_dftx
710701
for (dftx, ctx) in &dftxs {
@@ -727,12 +718,7 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
727718
log_elapsed(start, "Invalidate dftx");
728719
}
729720

730-
// invalidate_block_start
731-
for (dftx, _) in &dftxs {
732-
if let DfTx::PoolSwap(data) = dftx.clone() {
733-
data.invalidate_block_start(services, &block_ctx)?
734-
}
735-
}
721+
invalidate_block_start(services, &block_ctx)?;
736722

737723
// invalidate_block
738724
services.block.by_height.delete(&block.height)?;

lib/ain-ocean/src/indexer/poolswap.rs

Lines changed: 50 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -182,67 +182,65 @@ fn create_new_bucket(
182182
Ok(())
183183
}
184184

185-
impl IndexBlockStart for PoolSwap {
186-
fn index_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
187-
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
188-
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));
189-
190-
for interval in AGGREGATED_INTERVALS {
191-
for pool_pair in &pool_pairs {
192-
let repo = &services.pool_swap_aggregated;
193-
194-
let prev = repo
195-
.by_key
196-
.list(
197-
Some((pool_pair.id, interval, i64::MAX)),
198-
SortOrder::Descending,
199-
)?
200-
.take_while(|item| match item {
201-
Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval,
202-
_ => true,
203-
})
204-
.next()
205-
.transpose()?;
206-
207-
let bucket = block.median_time - (block.median_time % interval as i64);
208-
209-
let Some((_, prev_id)) = prev else {
210-
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
211-
continue;
212-
};
185+
pub fn index_pool_swap_aggregated(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
186+
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
187+
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));
213188

214-
let Some(prev) = repo.by_id.get(&prev_id)? else {
215-
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
216-
continue;
217-
};
218-
219-
if prev.bucket >= bucket {
220-
break;
221-
}
189+
for interval in AGGREGATED_INTERVALS {
190+
for pool_pair in &pool_pairs {
191+
let repo = &services.pool_swap_aggregated;
192+
193+
let prev = repo
194+
.by_key
195+
.list(
196+
Some((pool_pair.id, interval, i64::MAX)),
197+
SortOrder::Descending,
198+
)?
199+
.take_while(|item| match item {
200+
Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval,
201+
_ => true,
202+
})
203+
.next()
204+
.transpose()?;
205+
206+
let bucket = block.median_time - (block.median_time % interval as i64);
207+
208+
let Some((_, prev_id)) = prev else {
209+
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
210+
continue;
211+
};
222212

213+
let Some(prev) = repo.by_id.get(&prev_id)? else {
223214
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
215+
continue;
216+
};
217+
218+
if prev.bucket >= bucket {
219+
break;
224220
}
225-
}
226221

227-
Ok(())
222+
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
223+
}
228224
}
229225

230-
fn invalidate_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
231-
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
232-
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));
233-
234-
for interval in AGGREGATED_INTERVALS.into_iter().rev() {
235-
for pool_pair in pool_pairs.iter().rev() {
236-
let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash);
237-
services
238-
.pool_swap_aggregated
239-
.by_id
240-
.delete(&pool_swap_aggregated_id)?;
241-
}
242-
}
226+
Ok(())
227+
}
243228

244-
Ok(())
229+
pub fn invalidate_pool_swap_aggregated(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
230+
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
231+
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));
232+
233+
for interval in AGGREGATED_INTERVALS.into_iter().rev() {
234+
for pool_pair in pool_pairs.iter().rev() {
235+
let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash);
236+
services
237+
.pool_swap_aggregated
238+
.by_id
239+
.delete(&pool_swap_aggregated_id)?;
240+
}
245241
}
242+
243+
Ok(())
246244
}
247245

248246
impl Index for PoolSwap {

0 commit comments

Comments
 (0)