AP previously used the Erlang programming language for some course components. In 2024, we use only Haskell. If you previously followed the course, but didn't pass, and are attending the exam this year, you will need to adapt your knowledge about concurrency from Erlang to Haskell. The best way to do so is to follow the course, attend the lectures, and complete the assignments. But as a special help, this work-in-progess document describes how to translate aspects of concurrent Erlang to concurrent Haskell.
We use the Haskell modules Control.Concurrent and Control.Concurrent.Chan. The latter is implicitly re-exported by the former. The following import should give you everything you need in most cases:
import Control.Concurrent
( Chan,
ThreadId,
forkIO,
newChan,
readChan,
writeChan,
)
In Erlang, processes communicate by directly sending asynchronous
messages to each other. In Haskell, the equivalent of an Erlang
process is a thread. To create a new thread in Haskell, we use the
forkIO
function, similar to using spawn
in Erlang for creating
processes. The forkIO
function has the following type:
forkIO :: IO () -> IO ThreadId
In other words, to create a thread we pass forkIO
an action of type
IO ()
, meaning a monadic computation in the IO
monad. Typically,
this will be some kind of potentially infinite loop that receives and
handles messages, just as in Erlang. The thread will continue to run
until this action terminates.
The forkIO
function returns a ThreadId
that can be used for
interacting with the thread in low level ways, although we will not
make much use of that in AP. Instead, we will communicate using
channel-based messaging.
In Erlang, if we have a process ID, we can send a message to that
process. In Haskell, communication is done via channels. A channel is
created using the newChan
action:
newChan :: IO (Chan a)
The newChan
action produces a channel that can be used for sending
and receiving messages of type a
. The precise type of a
will be
inferred by the compiler. This is also a deviation from Erlang, where
messages are untyped.
Whenever we create a thread, we will also create a channel through which we
can communicate with the thread, as shown in below (assuming the existence of a
function threadLoop
):
do c <- newChan
forkIO $ threadLoop c
...
Now, both the new thread and we have a reference to the channel (c
).
Messages can be both read and written to a channel, corresponding to receiving and sending messages, using the following two functions:
writeChan :: Chan a -> a -> IO ()
readChan :: Chan a -> IO a
Conceptually, a channel is an unbounded queue of messages. Writing to a channel is an asynchronous operation - it immediately and always succeeds. Reading from a channel retrieves the oldest message in the channel. If the channel is empty, reading blocks until a message is available.
Single-reader principle: we adopt the rule that a channel may have
only a single reader, meaning only a single thread is allowed to
call readChan
on any given channel. This is typically the thread
that we created the channel for. This is not enforced by the Haskell
type system, and there are indeed forms of concurrent programming
that are more flexible, but they are outside the scope of this note.
It is perfectly acceptable (and often necessary) for a channel to have multiple writers.
If we call readChan
on a channel where we hold the only reference
(meaning we would in principle wait forever), the Haskell runtime
system will raise an exception that will cause the thread to be
terminated. This is a natural and safe way to shut down a thread that
is no longer necessary, assuming the thread does not hold resources
(e.g., open files) that must be manually closed. Handling such cases is
outside the scope of this note.
Thus, we have the following correspondences between Erlang and Haskell:
-
Sending messages: In Erlang, we send a message (to a process) with the operator
!
. For example,Pid ! Msg
sendsMsg
to the process identified byPid
. In Haskell, we send a message (to a process via a channel) with the functionwriteChan
. -
Receiving messages: In Erlang, the
recieve ... end
-expression is used for receiving messages. It not only waits for a message but also performs pattern matching on the received message to decide on the appropriate action. To do the same in Haskell, we first use thereadChan
function to receive a massage, and then do pattern matching with acase
-expression.
The following Erlang example:
ex1() ->
C = spawn(fun threadLoop/0),
C ! 0,
C ! 1.
threadLoop() ->
Msg = receive N -> N end,
io:format("Got message ~p~n", [Msg]),
threadLoop().
Can thus be translated to the following Haskell code:
ex1 :: IO ()
ex1 = do
c <- newChan
_ <- forkIO $ threadLoop c -- Ignore return value.
writeChan c 0
writeChan c 1
threadLoop :: Chan Int -> IO ()
threadLoop c = do
msg <- readChan c
putStrLn $ "Got integer: " <> show msg
threadLoop c
Just as in Erlang, the Haskell message passing facility is asynchronous. To implement synchronous (RPC) calls, where we wait for a response after sending a message, we need to invent a bit of machinery on top. The way we make it work is by creating a new channel that is used for transmitting the result. This channel is then sent along as part of the message.
The starting point (and always good practice) is to define an explicit type for the messages we would like to send.
data Msg = MsgInc (Chan Int) Int
We then define our thread loop as follows:
threadLoop :: Chan Msg -> IO ()
threadLoop c = do
msg <- readChan c
case msg of
MsgInc reply_chan x ->
writeChan reply_chan (x + 1)
threadLoop c
Given a handle to a channel of type Chan Msg
, we can then send a
message, and wait for a response, as follows:
performRPC :: Chan Msg -> Int -> IO Int
performRPC c x = do
reply_chan <- newChan
writeChan c $ MsgInc reply_chan x
readChan reply_chan
And tying it all together:
ex2 :: IO ()
ex2 = do
c <- newChan
_ <- forkIO $ threadLoop c
print =<< performRPC c 0
print =<< performRPC c 1
Let's consider a more comprehensive Erlang example, where we use a process to create a server that maintains a counter:
counter() -> spawn(fun () -> counter_loop(0) end).
incr(Cid) -> request_reply(Cid, incr).
decr_with(Cid, N) -> request_reply(Cid, {decr, N}).
get_value(Cid) -> request_reply(Cid, get_value).
request_reply(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} -> Response
end.
counter_loop(State) ->
receive
{From, incr} ->
{NewState, Res} = {State + 1, ok},
From ! {self(), Res},
counter_loop(NewState);
{From, {decr, N}} ->
{NewState, Res} = {State - N, ok},
From ! {self(), Res},
counter_loop(NewState);
{From, get_value} ->
{NewState, Res} = {State, {ok, State}},
From ! {self(), Res},
counter_loop(NewState)
end.
To translate this example to Haskell, the first step is to make a type for the messages that will be send to the server. Again, we use the pattern where we make constructor for each kind of message, and the last argument for each constructor is a channel for sending back the response:
data Msg = Incr (Chan ())
| Decr Int (Chan ())
| GetValue (Chan Int)
Here we use the Haskell type ()
(unit) in lieu of the Erlang atom ok
.
Next, we declare a type alias Counter
for representing a counter
server, here just the input channel, and a function for creating a new
counter server:
type Counter = Chan Msg
counter :: IO Counter
counter = do
input <- newChan
_ <- forkIO $ counterLoop input 0
return input
Like in the previous section, we define a function to abstract the communication pattern where we send a message and then wait for an reply:
requestReply :: Counter -> (Chan a -> Msg) -> IO a
requestReply cnt con = do
reply_chan <- newChan
writeChan cnt $ con reply_chan
readChan reply_chan
Note that the second argument of the requesReply
function is a
function that constructs a Msg
value.
Now we can use the requestReply
function to define the three API
functions incr
, decrWith
and getValue
for a counter server:
incr cnt = requestReply cnt Incr
decrWith cnt n = requestReply cnt $ Decr n
getValue cnt = requestReply cnt GetValue
Finally, we define the internal server loop function:
counterLoop input state = do
msg <- readChan input
case msg of
Incr from -> do
let (newState, res) = (state + 1, ())
writeChan from res
counterLoop input newState
Decr n from -> do
let (newState, res) = (state - n, ())
writeChan from res
counterLoop input newState
GetValue from -> do
let (newState, res) = (state, state)
writeChan from res
counterLoop input newState
The channel abstraction does not directly support timeouts for RPC calls. However, we can build our own support for timeouts. The technique we employ is to allow the reply to be either the intended value or a special timeout value. When we perform an RPC, we then also launch a new thread that sleeps for some period of time, then write the timeout value to the channel. If the non-timeout response is the first to arrive, then the timeout value is ignored and harmless.
First we must import the threadDelay
function.
import Control.Concurrent (threadDelay)
Then we define a type Timeout
with a single value Timeout
.
data Timeout = Timeout
Then we define a message type (in this case polymorphic in a
) where
the reply channel accepts messages of type Either Timeout a
.
data Msg a = MsgDoIt (Chan (Either Timeout a)) (IO a)
A Msg a
denotes a request to perform some impure operation IO a
(perhaps a network request), then reply with the resulting value of
type a
.
We can use this to build a facility for performing an action with a timeout:
actionWithTimeout :: Int -> IO a -> IO (Either Timeout a)
actionWithTimeout seconds action = do
reply_chan <- newChan
_ <- forkIO $ do -- worker thread
x <- action
writeChan reply_chan $ Right x
_ <- forkIO $ do -- timeout thread
threadDelay (seconds * 1000000)
writeChan reply_chan $ Left Timeout
readChan reply_chan
You will note that this is not a server in the usual sense, as it does not loop: it simply launches two threads.
One downside of this function is that the worker thread (the one that
runs action
, and might take too long) is not terminated after the
timeout. This is a problem if it is, for example, stuck in an infinite
loop that consumes ever more memory. To fix this, we can have the
timeout thread explicitly kill the worker thread. First we have to
import the killThread
function.
import Control.Concurrent (killThread)
Then we can use it as follows.
actionWithTimeout2 :: Int -> IO a -> IO (Either Timeout a)
actionWithTimeout2 seconds action = do
reply_chan <- newChan
worker_tid <- forkIO $ do
-- worker thread
x <- action
writeChan reply_chan $ Right x
_ <- forkIO $ do
-- timeout thread
threadDelay (seconds * 1000000)
killThread worker_tid
writeChan reply_chan $ Left Timeout
readChan reply_chan
Note that killing a thread is a dangerous operation in general. It may be the case that the worker thread is stuck in some loop or waiting for a network request, in which case it is harmless, but killing it may also leave some shared state in an unspecified state. We will (hopefully) not encounter such cases in AP, but it is something to be aware of in the future.