@@ -74,7 +74,6 @@ import Util.Options
74
74
import Wire.API.Internal.Notification
75
75
import Wire.API.Notification
76
76
import Wire.API.Presence (Presence (.. ))
77
- import Wire.API.Presence qualified as Presence
78
77
import Wire.API.Push.Token qualified as Public
79
78
import Wire.API.Push.V2
80
79
import Wire.API.User (UserSet (.. ))
@@ -146,17 +145,19 @@ instance MonadMapAsync Gundeck where
146
145
Nothing -> mapAsync f l
147
146
Just chunkSize -> concat <$> mapM (mapAsync f) (List. chunksOf chunkSize l)
148
147
149
- splitPushes :: (MonadPushAll m ) => [Push ] -> m ([Push ], [Push ])
148
+ splitPushes :: (MonadPushAll m ) => [Push ] -> m ([Push ], [Push ], UserClientsFull )
150
149
splitPushes ps = do
151
150
allUserClients <- mpaGetClients (Set. unions $ map (\ p -> Set. map (. _recipientId) $ p. _pushRecipients) ps)
152
- pure . partitionHereThere $ map (splitPush allUserClients) ps
151
+ let (rabbitmqPushes, legacyPushes) = partitionHereThere $ map (splitPush allUserClients) ps
152
+ pure (rabbitmqPushes, legacyPushes, allUserClients)
153
153
154
154
-- | Split a push into rabbitmq and legacy push. This code exists to help with
155
155
-- migration. Once it is completed and old APIs are not supported anymore we can
156
156
-- assume everything is meant for RabbitMQ and stop splitting.
157
157
splitPush ::
158
158
UserClientsFull ->
159
159
Push ->
160
+ -- | These rabbitmqPush cassandraPush
160
161
These Push Push
161
162
splitPush clientsFull p = do
162
163
let (rabbitmqRecipients, legacyRecipients) =
@@ -177,7 +178,6 @@ splitPush clientsFull p = do
177
178
RecipientClientsSome cs ->
178
179
Set. filter (\ c -> c. clientId `elem` toList cs) allClients
179
180
RecipientClientsAll -> allClients
180
- RecipientClientsTemporaryOnly -> mempty
181
181
(rabbitmqClients, legacyClients) = Set. partition supportsConsumableNotifications relevantClients
182
182
rabbitmqClientIds = (. clientId) <$> Set. toList rabbitmqClients
183
183
legacyClientIds = (. clientId) <$> Set. toList legacyClients
@@ -190,13 +190,16 @@ splitPush clientsFull p = do
190
190
-- We return all clients for RabbitMQ even if there are no real
191
191
-- clients so a temporary client can still read the notifications on
192
192
-- RabbitMQ.
193
- ( These rcpt {_recipientClients = RecipientClientsTemporaryOnly } rcpt)
193
+ That rcpt
194
194
(_, [] ) ->
195
- ( This rcpt)
195
+ This rcpt
196
196
(r : rs, l : ls) ->
197
- These
198
- rcpt {_recipientClients = RecipientClientsSome $ list1 r rs}
199
- rcpt {_recipientClients = RecipientClientsSome $ list1 l ls}
197
+ let rabbitMqRecipients = case rcpt. _recipientClients of
198
+ RecipientClientsAll -> RecipientClientsAll
199
+ RecipientClientsSome _ -> RecipientClientsSome $ list1 r rs
200
+ in These
201
+ rcpt {_recipientClients = rabbitMqRecipients}
202
+ rcpt {_recipientClients = RecipientClientsSome $ list1 l ls}
200
203
201
204
getClients :: Set UserId -> Gundeck UserClientsFull
202
205
getClients uids = do
@@ -221,13 +224,13 @@ getClients uids = do
221
224
pushAll :: (MonadPushAll m , MonadNativeTargets m , MonadMapAsync m , Log. MonadLogger m ) => [Push ] -> m ()
222
225
pushAll pushes = do
223
226
Log. debug $ msg (val " pushing" ) . Log. field " pushes" (Aeson. encode pushes)
224
- (rabbitmqPushes, legacyPushes) <- splitPushes pushes
227
+ (rabbitmqPushes, legacyPushes, allUserClients ) <- splitPushes pushes
225
228
226
229
legacyNotifs <- mapM mkNewNotification legacyPushes
227
- pushAllLegacy legacyNotifs
230
+ pushAllLegacy legacyNotifs allUserClients
228
231
229
232
rabbitmqNotifs <- mapM mkNewNotification rabbitmqPushes
230
- pushAllViaRabbitMq rabbitmqNotifs
233
+ pushAllViaRabbitMq rabbitmqNotifs allUserClients
231
234
232
235
-- Note that Cells needs all notifications because it doesn't matter whether
233
236
-- some recipients have rabbitmq clients or not.
@@ -240,8 +243,8 @@ pushAll pushes = do
240
243
241
244
-- | Construct and send a single bulk push request to the client. Write the 'Notification's from
242
245
-- the request to C*. Trigger native pushes for all delivery failures notifications.
243
- pushAllLegacy :: (MonadPushAll m , MonadNativeTargets m , MonadMapAsync m ) => [NewNotification ] -> m ()
244
- pushAllLegacy newNotifications = do
246
+ pushAllLegacy :: (MonadPushAll m , MonadNativeTargets m , MonadMapAsync m ) => [NewNotification ] -> UserClientsFull -> m ()
247
+ pushAllLegacy newNotifications userClientsFull = do
245
248
-- persist push request
246
249
let cassandraTargets :: [CassandraTargets ]
247
250
cassandraTargets = map mkCassandraTargets newNotifications
@@ -257,11 +260,14 @@ pushAllLegacy newNotifications = do
257
260
wsTargets <- mapM mkWSTargets newNotifications
258
261
resp <- compilePushResps wsTargets <$> mpaBulkPush (compilePushReq <$> wsTargets)
259
262
-- native push
260
- forM_ resp $ \ ((notif :: Notification , psh :: Push ), alreadySent :: [Presence ]) ->
261
- pushNativeWithBudget notif psh alreadySent
262
-
263
- pushNativeWithBudget :: (MonadMapAsync m , MonadPushAll m , MonadNativeTargets m ) => Notification -> Push -> [Presence ] -> m ()
264
- pushNativeWithBudget notif psh alreadySent = do
263
+ forM_ resp $ \ ((notif :: Notification , psh :: Push ), alreadySent :: [Presence ]) -> do
264
+ let alreadySentClients = Set. fromList $ mapMaybe (\ p -> (p. userId,) <$> p. clientId) alreadySent
265
+ rabbitmqClients = Map. map (Set. filter supportsConsumableNotifications) userClientsFull. userClientsFull
266
+ rabbitmqClientIds = Map. foldMapWithKey (\ uid clients -> Set. map (\ c -> (uid, c. clientId)) clients) rabbitmqClients
267
+ pushNativeWithBudget notif psh (Set. toList $ Set. union alreadySentClients rabbitmqClientIds)
268
+
269
+ pushNativeWithBudget :: (MonadMapAsync m , MonadPushAll m , MonadNativeTargets m ) => Notification -> Push -> [(UserId , ClientId )] -> m ()
270
+ pushNativeWithBudget notif psh dontPush = do
265
271
perPushConcurrency <- mntgtPerPushConcurrency
266
272
let rcps' = nativeTargetsRecipients psh
267
273
cost = maybe (length rcps') (min (length rcps')) perPushConcurrency
@@ -272,14 +278,16 @@ pushNativeWithBudget notif psh alreadySent = do
272
278
-- to cassandra and SNS are limited to 'perNativePushConcurrency' in parallel.
273
279
unless (psh ^. pushTransient) $
274
280
mpaRunWithBudget cost () $
275
- mpaPushNative notif (psh ^. pushNativePriority) =<< nativeTargets psh rcps' alreadySent
281
+ mpaPushNative notif (psh ^. pushNativePriority) =<< nativeTargets psh rcps' dontPush
276
282
277
- pushAllViaRabbitMq :: (MonadPushAll m , MonadMapAsync m , MonadNativeTargets m ) => [NewNotification ] -> m ()
278
- pushAllViaRabbitMq newNotifs = do
283
+ pushAllViaRabbitMq :: (MonadPushAll m , MonadMapAsync m , MonadNativeTargets m ) => [NewNotification ] -> UserClientsFull -> m ()
284
+ pushAllViaRabbitMq newNotifs userClientsFull = do
279
285
for_ newNotifs $ pushViaRabbitMq
280
286
mpaForkIO $ do
281
- for_ newNotifs $ \ newNotif ->
282
- pushNativeWithBudget newNotif. nnNotification newNotif. nnPush []
287
+ for_ newNotifs $ \ newNotif -> do
288
+ let cassandraClients = Map. map (Set. filter $ not . supportsConsumableNotifications) userClientsFull. userClientsFull
289
+ cassandraClientIds = Map. foldMapWithKey (\ uid clients -> Set. map (\ c -> (uid, c. clientId)) clients) cassandraClients
290
+ pushNativeWithBudget newNotif. nnNotification newNotif. nnPush (Set. toList $ cassandraClientIds)
283
291
284
292
pushViaRabbitMq :: (MonadPushAll m ) => NewNotification -> m ()
285
293
pushViaRabbitMq newNotif = do
@@ -291,11 +299,7 @@ pushViaRabbitMq newNotif = do
291
299
RecipientClientsAll ->
292
300
Set. singleton $ userRoutingKey r. _recipientId
293
301
RecipientClientsSome (toList -> cs) ->
294
- Set. fromList $
295
- temporaryRoutingKey r. _recipientId
296
- : map (clientRoutingKey r. _recipientId) cs
297
- RecipientClientsTemporaryOnly ->
298
- Set. singleton $ temporaryRoutingKey r. _recipientId
302
+ Set. fromList $ map (clientRoutingKey r. _recipientId) cs
299
303
for_ routingKeys $ \ routingKey ->
300
304
mpaPublishToRabbitMq userNotificationExchangeName routingKey qMsg
301
305
@@ -382,7 +386,6 @@ mkCassandraTargets NewNotification {..} =
382
386
-- clients are stored in cassandra as a list with a notification. empty list
383
387
-- is interpreted as "all clients" by 'Gundeck.Notification.Data.toNotif'.
384
388
RecipientClientsSome cs -> Just $ toList cs
385
- RecipientClientsTemporaryOnly -> Nothing
386
389
pure $ target (r ^. recipientId) & targetClients .~ clients
387
390
388
391
-- | Information needed to push notifications over websockets and/or native
@@ -465,9 +468,9 @@ nativeTargets ::
465
468
(MonadNativeTargets m , MonadMapAsync m ) =>
466
469
Push ->
467
470
[Recipient ] ->
468
- [Presence ] ->
471
+ [( UserId , ClientId ) ] ->
469
472
m [Address ]
470
- nativeTargets psh rcps' alreadySent =
473
+ nativeTargets psh rcps' dontPush =
471
474
mntgtMapAsync addresses rcps' >>= fmap concat . mapM check
472
475
where
473
476
addresses :: Recipient -> m [Address ]
@@ -483,14 +486,9 @@ nativeTargets psh rcps' alreadySent =
483
486
-- Is the client not whitelisted?
484
487
| not (whitelistedOrNoWhitelist a) = False
485
488
-- Include client if not found in already served presences.
486
- | otherwise = isNothing (List. find (isOnline a) alreadySent)
487
- isOnline a x =
488
- a ^. addrUser == Presence. userId x
489
- && (a ^. addrConn == Presence. connId x || equalClient a x)
490
- equalClient a x = Just (a ^. addrClient) == Presence. clientId x
489
+ | otherwise = not $ List. elem (a ^. addrUser, a ^. addrClient) dontPush -- (List.find (isOnline a) alreadySent)
491
490
eligibleClient _ RecipientClientsAll = True
492
491
eligibleClient a (RecipientClientsSome cs) = (a ^. addrClient) `elem` cs
493
- eligibleClient _ RecipientClientsTemporaryOnly = False
494
492
whitelistedOrNoWhitelist a =
495
493
null (psh ^. pushConnections)
496
494
|| a ^. addrConn `elem` psh ^. pushConnections
0 commit comments