1
1
use async_trait:: async_trait;
2
2
use bdk_chain:: collections:: btree_map;
3
+ use bdk_chain:: spk_client:: { FullScanRequest , FullScanResult , SyncRequest , SyncResult } ;
3
4
use bdk_chain:: {
4
5
bitcoin:: { BlockHash , OutPoint , ScriptBuf , TxOut , Txid } ,
5
6
collections:: BTreeMap ,
@@ -8,6 +9,7 @@ use bdk_chain::{
8
9
} ;
9
10
use esplora_client:: TxStatus ;
10
11
use futures:: { stream:: FuturesOrdered , TryStreamExt } ;
12
+ use std:: fmt:: Debug ;
11
13
12
14
use crate :: anchor_from_status;
13
15
@@ -45,22 +47,17 @@ pub trait EsploraAsyncExt {
45
47
) -> Result < local_chain:: Update , Error > ;
46
48
47
49
/// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and
48
- /// returns a [`TxGraph`] and a map of last active indices.
49
- ///
50
- /// * `keychain_spks`: keychains that we want to scan transactions for
50
+ /// returns a [`TxGraph`] and a map of keychain last active indices.
51
51
///
52
52
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
53
53
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
54
54
/// parallel.
55
- async fn full_scan < K : Ord + Clone + Send > (
55
+ async fn full_scan < K : Ord + Clone + Send + Debug , I : Iterator < Item = ( u32 , ScriptBuf ) > + Send > (
56
56
& self ,
57
- keychain_spks : BTreeMap <
58
- K ,
59
- impl IntoIterator < IntoIter = impl Iterator < Item = ( u32 , ScriptBuf ) > + Send > + Send ,
60
- > ,
57
+ request : FullScanRequest < K , I > ,
61
58
stop_gap : usize ,
62
59
parallel_requests : usize ,
63
- ) -> Result < ( TxGraph < ConfirmationTimeHeightAnchor > , BTreeMap < K , u32 > ) , Error > ;
60
+ ) -> Result < FullScanResult < K > , Error > ;
64
61
65
62
/// Sync a set of scripts with the blockchain (via an Esplora client) for the data
66
63
/// specified and return a [`TxGraph`].
@@ -76,11 +73,9 @@ pub trait EsploraAsyncExt {
76
73
/// [`full_scan`]: EsploraAsyncExt::full_scan
77
74
async fn sync (
78
75
& self ,
79
- misc_spks : impl IntoIterator < IntoIter = impl Iterator < Item = ScriptBuf > + Send > + Send ,
80
- txids : impl IntoIterator < IntoIter = impl Iterator < Item = Txid > + Send > + Send ,
81
- outpoints : impl IntoIterator < IntoIter = impl Iterator < Item = OutPoint > + Send > + Send ,
76
+ request : SyncRequest ,
82
77
parallel_requests : usize ,
83
- ) -> Result < TxGraph < ConfirmationTimeHeightAnchor > , Error > ;
78
+ ) -> Result < SyncResult , Error > ;
84
79
}
85
80
86
81
#[ cfg_attr( target_arch = "wasm32" , async_trait( ?Send ) ) ]
@@ -149,25 +144,24 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
149
144
} )
150
145
}
151
146
152
- async fn full_scan < K : Ord + Clone + Send > (
147
+ async fn full_scan <
148
+ K : Ord + Clone + Send + Debug ,
149
+ I : Iterator < Item = ( u32 , ScriptBuf ) > + Send ,
150
+ > (
153
151
& self ,
154
- keychain_spks : BTreeMap <
155
- K ,
156
- impl IntoIterator < IntoIter = impl Iterator < Item = ( u32 , ScriptBuf ) > + Send > + Send ,
157
- > ,
152
+ mut request : FullScanRequest < K , I > ,
158
153
stop_gap : usize ,
159
154
parallel_requests : usize ,
160
- ) -> Result < ( TxGraph < ConfirmationTimeHeightAnchor > , BTreeMap < K , u32 > ) , Error > {
155
+ ) -> Result < FullScanResult < K > , Error > {
161
156
type TxsOfSpkIndex = ( u32 , Vec < esplora_client:: Tx > ) ;
162
157
let parallel_requests = Ord :: max ( parallel_requests, 1 ) ;
163
- let mut graph = TxGraph :: < ConfirmationTimeHeightAnchor > :: default ( ) ;
164
- let mut last_active_indexes = BTreeMap :: < K , u32 > :: new ( ) ;
158
+ let mut graph_update = TxGraph :: < ConfirmationTimeHeightAnchor > :: default ( ) ;
159
+ let mut last_active_indices = BTreeMap :: < K , u32 > :: new ( ) ;
165
160
166
- for ( keychain, spks) in keychain_spks {
161
+ for ( keychain, spks) in request . take_spks_by_keychain ( ) {
167
162
let mut spks = spks. into_iter ( ) ;
168
163
let mut last_index = Option :: < u32 > :: None ;
169
164
let mut last_active_index = Option :: < u32 > :: None ;
170
-
171
165
loop {
172
166
let handles = spks
173
167
. by_ref ( )
@@ -200,9 +194,9 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
200
194
last_active_index = Some ( index) ;
201
195
}
202
196
for tx in txs {
203
- let _ = graph . insert_tx ( tx. to_tx ( ) ) ;
197
+ let _ = graph_update . insert_tx ( tx. to_tx ( ) ) ;
204
198
if let Some ( anchor) = anchor_from_status ( & tx. status ) {
205
- let _ = graph . insert_anchor ( tx. txid , anchor) ;
199
+ let _ = graph_update . insert_anchor ( tx. txid , anchor) ;
206
200
}
207
201
208
202
let previous_outputs = tx. vin . iter ( ) . filter_map ( |vin| {
@@ -220,7 +214,7 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
220
214
} ) ;
221
215
222
216
for ( outpoint, txout) in previous_outputs {
223
- let _ = graph . insert_txout ( outpoint, txout) ;
217
+ let _ = graph_update . insert_txout ( outpoint, txout) ;
224
218
}
225
219
}
226
220
}
@@ -237,42 +231,44 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
237
231
}
238
232
239
233
if let Some ( last_active_index) = last_active_index {
240
- last_active_indexes . insert ( keychain, last_active_index) ;
234
+ last_active_indices . insert ( keychain, last_active_index) ;
241
235
}
242
236
}
243
237
244
- Ok ( ( graph, last_active_indexes) )
238
+ // new tx graph transactions determine possible missing blockchain heights
239
+ let missing_heights = graph_update. anchor_heights ( request. chain_tip . height ( ) ) ;
240
+ // get blockchain update from original request checkpoint and missing heights
241
+ let chain_update = self
242
+ . update_local_chain ( request. chain_tip . clone ( ) , missing_heights)
243
+ . await ?;
244
+
245
+ Ok ( FullScanResult {
246
+ graph_update,
247
+ chain_update,
248
+ last_active_indices,
249
+ } )
245
250
}
246
251
247
252
async fn sync (
248
253
& self ,
249
- misc_spks : impl IntoIterator < IntoIter = impl Iterator < Item = ScriptBuf > + Send > + Send ,
250
- txids : impl IntoIterator < IntoIter = impl Iterator < Item = Txid > + Send > + Send ,
251
- outpoints : impl IntoIterator < IntoIter = impl Iterator < Item = OutPoint > + Send > + Send ,
254
+ mut request : SyncRequest ,
252
255
parallel_requests : usize ,
253
- ) -> Result < TxGraph < ConfirmationTimeHeightAnchor > , Error > {
254
- let mut graph = self
255
- . full_scan (
256
- [ (
257
- ( ) ,
258
- misc_spks
259
- . into_iter ( )
260
- . enumerate ( )
261
- . map ( |( i, spk) | ( i as u32 , spk) ) ,
262
- ) ]
263
- . into ( ) ,
264
- usize:: MAX ,
265
- parallel_requests,
266
- )
256
+ ) -> Result < SyncResult , Error > {
257
+ let mut full_scan_request = FullScanRequest :: new ( request. chain_tip . clone ( ) ) ;
258
+ let spks = [ ( 0 , request. take_spks ( ) ) ] . into ( ) ;
259
+ full_scan_request. add_spks_by_keychain ( spks) ;
260
+
261
+ let mut graph_update = self
262
+ . full_scan ( full_scan_request, usize:: MAX , parallel_requests)
267
263
. await
268
- . map ( |( g , _ ) | g ) ?;
264
+ . map ( |result| result . graph_update ) ?;
269
265
270
- let mut txids = txids . into_iter ( ) ;
266
+ let mut txids = request . take_txids ( ) ;
271
267
loop {
272
268
let handles = txids
273
269
. by_ref ( )
274
270
. take ( parallel_requests)
275
- . filter ( |& txid| graph . get_tx ( txid) . is_none ( ) )
271
+ . filter ( |& txid| graph_update . get_tx ( txid) . is_none ( ) )
276
272
. map ( |txid| {
277
273
let client = self . clone ( ) ;
278
274
async move { client. get_tx_status ( & txid) . await . map ( |s| ( txid, s) ) }
@@ -285,36 +281,52 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
285
281
286
282
for ( txid, status) in handles. try_collect :: < Vec < ( Txid , TxStatus ) > > ( ) . await ? {
287
283
if let Some ( anchor) = anchor_from_status ( & status) {
288
- let _ = graph . insert_anchor ( txid, anchor) ;
284
+ let _ = graph_update . insert_anchor ( txid, anchor) ;
289
285
}
290
286
}
291
287
}
292
288
293
- for op in outpoints . into_iter ( ) {
294
- if graph . get_tx ( op. txid ) . is_none ( ) {
289
+ for op in request . take_outpoints ( ) {
290
+ if graph_update . get_tx ( op. txid ) . is_none ( ) {
295
291
if let Some ( tx) = self . get_tx ( & op. txid ) . await ? {
296
- let _ = graph . insert_tx ( tx) ;
292
+ let _ = graph_update . insert_tx ( tx) ;
297
293
}
298
294
let status = self . get_tx_status ( & op. txid ) . await ?;
299
295
if let Some ( anchor) = anchor_from_status ( & status) {
300
- let _ = graph . insert_anchor ( op. txid , anchor) ;
296
+ let _ = graph_update . insert_anchor ( op. txid , anchor) ;
301
297
}
302
298
}
303
299
304
300
if let Some ( op_status) = self . get_output_status ( & op. txid , op. vout as _ ) . await ? {
305
301
if let Some ( txid) = op_status. txid {
306
- if graph . get_tx ( txid) . is_none ( ) {
302
+ if graph_update . get_tx ( txid) . is_none ( ) {
307
303
if let Some ( tx) = self . get_tx ( & txid) . await ? {
308
- let _ = graph . insert_tx ( tx) ;
304
+ let _ = graph_update . insert_tx ( tx) ;
309
305
}
310
306
let status = self . get_tx_status ( & txid) . await ?;
311
307
if let Some ( anchor) = anchor_from_status ( & status) {
312
- let _ = graph . insert_anchor ( txid, anchor) ;
308
+ let _ = graph_update . insert_anchor ( txid, anchor) ;
313
309
}
314
310
}
315
311
}
316
312
}
317
313
}
318
- Ok ( graph)
314
+
315
+ // Now that we're done updating the `IndexedTxGraph`, it's time to update the `LocalChain`! We
316
+ // want the `LocalChain` to have data about all the anchors in the `TxGraph` - for this reason,
317
+ // we want to retrieve the blocks at the heights of the newly added anchors that are missing from
318
+ // our view of the chain.
319
+
320
+ // new tx graph transactions determine possible missing blockchain heights
321
+ let missing_heights = graph_update. anchor_heights ( request. chain_tip . height ( ) ) ;
322
+ // get blockchain update from original request checkpoint and missing heights
323
+ let chain_update = self
324
+ . update_local_chain ( request. chain_tip . clone ( ) , missing_heights)
325
+ . await ?;
326
+
327
+ Ok ( SyncResult {
328
+ graph_update,
329
+ chain_update,
330
+ } )
319
331
}
320
332
}
0 commit comments