|
21 | 21 | package org.apache.bookkeeper.replication;
|
22 | 22 |
|
23 | 23 | import com.google.common.base.Stopwatch;
|
| 24 | +import com.google.common.collect.Lists; |
24 | 25 | import com.google.common.collect.Sets;
|
25 | 26 |
|
26 | 27 | import org.apache.bookkeeper.client.BKException;
|
@@ -104,6 +105,7 @@ public class Auditor implements BookiesListener {
|
104 | 105 | private final long auditorPeriodicCheckInterval;
|
105 | 106 | private final long bookieCheckInterval;
|
106 | 107 | private volatile boolean isBookieCheckFirstTime;
|
| 108 | + private Set<String> bookiesToBeAudited = Sets.newHashSet(); |
107 | 109 |
|
108 | 110 | public Auditor(final String bookieIdentifier, ServerConfiguration conf,
|
109 | 111 | ZooKeeper zkc, StatsLogger statsLogger) throws UnavailableException {
|
@@ -195,46 +197,61 @@ public void run() {
|
195 | 197 | waitIfLedgerReplicationDisabled();
|
196 | 198 |
|
197 | 199 | List<String> availableBookies = getAvailableBookies();
|
198 |
| - |
199 | 200 | // casting to String, as knownBookies and availableBookies
|
200 | 201 | // contains only String values
|
201 | 202 | // find new bookies(if any) and update the known bookie list
|
202 | 203 | Collection<String> newBookies = CollectionUtils.subtract(
|
203 | 204 | availableBookies, knownBookies);
|
204 | 205 | knownBookies.addAll(newBookies);
|
| 206 | + if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) { |
| 207 | + // the bookie, which went down earlier and had an audit scheduled for, |
| 208 | + // has come up. So let us stop tracking it and cancel the audit. Since |
| 209 | + // we allow delaying of audit when there is only one failed bookie, |
| 210 | + // bookiesToBeAudited should just have 1 element and hence containsAll |
| 211 | + // check should be ok |
| 212 | + if (auditTask != null && auditTask.cancel(false)) { |
| 213 | + auditTask = null; |
| 214 | + numDelayedBookieAuditsCancelled.inc(); |
| 215 | + } |
| 216 | + bookiesToBeAudited.clear(); |
| 217 | + } |
205 | 218 |
|
206 | 219 | // find lost bookies(if any)
|
207 |
| - Collection<String> lostBookies = CollectionUtils.subtract( |
208 |
| - knownBookies, availableBookies); |
209 |
| - |
210 |
| - if (lostBookies.size() > 0) { |
211 |
| - knownBookies.removeAll(lostBookies); |
212 |
| - if (conf.getLostBookieRecoveryDelay() == 0 || |
213 |
| - lostBookies.size() > 1 || |
214 |
| - auditTask != null) { |
215 |
| - // 1) if more than one bookie is down, start the audit immediately; |
216 |
| - // 2) if we had scheduled an audit earlier for a lost bookie and in |
217 |
| - // the meantime another bookie goes down, let us not delay recovery |
218 |
| - // we cancel the previously scheduled audit before starting the new |
219 |
| - // one |
220 |
| - if (auditTask != null) { |
221 |
| - if (auditTask.cancel(false)) { |
222 |
| - auditTask = null; |
223 |
| - numDelayedBookieAuditsCancelled.inc(); |
224 |
| - } |
225 |
| - } |
226 |
| - startAudit(false); |
227 |
| - } else { |
228 |
| - auditTask = executor.schedule( new Runnable() { |
229 |
| - public void run() { |
230 |
| - startAudit(false); |
231 |
| - auditTask = null; |
232 |
| - } |
233 |
| - }, conf.getLostBookieRecoveryDelay(), TimeUnit.SECONDS); |
234 |
| - numBookieAuditsDelayed.inc(); |
235 |
| - LOG.info("Delaying the start of bookie audit by " + conf.getLostBookieRecoveryDelay() + |
236 |
| - " seconds"); |
| 220 | + bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies)); |
| 221 | + if (bookiesToBeAudited.size() == 0) { |
| 222 | + return; |
| 223 | + } |
| 224 | + knownBookies.removeAll(bookiesToBeAudited); |
| 225 | + if (conf.getLostBookieRecoveryDelay() == 0) { |
| 226 | + startAudit(false); |
| 227 | + bookiesToBeAudited.clear(); |
| 228 | + return; |
| 229 | + } |
| 230 | + if (bookiesToBeAudited.size() > 1) { |
| 231 | + // if more than one bookie is down, start the audit immediately; |
| 232 | + LOG.info("Multiple bookie failure; not delaying bookie audit. Bookies lost now: " |
| 233 | + + CollectionUtils.subtract(knownBookies, availableBookies) |
| 234 | + +"; All lost bookies: " + bookiesToBeAudited.toString()); |
| 235 | + if (auditTask != null && auditTask.cancel(false)) { |
| 236 | + auditTask = null; |
| 237 | + numDelayedBookieAuditsCancelled.inc(); |
237 | 238 | }
|
| 239 | + startAudit(false); |
| 240 | + bookiesToBeAudited.clear(); |
| 241 | + return; |
| 242 | + } |
| 243 | + if (auditTask == null) { |
| 244 | + // if there is no scheduled audit, schedule one |
| 245 | + auditTask = executor.schedule( new Runnable() { |
| 246 | + public void run() { |
| 247 | + startAudit(false); |
| 248 | + auditTask = null; |
| 249 | + bookiesToBeAudited.clear(); |
| 250 | + } |
| 251 | + }, conf.getLostBookieRecoveryDelay(), TimeUnit.SECONDS); |
| 252 | + numBookieAuditsDelayed.inc(); |
| 253 | + LOG.info("Delaying bookie audit by " + conf.getLostBookieRecoveryDelay() |
| 254 | + + "secs for " + bookiesToBeAudited.toString()); |
238 | 255 | }
|
239 | 256 | } catch (BKException bke) {
|
240 | 257 | LOG.error("Exception getting bookie list", bke);
|
|
0 commit comments