From aa8879435998210062a7b76fb6454a2fcb4d3386 Mon Sep 17 00:00:00 2001 From: Tom Ellis Date: Sun, 15 Dec 2024 10:02:39 +0000 Subject: [PATCH] elaforge stream example https://discourse.haskell.org/t/solving-a-resourcet-related-space-leak-in-production/11007/11 --- bluefin-examples/bluefin-examples.cabal | 1 + .../src/Bluefin/Examples/Stream/Many.hs | 224 ++++++++++++++++++ 2 files changed, 225 insertions(+) create mode 100644 bluefin-examples/src/Bluefin/Examples/Stream/Many.hs diff --git a/bluefin-examples/bluefin-examples.cabal b/bluefin-examples/bluefin-examples.cabal index ec25906..e873170 100644 --- a/bluefin-examples/bluefin-examples.cabal +++ b/bluefin-examples/bluefin-examples.cabal @@ -77,6 +77,7 @@ library exposed-modules: Bluefin.Examples.DB, Bluefin.Examples.Stream.InsideAndOut, + Bluefin.Examples.Stream.Many, Bluefin.Examples.Terminal build-depends: base, bluefin >= 0.0.12.0 && < 0.1 diff --git a/bluefin-examples/src/Bluefin/Examples/Stream/Many.hs b/bluefin-examples/src/Bluefin/Examples/Stream/Many.hs new file mode 100644 index 0000000..09d3852 --- /dev/null +++ b/bluefin-examples/src/Bluefin/Examples/Stream/Many.hs @@ -0,0 +1,224 @@ +{-# LANGUAGE ImpredicativeTypes #-} +{-# LANGUAGE NoMonoLocalBinds #-} +{-# LANGUAGE NoMonomorphismRestriction #-} + +module Bluefin.Examples.Stream.Many where + +import Bluefin.Compound + ( Handle (mapHandle), + makeOp, + useImplIn, + useImplUnder, + useImplWithin, + ) +import Bluefin.Consume (Consume, await) +import Bluefin.Eff (Eff, bracket, runEff, (:&), (:>)) +import Bluefin.IO (IOE, effIO) +import Bluefin.Jump (jumpTo, withJump) +import Bluefin.State (evalState, get, modify) +import Bluefin.Stream (Stream, consumeStream, forEach, yield) +import Bluefin.System.IO (hGetLine, hIsEOF, withFile) +import Control.Monad (forever, replicateM_, when) +import Data.Function (fix) +import Data.Maybe (Maybe (..), isNothing) +import System.IO (IOMode (ReadMode)) +import Prelude hiding + ( break, + drop, + head, + read, + readFile, + return, + take, + writeFile, + ) +import qualified Prelude + +-- An example form elaforge at +-- +-- https://discourse.haskell.org/t/solving-a-resourcet-related-space-leak-in-production/11007/11?u=tomjaguarpaw + +-- ghci> mixExample +-- [Just "a0",Just "",Just ""] +-- [Just "a1",Just "",Just ""] +-- [Just "a2",Just "b0",Just ""] +-- File closed: a +-- [Nothing,Just "b1",Just ""] +-- [Just "b2",Just "a0"] +-- File closed: b +-- [Nothing,Just "a1"] +-- [Just "a2"] +-- File closed: a +-- [Nothing] +mixExample :: IO () +mixExample = runEff $ \io -> do + effIO io $ do + Prelude.writeFile "a" (unlines (map (\i -> "a" <> show i) [0 :: Int .. 5])) + Prelude.writeFile "b" (unlines (map (\i -> "b" <> show i) [0 :: Int .. 5])) + + let timings = + [ (0, "a"), + (2, "b"), + (4, "a") + ] + + -- With newtype wrapping removed, `Stream a e` is just `a -> IO ()` + -- and `forEach` is just function application. `effIO io` is a no-op, so + -- this is: + -- + -- mix timings io print + forEach (mix timings io) $ \out -> + effIO io (print out) + +mix :: + forall e1 e2 es. + (e1 :> es, e2 :> es) => + [(Int, FilePath)] -> + IOE e2 -> + Stream [Data.Maybe.Maybe String] e1 -> + Eff es () +mix timings io y = do + -- There are `ImpredicativeTypes` here, but that's not essential. + -- We could have just used a rank-1 newtype wrapper instead. + -- `ImpredicativeTypes` are just slightly more convenient. + let itersStreams :: + [ forall e. + Stream (Data.Maybe.Maybe String) e -> + Eff (e :& es) () + ] + itersStreams = map (\x -> nothingOnEnd (pad io x)) timings + + connectMany itersStreams $ \itersStart -> do + flip fix itersStart $ \again iters -> do + when (not (null iters)) $ do + outs <- traverse await iters + yield y outs + let iters' = + [ iter + | (iter, o) <- zip iters outs, + not (Data.Maybe.isNothing o) + ] + again iters' + +-- | When the stream is finished, yield Nothing for evermore. +nothingOnEnd :: + (e1 :> es) => + (forall e. Stream a e -> Eff (e :& es) r) -> + Stream (Data.Maybe.Maybe a) e1 -> + Eff es r +nothingOnEnd s y = do + _ <- forEach s $ \a -> yield y (Data.Maybe.Just a) + forever (yield y Data.Maybe.Nothing) + +pad :: + (e1 :> es, e2 :> es) => + IOE e1 -> + (Int, FilePath) -> + Stream String e2 -> + Eff es () +pad io (start, fname) y = do + replicateM_ start (yield y "") + take 3 (linesOfFile fname io) y + +-- General purpose Bluefin function for streaming the +-- lines of a file +linesOfFile :: + (e1 :> es, e2 :> es) => + String -> + IOE e1 -> + Stream String e2 -> + Eff es () +linesOfFile filename io y = do + withJump $ \onEOF -> do + withFile io filename ReadMode $ \h -> do + -- This bracket is only so we can observe the + -- prompt closing of the file. + bracket + (pure ()) + (\() -> effIO io (putStrLn ("File closed: " <> filename))) + ( \() -> do + forever $ do + isEOF <- hIsEOF h + when isEOF $ + jumpTo onEOF + -- A `Stream a e` is just a newtype wrapped `a -> IO ()`, and + -- without the wrapping `yield y a` is just `y a`. So, without the + -- wrapping this is `y =<< hGetLine h`. + yield y =<< hGetLine h + ) + +-- This should be part of the Bluefin standard library +-- +-- `take n s y` takes the first n elements of `s` (yielding them to `y`) +-- and then stops. It does so by jumping out of an infinite loop +-- when a countdown (which starts at n) hits 0. +take :: + (e1 :> es) => + Integer -> + (forall e. Stream a e -> Eff (e :& es) ()) -> + Stream a e1 -> + Eff es () +take n k y = + withJump $ \done -> do + evalState n $ \s -> do + -- `Stream a e` is a newtype wrapped `a -> IO ()`, and + -- with all the wrapping removed, `forEach k body` is + -- just `k body`. + -- + -- `useImplUnder` is a no-op, when the newtype wrapping + -- is removed. It just massages effect type tags. + forEach (useImplUnder . k) $ \a -> do + s' <- get s + when (s' <= 0) $ + jumpTo done + + modify s (subtract 1) + -- With newtype wrapping removed this is just `y a`. + yield y a + +-- `connectMany` is the part of this program that behaves in the +-- least familiar manner. This is what it does: +-- +-- When given a list of n effectful operations which yield `a`s it +-- forks n threads for them to run in. There is also a thread forked +-- for the function that accepts a list of `Consume`s. Each `Consume` +-- receives the `a`s yielded by one of the n threads. +-- +-- The threads are synchronised by `MVar`s, so there is only one +-- possible interleaving of the concurrent code. When `await` is +-- called on one of the n `Consume`s the `Stream` thread corresponding +-- to that `Consume` is asked "please do the work required to get +-- me your next `a`". That unblocks it, it does its work, gives its `a`, +-- blocks, and the `Consume` thread continues. +-- +-- (Actually, there are n threads forked for the `Consume` action, +-- but it only runs in the last one. This is not an essential feature, +-- it's just how the current implementation happens to work.) +-- +-- The function also uses `ImpredicativeTypes` to put a `forall` inside the +-- list type constructor, but that's just a convenience, not something +-- essential to the operation of the program. +connectMany :: + -- | n effectful operations that yield `a`s + [forall e. Stream a e -> Eff (e :& es) r] -> + -- | Will be called with a list of n Consumes, + -- to which the streams above will yield their + -- `a`s + (forall e. [Consume a e] -> Eff (e :& es) r) -> + Eff es r +connectMany ss k = + makeOp (connectMany' ss k []) + +connectMany' :: + [forall e. Stream a e -> Eff (e :& es) r] -> + (forall e. [Consume a e] -> Eff (e :& es) r) -> + (forall e. [Consume a e] -> Eff (e :& es) r) +connectMany' [] k = k +connectMany' (s : ss) k = + connectMany' + ss + ( \cs -> + consumeStream + (\c -> useImplIn k (mapHandle c : map mapHandle cs)) + (useImplWithin s) + )