Skip to content

Commit b500a58

Browse files
authored
fix: improve join payload handling (#1501)
improves error handling on join by doing basic checks on types received by realtime. it's the scafold for future validations with more informative errors to the users
1 parent 9daf263 commit b500a58

File tree

10 files changed

+384
-1
lines changed

10 files changed

+384
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ This is the list of operational codes that can help you understand your deployme
225225
| Code | Description |
226226
| ---------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
227227
| TopicNameRequired | You are trying to use Realtime without a topic name set |
228+
| InvalidJoinPayload | The payload provided to Realtime on connect is invalid |
228229
| RealtimeDisabledForConfiguration | The configuration provided to Realtime on connect will not be able to provide you any Postgres Changes |
229230
| TenantNotFound | The tenant you are trying to connect to does not exist |
230231
| ErrorConnectingToWebsocket | Error when trying to connect to the WebSocket server |
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Broadcast do
2+
@moduledoc """
3+
Validate broadcast field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
9+
embedded_schema do
10+
field :ack, :boolean, default: false
11+
field :self, :boolean, default: false
12+
end
13+
14+
def changeset(broadcast, attrs) do
15+
cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2)
16+
end
17+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Config do
2+
@moduledoc """
3+
Validate config field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
alias RealtimeWeb.Channels.Payloads.Broadcast
9+
alias RealtimeWeb.Channels.Payloads.Presence
10+
alias RealtimeWeb.Channels.Payloads.PostgresChange
11+
12+
embedded_schema do
13+
embeds_one :broadcast, Broadcast
14+
embeds_one :presence, Presence
15+
embeds_many :postgres_changes, PostgresChange
16+
field :private, :boolean, default: false
17+
end
18+
19+
def changeset(config, attrs) do
20+
config
21+
|> cast(attrs, [:private], message: &Join.error_message/2)
22+
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")
23+
|> cast_embed(:presence, invalid_message: "unable to parse, expected a map")
24+
|> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps")
25+
end
26+
end
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Join do
2+
@moduledoc """
3+
Payload validation for the phx_join event.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Config
8+
alias RealtimeWeb.Channels.Payloads.Broadcast
9+
alias RealtimeWeb.Channels.Payloads.Presence
10+
11+
embedded_schema do
12+
embeds_one :config, Config
13+
field :access_token, :string
14+
field :user_token, :string
15+
end
16+
17+
def changeset(join, attrs) do
18+
join
19+
|> cast(attrs, [:access_token, :user_token], message: &error_message/2)
20+
|> cast_embed(:config, invalid_message: "unable to parse, expected a map")
21+
end
22+
23+
@spec validate(map()) :: {:ok, %__MODULE__{}} | {:error, :invalid_join_payload, map()}
24+
def validate(params) do
25+
case changeset(%__MODULE__{}, params) do
26+
%Ecto.Changeset{valid?: true} = changeset ->
27+
{:ok, Ecto.Changeset.apply_changes(changeset)}
28+
29+
%Ecto.Changeset{valid?: false} = changeset ->
30+
errors = Ecto.Changeset.traverse_errors(changeset, &elem(&1, 0))
31+
{:error, :invalid_join_payload, errors}
32+
end
33+
end
34+
35+
def presence_enabled?(%__MODULE__{config: %Config{presence: %Presence{enabled: enabled}}}), do: enabled
36+
def presence_enabled?(_), do: true
37+
38+
def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: ""}}}), do: UUID.uuid1()
39+
def presence_key(%__MODULE__{config: %Config{presence: %Presence{key: key}}}), do: key
40+
def presence_key(_), do: UUID.uuid1()
41+
42+
def ack_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{ack: ack}}}), do: ack
43+
def ack_broadcast?(_), do: false
44+
45+
def self_broadcast?(%__MODULE__{config: %Config{broadcast: %Broadcast{self: self}}}), do: self
46+
def self_broadcast?(_), do: false
47+
48+
def private?(%__MODULE__{config: %Config{private: private}}), do: private
49+
def private?(_), do: false
50+
51+
def error_message(_field, meta) do
52+
type = Keyword.get(meta, :type)
53+
54+
if type,
55+
do: "unable to parse, expected #{type}",
56+
else: "unable to parse"
57+
end
58+
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
defmodule RealtimeWeb.Channels.Payloads.PostgresChange do
2+
@moduledoc """
3+
Validate postgres_changes field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
9+
embedded_schema do
10+
field :event, :string
11+
field :schema, :string
12+
field :table, :string
13+
field :filter, :string
14+
end
15+
16+
def changeset(postgres_change, attrs) do
17+
cast(postgres_change, attrs, [:event, :schema, :table, :filter], message: &Join.error_message/2)
18+
end
19+
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Presence do
2+
@moduledoc """
3+
Validate presence field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
9+
embedded_schema do
10+
field :enabled, :boolean, default: true
11+
field :key, :string, default: UUID.uuid1()
12+
end
13+
14+
def changeset(presence, attrs) do
15+
cast(presence, attrs, [:enabled, :key], message: &Join.error_message/2)
16+
end
17+
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ defmodule RealtimeWeb.RealtimeChannel do
2121
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
2222
alias Realtime.Tenants.Connect
2323

24+
alias RealtimeWeb.Channels.Payloads.Join
2425
alias RealtimeWeb.ChannelsAuthorization
2526
alias RealtimeWeb.RealtimeChannel.BroadcastHandler
2627
alias RealtimeWeb.RealtimeChannel.MessageDispatcher
@@ -54,6 +55,15 @@ defmodule RealtimeWeb.RealtimeChannel do
5455
|> assign(:private?, !!params["config"]["private"])
5556
|> assign(:policies, nil)
5657

58+
case Join.validate(params) do
59+
{:ok, _join} ->
60+
nil
61+
62+
{:error, :invalid_join_payload, errors} ->
63+
log_params = params |> Map.put("access_token", "<redacted>") |> Map.put("user_token", "<redacted>")
64+
log_error(socket, "InvalidJoinPayload", %{changeset_errors: errors, params: log_params})
65+
end
66+
5767
with :ok <- SignalHandler.shutdown_in_progress?(),
5868
:ok <- only_private?(tenant_id, socket),
5969
:ok <- limit_joins(socket),

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.44.0",
7+
version: "2.44.1",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

phx_join.schema.json

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
{
2+
"type": "object",
3+
"description": "",
4+
"title": "phx_join",
5+
"required": [
6+
"access_token",
7+
"config"
8+
],
9+
"properties": {
10+
"config": {
11+
"title": "config",
12+
"$ref": "#/$defs/phx_join_config"
13+
},
14+
"access_token": {
15+
"type": "string",
16+
"description": "String, e.g. 'hello'",
17+
"title": "access_token"
18+
}
19+
},
20+
"additionalProperties": false,
21+
"$defs": {
22+
"phx_join_config": {
23+
"type": "object",
24+
"description": "",
25+
"title": "phx_join_config",
26+
"properties": {
27+
"private": {
28+
"type": "boolean",
29+
"description": "Boolean, e.g. true",
30+
"title": "private",
31+
"default": false
32+
},
33+
"broadcast": {
34+
"title": "broadcast",
35+
"$ref": "#/$defs/phx_join_broadcast",
36+
"default": {
37+
"self": false,
38+
"ack": false
39+
}
40+
},
41+
"presence": {
42+
"title": "presence",
43+
"$ref": "#/$defs/phx_join_presence",
44+
"default": {
45+
"enabled": false,
46+
"key": ""
47+
}
48+
},
49+
"postgres_changes": {
50+
"type": "array",
51+
"title": "phx_join_postgres_changes",
52+
"items": {
53+
"$ref": "#/$defs/phx_join_postgres_changes",
54+
"default": {
55+
"table": "",
56+
"filter": "",
57+
"schema": "",
58+
"event": ""
59+
}
60+
}
61+
}
62+
},
63+
"additionalProperties": false
64+
},
65+
"phx_join_broadcast": {
66+
"type": "object",
67+
"description": "",
68+
"title": "phx_join_broadcast",
69+
"properties": {
70+
"self": {
71+
"type": "boolean",
72+
"description": "Boolean, e.g. true",
73+
"title": "self",
74+
"default": false
75+
},
76+
"ack": {
77+
"type": "boolean",
78+
"description": "Boolean, e.g. true",
79+
"title": "ack",
80+
"default": false
81+
}
82+
},
83+
"additionalProperties": false
84+
},
85+
"phx_join_postgres_changes": {
86+
"type": "object",
87+
"description": "",
88+
"title": "phx_join_postgres_changes",
89+
"required": [
90+
"event",
91+
"filter",
92+
"schema",
93+
"table"
94+
],
95+
"properties": {
96+
"table": {
97+
"type": "string",
98+
"description": "String, e.g. 'hello'",
99+
"title": "table"
100+
},
101+
"filter": {
102+
"type": "string",
103+
"description": "String, e.g. 'hello'",
104+
"title": "filter"
105+
},
106+
"schema": {
107+
"type": "string",
108+
"description": "String, e.g. 'hello'",
109+
"title": "schema"
110+
},
111+
"event": {
112+
"type": "string",
113+
"description": "String, e.g. 'hello'",
114+
"title": "event"
115+
}
116+
},
117+
"additionalProperties": false
118+
},
119+
"phx_join_presence": {
120+
"type": "object",
121+
"description": "",
122+
"title": "phx_join_presence",
123+
"properties": {
124+
"enabled": {
125+
"type": "boolean",
126+
"description": "Boolean, e.g. true",
127+
"title": "enabled",
128+
"default": false
129+
},
130+
"key": {
131+
"type": "string",
132+
"description": "String, e.g. 'hello'",
133+
"title": "key"
134+
}
135+
},
136+
"additionalProperties": false
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)