Skip to content

Commit 9241907

Browse files
committed
fix: bug in RetryableObjectStore.list_with_default_retries, clean up ReauthingObjectStore (#1399)
* fix: RetryableObjectStore list_with_retries * fix: remove unnecessary block_on call from ReauthingObjectStore * test: add more list_with_retries test cases to validate non-trivial retryable list calls
1 parent 901294c commit 9241907

File tree

2 files changed

+237
-71
lines changed

2 files changed

+237
-71
lines changed

influxdb3_clap_blocks/src/object_store.rs

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ macro_rules! object_store_config_inner {
536536
/// `--aws-secret-access-key`, and `--aws-session-token`. This is a file path
537537
/// argument where the format of the file is as follows:
538538
///
539-
/// ```
539+
/// ```ignore
540540
/// {
541541
/// "aws_access_key_id": "<key>",
542542
/// "aws_secret_access_key": "<secret>",
@@ -1383,35 +1383,45 @@ impl object_store::ObjectStore for ReauthingObjectStore {
13831383
}
13841384

13851385
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1386-
let inner_cloned = Arc::clone(&self.inner);
1387-
let items: Vec<object_store::Result<ObjectMeta, _>> = tokio::task::block_in_place(|| {
1388-
Handle::current().block_on(async move {
1389-
// we could use TryStreamExt.collect() here to drop all collected results and
1390-
// return the first error we encounter, but users of the ObjectStore API will
1391-
// probably expect to have to deal with errors one element at a time anyway
1392-
inner_cloned.list(prefix).collect().await
1393-
})
1394-
});
1386+
let inner = Arc::clone(&self.inner);
1387+
let credential_reloader = Arc::clone(&self.credential_reloader);
1388+
let prefix = prefix.cloned();
13951389

1396-
if items.is_empty() {
1397-
return futures::stream::iter(items).boxed();
1398-
}
1390+
futures::stream::once(async move {
1391+
use std::pin::Pin;
13991392

1400-
if let Err(object_store::Error::Unauthenticated { source, .. }) = &items[0] {
1401-
warn!(error = ?source, "authentication with object store failed, attempting to reload from disk");
1402-
let items: Vec<Result<ObjectMeta, _>> = tokio::task::block_in_place(|| {
1403-
Handle::current().block_on(async move {
1404-
self.credential_reloader.check_and_update().await;
1405-
// we could use TryStreamExt.collect() here to drop all collected results and
1406-
// return the first error we encounter, but users of the ObjectStore API will
1407-
// probably expect to have to deal with errors one element at a time anyway
1408-
self.inner.as_ref().list(prefix).collect().await
1409-
})
1410-
});
1411-
return futures::stream::iter(items).boxed();
1412-
}
1393+
let mut stream = inner.list(prefix.as_ref()).peekable();
14131394

1414-
futures::stream::iter(items).boxed()
1395+
// Peek at the first item to check for authentication errors
1396+
let first_item = Pin::new(&mut stream).peek().await;
1397+
1398+
match first_item {
1399+
Some(Err(object_store::Error::Unauthenticated { source, .. })) => {
1400+
let path = credential_reloader.path.display();
1401+
warn!(error = ?source, "authentication with object store failed, attempting to reload credentials from {path}");
1402+
credential_reloader.check_and_update().await;
1403+
// Retry with fresh credentials
1404+
inner.list(prefix.as_ref())
1405+
}
1406+
Some(Err(object_store::Error::Generic { source, .. })) => {
1407+
let msg = format!("{source:?}");
1408+
if msg.contains("ExpiredToken") {
1409+
let path = credential_reloader.path.display();
1410+
warn!(error = ?source, "authentication with object store failed (ExpiredToken), attempting to reload credentials from {path}");
1411+
credential_reloader.check_and_update().await;
1412+
// Retry with fresh credentials
1413+
inner.list(prefix.as_ref())
1414+
} else {
1415+
// Not an auth error, return the original stream
1416+
stream.boxed()
1417+
}
1418+
}
1419+
_ => {
1420+
// No auth error or empty stream, return the original stream
1421+
stream.boxed()
1422+
}
1423+
}
1424+
}).flatten().boxed()
14151425
}
14161426

14171427
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {

object_store_utils/src/retryable_object_store.rs

Lines changed: 200 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::pin::Pin;
12
use std::sync::{Arc, OnceLock};
23
use std::time::Duration;
34

@@ -279,27 +280,44 @@ pub trait RetryableObjectStore: ObjectStore {
279280
where
280281
Self: Clone + Send + Sync + 'static,
281282
{
282-
// Note: List operations return streaming results, so we can only retry the initial
283-
// stream creation, not failures that occur during iteration
284-
let prefix_clone = prefix.cloned();
285-
let offset_clone = offset.cloned();
286283
let prefix_str = prefix
287284
.map(|p| p.to_string())
288285
.unwrap_or_else(|| "<root>".to_string());
289286

290287
// Clone self to move into async block
291-
let self_clone = self.clone();
292-
288+
let prefix_clone = prefix.cloned();
289+
let offset_clone = offset.cloned();
290+
let inner = self.clone();
291+
let retry_builder = retry_params.exponential_builder();
293292
let fut = async move {
294-
let retry_builder = retry_params.exponential_builder();
295-
296-
let result: Result<BoxStream<'static, Result<ObjectMeta>>> = (|| async {
297-
if let Some(offset) = &offset_clone {
298-
Ok(self_clone.list_with_offset(prefix_clone.as_ref(), offset))
293+
let prefix_clone = prefix_clone.clone();
294+
let offset_clone = offset_clone.clone();
295+
let inner = inner.clone();
296+
let f = async || -> Result<BoxStream<'static, Result<ObjectMeta>>> {
297+
let mut stream = if let Some(offset) = &offset_clone {
298+
inner.list_with_offset(prefix_clone.as_ref(), offset)
299299
} else {
300-
Ok(self_clone.list(prefix_clone.as_ref()))
300+
inner.list(prefix_clone.as_ref())
301301
}
302-
})
302+
.peekable();
303+
304+
// Because peek only gives us a borrowed Err(e) value, we can only use that peek to
305+
// check if the value is an Err, then get the actual owned value as an error from
306+
// the stream if it is
307+
if Pin::new(&mut stream)
308+
.peek()
309+
.await
310+
.is_some_and(|v| v.is_err())
311+
// the following condition is redundant, but we need to do it to get an owned
312+
// ObjectStoreError
313+
&& let Some(Err(err)) = stream.next().await
314+
{
315+
return Err(err);
316+
}
317+
318+
Ok(stream.boxed())
319+
};
320+
let result: Result<BoxStream<'static, Result<ObjectMeta>>> = f
303321
.retry(&retry_builder)
304322
.notify(|err: &ObjectStoreError, dur: Duration| {
305323
warn!(
@@ -310,7 +328,7 @@ pub trait RetryableObjectStore: ObjectStore {
310328
.await;
311329

312330
match result {
313-
Ok(stream) => stream,
331+
Ok(s) => s,
314332
Err(e) => futures::stream::once(async move { Err(e) }).boxed(),
315333
}
316334
};
@@ -329,33 +347,44 @@ impl RetryableObjectStore for Arc<dyn ObjectStore> {
329347
context_message: String,
330348
retry_params: RetryParams,
331349
) -> BoxStream<'static, Result<ObjectMeta>> {
332-
// Note: List operations return streaming results, so we can only retry the initial
333-
// stream creation, not failures that occur during iteration
334-
let prefix_clone = prefix.cloned();
335-
let offset_clone = offset.cloned();
336350
let prefix_str = prefix
337351
.map(|p| p.to_string())
338352
.unwrap_or_else(|| "<root>".to_string());
339353

340-
// Clone Arc for use in async block
341-
let store = Arc::clone(self);
342-
354+
// Clone self to move into async block
355+
let prefix_clone = prefix.cloned();
356+
let offset_clone = offset.cloned();
357+
let inner = Arc::clone(self);
358+
let retry_builder = retry_params.exponential_builder();
343359
let fut = async move {
344-
let retry_builder = retry_params.exponential_builder();
345-
346-
let o = offset_clone.clone();
347-
let result: Result<BoxStream<'static, Result<ObjectMeta>>> = (move || {
348-
let s = Arc::clone(&store);
349-
let p = prefix_clone.clone();
350-
let o = o.clone();
351-
async move {
352-
if let Some(offset) = &o {
353-
Ok(s.list_with_offset(p.as_ref(), offset))
354-
} else {
355-
Ok(s.list(p.as_ref()))
356-
}
360+
let prefix_clone = prefix_clone.clone();
361+
let offset_clone = offset_clone.clone();
362+
let inner = Arc::clone(&inner);
363+
let f = async || -> Result<BoxStream<'static, Result<ObjectMeta>>> {
364+
let mut stream = if let Some(offset) = &offset_clone {
365+
inner.list_with_offset(prefix_clone.as_ref(), offset)
366+
} else {
367+
inner.list(prefix_clone.as_ref())
357368
}
358-
})
369+
.peekable();
370+
371+
// Because peek only gives us a borrowed Err(e) value, we can only use that peek to
372+
// check if the value is an Err, then get the actual owned value as an error from
373+
// the stream if it is
374+
if Pin::new(&mut stream)
375+
.peek()
376+
.await
377+
.is_some_and(|v| v.is_err())
378+
// the following condition is redundant, but we need to do it to get an owned
379+
// ObjectStoreError
380+
&& let Some(Err(err)) = stream.next().await
381+
{
382+
return Err(err);
383+
}
384+
385+
Ok(stream.boxed())
386+
};
387+
let result: Result<BoxStream<'static, Result<ObjectMeta>>> = f
359388
.retry(&retry_builder)
360389
.notify(|err: &ObjectStoreError, dur: Duration| {
361390
warn!(
@@ -366,7 +395,7 @@ impl RetryableObjectStore for Arc<dyn ObjectStore> {
366395
.await;
367396

368397
match result {
369-
Ok(stream) => stream,
398+
Ok(s) => s,
370399
Err(e) => futures::stream::once(async move { Err(e) }).boxed(),
371400
}
372401
};
@@ -500,12 +529,16 @@ mod tests {
500529
.await
501530
.expect("setup: direct put to InMemory should succeed");
502531

503-
// Create TestObjectStore that will succeed (no error injection)
504-
// Note: list_with_retries only retries stream creation, not errors within the stream
505-
// So we test that list works when no errors are injected
506-
let test_store: Arc<dyn ObjectStore> = Arc::new(TestObjectStore::new(Arc::clone(&inner)));
532+
// Create TestObjectStore that fails first call
533+
let concrete_store = Arc::new(
534+
TestObjectStore::new(Arc::clone(&inner)).with_error_config(ErrorConfig::FirstCallFails),
535+
);
536+
let test_store: Arc<dyn ObjectStore> = Arc::clone(&concrete_store) as _;
537+
538+
// Reset call count before test
539+
concrete_store.reset_call_count();
507540

508-
// Test: List should work normally
541+
// Test: List should fail first, then succeed on retry
509542
let mut stream = test_store.list_with_retries(
510543
None,
511544
None,
@@ -519,11 +552,134 @@ mod tests {
519552
results.push(item);
520553
}
521554

522-
// Should have successfully listed files
523-
assert_eq!(results.len(), 2, "Should list 2 files");
555+
// Should have successfully listed files after retry
556+
assert_eq!(results.len(), 2, "Should list 2 files after retry");
524557
assert!(
525558
results.iter().all(|r: &Result<ObjectMeta>| r.is_ok()),
526-
"All results should be Ok"
559+
"All results should be Ok after retry"
560+
);
561+
562+
// Verify that multiple list calls were made (1 failure + 1 successful retry)
563+
assert_eq!(
564+
concrete_store.get_call_count(),
565+
2,
566+
"Should make exactly 2 list calls (1 failure + 1 successful retry)"
567+
);
568+
}
569+
570+
#[tokio::test]
571+
async fn list_with_retries_exhaustion() {
572+
let inner: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
573+
574+
// Add test data
575+
let path1 = Path::from("file1.txt");
576+
inner
577+
.put(&path1, PutPayload::from("data1"))
578+
.await
579+
.expect("setup: direct put to InMemory should succeed");
580+
581+
// Create TestObjectStore that always fails
582+
let concrete_store = Arc::new(
583+
TestObjectStore::new(Arc::clone(&inner))
584+
.with_error_config(ErrorConfig::PercentageError(100.0)),
585+
);
586+
let test_store: Arc<dyn ObjectStore> = Arc::clone(&concrete_store) as _;
587+
588+
// Reset call count before test
589+
concrete_store.reset_call_count();
590+
591+
// Test: List should fail after exhausting retries
592+
let mut stream = test_store.list_with_retries(
593+
None,
594+
None,
595+
"test context".to_string(),
596+
test_retry_params(4),
597+
);
598+
599+
// Collect results - should get error
600+
let mut has_error = false;
601+
while let Some(item) = stream.next().await {
602+
if item.is_err() {
603+
has_error = true;
604+
}
605+
}
606+
607+
// Should have encountered an error after exhausting retries
608+
assert!(has_error, "Should fail after exhausting retries");
609+
610+
// Verify that exactly 5 calls were made -- the initial first attempt and 4 retries
611+
assert_eq!(
612+
concrete_store.get_call_count(),
613+
5,
614+
"Should make at least 5 list calls, got {}",
615+
concrete_store.get_call_count()
616+
);
617+
}
618+
619+
#[tokio::test]
620+
async fn list_with_retries_every_nth_fails() {
621+
let inner: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
622+
623+
// Add some test data
624+
let path1 = Path::from("file1.txt");
625+
let path2 = Path::from("file2.txt");
626+
inner
627+
.put(&path1, PutPayload::from("data1"))
628+
.await
629+
.expect("setup: direct put to InMemory should succeed");
630+
inner
631+
.put(&path2, PutPayload::from("data2"))
632+
.await
633+
.expect("setup: direct put to InMemory should succeed");
634+
635+
// Create TestObjectStore that fails every 2nd call
636+
let concrete_store = Arc::new(
637+
TestObjectStore::new(Arc::clone(&inner))
638+
.with_error_config(ErrorConfig::EveryNthFails(2)),
639+
);
640+
let test_store: Arc<dyn ObjectStore> = Arc::clone(&concrete_store) as _;
641+
642+
// Reset call count before test
643+
concrete_store.reset_call_count();
644+
645+
// First list should succeed (1st call)
646+
let mut stream1 = test_store.list_with_retries(
647+
None,
648+
None,
649+
"test context".to_string(),
650+
test_retry_params(3),
651+
);
652+
653+
let mut results1 = Vec::new();
654+
while let Some(item) = stream1.next().await {
655+
results1.push(item);
656+
}
657+
658+
assert_eq!(results1.len(), 2, "First list should succeed");
659+
assert_eq!(
660+
concrete_store.get_call_count(),
661+
1,
662+
"First list should make 1 call"
663+
);
664+
665+
// Second list should fail initially (2nd call) but succeed on retry (3rd call)
666+
let mut stream2 = test_store.list_with_retries(
667+
None,
668+
None,
669+
"test context".to_string(),
670+
test_retry_params(3),
671+
);
672+
673+
let mut results2 = Vec::new();
674+
while let Some(item) = stream2.next().await {
675+
results2.push(item);
676+
}
677+
678+
assert_eq!(results2.len(), 2, "Second list should succeed after retry");
679+
assert_eq!(
680+
concrete_store.get_call_count(),
681+
3,
682+
"Total should be 3 calls (1 from first list + 2 from second list with retry)"
527683
);
528684
}
529685

0 commit comments

Comments
 (0)