Skip to content

Commit 52903b0

Browse files
committed
Implement partitioned upload server
1 parent fada0c0 commit 52903b0

File tree

4 files changed

+91
-16
lines changed

4 files changed

+91
-16
lines changed

lib/plug/application.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule Plug.Application do
88
Plug.Keys = :ets.new(Plug.Keys, [:named_table, :public, read_concurrency: true])
99

1010
children = [
11-
Plug.Upload
11+
Plug.Upload.Supervisor
1212
]
1313

1414
Supervisor.start_link(children, name: __MODULE__, strategy: :one_for_one)

lib/plug/upload.ex

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ defmodule Plug.Upload do
4040
@dir_table __MODULE__.Dir
4141
@path_table __MODULE__.Path
4242
@max_attempts 10
43-
@temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s
4443

4544
@doc """
4645
Requests a random file to be created in the upload directory
@@ -85,7 +84,7 @@ defmodule Plug.Upload do
8584
:ok
8685

8786
[] ->
88-
server = plug_server()
87+
server = plug_server(to_pid)
8988
{:ok, tmp} = generate_tmp_dir()
9089
:ok = GenServer.call(server, {:give_away, to_pid, tmp, path})
9190
:ets.delete_object(@path_table, {from_pid, path})
@@ -105,7 +104,7 @@ defmodule Plug.Upload do
105104
{:ok, tmp}
106105

107106
[] ->
108-
server = plug_server()
107+
server = plug_server(pid)
109108
GenServer.cast(server, {:monitor, pid})
110109

111110
with {:ok, tmp} <- generate_tmp_dir() do
@@ -188,30 +187,23 @@ defmodule Plug.Upload do
188187
end
189188
end
190189

191-
defp plug_server do
192-
Process.whereis(__MODULE__) ||
190+
defp plug_server(pid) do
191+
PartitionSupervisor.whereis_name({__MODULE__, pid})
192+
rescue
193+
ArgumentError ->
193194
raise Plug.UploadError,
194195
"could not find process Plug.Upload. Have you started the :plug application?"
195196
end
196197

197198
@doc false
198199
def start_link(_) do
199-
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
200+
GenServer.start_link(__MODULE__, :ok)
200201
end
201202

202203
## Callbacks
203204

204205
@impl true
205206
def init(:ok) do
206-
Process.flag(:trap_exit, true)
207-
tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand()
208-
cwd = Path.join(File.cwd!(), "tmp")
209-
# Add a tiny random component to avoid clashes between nodes
210-
suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64()
211-
:persistent_term.put(__MODULE__, {[tmp, cwd], suffix})
212-
213-
:ets.new(@dir_table, [:named_table, :public, :set])
214-
:ets.new(@path_table, [:named_table, :public, :duplicate_bag])
215207
{:ok, %{}}
216208
end
217209

lib/plug/upload/supervisor.ex

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
defmodule Plug.Upload.Supervisor do
2+
@moduledoc false
3+
use Supervisor
4+
5+
@temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s
6+
@dir_table Plug.Upload.Dir
7+
@path_table Plug.Upload.Path
8+
@otp_vsn System.otp_release() |> String.to_integer()
9+
@write_mode if @otp_vsn >= 25, do: :auto, else: true
10+
@ets_opts [:public, :named_table, read_concurrency: true, write_concurrency: @write_mode]
11+
12+
def start_link(args) do
13+
Supervisor.start_link(__MODULE__, args, name: __MODULE__)
14+
end
15+
16+
@impl true
17+
def init(_args) do
18+
# Initialize the upload system
19+
tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand()
20+
cwd = Path.join(File.cwd!(), "tmp")
21+
# Add a tiny random component to avoid clashes between nodes
22+
suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64()
23+
:persistent_term.put(Plug.Upload, {[tmp, cwd], suffix})
24+
:ets.new(@dir_table, [:set | @ets_opts])
25+
:ets.new(@path_table, [:duplicate_bag | @ets_opts])
26+
27+
children = [
28+
{PartitionSupervisor, child_spec: Plug.Upload, name: Plug.Upload}
29+
]
30+
31+
Supervisor.init(children, strategy: :one_for_one)
32+
end
33+
end

test/plug/upload_test.exs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,54 @@ defmodule Plug.UploadTest do
156156
wait_until(fn -> not File.exists?(path1) end)
157157
end
158158
end
159+
160+
test "give_away with invalid path returns error" do
161+
result = Plug.Upload.give_away("/invalid/path", spawn(fn -> :ok end))
162+
assert result == {:error, :unknown_path}
163+
end
164+
165+
test "give_away when target process dies during transfer" do
166+
{:ok, path} = Plug.Upload.random_file("target_dies")
167+
168+
# Create a process that dies immediately
169+
pid = spawn(fn -> :ok end)
170+
171+
# This should still work but file will be cleaned up when dead process is detected
172+
result = Plug.Upload.give_away(path, pid)
173+
assert result == :ok
174+
wait_until(fn -> not File.exists?(path) end)
175+
end
176+
177+
test "routes uploads to correct partition based on process" do
178+
parent = self()
179+
num_processes = 10
180+
181+
# Create uploads from different processes and verify they get different servers
182+
tasks =
183+
Enum.map(1..num_processes, fn i ->
184+
Task.async(fn ->
185+
{:ok, path} = Plug.Upload.random_file("partition_test_#{i}")
186+
server = PartitionSupervisor.whereis_name({Plug.Upload, self()})
187+
send(parent, {:result, i, path, server})
188+
path
189+
end)
190+
end)
191+
192+
# Collect results
193+
results =
194+
Enum.map(1..num_processes, fn _ ->
195+
receive do
196+
{:result, i, path, server} -> {i, path, server}
197+
after
198+
1_000 -> flunk("didn't get result")
199+
end
200+
end)
201+
202+
# Verify different processes got different servers (partitioning working)
203+
servers = Enum.map(results, fn {_, _, server} -> server end)
204+
assert length(Enum.uniq(servers)) > 1
205+
206+
# Cleanup
207+
Enum.each(tasks, &Task.await/1)
208+
end
159209
end

0 commit comments

Comments
 (0)