Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions changelog.d/0-release-notes/background-worker
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Introduce background-worker (#3276, #3314, #3333)
Introduce background-worker (#3276, #3314, #3333, #3366)

This release introduces a new component: background-worker. This is currently
only used to forward notifications to federated backends. Enabling federation in
Expand Down Expand Up @@ -36,15 +36,12 @@ background-worker:
host: rabbitmq
port: 5672
vHost: /
remoteDomains: []
adminPort: 15672
secrets:
rabbitmq:
username: <YOUR_USERNAME>
password: <YOUR_PASSWORD>
```

The above are the default values (except for secrets, which do not have
defaults), if they work they are not required to be configured.
`background-worker.config.remoteDomains` should contain all the remote domains
with which the wire-server instance allows federating. This change is
incompatible with open-federation.
defaults), if they work they are not required to be configured.
4 changes: 2 additions & 2 deletions charts/background-worker/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ data:
port: 8080
rabbitmq:
{{toYaml .rabbitmq | indent 6 }}
remoteDomains:
{{toYaml .remoteDomains | indent 6 }}
backendNotificationPusher:
{{toYaml .backendNotificationPusher | indent 6 }}
{{- end }}
4 changes: 3 additions & 1 deletion charts/background-worker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ config:
host: rabbitmq
port: 5672
vHost: /
remoteDomains: []
adminPort: 15672
backendNotificationPusher:
remotesRefreshInterval: 60 # seconds

serviceAccount:
# When setting this to 'false', either make sure that a service account named
Expand Down
4 changes: 2 additions & 2 deletions hack/helm_vars/wire-server/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ background-worker:
requests: {}
imagePullPolicy: {{ .Values.imagePullPolicy }}
config:
# See helmfile for the real value
remoteDomains: []
backendNotificationPusher:
remotesRefreshInterval: 1
secrets:
rabbitmq:
username: {{ .Values.rabbitmqUsername }}
Expand Down
6 changes: 0 additions & 6 deletions hack/helmfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ releases:
value: {{ .Values.federationDomain1 }}
- name: brig.config.optSettings.setFederationDomainConfigs[0].domain
value: {{ .Values.federationDomain2 }}
- name: background-worker.config.remoteDomains
values:
- {{ .Values.federationDomain2 }}
needs:
- 'databases-ephemeral'

Expand All @@ -162,8 +159,5 @@ releases:
value: {{ .Values.federationDomain2 }}
- name: brig.config.optSettings.setFederationDomainConfigs[0].domain
value: {{ .Values.federationDomain1 }}
- name: background-worker.config.remoteDomains
values:
- {{ .Values.federationDomain1 }}
needs:
- 'databases-ephemeral'
8 changes: 8 additions & 0 deletions libs/extended/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
, gitignoreSource
, hspec
, hspec-discover
, http-client
, http-types
, imports
, lib
Expand All @@ -24,11 +25,14 @@
, resourcet
, retry
, servant
, servant-client
, servant-client-core
, servant-server
, servant-swagger
, temporary
, text
, tinylog
, unliftio
, wai
}:
mkDerivation {
Expand All @@ -45,6 +49,7 @@ mkDerivation {
errors
exceptions
extra
http-client
http-types
imports
metrics-wai
Expand All @@ -53,10 +58,13 @@ mkDerivation {
resourcet
retry
servant
servant-client
servant-client-core
servant-server
servant-swagger
text
tinylog
unliftio
wai
];
testHaskellDepends = [ aeson base hspec imports temporary ];
Expand Down
5 changes: 5 additions & 0 deletions libs/extended/extended.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ library
-- cabal-fmt: expand src
exposed-modules:
Network.AMQP.Extended
Network.RabbitMqAdmin
Options.Applicative.Extended
Servant.API.Extended
Servant.API.Extended.RawM
Expand Down Expand Up @@ -85,6 +86,7 @@ library
, errors
, exceptions
, extra
, http-client
, http-types
, imports
, metrics-wai
Expand All @@ -93,10 +95,13 @@ library
, resourcet
, retry
, servant
, servant-client
, servant-client-core
, servant-server
, servant-swagger
, text
, tinylog
, unliftio
, wai

default-language: Haskell2010
Expand Down
117 changes: 82 additions & 35 deletions libs/extended/src/Network/AMQP/Extended.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
{-# LANGUAGE RecordWildCards #-}

module Network.AMQP.Extended where

import Control.Exception (throwIO)
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Control.Retry
import Data.Aeson (FromJSON)
import Data.Aeson
import Data.Proxy
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Imports
import qualified Network.AMQP as Q
import qualified Network.HTTP.Client as HTTP
import Network.RabbitMqAdmin
import Servant
import Servant.Client
import qualified Servant.Client as Servant
import System.Logger (Logger)
import qualified System.Logger as Log
import UnliftIO.Async

data RabbitMqHooks m = RabbitMqHooks
{ -- | Called whenever there is a new channel. At any time there should be at
Expand All @@ -22,6 +33,30 @@ data RabbitMqHooks m = RabbitMqHooks
onChannelException :: SomeException -> m ()
}

data RabbitMqAdminOpts = RabbitMqAdminOpts
{ host :: !String,
port :: !Int,
vHost :: !Text,
adminPort :: !Int
}
deriving (Show, Generic)

instance FromJSON RabbitMqAdminOpts

mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv opts = do
(username, password) <- readCredsFromEnv
manager <- HTTP.newManager HTTP.defaultManagerSettings
let basicAuthData = Servant.BasicAuthData (Text.encodeUtf8 username) (Text.encodeUtf8 password)
clientEnv = Servant.mkClientEnv manager (Servant.BaseUrl Servant.Http opts.host opts.adminPort "")
pure . fromServant $
hoistClient
(Proxy @(ToServant AdminAPI AsApi))
(either throwM pure <=< flip runClientM clientEnv)
(toServant $ adminClient basicAuthData)

-- | When admin opts are needed use `RabbitMqOpts Identity`, otherwise use
-- `RabbitMqOpts NoAdmin`.
data RabbitMqOpts = RabbitMqOpts
{ host :: !String,
port :: !Int,
Expand All @@ -31,17 +66,29 @@ data RabbitMqOpts = RabbitMqOpts

instance FromJSON RabbitMqOpts

demoteOpts :: RabbitMqAdminOpts -> RabbitMqOpts
demoteOpts RabbitMqAdminOpts {..} = RabbitMqOpts {..}

-- | Useful if the application only pushes into some queues.
mkRabbitMqChannelMVar :: Logger -> RabbitMqOpts -> IO (MVar Q.Channel)
mkRabbitMqChannelMVar l opts = do
chan <- newEmptyMVar
openConnectionWithRetries l opts.host opts.port opts.vHost $
RabbitMqHooks
{ onNewChannel = putMVar chan,
onChannelException = \_ -> void $ tryTakeMVar chan,
onConnectionClose = void $ tryTakeMVar chan
}
pure chan
chanMVar <- newEmptyMVar
connThread <-
async . openConnectionWithRetries l opts $
RabbitMqHooks
{ onNewChannel = \conn -> putMVar chanMVar conn >> forever (threadDelay maxBound),
onChannelException = \_ -> void $ tryTakeMVar chanMVar,
onConnectionClose = void $ tryTakeMVar chanMVar
}
waitForConnThread <- async $ withMVar chanMVar $ \_ -> pure ()
waitEither connThread waitForConnThread >>= \case
Left () -> throwIO $ RabbitMqConnectionFailed "connection thread finished before getting connection"
Right () -> pure chanMVar

data RabbitMqConnectionError = RabbitMqConnectionFailed String
deriving (Show)

instance Exception RabbitMqConnectionError

-- | Connects with RabbitMQ and opens a channel. If the channel is closed for
-- some reasons, reopens the channel. If the connection is closed for some
Expand All @@ -50,14 +97,11 @@ openConnectionWithRetries ::
forall m.
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger ->
String ->
Int ->
Text ->
RabbitMqOpts ->
RabbitMqHooks m ->
m ()
openConnectionWithRetries l host port vHost hooks = do
username <- liftIO $ Text.pack <$> getEnv "RABBITMQ_USERNAME"
password <- liftIO $ Text.pack <$> getEnv "RABBITMQ_PASSWORD"
openConnectionWithRetries l RabbitMqOpts {..} hooks = do
(username, password) <- liftIO $ readCredsFromEnv
connectWithRetries username password
where
connectWithRetries :: Text -> Text -> m ()
Expand All @@ -71,26 +115,23 @@ openConnectionWithRetries l host port vHost hooks = do
. Log.field "error" (displayException @SomeException e)
. Log.field "willRetry" willRetry
. Log.field "retryCount" retryStatus.rsIterNumber
recovering
policy
( skipAsyncExceptions
<> [ logRetries (const $ pure True) logError
]
)
( const $ do
Log.info l $ Log.msg (Log.val "Trying to connect to RabbitMQ")
connect username password
)

connect :: Text -> Text -> m ()
connect username password = do
conn <- liftIO $ Q.openConnection' host (fromIntegral port) vHost username password
liftBaseWith $ \runInIO ->
Q.addConnectionClosedHandler conn True $ void $ runInIO $ do
hooks.onConnectionClose
`catch` logException l "onConnectionClose hook threw an exception, reconnecting to RabbitMQ anyway"
connectWithRetries username password
openChan conn
getConn =
recovering
policy
( skipAsyncExceptions
<> [logRetries (const $ pure True) logError]
)
( const $ do
Log.info l $ Log.msg (Log.val "Trying to connect to RabbitMQ")
liftIO $ Q.openConnection' host (fromIntegral port) vHost username password
)
bracket getConn (liftIO . Q.closeConnection) $ \conn -> do
liftBaseWith $ \runInIO ->
Q.addConnectionClosedHandler conn True $ void $ runInIO $ do
hooks.onConnectionClose
`catch` logException l "onConnectionClose hook threw an exception, reconnecting to RabbitMQ anyway"
connectWithRetries username password
openChan conn

openChan :: Q.Connection -> m ()
openChan conn = do
Expand Down Expand Up @@ -119,3 +160,9 @@ logException l m (SomeException e) = do
Log.err l $
Log.msg m
. Log.field "error" (displayException e)

readCredsFromEnv :: IO (Text, Text)
readCredsFromEnv =
(,)
<$> (Text.pack <$> getEnv "RABBITMQ_USERNAME")
<*> (Text.pack <$> getEnv "RABBITMQ_PASSWORD")
48 changes: 48 additions & 0 deletions libs/extended/src/Network/RabbitMqAdmin.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- | Perhaps this module should be a separate package and published to hackage.
module Network.RabbitMqAdmin where

import Data.Aeson
import Imports
import Servant
import Servant.Client
import Servant.Client.Generic

type RabbitMqBasicAuth = BasicAuth "RabbitMq Management" BasicAuthData

type VHost = Text

-- | Upstream Docs:
-- https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.12.0/deps/rabbitmq_management/priv/www/api/index.html
data AdminAPI route = AdminAPI
{ -- | NOTE: This endpoint can be made paginated, but that complicates
-- consumer code a little. This might be needed for performance tuning
-- later, but perhaps not.
listQueuesByVHost ::
route
:- "api"
:> "queues"
:> Capture "vhost" VHost
:> Get '[JSON] [Queue]
}
deriving (Generic)

data AuthenticatedAPI route = AuthenticatedAPI
{ api ::
route
:- RabbitMqBasicAuth
:> ToServant AdminAPI AsApi
}
deriving (Generic)

data Queue = Queue {name :: Text, vhost :: Text}
deriving (Show, Eq, Generic)

instance FromJSON Queue

instance ToJSON Queue

adminClient :: BasicAuthData -> AdminAPI (AsClientT ClientM)
adminClient ba = fromServant $ clientWithAuth.api ba
where
clientWithAuth :: AuthenticatedAPI (AsClientT ClientM)
clientWithAuth = genericClient
Loading