forked from purescript-contrib/purescript-aff-bus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBus.purs
218 lines (186 loc) · 7.86 KB
/
Bus.purs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
{-
Copyright 2018 SlamData, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-}
module Effect.Aff.Bus
( make
, read
, consume
, consumeLatest
, Step(..)
, write
, split
, kill
, isKilled
, Cap
, Bus
, BusRW
, BusR
, BusR'
, BusW
, BusW'
) where
import Prelude
import Control.Lazy (fix)
import Control.Monad.Reader (ReaderT, ask, lift, runReaderT)
import Control.Monad.Rec.Class (forever)
import Data.Array (catMaybes, (:))
import Data.Either (Either(..))
import Data.Foldable (for_)
import Data.Maybe (Maybe(..))
import Data.Traversable (for)
import Data.Tuple (Tuple(..))
import Effect (Effect)
import Effect.AVar as EffAVar
import Effect.Aff (Aff, Error, generalBracket, killFiber, launchAff, launchAff_, try)
import Effect.Aff.AVar (AVar)
import Effect.Aff.AVar as AVar
import Effect.Aff.Finally as F
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception as Exn
import Effect.Ref (Ref)
import Effect.Ref as Ref
import Unsafe.Coerce (unsafeCoerce)
data Cap
data Bus (r :: # Type) a = Bus (AVar (Either Error a)) (AVar (Array (Ref (Maybe (Consumer a)))))
foreign import data Consumer :: Type -> Type
type ConsumerR output input =
{ outputVar :: AVar output
, consumer :: input -> ReaderT (AVar output) Effect Unit
}
mkConsumer :: ∀ output input. ConsumerR output input -> Consumer input
mkConsumer = unsafeCoerce
runConsumer :: ∀ input r. (∀ output. ConsumerR output input -> r) -> Consumer input -> r
runConsumer = unsafeCoerce
type BusR = BusR' ()
type BusR' r = Bus (read :: Cap | r)
type BusW = BusW' ()
type BusW' r = Bus (write :: Cap | r)
type BusRW = Bus (read :: Cap, write :: Cap)
-- | Creates a new bidirectional Bus which can be read from and written to.
make :: ∀ m a. MonadEffect m ⇒ m (BusRW a)
make = liftEffect do
cell ← EffAVar.empty
consumers ← EffAVar.new mempty
launchAff_ $ fix \loop -> do
-- we `read` from `cell` instead of `take`, so that if error is written,
-- `cell` can be killed, such that if there was any other `put` operations
-- blocked, that will resolve with the error.
resE ← AVar.read cell
case resE of
Left err -> do
pure unit
cs ← AVar.take consumers
liftEffect do
EffAVar.kill err cell
for_ cs $ \cRef -> do
mbC <- liftEffect $ Ref.read cRef
for_ mbC (runConsumer (_.outputVar >>> EffAVar.kill err))
EffAVar.kill err consumers
-- TODO investigate why having `EffAVar.kill err cell` here instead of up there
-- results in strange deadlock when `test_consume` is run with `range 1 1025`
Right res -> do
void $ AVar.take cell
cs ← AVar.take consumers
consumersMb <- for cs $ \cRef -> do
mbC <- liftEffect $ Ref.read cRef
case mbC of
Nothing -> pure Nothing
Just c -> c # runConsumer \{outputVar, consumer} -> do
try (liftEffect (runReaderT (consumer res) outputVar)) >>= case _ of
Right _ -> do
pure $ Just cRef
Left err -> do
AVar.kill err outputVar
pure $ Nothing
AVar.put (catMaybes consumersMb) consumers
loop
pure $ Bus cell consumers
-- | Blocks until a new value is pushed to the Bus, returning the value.
read :: ∀ a r. BusR' r a -> Aff a
read bus = consume bus $ \a -> ask >>= AVar.put a >>> launchAff_ >>> lift
data Step res = Loop | Done res
-- | Registers a new consumer on the bus. Blocks until the consumer returns
-- | `Done x` and then computation is resolved with the `x`.
-- | if Effect raises an exception it will be propagated
consume :: ∀ a r output. BusR' r a -> (a -> ReaderT (AVar output) Effect Unit) -> Aff output
consume (Bus _ consumers) consumer = F.runFinallyM do
cs ← EffAVar.take consumers # F.finally' do
-- If we were able to `take` we must put it back.
F.onResult \cs -> lift $ AVar.put cs consumers
{outputVar, cRef} <- lift $ liftEffect do
outputVar <- EffAVar.empty
cRef <- Ref.new $ Just $ mkConsumer {outputVar, consumer}
void $ EffAVar.read outputVar \_ -> do
-- after outputVar is killed or filled we clear
-- the ref to avoid any possible memory leak.
Ref.write Nothing cRef
pure {outputVar, cRef}
EffAVar.put (cRef : cs) consumers # F.finally' do
-- If we were able to `put` then we must kill outputVar
F.onResult \_ -> (F.onError <> F.onFinalError) \err -> do
lift $ AVar.kill err outputVar
-- Otherwise we should `put` back to `consumers`.
F.onError \_ -> lift $ AVar.put cs consumers
EffAVar.take outputVar # F.finally' do
-- here in any case we kill `outputVar`
lift $ AVar.kill (Exn.error "cleanup") outputVar
--TODO investigate reason:
--
-- if instead of `EffAVar.___ outputVar # F.finally'` we have
-- `AVar.___ outputVar # F.finallyAff` then we get this error in tests:
--
-- [Start] Testing read/write/kill
-- `res` should be as expected
-- Expected: [(Right 1),(Right 1),(Right 2),(Right 2),(Right 3),(Right 3),(Left "Error: Done ..."),(Left "Error: Done ...")]
-- Actual: [(Right 1),(Right 1),(Right 2), (Right 3),(Left "Error: Done ..."),(Left "Error: Done ...")]
-- [Error] Testing read/write/kill
-- | Same as `consume` but consuming function returns Aff instead of Effect.
-- | When consuming function is not completed and new value comes in, it gets
-- | killed and is invoked with new value.
consumeLatest :: ∀ a r output. BusR' r a -> (a -> Aff (Step output)) -> Aff output
consumeLatest bus consumer = do
fiberRef <- liftEffect $ Ref.new (pure unit)
consume bus \val -> do
outputVar <- ask
lift $ Ref.read fiberRef >>= (killFiber (Exn.error "kill from consumeLatest") >>> launchAff_)
fiber <- lift $ launchAff do
void $ generalBracket (pure unit)
{ killed: const pure
, failed: \err _ -> AVar.kill err outputVar
, completed: \cs _ -> case cs of
Loop -> pure unit
Done res -> AVar.put res outputVar
}
(const $ consumer val)
lift $ Ref.write fiber fiberRef
-- | Pushes a new value to the Bus, yielding immediately.
write :: ∀ a r. a -> BusW' r a -> Aff Unit
write a (Bus cell _) = AVar.put (Right a) cell
-- | Splits a bidirectional Bus into separate read and write Buses.
split :: ∀ a. BusRW a -> Tuple (BusR a) (BusW a)
split (Bus a b) = Tuple (Bus a b) (Bus a b)
-- | Kills the Bus and propagates the exception to all pending and future consumers.
-- | `kill` is idempotent and blocks until killing process is fully finishes, i.e.
-- | `kill err bus *> isKilled bus` will result with `true`.
kill :: ∀ a r. Exn.Error -> BusW' r a -> Aff Unit
kill err bus@(Bus cell _) = do
unlessM (isKilled bus) do
-- If there are multiple parallel processes executing `kill` at the same time,
-- then without this try all of processes which are blocked bu put will be killed
-- as part of handling first `put`. so we have this try to guaranty that kill is idempotent.
void $ try $ AVar.put (Left err) cell
-- Here we block until read from `cell` result's with the `error`,
-- i.e. kill process was finished successfully.
void $ try $ forever $ AVar.read cell
-- | Synchronously checks whether a Bus has been killed.
isKilled :: ∀ m a r. MonadEffect m ⇒ Bus r a -> m Boolean
isKilled (Bus cell _) = liftEffect $ EffAVar.isKilled <$> EffAVar.status cell