@@ -29,7 +29,9 @@ defmodule Realtime.Tenants.BatchBroadcast do
29
29
@ spec broadcast (
30
30
auth_params :: map ( ) | nil ,
31
31
tenant :: Tenant . t ( ) ,
32
- messages :: % { messages: list ( % { topic: String . t ( ) , payload: map ( ) , event: String . t ( ) , private: boolean ( ) } ) } ,
32
+ messages :: % {
33
+ messages: list ( % { id: String . t ( ) , topic: String . t ( ) , payload: map ( ) , event: String . t ( ) , private: boolean ( ) } )
34
+ } ,
33
35
super_user :: boolean ( )
34
36
) :: :ok | { :error , atom ( ) }
35
37
def broadcast ( auth_params , tenant , messages , super_user \\ false )
@@ -59,8 +61,8 @@ defmodule Realtime.Tenants.BatchBroadcast do
59
61
# Handle events for public channel
60
62
events
61
63
|> Map . get ( false , [ ] )
62
- |> Enum . each ( fn % { topic: sub_topic , payload: payload , event: event } ->
63
- send_message_and_count ( tenant , events_per_second_rate , sub_topic , event , payload , true )
64
+ |> Enum . each ( fn message ->
65
+ send_message_and_count ( tenant , events_per_second_rate , message , true )
64
66
end )
65
67
66
68
# Handle events for private channel
@@ -69,14 +71,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
69
71
|> Enum . group_by ( fn event -> Map . get ( event , :topic ) end )
70
72
|> Enum . each ( fn { topic , events } ->
71
73
if super_user do
72
- Enum . each ( events , fn % { topic: sub_topic , payload: payload , event: event } ->
73
- send_message_and_count ( tenant , events_per_second_rate , sub_topic , event , payload , false )
74
+ Enum . each ( events , fn message ->
75
+ send_message_and_count ( tenant , events_per_second_rate , message , false )
74
76
end )
75
77
else
76
78
case permissions_for_message ( tenant , auth_params , topic ) do
77
79
% Policies { broadcast: % BroadcastPolicies { write: true } } ->
78
- Enum . each ( events , fn % { topic: sub_topic , payload: payload , event: event } ->
79
- send_message_and_count ( tenant , events_per_second_rate , sub_topic , event , payload , false )
80
+ Enum . each ( events , fn message ->
81
+ send_message_and_count ( tenant , events_per_second_rate , message , false )
80
82
end )
81
83
82
84
_ ->
@@ -91,15 +93,15 @@ defmodule Realtime.Tenants.BatchBroadcast do
91
93
92
94
def broadcast ( _ , nil , _ , _ ) , do: { :error , :tenant_not_found }
93
95
94
- def changeset ( payload , attrs ) do
96
+ defp changeset ( payload , attrs ) do
95
97
payload
96
98
|> cast ( attrs , [ ] )
97
99
|> cast_embed ( :messages , required: true , with: & message_changeset / 2 )
98
100
end
99
101
100
- def message_changeset ( message , attrs ) do
102
+ defp message_changeset ( message , attrs ) do
101
103
message
102
- |> cast ( attrs , [ :topic , :payload , :event , :private ] )
104
+ |> cast ( attrs , [ :id , : topic, :payload , :event , :private ] )
103
105
|> maybe_put_private_change ( )
104
106
|> validate_required ( [ :topic , :payload , :event ] )
105
107
end
@@ -112,11 +114,19 @@ defmodule Realtime.Tenants.BatchBroadcast do
112
114
end
113
115
114
116
@ event_type "broadcast"
115
- defp send_message_and_count ( tenant , events_per_second_rate , topic , event , payload , public? ) do
116
- tenant_topic = Tenants . tenant_topic ( tenant , topic , public? )
117
- payload = % { "payload" => payload , "event" => event , "type" => "broadcast" }
117
+ defp send_message_and_count ( tenant , events_per_second_rate , message , public? ) do
118
+ tenant_topic = Tenants . tenant_topic ( tenant , message . topic , public? )
118
119
119
- broadcast = % Phoenix.Socket.Broadcast { topic: topic , event: @ event_type , payload: payload }
120
+ payload = % { "payload" => message . payload , "event" => message . event , "type" => "broadcast" }
121
+
122
+ payload =
123
+ if message [ :id ] do
124
+ Map . put ( payload , "meta" , % { "id" => message . id } )
125
+ else
126
+ payload
127
+ end
128
+
129
+ broadcast = % Phoenix.Socket.Broadcast { topic: message . topic , event: @ event_type , payload: payload }
120
130
121
131
GenCounter . add ( events_per_second_rate . id )
122
132
TenantBroadcaster . pubsub_broadcast ( tenant . external_id , tenant_topic , broadcast , RealtimeChannel.MessageDispatcher )
0 commit comments