Skip to content

Commit 5ba2a0b

Browse files
committed
Continue emitting progress updates until import is fully completed
This is needed in case there are multiple rounds of imports.
1 parent fdd46f3 commit 5ba2a0b

File tree

3 files changed

+42
-21
lines changed

3 files changed

+42
-21
lines changed

src/app.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ fn wait_bitcoind(rpc: &RpcClient, progress_tx: Option<mpsc::Sender<Progress>>) -
225225
const INTERVAL: time::Duration = time::Duration::from_secs(7);
226226

227227
let bcinfo = rpc.wait_blockchain_sync(progress_tx.clone(), INTERVAL)?;
228-
let walletinfo = rpc.wait_wallet_scan(progress_tx, INTERVAL, false)?;
228+
let walletinfo = rpc.wait_wallet_scan(progress_tx, None, INTERVAL)?;
229229

230230
let netinfo = rpc.get_network_info()?;
231231
info!(

src/indexer.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ impl Indexer {
5252
let mut changelog = Changelog::new(false);
5353
let mut synced_tip;
5454

55-
spawn_send_progress_thread(self.rpc.clone(), progress_tx);
55+
let shutdown_progress_thread = spawn_send_progress_thread(self.rpc.clone(), progress_tx);
5656

5757
while {
5858
synced_tip = self.sync_transactions(&mut changelog)?;
5959
self.watcher.do_imports(&self.rpc, /*rescan=*/ true)?
6060
} { /* do while */ }
6161

62+
shutdown_progress_thread.send(()).unwrap();
63+
6264
self.sync_mempool(/*force_refresh=*/ true)?;
6365

6466
let stats = self.store.stats();
@@ -427,16 +429,23 @@ impl fmt::Display for IndexChange {
427429
fn spawn_send_progress_thread(
428430
rpc: Arc<RpcClient>,
429431
progress_tx: Option<mpsc::Sender<Progress>>,
430-
) -> thread::JoinHandle<()> {
432+
) -> mpsc::SyncSender<()> {
431433
const DELAY: time::Duration = time::Duration::from_millis(250);
432434
const INTERVAL: time::Duration = time::Duration::from_millis(1500);
433435

436+
let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
437+
434438
thread::spawn(move || {
435439
// allow some time for the indexer to start the first set of imports
436440
thread::sleep(DELAY);
437441

438-
if let Err(e) = rpc.wait_wallet_scan(progress_tx, INTERVAL, true) {
442+
if shutdown_rx.try_recv() != Err(mpsc::TryRecvError::Empty) {
443+
return;
444+
}
445+
if let Err(e) = rpc.wait_wallet_scan(progress_tx, Some(shutdown_rx), INTERVAL) {
439446
warn!("getwalletinfo failed: {:?}", e);
440447
}
441-
})
448+
});
449+
450+
shutdown_tx
442451
}

src/util/bitcoincore_ext.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,21 @@ pub trait RpcApiExt: RpcApi {
8787
fn wait_wallet_scan(
8888
&self,
8989
progress_tx: Option<mpsc::Sender<Progress>>,
90+
shutdown_rx: Option<mpsc::Receiver<()>>,
9091
interval: time::Duration,
91-
mut wait_for_scanning: bool,
9292
) -> RpcResult<json::GetWalletInfoResult> {
93-
let start = time::Instant::now();
93+
// Stop if the shutdown signal was received or if the channel was disconnected
94+
let should_shutdown = || {
95+
shutdown_rx
96+
.as_ref()
97+
.map_or(false, |rx| rx.try_recv() != Err(mpsc::TryRecvError::Empty))
98+
};
99+
94100
Ok(loop {
95101
let info = self.get_wallet_info()?;
102+
if should_shutdown() {
103+
break info;
104+
}
96105
match info.scanning {
97106
None => {
98107
warn!("Your bitcoin node does not report the `scanning` status in `getwalletinfo`. It is recommended to upgrade to Bitcoin Core v0.19+ to enable this.");
@@ -109,36 +118,39 @@ pub trait RpcApiExt: RpcApi {
109118
break info;
110119
}
111120
}
112-
// wait_wallet_scan() could be called before scanning actually started,
113-
// give it a few seconds to start up before giving up
114-
if !wait_for_scanning || start.elapsed().as_secs() > 3 {
121+
// Stop as soon as scanning is completed if no explicit shutdown_rx was given,
122+
// or continue until the shutdown signal is received if it was.
123+
if shutdown_rx.is_none() {
115124
break info;
116125
}
117126
}
118-
Some(ScanningDetails::Scanning { progress, duration }) => {
119-
wait_for_scanning = false;
120-
let duration = duration as u64;
121-
let progress_n = progress as f32;
127+
Some(ScanningDetails::Scanning {
128+
progress: progress_n,
129+
duration,
130+
}) => {
122131
let eta = if progress_n > 0.0 {
123-
(duration as f32 / progress_n) as u64 - duration
132+
(duration as f32 / progress_n) as u64 - duration as u64
124133
} else {
125134
0
126135
};
127136

128-
info!(target: "bwt",
129-
"waiting for bitcoind to finish scanning [done {:.1}%, running for {}m, eta {}m]",
130-
progress_n * 100.0, duration / 60, eta / 60
131-
);
132-
133137
if let Some(ref progress_tx) = progress_tx {
134138
let progress = Progress::Scan { progress_n, eta };
135139
if progress_tx.send(progress).is_err() {
136140
break info;
137141
}
142+
} else {
143+
info!(target: "bwt",
144+
"waiting for bitcoind to finish scanning [done {:.1}%, running for {}m, eta {}m]",
145+
progress_n * 100.0, duration / 60, eta / 60
146+
);
138147
}
139148
}
140-
};
149+
}
141150
thread::sleep(interval);
151+
if should_shutdown() {
152+
break info;
153+
}
142154
})
143155
}
144156
}

0 commit comments

Comments
 (0)