Skip to content

Commit b9aa992

Browse files
authored
fix: realtime channel error cases (#1472)
* fix: realtime channel error cases Also: * Don't reset Authorization policies on :confirm_token * Avoid resetting Authorization policies twice on new "access_token" * New access token was not being passed to authorization_context * Capture Connect RPC errors
1 parent adc824c commit b9aa992

File tree

7 files changed

+355
-47
lines changed

7 files changed

+355
-47
lines changed

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ defmodule RealtimeWeb.RealtimeChannel do
6464
:ok <- limit_channels(socket),
6565
:ok <- limit_max_users(socket.assigns),
6666
{:ok, claims, confirm_token_ref, access_token, _} <- confirm_token(socket),
67-
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
6867
socket = assign_authorization_context(socket, sub_topic, access_token, claims),
68+
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
6969
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do
7070
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?)
7171

@@ -424,19 +424,19 @@ defmodule RealtimeWeb.RealtimeChannel do
424424
def handle_in("access_token", %{"access_token" => refresh_token}, socket) when is_binary(refresh_token) do
425425
%{
426426
assigns: %{
427-
access_token: access_token,
427+
tenant: tenant_id,
428428
pg_sub_ref: pg_sub_ref,
429429
channel_name: channel_name,
430-
pg_change_params: pg_change_params,
431-
tenant: tenant
430+
pg_change_params: pg_change_params
432431
}
433432
} = socket
434433

435-
socket = assign(socket, :access_token, refresh_token)
434+
# Update token and reset policies
435+
socket = assign(socket, %{access_token: refresh_token, policies: nil})
436436

437437
with {:ok, claims, confirm_token_ref, _, socket} <- confirm_token(socket),
438-
socket = assign_authorization_context(socket, channel_name, access_token, claims),
439-
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant),
438+
socket = assign_authorization_context(socket, channel_name, refresh_token, claims),
439+
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
440440
{:ok, socket} <- maybe_assign_policies(channel_name, db_conn, socket) do
441441
Helpers.cancel_timer(pg_sub_ref)
442442
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))
@@ -455,23 +455,23 @@ defmodule RealtimeWeb.RealtimeChannel do
455455

456456
{:noreply, assign(socket, assigns)}
457457
else
458-
{:error, :unauthorized, msg} ->
459-
shutdown_response(socket, msg)
460-
461-
{:error, :expired_token, msg} ->
458+
{:error, reason, msg} when reason in ~w(unauthorized expired_token token_malformed)a ->
462459
shutdown_response(socket, msg)
463460

464461
{:error, :missing_claims} ->
465462
shutdown_response(socket, "Fields `role` and `exp` are required in JWT")
466463

467-
{:error, :token_malformed, msg} ->
468-
shutdown_response(socket, msg)
469-
470464
{:error, :unable_to_set_policies, _msg} ->
471465
shutdown_response(socket, "Realtime was unable to connect to the project database")
472466

473467
{:error, error} ->
474468
shutdown_response(socket, inspect(error))
469+
470+
{:error, :rpc_error, :timeout} ->
471+
shutdown_response(socket, "Node request timeout")
472+
473+
{:error, :rpc_error, reason} ->
474+
shutdown_response(socket, "RPC call error: " <> inspect(reason))
475475
end
476476
end
477477

@@ -595,22 +595,14 @@ defmodule RealtimeWeb.RealtimeChannel do
595595
end
596596

597597
defp confirm_token(%{assigns: assigns} = socket) do
598-
%{
599-
jwt_secret: jwt_secret,
600-
access_token: access_token,
601-
tenant: tenant_id
602-
} = assigns
598+
%{jwt_secret: jwt_secret, access_token: access_token} = assigns
603599

604-
topic = Map.get(assigns, :topic)
605-
socket = Map.put(socket, :policies, nil)
606600
jwt_jwks = Map.get(assigns, :jwt_jwks)
607601

608602
with jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
609603
{:ok, %{"exp" => exp} = claims} when is_integer(exp) <-
610604
ChannelsAuthorization.authorize_conn(access_token, jwt_secret_dec, jwt_jwks),
611-
exp_diff when exp_diff > 0 <- exp - Joken.current_time(),
612-
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
613-
{:ok, socket} <- maybe_assign_policies(topic, db_conn, socket) do
605+
exp_diff when exp_diff > 0 <- exp - Joken.current_time() do
614606
if ref = assigns[:confirm_token_ref], do: Helpers.cancel_timer(ref)
615607

616608
interval = min(@confirm_token_ms_interval, exp_diff * 1000)
@@ -761,7 +753,7 @@ defmodule RealtimeWeb.RealtimeChannel do
761753
end
762754

763755
defp maybe_assign_policies(topic, db_conn, %{assigns: %{private?: true}} = socket)
764-
when not is_nil(topic) and not is_nil(db_conn) do
756+
when not is_nil(topic) do
765757
authorization_context = socket.assigns.authorization_context
766758
policies = socket.assigns.policies || %Policies{}
767759

@@ -785,9 +777,7 @@ defmodule RealtimeWeb.RealtimeChannel do
785777
end
786778
end
787779

788-
defp maybe_assign_policies(_, _, socket) do
789-
{:ok, assign(socket, policies: nil)}
790-
end
780+
defp maybe_assign_policies(_, _, socket), do: {:ok, assign(socket, policies: nil)}
791781

792782
defp only_private?(tenant_id, %{assigns: %{private?: private?}}) do
793783
tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id)

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.41.20",
7+
version: "2.41.21",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1621,8 +1621,7 @@ defmodule Realtime.Integration.RtChannelTest do
16211621
tenant = Tenants.get_tenant_by_external_id(tenant.external_id)
16221622
Realtime.Api.update_tenant(tenant, %{jwt_jwks: %{keys: ["potato"]}})
16231623

1624-
Process.sleep(500)
1625-
assert {:error, %Mint.TransportError{reason: :closed}} = WebsocketClient.send_heartbeat(socket)
1624+
assert_process_down(socket)
16261625
end
16271626

16281627
test "on jwt_secret the socket closes and sends a system message", %{tenant: tenant, topic: topic} do
@@ -1638,8 +1637,7 @@ defmodule Realtime.Integration.RtChannelTest do
16381637
tenant = Tenants.get_tenant_by_external_id(tenant.external_id)
16391638
Realtime.Api.update_tenant(tenant, %{jwt_secret: "potato"})
16401639

1641-
Process.sleep(500)
1642-
assert {:error, %Mint.TransportError{reason: :closed}} = WebsocketClient.send_heartbeat(socket)
1640+
assert_process_down(socket)
16431641
end
16441642

16451643
test "on private_only the socket closes and sends a system message", %{tenant: tenant, topic: topic} do
@@ -1655,8 +1653,7 @@ defmodule Realtime.Integration.RtChannelTest do
16551653
tenant = Tenants.get_tenant_by_external_id(tenant.external_id)
16561654
Realtime.Api.update_tenant(tenant, %{private_only: true})
16571655

1658-
Process.sleep(500)
1659-
assert {:error, %Mint.TransportError{reason: :closed}} = WebsocketClient.send_heartbeat(socket)
1656+
assert_process_down(socket)
16601657
end
16611658

16621659
test "on other param changes the socket won't close and no message is sent", %{tenant: tenant, topic: topic} do
@@ -1712,8 +1709,7 @@ defmodule Realtime.Integration.RtChannelTest do
17121709

17131710
SocketDisconnect.distributed_disconnect(tenant)
17141711

1715-
Process.sleep(500)
1716-
assert {:error, %Mint.TransportError{reason: :closed}} = WebsocketClient.send_heartbeat(socket)
1712+
assert_process_down(socket)
17171713
end
17181714
end
17191715

@@ -1746,6 +1742,7 @@ defmodule Realtime.Integration.RtChannelTest do
17461742

17471743
test "max_events_per_second limit respected", %{tenant: tenant} do
17481744
%{max_events_per_second: max_concurrent_users} = Tenants.get_tenant_by_external_id(tenant.external_id)
1745+
on_exit(fn -> change_tenant_configuration(tenant, :max_events_per_second, max_concurrent_users) end)
17491746
change_tenant_configuration(tenant, :max_events_per_second, 1)
17501747

17511748
{socket, _} = get_connection(tenant, "authenticated")
@@ -1755,7 +1752,12 @@ defmodule Realtime.Integration.RtChannelTest do
17551752
WebsocketClient.join(socket, realtime_topic, %{config: config})
17561753

17571754
for _ <- 1..1000 do
1758-
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
1755+
try do
1756+
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
1757+
catch
1758+
_, _ -> :ok
1759+
end
1760+
17591761
1..5 |> Enum.random() |> Process.sleep()
17601762
end
17611763

@@ -1770,8 +1772,7 @@ defmodule Realtime.Integration.RtChannelTest do
17701772
2000
17711773

17721774
assert_receive %Message{event: "phx_close"}
1773-
1774-
change_tenant_configuration(tenant, :max_events_per_second, max_concurrent_users)
1775+
assert_process_down(socket)
17751776
end
17761777

17771778
test "max_channels_per_client limit respected", %{tenant: tenant} do
@@ -2181,9 +2182,8 @@ defmodule Realtime.Integration.RtChannelTest do
21812182
end
21822183

21832184
# wait to trigger tracker
2184-
Process.sleep(5000)
2185+
assert_process_down(socket, 5000)
21852186
assert [] = Tracker.list_pids()
2186-
assert {:error, %Mint.TransportError{reason: :closed}} = WebsocketClient.send_heartbeat(socket)
21872187
end
21882188

21892189
test "failed connections are present in tracker with counter counter lower than 0 so they are actioned on by tracker",
@@ -2341,4 +2341,9 @@ defmodule Realtime.Integration.RtChannelTest do
23412341

23422342
Realtime.Tenants.Cache.invalidate_tenant_cache(external_id)
23432343
end
2344+
2345+
defp assert_process_down(pid, timeout \\ 100) do
2346+
ref = Process.monitor(pid)
2347+
assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout
2348+
end
23442349
end

test/realtime/gen_rpc_test.exs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,36 @@ defmodule Realtime.GenRpcTest do
121121
}}
122122
end
123123

124+
test "local node exception" do
125+
current_node = node()
126+
127+
assert {:error, :rpc_error, _} = GenRpc.call(current_node, Map, :fetch!, [%{}, :a], tenant_id: "123")
128+
129+
assert_receive {[:realtime, :rpc], %{latency: _},
130+
%{
131+
origin_node: ^current_node,
132+
target_node: ^current_node,
133+
success: false,
134+
tenant: "123",
135+
mechanism: :gen_rpc
136+
}}
137+
end
138+
139+
test "remote node exception", %{node: node} do
140+
current_node = node()
141+
142+
assert {:error, :rpc_error, _} = GenRpc.call(node, Map, :fetch!, [%{}, :a], tenant_id: "123")
143+
144+
assert_receive {[:realtime, :rpc], %{latency: _},
145+
%{
146+
origin_node: ^current_node,
147+
target_node: ^node,
148+
success: false,
149+
tenant: "123",
150+
mechanism: :gen_rpc
151+
}}
152+
end
153+
124154
@tag extra_config: [{:gen_rpc, :tcp_server_port, 9999}]
125155
test "bad tcp error", %{node: node} do
126156
current_node = node()

test/realtime/monitoring/prom_ex/plugins/distributed_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ defmodule Realtime.PromEx.Plugins.DistributedTest do
5757

5858
test "queue_size", %{metrics: metrics, node: node} do
5959
pattern = ~r/dist_queue_size{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
60-
assert metric_value(metrics, pattern) == 0
60+
assert is_integer(metric_value(metrics, pattern))
6161
end
6262
end
6363

0 commit comments

Comments
 (0)