Skip to content

Commit

Permalink
Some tweaks to code and docs (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
odersky authored Mar 12, 2023
1 parent a1dc487 commit 0b9865b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,11 @@ The general interface of a channel is as follows:
trait Channel[T]:
def read()(using Async): T
def send(x: T)(using Async): Unit
def close(): Unit
```
Channels provide

- a `read` method, which might suspend while waiting for a message to arrive,
- a `send` method, which also might suspend in case this is a sync channel or there is some other mechanism that forces a sender to wait,
- a `close` method which closes the channel. Trying to send to a closed channel or to read from it results in a `ChannelClosedException` to be thrown.

### Async Channels

Expand Down Expand Up @@ -409,12 +407,17 @@ A stream represents a sequence of values that are computed one-by-one in a separ
case More(elem: T, rest: Stream[T])
case End extends StreamResult[Nothing]
```
One can see a stream as a static representation of the values that are transmitted over a channel. Indeed, there is an easy way to convert a channel to a stream:
One can see a stream as a static representation of the values that are transmitted over a channel. This poses the question of termination -- when do we know that a channel receives no further values, so the stream can be terminated
with an `StreamResult.End` value? The following implementation shows one
possibility: Here we map a channel of `Try` results to a stream, mapping
failures with a special =`ChannelClosedException` to `StreamResult.End`.
```scala
extension [T](c: Channel[T])
extension [T](c: Channel[Try[T]])
def toStream(using Async.Config): Stream[T] = Future:
try StreamResult.More(read(), toStream)
catch case ex: ChannelClosedException => StreamResult.End
c.read() match
case Success(x) => StreamResult.More(x, toStream)
case Failure(ex: ChannelClosedException) => StreamResult.End
case Failure(ex) => throw ex
```

### Coroutines or Fibers
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object Async:

def group[T](body: Async ?=> T)(using async: Async): T =
val newGroup = CancellationGroup().link()
body(using async.withConfig(async.config.copy(group = newGroup)))
try body(using async.withConfig(async.config.copy(group = newGroup)))
finally newGroup.cancel()

/** A function `T => Boolean` whose lineage is recorded by its implementing
* classes. The Listener function accepts values of type `T` and returns
Expand Down
15 changes: 12 additions & 3 deletions src/main/scala/async/channels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ import scala.collection.mutable, mutable.ListBuffer
import fiberRuntime.boundary, boundary.Label
import fiberRuntime.suspend
import scala.concurrent.ExecutionContext
import scala.util.{Try, Failure}
import Async.{Listener, await}

/** A common interface for channels */
trait Channel[T]:
def read()(using Async): T
def send(x: T)(using Async): Unit
def close(): Unit
protected def shutDown(finalValue: T): Unit

object Channel:

extension [T](c: Channel[Try[T]])
def close(): Unit =
c.shutDown(Failure(ChannelClosedException()))

class ChannelClosedException extends Exception

Expand Down Expand Up @@ -57,8 +64,9 @@ class AsyncChannel[T] extends Async.OriginalSource[T], Channel[T]:
def dropListener(k: Listener[T]): Unit = synchronized:
waiting -= k

def close() =
protected def shutDown(finalValue: T) =
isClosed = true
waiting.foreach(_(finalValue))

end AsyncChannel

Expand Down Expand Up @@ -117,8 +125,9 @@ object SyncChannel:
def dropListener(k: Listener[Listener[T]]): Unit = synchronized:
pendingSends -= k

def close() =
protected def shutDown(finalValue: T) =
isClosed = true
pendingReads.foreach(_(finalValue))

end SyncChannel

Expand Down
9 changes: 6 additions & 3 deletions src/main/scala/async/streams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ enum StreamResult[+T]:

import StreamResult.*

extension [T](c: Channel[T])
extension [T](c: Channel[Try[T]])
def toStream(using Async.Config): Stream[T] = Future:
try StreamResult.More(c.read(), toStream)
catch case ex: ChannelClosedException => StreamResult.End
c.read() match
case Success(x) => StreamResult.More(x, toStream)
case Failure(ex: ChannelClosedException) => StreamResult.End
case Failure(ex) => throw ex

0 comments on commit 0b9865b

Please sign in to comment.