Skip to content

Commit 6558491

Browse files
supersvenpcapriotti
authored andcommitted
Test stream-asset
1 parent 7a8fbdd commit 6558491

File tree

4 files changed

+71
-8
lines changed

4 files changed

+71
-8
lines changed

services/cargohold/src/CargoHold/API/Federation.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ where
2626
import CargoHold.API.Error
2727
import CargoHold.API.V3
2828
import CargoHold.App
29+
import qualified CargoHold.S3 as S3
2930
import Control.Error
3031
import Imports
3132
import Servant.API
@@ -34,7 +35,6 @@ import Servant.Server hiding (Handler)
3435
import Servant.Server.Generic
3536
import Wire.API.Federation.API
3637
import qualified Wire.API.Federation.API.Cargohold as F
37-
import qualified CargoHold.S3 as S3
3838
import Wire.API.Routes.AssetBody
3939

4040
type FederationAPI = "federation" :> ToServantApi (FedApi 'Cargohold)

services/cargohold/src/CargoHold/AWS.hs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,17 @@ module CargoHold.AWS
3434
send,
3535
sendCatch,
3636
exec,
37+
execStream,
3738
execCatch,
3839
)
3940
where
4041

4142
import CargoHold.CloudFront
4243
import CargoHold.Options
44+
import Conduit
4345
import Control.Lens hiding ((.=))
4446
import Control.Monad.Catch
4547
import qualified Control.Monad.Trans.AWS as AWST
46-
import Control.Monad.Trans.Resource
4748
import Control.Retry
4849
import Data.ByteString.Builder (toLazyByteString)
4950
import Imports
@@ -157,7 +158,7 @@ instance Exception Error
157158
--------------------------------------------------------------------------------
158159
-- Utilities
159160

160-
sendCatch :: AWSRequest r => r -> Amazon (Either AWS.Error (Rs r))
161+
sendCatch :: (MonadCatch m, AWS.MonadAWS m, AWSRequest r) => r -> m (Either AWS.Error (Rs r))
161162
sendCatch = AWST.trying AWS._Error . AWS.send
162163

163164
send :: AWSRequest r => r -> Amazon (Rs r)
@@ -176,7 +177,7 @@ exec env request = do
176177
resp <- execute env (sendCatch req)
177178
case resp of
178179
Left err -> do
179-
Log.info $
180+
Logger.info (view logger env) $
180181
Log.field "remote" (Log.val "S3")
181182
~~ Log.msg (show err)
182183
~~ Log.msg (show req)
@@ -185,6 +186,25 @@ exec env request = do
185186
throwM (GeneralError err)
186187
Right r -> return r
187188

189+
execStream ::
190+
(AWSRequest r, Show r) =>
191+
Env ->
192+
(Text -> r) ->
193+
ResourceT IO (Rs r)
194+
execStream env request = do
195+
let req = request (_s3Bucket env)
196+
resp <- AWS.runAWS (view amazonkaEnv env) (sendCatch req)
197+
case resp of
198+
Left err -> do
199+
Logger.info (view logger env) $
200+
Log.field "remote" (Log.val "S3")
201+
~~ Log.msg (show err)
202+
~~ Log.msg (show req)
203+
-- We just re-throw the error, but logging it here also gives us the request
204+
-- that caused it.
205+
throwM (GeneralError err)
206+
Right r -> pure r
207+
188208
execCatch ::
189209
(AWSRequest r, Show r, MonadLogger m, MonadIO m) =>
190210
Env ->

services/cargohold/src/CargoHold/S3.hs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,27 @@ uploadV3 prc (s3Key . mkKey -> key) originalHeaders@(V3.AssetHeaders _ cl) tok s
129129
& poContentType ?~ MIME.showType ct
130130
& poMetadata .~ metaHeaders tok prc
131131

132+
-- | Turn a 'ResourceT IO' action into a pure @Conduit@.
133+
--
134+
-- This is possible because @Conduit@ itself is a monad transformer over
135+
-- 'ResourceT IO'. Removing the outer 'ResourceT IO' layer makes it possible to
136+
-- pass this @Conduit@ to resource-oblivious code.
137+
flattenResourceT ::
138+
ResourceT IO (ConduitT () ByteString (ResourceT IO) ()) ->
139+
ConduitT () ByteString (ResourceT IO) ()
140+
flattenResourceT = join . lift
141+
132142
downloadV3 ::
133143
V3.AssetKey ->
134144
ExceptT Error App (ConduitM () ByteString (ResourceT IO) ())
135145
downloadV3 (s3Key . mkKey -> key) = do
136-
_streamBody . view gorsBody <$> exec req
146+
env <- view aws
147+
pure . flattenResourceT $ _streamBody . view gorsBody <$> AWS.execStream env req
137148
where
138149
req :: Text -> GetObject
139-
req b = getObject (BucketName b) (ObjectKey key)
140-
& goResponseContentType ?~ MIME.showType octets
150+
req b =
151+
getObject (BucketName b) (ObjectKey key)
152+
& goResponseContentType ?~ MIME.showType octets
141153

142154
getMetadataV3 :: V3.AssetKey -> ExceptT Error App (Maybe S3AssetMeta)
143155
getMetadataV3 (s3Key . mkKey -> key) = do

services/cargohold/test/integration/API/Federation.hs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ tests s =
3030
],
3131
testGroup
3232
"stream-asset"
33-
[test s "streaming large asset" testLargeAsset]
33+
[ test s "streaming large asset" testLargeAsset,
34+
test s "stream an asset" testStreamAsset
35+
]
3436
]
3537

3638
testGetAssetAvailable :: Bool -> TestSignature ()
@@ -140,3 +142,32 @@ testLargeAsset c = do
140142
-- check that the first chunk is received
141143
chunk <- responseBody resp
142144
print chunk
145+
146+
testStreamAsset :: TestSignature ()
147+
testStreamAsset c = do
148+
-- Initial upload
149+
let bdy = (applicationOctetStream, "Hello World")
150+
settings =
151+
defAssetSettings
152+
& set setAssetRetention (Just AssetVolatile)
153+
uid <- liftIO $ Id <$> nextRandom
154+
ast :: Asset <-
155+
responseJsonError
156+
=<< uploadSimple (c . path "/assets/v3") uid settings bdy
157+
<!! const 201 === statusCode
158+
159+
-- Call get-asset federation API
160+
let tok = view assetToken ast
161+
let key = view assetKey ast
162+
let ga =
163+
GetAsset
164+
{ gaUser = uid,
165+
gaToken = tok,
166+
gaKey = qUnqualified key
167+
}
168+
respBody <- fmap responseBody $
169+
post (c . path "/federation/stream-asset" . json ga)
170+
<!! const 200 === statusCode
171+
172+
173+
liftIO $ respBody @?= Just "Hello World"

0 commit comments

Comments
 (0)