Skip to content

Commit bc18a3e

Browse files
authored
Add retrying to upload functionality (#36)
(DIS-1804)
1 parent de34a10 commit bc18a3e

File tree

4 files changed

+86
-28
lines changed

4 files changed

+86
-28
lines changed

acquire/acquire.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from acquire.log import get_file_handler, reconfigure_log_file, setup_logging
3737
from acquire.outputs import OUTPUTS
3838
from acquire.uploaders.minio import MinIO
39-
from acquire.uploaders.plugin import UploaderPlugin
39+
from acquire.uploaders.plugin import UploaderPlugin, upload_files_using_uploader
4040
from acquire.uploaders.plugin_registry import UploaderRegistry
4141
from acquire.utils import (
4242
check_and_set_acquire_args,
@@ -1689,7 +1689,7 @@ def upload_files(
16891689
log.debug("Proxies: %s (no_proxy = %s)", proxies, no_proxy)
16901690

16911691
try:
1692-
upload_plugin.upload_files(paths, proxies)
1692+
upload_files_using_uploader(upload_plugin, paths, proxies)
16931693
except Exception:
16941694
log.error("Upload %s FAILED. See log file for details.", paths)
16951695
log.exception("")

acquire/uploaders/minio.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
import logging
21
import os
32
from pathlib import Path
4-
from typing import Any, List, Optional
3+
from typing import Any, Optional
54

65
from acquire.uploaders.plugin import UploaderPlugin
76

8-
log = logging.getLogger(__name__)
9-
107

118
class MinIO(UploaderPlugin):
129
def __init__(self, upload: dict[str, str], **kwargs: dict[str, Any]) -> None:
@@ -27,8 +24,8 @@ def __init__(self, upload: dict[str, str], **kwargs: dict[str, Any]) -> None:
2724
if not all((self.endpoint, self.access_id, self.access_key, self.bucket_name)):
2825
raise ValueError("Invalid cloud upload configuration")
2926

30-
def upload_files(self, paths: List[Path], proxies: Optional[dict[str, str]] = None) -> None:
31-
"""Uploads files using MinIO
27+
def prepare_client(self, paths: list[Path], proxies: Optional[dict[str, str]] = None) -> Any:
28+
"""Prepares a Minio client used to upload files.
3229
3330
Args:
3431
paths: The files to upload.
@@ -45,12 +42,10 @@ def upload_files(self, paths: List[Path], proxies: Optional[dict[str, str]] = No
4542

4643
http_client = urllib3.proxy_from_url(proxies["http"]) if proxies else None
4744

48-
client = Minio(self.endpoint, self.access_id, self.access_key, http_client=http_client)
49-
for path in paths:
50-
try:
51-
log.info("Uploading %s", path)
52-
client.fput_object(self.bucket_name, os.path.basename(path), path)
53-
log.info("Uploaded %s", path)
54-
except Exception:
55-
log.error("Upload %s FAILED. See log file for details.", path)
56-
log.exception("")
45+
return Minio(self.endpoint, self.access_id, self.access_key, http_client=http_client)
46+
47+
def upload_file(self, client: Any, path: Path) -> None:
48+
client.fput_object(self.bucket_name, os.path.basename(path), path)
49+
50+
def finish(self, client: Any) -> None:
51+
pass

acquire/uploaders/plugin.py

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,60 @@
1+
import logging
12
from pathlib import Path
2-
from typing import List, Optional, Protocol, runtime_checkable
3+
from typing import Any, Optional
34

5+
log = logging.getLogger(__name__)
46

5-
@runtime_checkable
6-
class UploaderPlugin(Protocol):
7+
__all__ = [
8+
"UploaderPlugin",
9+
"upload_files_using_uploader",
10+
]
11+
12+
MAX_RETRIES = 4
13+
14+
15+
class UploaderPlugin:
716
"""Creates a typing definition to which an UploaderPlugin should adhere."""
817

9-
def upload_files(self, paths: List[Path], proxies: Optional[dict[str, str]] = None) -> None:
10-
"""Uploads the files in ``paths`` to a destination.
18+
def prepare_client(self, paths: list[Path], proxies: Optional[dict[str, str]] = None) -> Any:
19+
"""Prepares a client for the upload."""
20+
raise NotImplementedError()
21+
22+
def upload_file(self, client: Any, path: Path) -> None:
23+
"""Uploads a file/path using the ``client``."""
24+
raise NotImplementedError()
25+
26+
def finish(self, client: Any) -> None:
27+
"""A cleanup step or anything required to finish the upload."""
28+
raise NotImplementedError()
29+
30+
31+
def upload_files_using_uploader(
32+
uploader: UploaderPlugin, paths: list[Path], proxies: Optional[dict[str, str]] = None
33+
) -> None:
34+
"""Uploads the files in ``paths`` to a destination.
35+
36+
Args:
37+
uploader: The plugin used to upload files.
38+
paths: A list of files to upload.
39+
proxies: Proxies used as an intermediate during an upload.
40+
"""
41+
paths = [Path(path) if isinstance(path, str) else path for path in paths]
42+
client = uploader.prepare_client(paths, proxies)
43+
44+
for path in paths:
45+
for retry in range(MAX_RETRIES):
46+
if retry == MAX_RETRIES - 1:
47+
error_log = ("Upload %s FAILED after too many attempts. Stopping.", path)
48+
else:
49+
error_log = ("Upload %s FAILED. See log file for details. Retrying", path)
50+
51+
try:
52+
log.info("Uploading %s", path)
53+
uploader.upload_file(client, path)
54+
log.info("Uploaded %s", path)
55+
break
56+
except Exception:
57+
log.error(*error_log)
58+
log.exception("")
1159

12-
Args:
13-
paths: A list of files to upload.
14-
proxies: Proxies used as an intermediate during an upload.
15-
"""
16-
...
60+
uploader.finish(client)

tests/test_minio_uploader.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from pathlib import Path
2+
from unittest.mock import Mock, patch
23

34
import pytest
45

56
from acquire.uploaders.minio import MinIO
7+
from acquire.uploaders.plugin import upload_files_using_uploader
68
from acquire.uploaders.plugin_registry import UploaderRegistry
79

810

@@ -59,4 +61,21 @@ def test_minio_valueerror(minio_plugin):
5961

6062
def test_upload_files(minio_instance):
6163
# Generates an internal error, which is caught
62-
minio_instance.upload_files([Path("hello")])
64+
upload_files_using_uploader(minio_instance, [Path("hello")])
65+
66+
67+
def test_upload_file_multiple_failures(minio_instance: MinIO):
68+
mocked_upload = Mock()
69+
mocked_upload.side_effect = [Exception, Exception, Exception, "Hello"]
70+
minio_instance.upload_file = mocked_upload
71+
72+
with patch("acquire.uploaders.plugin.log") as mocked_logger:
73+
# Only three retry attempts get made
74+
upload_files_using_uploader(minio_instance, [Path("hello")])
75+
mocked_logger.error.assert_called_with("Upload %s FAILED. See log file for details. Retrying", Path("hello"))
76+
77+
# One extra gives a value error.
78+
mocked_upload.side_effect = [Exception, Exception, Exception, Exception]
79+
with patch("acquire.uploaders.plugin.log") as mocked_logger:
80+
upload_files_using_uploader(minio_instance, [Path("hello")])
81+
mocked_logger.error.assert_called_with("Upload %s FAILED after too many attempts. Stopping.", Path("hello"))

0 commit comments

Comments
 (0)