Skip to content

Commit

Permalink
Migrated async from cats-effect and fs2 to rapid
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 19, 2024
1 parent 7dc73f1 commit 1ac9bc4
Show file tree
Hide file tree
Showing 21 changed files with 283 additions and 297 deletions.
69 changes: 34 additions & 35 deletions all/src/test/scala/spec/AbstractAsyncSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package spec

import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
import fabric.rw._
import lightdb.async.{AsyncCollection, AsyncDatabaseUpgrade, AsyncLightDB, AsyncStoredValue}
import lightdb.collection.Collection
Expand All @@ -13,10 +11,11 @@ import lightdb.{Id, LightDB, Sort, StoredValue}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.{AnyWordSpec, AsyncWordSpec}
import perfolation.double2Implicits
import rapid.Task

import java.nio.file.Path

abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Matchers { spec =>
abstract class AbstractAsyncSpec extends AnyWordSpec with Matchers { spec =>
protected def aggregationSupported: Boolean = true

private val adam = Person("Adam", 21, Person.id("adam"))
Expand Down Expand Up @@ -61,46 +60,46 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat

specName should {
"initialize the database" in {
db.init().map(b => b should be(true))
db.init().map(b => b should be(true)).sync()
}
"verify the database is empty" in {
db.people.transaction { implicit transaction =>
db.people.count.map(c => c should be(0))
}
}.sync()
}
"insert the records" in {
db.people.transaction { implicit transaction =>
db.people.insert(names).map(_ should not be None)
}
}.sync()
}
"retrieve the first record by _id -> id" in {
db.people.transaction { implicit transaction =>
db.people(_._id -> adam._id).map(_ should be(adam))
}
}.sync()
}
"retrieve the first record by id" in {
db.people.transaction { implicit transaction =>
db.people(adam._id).map(_ should be(adam))
}
}.sync()
}
"count the records in the database" in {
db.people.transaction { implicit transaction =>
db.people.count.map(_ should be(26))
}
}.sync()
}
"stream the ids in the database" in {
db.people.transaction { implicit transaction =>
db.people.query.search.id.flatMap(_.stream.compile.toList).map(_.toSet).map { ids =>
db.people.query.search.id.flatMap(_.stream.toList).map(_.toSet).map { ids =>
ids should be(names.map(_._id).toSet)
}
}
}.sync()
}
"stream the records in the database" in {
db.people.transaction { implicit transaction =>
db.people.stream.compile.toList.map(_.map(_.age).toSet).map { ages =>
db.people.stream.toList.map(_.map(_.age).toSet).map { ages =>
ages should be(Set(101, 42, 89, 102, 53, 13, 2, 22, 12, 81, 35, 63, 99, 23, 30, 4, 21, 33, 11, 72, 15, 62))
}
}
}.sync()
}
"query with aggregate functions" in {
if (aggregationSupported) {
Expand All @@ -119,28 +118,28 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat
list.map(m => m(_.age.avg).f(f = 6)).toSet should be(Set("41.807692"))
list.map(m => m(_.age.sum)).toSet should be(Set(1087))
}
}
}.sync()
} else {
succeed
}
}
"search by age range" in {
db.people.transaction { implicit transaction =>
db.people.query.filter(_.age BETWEEN 19 -> 22).search.id.flatMap(_.stream.compile.toList).map { ids =>
db.people.query.filter(_.age BETWEEN 19 -> 22).search.id.flatMap(_.stream.toList).map { ids =>
ids.toSet should be(Set(adam._id, nancy._id, oscar._id, uba._id))
}
}
}.sync()
}
"sort by age" in {
db.people.transaction { implicit transaction =>
db.people.query.sort(Sort.ByField(Person.age).descending).search.docs.flatMap(_.stream.compile.toList).map { people =>
db.people.query.sort(Sort.ByField(Person.age).descending).search.docs.flatMap(_.stream.toList).map { people =>
people.map(_.name).take(3) should be(List("Ruth", "Zoey", "Quintin"))
}
}
}.sync()
}
"group by age" in {
db.people.transaction { implicit transaction =>
db.people.query.grouped(_.age).compile.toList.map { list =>
db.people.query.grouped(_.age).toList.map { list =>
list.map(_._1) should be(List(2, 4, 11, 12, 13, 15, 21, 22, 23, 30, 33, 35, 42, 53, 62, 63, 72, 81, 89, 99, 101, 102))
list.map(_._2.map(_.name).toSet) should be(List(
Set("Penny"), Set("Jenna"), Set("Brenda"), Set("Greg"), Set("Veronica"), Set("Diana"),
Expand All @@ -149,67 +148,67 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat
Set("Quintin"), Set("Zoey"), Set("Ruth")
))
}
}
}.sync()
}
"delete some records" in {
db.people.transaction { implicit transaction =>
for {
b1 <- db.people.delete(_._id -> linda._id)
b2 <- db.people.delete(_._id -> yuri._id)
} yield (b1, b2) should be((true, true))
}
}.sync()
}
"verify the records were deleted" in {
db.people.transaction { implicit transaction =>
db.people.count.map(_ should be(24))
}
}.sync()
}
"modify a record" in {
db.people.transaction { implicit transaction =>
db.people.modify(adam._id) {
case Some(p) => IO(Some(p.copy(name = "Allan")))
case Some(p) => Task(Some(p.copy(name = "Allan")))
case None => fail("Adam was not found!")
}
}.map {
case Some(p) => p.name should be("Allan")
case None => fail("Allan was not returned!")
}
}.sync()
}
"verify the record has been renamed" in {
db.people.transaction { implicit transaction =>
db.people(_._id -> adam._id).map(_.name should be("Allan"))
}
}.sync()
}
"verify start time has been set" in {
db.startTime.get.map(_ should be > 0L)
db.startTime.get.map(_ should be > 0L).sync()
}
"dispose the database before creating a new instance" in {
db.dispose().map(_ should be(true))
db.dispose().map(_ should be(true)).sync()
}
"prepare a new instance" in {
db = new DB
db.init().map(_ should be(true))
db.init().map(_ should be(true)).sync()
}
"query the database to verify records were persisted properly" in {
db.people.transaction { implicit transaction =>
db.people.stream.compile.toList.map(_.map(_.name).toSet).map(_ should be(Set(
db.people.stream.toList.map(_.map(_.name).toSet).map(_ should be(Set(
"Tori", "Ruth", "Nancy", "Jenna", "Hanna", "Wyatt", "Diana", "Ian", "Quintin", "Uba", "Oscar", "Kevin",
"Penny", "Charlie", "Evan", "Sam", "Mike", "Brenda", "Zoey", "Allan", "Xena", "Fiona", "Greg", "Veronica"
)))
}
}.sync()
}
"truncate the collection" in {
db.people.transaction { implicit transaction =>
db.people.truncate().map(_ should be(24))
}
}.sync()
}
"verify the collection is empty" in {
db.people.transaction { implicit transaction =>
db.people.count.map(_ should be(0))
}
}.sync()
}
"dispose the database" in {
db.dispose().map(_ should be(true))
db.dispose().map(_ should be(true)).sync()
}
}

Expand Down Expand Up @@ -248,8 +247,8 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat

override def alwaysRun: Boolean = false

override def upgrade(ldb: AsyncLightDB): IO[Unit] = {
db.startTime.set(System.currentTimeMillis()).void
override def upgrade(ldb: AsyncLightDB): Task[Unit] = {
db.startTime.set(System.currentTimeMillis()).unit
}
}
}
2 changes: 0 additions & 2 deletions all/src/test/scala/spec/AirportSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package spec

import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
import fabric.rw._
import lightdb.collection.Collection
import lightdb.doc.{Document, DocumentModel, JsonConversion}
Expand Down
12 changes: 5 additions & 7 deletions async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package lightdb.async

import cats.effect.IO
import lightdb.aggregate.{AggregateFilter, AggregateFunction, AggregateQuery}
import lightdb.doc.{Document, DocumentModel}
import lightdb.materialized.MaterializedAggregate
import lightdb.transaction.Transaction
import lightdb.{Query, SortDirection}
import rapid.Task

case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](query: Query[Doc, Model],
functions: List[AggregateFunction[_, _, Doc]],
Expand Down Expand Up @@ -45,13 +45,11 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]
sort = sort
)

def count(implicit transaction: Transaction[Doc]): IO[Int] =
IO.blocking(query.store.aggregateCount(aggregateQuery))
def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(query.store.aggregateCount(aggregateQuery))

def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedAggregate[Doc, Model]] = {
val iterator = query.store.aggregate(aggregateQuery)
fs2.Stream.fromBlockingIterator[IO](iterator, 100)
def stream(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = {
rapid.Stream.fromIterator(Task(query.store.aggregate(aggregateQuery)))
}

def toList(implicit transaction: Transaction[Doc]): IO[List[MaterializedAggregate[Doc, Model]]] = stream.compile.toList
def toList(implicit transaction: Transaction[Doc]): Task[List[MaterializedAggregate[Doc, Model]]] = stream.toList
}
84 changes: 43 additions & 41 deletions async/src/main/scala/lightdb/async/AsyncCollection.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package lightdb.async

import cats.effect.IO
import lightdb._
import lightdb.field.Field._
import lightdb.collection.Collection
import lightdb.doc.{Document, DocumentModel}
import lightdb.transaction.Transaction
import rapid.Task

import scala.util.{Failure, Success}

case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](underlying: Collection[Doc, Model]) extends AnyVal {
def transaction[Return](f: Transaction[Doc] => IO[Return]): IO[Return] = {
def transaction[Return](f: Transaction[Doc] => Task[Return]): Task[Return] = {
val transaction = underlying.transaction.create()
f(transaction).guarantee(IO {
f(transaction).guarantee(Task {
underlying.transaction.release(transaction)
})
}
Expand All @@ -20,81 +22,81 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un
*/
def t: AsyncTransactionConvenience[Doc, Model] = AsyncTransactionConvenience(this)

def insert(doc: Doc)(implicit transaction: Transaction[Doc]): IO[Doc] = IO.blocking(underlying.insert(doc))
def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task(underlying.insert(doc))

def insert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): IO[Seq[Doc]] = IO.blocking(underlying.insert(docs))
def insert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Task[Seq[Doc]] = Task(underlying.insert(docs))

def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): IO[Doc] = IO.blocking(underlying.upsert(doc))
def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task(underlying.upsert(doc))

def upsert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): IO[Seq[Doc]] = IO.blocking(underlying.upsert(docs))
def upsert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Task[Seq[Doc]] = Task(underlying.upsert(docs))

def get[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): IO[Option[Doc]] =
IO.blocking(underlying.get(f))
def get[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Option[Doc]] =
Task(underlying.get(f))

def apply[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): IO[Doc] =
IO.blocking(underlying(f))
def apply[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Doc] =
Task(underlying(f))

def get(id: Id[Doc])(implicit transaction: Transaction[Doc]): IO[Option[Doc]] =
IO.blocking(underlying.get(id))
def get(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Option[Doc]] =
Task(underlying.get(id))

def getAll(ids: Seq[Id[Doc]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] =
fs2.Stream.fromBlockingIterator[IO](underlying.getAll(ids), 512)
def getAll(ids: Seq[Id[Doc]])(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] =
rapid.Stream.fromIterator(Task(underlying.getAll(ids)))

def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): IO[Doc] =
IO.blocking(underlying(id))
def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Doc] =
Task(underlying(id))

def withLock(id: Id[Doc], doc: IO[Option[Doc]], establishLock: Boolean = true)
(f: Option[Doc] => IO[Option[Doc]]): IO[Option[Doc]] = if (establishLock) {
def withLock(id: Id[Doc], doc: Task[Option[Doc]], establishLock: Boolean = true)
(f: Option[Doc] => Task[Option[Doc]]): Task[Option[Doc]] = if (establishLock) {
doc.map(d => if (establishLock) underlying.lock.acquire(id, d) else d).flatMap { existing =>
f(existing)
.attempt
.flatMap {
case Left(err) =>
if (establishLock) underlying.lock.release(id, existing)
IO.raiseError(err)
case Right(modified) =>
case Success(modified) =>
if (establishLock) underlying.lock.release(id, modified)
IO.pure(modified)
Task.pure(modified)
case Failure(err) =>
if (establishLock) underlying.lock.release(id, existing)
Task.error(err)
}
}
} else {
doc.flatMap(f)
}

def modify(id: Id[Doc], establishLock: Boolean = true, deleteOnNone: Boolean = false)
(f: Option[Doc] => IO[Option[Doc]])
(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), establishLock) { existing =>
(f: Option[Doc] => Task[Option[Doc]])
(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = withLock(id, get(id), establishLock) { existing =>
f(existing).flatMap {
case Some(doc) => upsert(doc).map(doc => Some(doc))
case None if deleteOnNone => delete(id).map(_ => None)
case None => IO.pure(None)
case None => Task.pure(None)
}
}

def getOrCreate(id: Id[Doc], create: => IO[Doc], lock: Boolean = true)
(implicit transaction: Transaction[Doc]): IO[Doc] = modify(id, establishLock = lock) {
case Some(doc) => IO.pure(Some(doc))
def getOrCreate(id: Id[Doc], create: => Task[Doc], lock: Boolean = true)
(implicit transaction: Transaction[Doc]): Task[Doc] = modify(id, establishLock = lock) {
case Some(doc) => Task.pure(Some(doc))
case None => create.map(Some.apply)
}.map(_.get)

def delete[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): IO[Boolean] =
IO.blocking(underlying.delete(f))
def delete[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Boolean] =
Task(underlying.delete(f))

def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): IO[Boolean] =
IO.blocking(underlying.delete(id))
def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Task[Boolean] =
Task(underlying.delete(id))

def count(implicit transaction: Transaction[Doc]): IO[Int] = IO.blocking(underlying.count)
def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(underlying.count)

def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] = fs2.Stream
.fromBlockingIterator[IO](underlying.iterator, 512)
def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] =
rapid.Stream.fromIterator(Task(underlying.iterator))

def list(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.compile.toList
def list(implicit transaction: Transaction[Doc]): Task[List[Doc]] = stream.toList

def query: AsyncQuery[Doc, Model] = AsyncQuery(this)

def truncate()(implicit transaction: Transaction[Doc]): IO[Int] = IO.blocking(underlying.truncate())
def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = Task(underlying.truncate())

def reIndex(): IO[Boolean] = IO.blocking(underlying.reIndex())
def reIndex(): Task[Boolean] = Task(underlying.reIndex())

def dispose(): IO[Unit] = IO.blocking(underlying.dispose())
def dispose(): Task[Unit] = Task(underlying.dispose())
}
Loading

0 comments on commit 1ac9bc4

Please sign in to comment.