Skip to content

Commit f8c1baa

Browse files
tikksskou
andauthored
downloader: avoid duplicate downloads (#243)
GitHub: fix GH-242 Red Datasets sometimes handle large files. Downloading the same data multiple times is not suitable for this use case. We should not download per worker. Add a cache check not only before acquiring the lock, but also after acquiring the lock. While checking only after acquiring the lock would be enough, it would create a lock file unnecessarily, so we kept the pre-lock cache check as well. This patch will avoid duplicate downloads in the following case: ```mermaid sequenceDiagram participant P1 as Process 1 participant P2 as Process 2 participant FS as File System P1->>FS: check output_path.exist? => false P2->>FS: check output_path.exist? => false P1->>FS: create lock => success P2->>FS: create lock => failure (sleep 1~10s) P1->>FS: download P1->>FS: delete lock P2->>FS: create lock => success Note over P2: No re-check after lock P2->>FS: download (duplicate) P2->>FS: delete lock ``` --------- Co-authored-by: Sutou Kouhei <[email protected]>
1 parent 681b46d commit f8c1baa

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

lib/datasets/downloader.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@ def initialize(url, *fallback_urls, http_method: nil, http_parameters: nil)
2020
end
2121

2222
def download(output_path, &block)
23-
if output_path.exist?
24-
yield_chunks(output_path, &block) if block_given?
25-
return
26-
end
23+
return if use_cache(output_path, &block)
2724

2825
partial_output_path = Pathname.new("#{output_path}.partial")
2926
synchronize(output_path, partial_output_path) do
27+
return if use_cache(output_path, &block)
28+
3029
output_path.parent.mkpath
3130

3231
n_retries = 0
@@ -94,6 +93,15 @@ def download(output_path, &block)
9493
url
9594
end
9695

96+
private def use_cache(output_path, &block)
97+
if output_path.exist?
98+
yield_chunks(output_path, &block) if block_given?
99+
true
100+
else
101+
false
102+
end
103+
end
104+
97105
private def synchronize(output_path, partial_output_path)
98106
begin
99107
Process.getpgid(Process.pid)

0 commit comments

Comments
 (0)