Skip to content

Commit 558eb31

Browse files
committed
WIP: Test stream-asset
1 parent 7a8fbdd commit 558eb31

File tree

4 files changed

+61
-5
lines changed

4 files changed

+61
-5
lines changed

services/cargohold/src/CargoHold/API/Federation.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import Wire.API.Federation.API
3636
import qualified Wire.API.Federation.API.Cargohold as F
3737
import qualified CargoHold.S3 as S3
3838
import Wire.API.Routes.AssetBody
39+
import Debug.Trace
3940

4041
type FederationAPI = "federation" :> ToServantApi (FedApi 'Cargohold)
4142

@@ -54,7 +55,9 @@ checkAsset ga =
5455

5556
streamAsset :: F.GetAsset -> Handler AssetSource
5657
streamAsset ga = do
58+
traceM "streamAsset"
5759
available <- checkAsset ga
60+
traceM $ "available : " ++ show available
5861
unless available (throwE assetNotFound)
5962
AssetSource <$> S3.downloadV3 (F.gaKey ga)
6063

services/cargohold/src/CargoHold/AWS.hs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ module CargoHold.AWS
3434
send,
3535
sendCatch,
3636
exec,
37+
execStream,
3738
execCatch,
3839
)
3940
where
@@ -157,7 +158,7 @@ instance Exception Error
157158
--------------------------------------------------------------------------------
158159
-- Utilities
159160

160-
sendCatch :: AWSRequest r => r -> Amazon (Either AWS.Error (Rs r))
161+
sendCatch :: (MonadCatch m, AWS.MonadAWS m, AWSRequest r) => r -> m (Either AWS.Error (Rs r))
161162
sendCatch = AWST.trying AWS._Error . AWS.send
162163

163164
send :: AWSRequest r => r -> Amazon (Rs r)
@@ -173,10 +174,10 @@ exec ::
173174
m (Rs r)
174175
exec env request = do
175176
let req = request (_s3Bucket env)
176-
resp <- execute env (sendCatch req)
177+
resp <- execute env (sendCatch req)
177178
case resp of
178179
Left err -> do
179-
Log.info $
180+
Logger.info (view logger env) $
180181
Log.field "remote" (Log.val "S3")
181182
~~ Log.msg (show err)
182183
~~ Log.msg (show req)
@@ -185,6 +186,25 @@ exec env request = do
185186
throwM (GeneralError err)
186187
Right r -> return r
187188

189+
execStream ::
190+
(AWSRequest r, Show r) =>
191+
Env ->
192+
(Text -> r) ->
193+
ResourceT IO (Either Error (Rs r))
194+
execStream env request = do
195+
let req = request (_s3Bucket env)
196+
resp <- AWS.runAWS (view amazonkaEnv env) (sendCatch req)
197+
case resp of
198+
Left err -> do
199+
Logger.info (view logger env) $
200+
Log.field "remote" (Log.val "S3")
201+
~~ Log.msg (show err)
202+
~~ Log.msg (show req)
203+
-- We just re-throw the error, but logging it here also gives us the request
204+
-- that caused it.
205+
pure $ Left (GeneralError err)
206+
Right r -> pure $ Right r
207+
188208
execCatch ::
189209
(AWSRequest r, Show r, MonadLogger m, MonadIO m) =>
190210
Env ->
@@ -211,3 +231,4 @@ canRetry (Left e) = case e of
211231

212232
retry5x :: (Monad m) => RetryPolicyM m
213233
retry5x = limitRetries 5 <> exponentialBackoff 100000
234+

services/cargohold/src/CargoHold/S3.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ downloadV3 ::
133133
V3.AssetKey ->
134134
ExceptT Error App (ConduitM () ByteString (ResourceT IO) ())
135135
downloadV3 (s3Key . mkKey -> key) = do
136-
_streamBody . view gorsBody <$> exec req
136+
env <- view aws
137+
_streamBody . view gorsBody <$> AWS.execStream env req
137138
where
138139
req :: Text -> GetObject
139140
req b = getObject (BucketName b) (ObjectKey key)

services/cargohold/test/integration/API/Federation.hs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ tests s =
3030
],
3131
testGroup
3232
"stream-asset"
33-
[test s "streaming large asset" testLargeAsset]
33+
[ test s "streaming large asset" testLargeAsset,
34+
test s "stream an asset" testStreamAsset
35+
]
3436
]
3537

3638
testGetAssetAvailable :: Bool -> TestSignature ()
@@ -140,3 +142,32 @@ testLargeAsset c = do
140142
-- check that the first chunk is received
141143
chunk <- responseBody resp
142144
print chunk
145+
146+
testStreamAsset :: TestSignature ()
147+
testStreamAsset c = do
148+
-- Initial upload
149+
let bdy = (applicationOctetStream, "Hello World")
150+
settings =
151+
defAssetSettings
152+
& set setAssetRetention (Just AssetVolatile)
153+
uid <- liftIO $ Id <$> nextRandom
154+
ast :: Asset <-
155+
responseJsonError
156+
=<< uploadSimple (c . path "/assets/v3") uid settings bdy
157+
<!! const 201 === statusCode
158+
159+
-- Call get-asset federation API
160+
let tok = view assetToken ast
161+
let key = view assetKey ast
162+
let ga =
163+
GetAsset
164+
{ gaUser = uid,
165+
gaToken = tok,
166+
gaKey = qUnqualified key
167+
}
168+
respBody <- fmap responseBody $
169+
post (c . path "/federation/stream-asset" . json ga)
170+
<!! const 200 === statusCode
171+
172+
173+
liftIO $ respBody @?= Just "Hello World"

0 commit comments

Comments
 (0)