Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/polysemy-unsafe-concurrency
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a Concurrency effect for Polysemy
4 changes: 4 additions & 0 deletions libs/polysemy-wire-zoo/polysemy-wire-zoo.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ library
Wire.Sem.Paging.Cassandra
Wire.Sem.Random
Wire.Sem.Random.IO
Wire.Sem.Concurrency
Wire.Sem.Concurrency.IO
Wire.Sem.Concurrency.Sequential

other-modules: Paths_polysemy_wire_zoo
hs-source-dirs: src
Expand Down Expand Up @@ -86,6 +89,7 @@ library
, time
, tinylog
, types-common
, unliftio
, uuid
, wire-api

Expand Down
113 changes: 113 additions & 0 deletions libs/polysemy-wire-zoo/src/Wire/Sem/Concurrency.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{-# LANGUAGE StandaloneKindSignatures #-}

module Wire.Sem.Concurrency where

import Data.Kind (Type)
import Imports
import Polysemy
import Polysemy.Internal

data ConcurrencySafety = Safe | Unsafe

-- | Polysemy "effect" for hinting about concurrency. This comes with a host of
-- caveats, because concurrency fundamentally is not an effect we can ascribe
-- any semantics to.
--
-- For example, what should the result of the following program be?
--
-- @@
-- unsafePooledMapConcurrentlyN_ 8 put [0..10]
-- get
-- @@
--
-- There is no answer, and the actual behavior depends on unpredictable quirks
-- of the runtime. In general, we have no means of combining the resulting
-- state changes, so we have no option other than to arbitrarily pick one.
--
-- This is confusing behavior --- especially when the call to `Concurrency` is
-- far away from the observed bug.
--
-- Notice that almost everything in Polysemy is "stateful", even things that
-- don't invoke 'Polysemy.State.State'. The 'Polysemy.Error.Error' effect also
-- carries itself around as "state", and thus any interpretation composed of
-- these interpretations is subject to dropping observable state changes.
--
-- There is a "safe" usage of 'Concurrency', at least, no more unsafe than 'IO'
-- when the action you want to perform concurrently requires only @'Final'
-- 'IO'@. This use case is common in interpreters which can statically
-- guarantee their scoped effects do not have access to the full polysemy
-- stack.
type Concurrency :: ConcurrencySafety -> (Type -> Type) -> Type -> Type
data Concurrency (safe :: ConcurrencySafety) m a where
UnsafePooledMapConcurrentlyN ::
Foldable t =>
Int ->
(a -> m b) ->
t a ->
Concurrency safe m [b]
UnsafePooledMapConcurrentlyN_ ::
Foldable t =>
Int ->
(a -> m b) ->
t a ->
Concurrency safe m ()

unsafePooledMapConcurrentlyN ::
forall r t a b.
(Member (Concurrency 'Unsafe) r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r b) ->
t a ->
Sem r [b]
unsafePooledMapConcurrentlyN n f as =
send
( UnsafePooledMapConcurrentlyN n f as ::
Concurrency 'Unsafe (Sem r) [b]
)
{-# INLINEABLE unsafePooledMapConcurrentlyN #-}

unsafePooledMapConcurrentlyN_ ::
forall r t a b.
(Member (Concurrency 'Unsafe) r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r b) ->
t a ->
Sem r ()
unsafePooledMapConcurrentlyN_ n f as =
send
(UnsafePooledMapConcurrentlyN_ n f as :: Concurrency 'Unsafe (Sem r) ())
{-# INLINEABLE unsafePooledMapConcurrentlyN_ #-}

pooledMapConcurrentlyN ::
forall r' r t a b.
r' ~ '[Final IO] =>
(Member (Concurrency 'Safe) r, Subsume r' r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r' b) ->
t a ->
Sem r [b]
pooledMapConcurrentlyN n f as =
send
( UnsafePooledMapConcurrentlyN n (subsume_ @r' @r . f) as ::
Concurrency 'Safe (Sem r) [b]
)
{-# INLINEABLE pooledMapConcurrentlyN #-}

pooledMapConcurrentlyN_ ::
forall r' r t a b.
r' ~ '[Final IO] =>
(Member (Concurrency 'Safe) r, Subsume r' r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r' b) ->
t a ->
Sem r ()
pooledMapConcurrentlyN_ n f as =
send
( UnsafePooledMapConcurrentlyN_ n (subsume_ @r' @r . f) as ::
Concurrency 'Safe (Sem r) ()
)
{-# INLINEABLE pooledMapConcurrentlyN_ #-}
39 changes: 39 additions & 0 deletions libs/polysemy-wire-zoo/src/Wire/Sem/Concurrency/IO.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module Wire.Sem.Concurrency.IO where

import Imports
import Polysemy
import Polysemy.Final
import UnliftIO (pooledMapConcurrentlyN, pooledMapConcurrentlyN_)
import Wire.Sem.Concurrency (Concurrency (..), ConcurrencySafety (Safe))

------------------------------------------------------------------------------

-- | Safely perform concurrency that wraps only IO effects.
performConcurrency ::
Member (Final IO) r =>
Sem (Concurrency 'Safe ': r) a ->
Sem r a
performConcurrency = unsafelyPerformConcurrency

------------------------------------------------------------------------------

-- | VERY UNSAFELY perform concurrency in Polysemy. This is likely to lead to
-- obscure bugs. See the notes on 'Concurrency' to get a better understanding
-- of what can go wrong here.
unsafelyPerformConcurrency ::
Member (Final IO) r =>
Sem (Concurrency safe ': r) a ->
Sem r a
unsafelyPerformConcurrency = interpretFinal @IO $ \case
UnsafePooledMapConcurrentlyN n f t -> do
st <- getInitialStateS
faction <- bindS f
let action a = faction $ a <$ st
z <- liftS $ pooledMapConcurrentlyN n action $ toList t
Inspector ins <- getInspectorS
pure $ fmap (fmap (mapMaybe ins)) z
UnsafePooledMapConcurrentlyN_ n f t -> do
st <- getInitialStateS
faction <- bindS f
let action a = faction $ a <$ st
liftS $ pooledMapConcurrentlyN_ n action $ toList t
19 changes: 19 additions & 0 deletions libs/polysemy-wire-zoo/src/Wire/Sem/Concurrency/Sequential.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module Wire.Sem.Concurrency.Sequential where

import Imports
import Polysemy
import Wire.Sem.Concurrency

------------------------------------------------------------------------------

-- | Safely perform "concurrency" by doing it sequentially.
sequentiallyPerformConcurrency :: Sem (Concurrency safe ': r) a -> Sem r a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have been 'Safe instead of safe in the type :: Sem (Concurrency safe ': r) a -> Sem r a?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@isovector , I guess this has slipped your attention because I merged this before you got a chance to reply. Here's just a reminder.

sequentiallyPerformConcurrency = interpretH $ \case
UnsafePooledMapConcurrentlyN _ f t -> do
st <- getInitialStateT
ftraverse <- bindT $ traverse @[] f
raise $ sequentiallyPerformConcurrency $ ftraverse $ toList t <$ st
UnsafePooledMapConcurrentlyN_ _ f (t :: t x) -> do
st <- getInitialStateT
ftraverse_ <- bindT $ traverse_ @t f
raise $ sequentiallyPerformConcurrency $ ftraverse_ $ t <$ st