Skip to content

Commit db54671

Browse files
committed
fix: shard user scopes in syn
1 parent d5658ad commit db54671

File tree

7 files changed

+29
-11
lines changed

7 files changed

+29
-11
lines changed

config/runtime.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws
7070
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
7171
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
7272
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
73+
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
7374

7475
no_channel_timeout_in_ms =
7576
if config_env() == :test,
@@ -126,7 +127,8 @@ config :realtime,
126127
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
127128
platform: platform,
128129
pubsub_adapter: pubsub_adapter,
129-
broadcast_pool_size: broadcast_pool_size
130+
broadcast_pool_size: broadcast_pool_size,
131+
users_scope_shards: users_scope_shards
130132

131133
if config_env() != :test && run_janitor? do
132134
config :realtime,

lib/realtime/application.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ defmodule Realtime.Application do
4646
Realtime.PromEx.set_metrics_tags()
4747
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
4848
:syn.set_event_handler(Realtime.SynHandler)
49-
50-
:ok = :syn.add_node_to_scopes([:users, RegionNodes, Realtime.Tenants.Connect])
49+
:ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])
5150

5251
region = Application.get_env(:realtime, :region)
5352
:syn.join(RegionNodes, region, self(), node: node())

lib/realtime/tenants.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ defmodule Realtime.Tenants do
2121
"""
2222
@spec list_connected_tenants(atom()) :: [String.t()]
2323
def list_connected_tenants(node) do
24-
:syn.group_names(:users, node)
24+
UsersCounter.scopes()
25+
|> Enum.map(fn scope -> :syn.group_names(scope, node) end)
26+
|> List.flatten()
2527
end
2628

2729
@doc """

lib/realtime/user_counter.ex

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,32 @@ defmodule Realtime.UsersCounter do
88
Adds a RealtimeChannel pid to the `:users` scope for a tenant so we can keep track of all connected clients for a tenant.
99
"""
1010
@spec add(pid(), String.t()) :: :ok
11-
def add(pid, tenant), do: :syn.join(:users, tenant, pid)
11+
def add(pid, tenant_id), do: tenant_id |> scope() |> :syn.join(tenant_id, pid)
1212

1313
@doc """
1414
Returns the count of all connected clients for a tenant for the cluster.
1515
"""
1616
@spec tenant_users(String.t()) :: non_neg_integer()
17-
def tenant_users(tenant), do: :syn.member_count(:users, tenant)
17+
def tenant_users(tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id)
1818

1919
@doc """
2020
Returns the count of all connected clients for a tenant for a single node.
2121
"""
2222
@spec tenant_users(atom, String.t()) :: non_neg_integer()
23-
def tenant_users(node_name, tenant), do: :syn.member_count(:users, tenant, node_name)
23+
def tenant_users(node_name, tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id, node_name)
24+
25+
@doc """
26+
Returns the scope for a given tenant id.
27+
"""
28+
@spec scope(String.t()) :: atom()
29+
def scope(tenant_id) do
30+
shards = Application.get_env(:realtime, :users_scope_shards)
31+
shard = :erlang.phash2(tenant_id, shards)
32+
:"users_#{shard}"
33+
end
34+
35+
def scopes() do
36+
shards = Application.get_env(:realtime, :users_scope_shards)
37+
Enum.map(0..(shards - 1), fn shard -> :"users_#{shard}" end)
38+
end
2439
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.53.1",
7+
version: "2.53.2",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2588,7 +2588,7 @@ defmodule Realtime.Integration.RtChannelTest do
25882588
Realtime.Tenants.Cache.invalidate_tenant_cache(external_id)
25892589
end
25902590

2591-
defp assert_process_down(pid, timeout \\ 100) do
2591+
defp assert_process_down(pid, timeout \\ 300) do
25922592
ref = Process.monitor(pid)
25932593
assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout
25942594
end

test/realtime/tenants/connect_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,9 +328,9 @@ defmodule Realtime.Tenants.ConnectTest do
328328
region = Tenants.region(tenant)
329329
assert {_pid, %{conn: ^db_conn, region: ^region}} = :syn.lookup(Connect, external_id)
330330
Process.sleep(1000)
331-
:syn.leave(:users, external_id, self())
331+
external_id |> UsersCounter.scope() |> :syn.leave(external_id, self())
332332
Process.sleep(1000)
333-
assert :undefined = :syn.lookup(Connect, external_id)
333+
assert :undefined = external_id |> UsersCounter.scope() |> :syn.lookup(external_id)
334334
refute Process.alive?(db_conn)
335335
Connect.shutdown(external_id)
336336
end

0 commit comments

Comments
 (0)