Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/polysemy-unsafe-concurrency
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a Concurrency effect for Polysemy
4 changes: 4 additions & 0 deletions libs/polysemy-wire-zoo/polysemy-wire-zoo.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ library
Wire.Sem.Paging.Cassandra
Wire.Sem.Random
Wire.Sem.Random.IO
Wire.Sem.Concurrency
Wire.Sem.Concurrency.IO
Wire.Sem.Concurrency.Sequential

other-modules: Paths_polysemy_wire_zoo
hs-source-dirs: src
Expand Down Expand Up @@ -86,6 +89,7 @@ library
, time
, tinylog
, types-common
, unliftio
, uuid
, wire-api

Expand Down
113 changes: 113 additions & 0 deletions libs/polysemy-wire-zoo/src/Wire/Sem/Concurrency.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{-# LANGUAGE StandaloneKindSignatures #-}

module Wire.Sem.Concurrency where

import Data.Kind (Type)
import Imports
import Polysemy
import Polysemy.Internal

data ConcurrencySafety = Safe | Unsafe

-- | Polysemy "effect" for hinting about concurrency. This comes with a host of
-- caveats, because concurrency fundamentally is not an effect we can ascribe
-- any semantics to.
--
-- For example, what should the result of the following program be?
--
-- @@
-- unsafePooledMapConcurrentlyN_ 8 put [0..10]
-- get
-- @@
--
-- There is no answer, and the actual behavior depends on unpredictable quirks
-- of the runtime. In general, we have no means of combining the resulting
-- state changes, so we have no option other than to arbitrarily pick one.
--
-- This is confusing behavior --- especially when the call to `Concurrency` is
-- far away from the observed bug.
--
-- Notice that almost everything in Polysemy is "stateful", even things that
-- don't invoke 'Polysemy.State.State'. The 'Polysemy.Error.Error' effect also
-- carries itself around as "state", and thus any interpretation composed of
-- these interpretations is subject to dropping observable state changes.
--
-- There is a "safe" usage of 'Concurrency', at least, no more unsafe than 'IO'
-- when the action you want to perform concurrently requires only @'Final'
-- 'IO'@. This use case is common in interpreters which can statically
-- guarantee their scoped effects do not have access to the full polysemy
-- stack.
type Concurrency :: ConcurrencySafety -> (Type -> Type) -> Type -> Type
data Concurrency (safe :: ConcurrencySafety) m a where
UnsafePooledMapConcurrentlyN ::
Foldable t =>
Int ->
(a -> m b) ->
t a ->
Concurrency safe m [b]
UnsafePooledMapConcurrentlyN_ ::
Foldable t =>
Int ->
(a -> m b) ->
t a ->
Concurrency safe m ()

unsafePooledMapConcurrentlyN ::
forall r t a b.
(Member (Concurrency 'Unsafe) r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r b) ->
t a ->
Sem r [b]
unsafePooledMapConcurrentlyN n f as =
send
( UnsafePooledMapConcurrentlyN n f as ::
Concurrency 'Unsafe (Sem r) [b]
)
{-# INLINEABLE unsafePooledMapConcurrentlyN #-}

unsafePooledMapConcurrentlyN_ ::
forall r t a b.
(Member (Concurrency 'Unsafe) r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r b) ->
t a ->
Sem r ()
unsafePooledMapConcurrentlyN_ n f as =
send
(UnsafePooledMapConcurrentlyN_ n f as :: Concurrency 'Unsafe (Sem r) ())
{-# INLINEABLE unsafePooledMapConcurrentlyN_ #-}

pooledMapConcurrentlyN ::
forall r' r t a b.
r' ~ '[Final IO] =>
(Member (Concurrency 'Safe) r, Subsume r' r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r' b) ->
t a ->
Sem r [b]
pooledMapConcurrentlyN n f as =
send
( UnsafePooledMapConcurrentlyN n (subsume_ @r' @r . f) as ::
Concurrency 'Safe (Sem r) [b]
)
{-# INLINEABLE pooledMapConcurrentlyN #-}

pooledMapConcurrentlyN_ ::
forall r' r t a b.
r' ~ '[Final IO] =>
(Member (Concurrency 'Safe) r, Subsume r' r, Foldable t) =>
-- | Max. number of threads. Should not be less than 1.
Int ->
(a -> Sem r' b) ->
t a ->
Sem r ()
pooledMapConcurrentlyN_ n f as =
send
( UnsafePooledMapConcurrentlyN_ n (subsume_ @r' @r . f) as ::
Concurrency 'Safe (Sem r) ()
)
{-# INLINEABLE pooledMapConcurrentlyN_ #-}
39 changes: 39 additions & 0 deletions libs/polysemy-wire-zoo/src/Wire/Sem/Concurrency/IO.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module Wire.Sem.Concurrency.IO where

import Imports
import Polysemy
import Polysemy.Final
import UnliftIO (pooledMapConcurrentlyN, pooledMapConcurrentlyN_)
import Wire.Sem.Concurrency (Concurrency (..), ConcurrencySafety (Safe))

------------------------------------------------------------------------------

-- | Safely perform concurrency that wraps only IO effects.
performConcurrency ::
Member (Final IO) r =>
Sem (Concurrency 'Safe ': r) a ->
Sem r a
performConcurrency = unsafelyPerformConcurrency

------------------------------------------------------------------------------

-- | VERY UNSAFELY perform concurrency in Polysemy. This is likely to lead to
-- obscure bugs. See the notes on 'Concurrency' to get a better understanding
-- of what can go wrong here.
unsafelyPerformConcurrency ::
Member (Final IO) r =>
Sem (Concurrency safe ': r) a ->
Sem r a
unsafelyPerformConcurrency = interpretFinal @IO $ \case
UnsafePooledMapConcurrentlyN n f t -> do
st <- getInitialStateS
faction <- bindS f
let action a = faction $ a <$ st
z <- liftS $ pooledMapConcurrentlyN n action $ toList t
Inspector ins <- getInspectorS
pure $ fmap (fmap (mapMaybe ins)) z
UnsafePooledMapConcurrentlyN_ n f t -> do
st <- getInitialStateS
faction <- bindS f
let action a = faction $ a <$ st
liftS $ pooledMapConcurrentlyN_ n action $ toList t
19 changes: 19 additions & 0 deletions libs/polysemy-wire-zoo/src/Wire/Sem/Concurrency/Sequential.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module Wire.Sem.Concurrency.Sequential where

import Imports
import Polysemy
import Wire.Sem.Concurrency

------------------------------------------------------------------------------

-- | Safely perform "concurrency" by doing it sequentially.
sequentiallyPerformConcurrency :: Sem (Concurrency safe ': r) a -> Sem r a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have been 'Safe instead of safe in the type :: Sem (Concurrency safe ': r) a -> Sem r a?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@isovector , I guess this has slipped your attention because I merged this before you got a chance to reply. Here's just a reminder.

sequentiallyPerformConcurrency = interpretH $ \case
UnsafePooledMapConcurrentlyN _ f t -> do
st <- getInitialStateT
ftraverse <- bindT $ traverse @[] f
raise $ sequentiallyPerformConcurrency $ ftraverse $ toList t <$ st
UnsafePooledMapConcurrentlyN_ _ f (t :: t x) -> do
st <- getInitialStateT
ftraverse_ <- bindT $ traverse_ @t f
raise $ sequentiallyPerformConcurrency $ ftraverse_ $ t <$ st