diff --git a/all/src/test/scala/spec/SimpleSpec.scala b/all/src/test/scala/spec/SimpleSpec.scala index 4334c54e..8dcf8bfd 100644 --- a/all/src/test/scala/spec/SimpleSpec.scala +++ b/all/src/test/scala/spec/SimpleSpec.scala @@ -6,8 +6,7 @@ import lighdb.storage.mapdb.SharedMapDBSupport import lightdb.collection.Collection import lightdb.index.lucene._ import lightdb.query._ -import lightdb.store.halo.{MultiHaloSupport, SharedHaloSupport} -import lightdb.{Document, Id, JsonMapping, LightDB} +import lightdb._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec @@ -69,13 +68,13 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers { db.people.store.all[Person]() .compile .toList - .map(_.map(_._1)) + .map(_.map(_.id)) .map { ids => ids.toSet should be(Set(id1, id2)) } } "search by name for positive result" in { - db.people.query.filter(Person.name === "Jane Doe").search().compile.toList.map { results => + db.people.query.filter(Person.name is "Jane Doe").search().compile.toList.map { results => results.length should be(1) val doc = results.head doc.id should be(id2) diff --git a/benchmark/src/main/scala/benchmark/IMDBBenchmark.scala b/benchmark/src/main/scala/benchmark/IMDBBenchmark.scala index 45bda5df..99916b85 100644 --- a/benchmark/src/main/scala/benchmark/IMDBBenchmark.scala +++ b/benchmark/src/main/scala/benchmark/IMDBBenchmark.scala @@ -78,11 +78,11 @@ object IMDBBenchmark { // extends IOApp { val io = for { _ <- implementation.init() _ = scribe.info("--- Stage 1 ---") - akasFile <- downloadFile(new File(baseDirectory, "title.akas.tsv"), Limit.OneMillion).elapsed + akasFile <- downloadFile(new File(baseDirectory, "title.akas.tsv"), Limit.Unlimited).elapsed _ = scribe.info("--- Stage 2 ---") totalAka <- process(akasFile.value, implementation.map2TitleAka, implementation.persistTitleAka).elapsed _ = scribe.info("--- Stage 3 ---") - basicsFile <- downloadFile(new File(baseDirectory, "title.basics.tsv"), Limit.OneMillion).elapsed + basicsFile <- downloadFile(new File(baseDirectory, "title.basics.tsv"), Limit.Unlimited).elapsed _ = scribe.info("--- Stage 4 ---") totalBasics <- process(basicsFile.value, implementation.map2TitleBasics, implementation.persistTitleBasics).elapsed _ = scribe.info("--- Stage 5 ---") diff --git a/build.sbt b/build.sbt index 281be6b0..d5f251f3 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ // Scala versions val scala213 = "2.13.13" -val scala3 = "3.3.1" +val scala3 = "3.3.3" val scala2 = List(scala213) val allScalaVersions = scala3 :: scala2 @@ -15,7 +15,7 @@ val developerURL: String = "https://matthicks.com" name := projectName ThisBuild / organization := org -ThisBuild / version := "0.2.0-SNAPSHOT1" +ThisBuild / version := "0.3.0-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := allScalaVersions ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation") @@ -45,7 +45,7 @@ val collectionCompatVersion: String = "2.11.0" val haloDBVersion: String = "v0.5.6" val catsEffectVersion: String = "3.5.4" val fabricVersion: String = "1.14.1" -val fs2Version: String = "3.10.0" +val fs2Version: String = "3.10.1" val scribeVersion: String = "3.13.2" val luceneVersion: String = "9.10.0" diff --git a/core/shared/src/main/scala/lightdb/collection/Collection.scala b/core/shared/src/main/scala/lightdb/collection/Collection.scala index 52b326d7..d5811bde 100644 --- a/core/shared/src/main/scala/lightdb/collection/Collection.scala +++ b/core/shared/src/main/scala/lightdb/collection/Collection.scala @@ -29,7 +29,7 @@ case class Collection[D <: Document[D]](db: LightDB, mapping: ObjectMapping[D], def all(chunkSize: Int = 512, maxConcurrent: Int = 16): fs2.Stream[IO, D] = store .all[D](chunkSize) - .mapAsync(maxConcurrent)(t => IO(dataManager.fromArray(t._2))) + .mapAsync(maxConcurrent)(t => IO(dataManager.fromArray(t.data))) def modify(id: Id[D])(f: Option[D] => Option[D]): IO[Option[D]] = for { result <- data.modify(id)(f) diff --git a/core/shared/src/main/scala/lightdb/query/package.scala b/core/shared/src/main/scala/lightdb/query/package.scala index 5d2d89fb..2fd12966 100644 --- a/core/shared/src/main/scala/lightdb/query/package.scala +++ b/core/shared/src/main/scala/lightdb/query/package.scala @@ -5,6 +5,7 @@ import lightdb.field.Field package object query { implicit class FieldQueryExtras[D <: Document[D], F](val field: Field[D, F]) extends AnyVal { def ===(value: F): Filter[D] = Filter.Equals(field, value) + def is(value: F): Filter[D] = Filter.Equals(field, value) def includes(values: Seq[F]): Filter[D] = Filter.Includes(field, values) def excludes(values: Seq[F]): Filter[D] = Filter.Excludes(field, values) } diff --git a/core/shared/src/main/scala/lightdb/store/MapStore.scala b/core/shared/src/main/scala/lightdb/store/MapStore.scala index edc405ce..87c1675b 100644 --- a/core/shared/src/main/scala/lightdb/store/MapStore.scala +++ b/core/shared/src/main/scala/lightdb/store/MapStore.scala @@ -7,10 +7,10 @@ import lightdb.Id class MapStore extends ObjectStore { private var map = Map.empty[Id[_], Array[Byte]] - override def all[T](chunkSize: Int = 512): fs2.Stream[IO, (Id[T], Array[Byte])] = fs2.Stream + override def all[T](chunkSize: Int = 512): fs2.Stream[IO, ObjectData[T]] = fs2.Stream .fromBlockingIterator[IO](map.iterator, chunkSize) .map { - case (id, value) => id.asInstanceOf[Id[T]] -> value + case (id, data) => ObjectData(id.asInstanceOf[Id[T]], data) } override def get[T](id: Id[T]): IO[Option[Array[Byte]]] = IO.pure(map.get(id)) diff --git a/core/shared/src/main/scala/lightdb/store/NullStore.scala b/core/shared/src/main/scala/lightdb/store/NullStore.scala index 5a23a2c7..5fb9b207 100644 --- a/core/shared/src/main/scala/lightdb/store/NullStore.scala +++ b/core/shared/src/main/scala/lightdb/store/NullStore.scala @@ -14,7 +14,7 @@ object NullStore extends ObjectStore { override def count(): IO[Long] = IO.pure(0L) - override def all[T](chunkSize: Int): fs2.Stream[IO, (Id[T], Array[Byte])] = fs2.Stream.empty + override def all[T](chunkSize: Int): fs2.Stream[IO, ObjectData[T]] = fs2.Stream.empty override def commit(): IO[Unit] = IO.unit diff --git a/core/shared/src/main/scala/lightdb/store/ObjectData.scala b/core/shared/src/main/scala/lightdb/store/ObjectData.scala new file mode 100644 index 00000000..3b222800 --- /dev/null +++ b/core/shared/src/main/scala/lightdb/store/ObjectData.scala @@ -0,0 +1,5 @@ +package lightdb.store + +import lightdb.Id + +case class ObjectData[T](id: Id[T], data: Array[Byte]) diff --git a/core/shared/src/main/scala/lightdb/store/ObjectStore.scala b/core/shared/src/main/scala/lightdb/store/ObjectStore.scala index d4c0baa0..d3fa75b7 100644 --- a/core/shared/src/main/scala/lightdb/store/ObjectStore.scala +++ b/core/shared/src/main/scala/lightdb/store/ObjectStore.scala @@ -29,7 +29,7 @@ trait ObjectStore { def count(): IO[Long] - def all[T](chunkSize: Int = 512): Stream[IO, (Id[T], Array[Byte])] + def all[T](chunkSize: Int = 512): Stream[IO, ObjectData[T]] def commit(): IO[Unit] diff --git a/halo/src/main/scala/lightdb/store/halo/HaloIndexer.scala b/halo/src/main/scala/lightdb/store/halo/HaloIndexer.scala index f86af601..a103ab14 100644 --- a/halo/src/main/scala/lightdb/store/halo/HaloIndexer.scala +++ b/halo/src/main/scala/lightdb/store/halo/HaloIndexer.scala @@ -13,9 +13,9 @@ case class HaloIndexer[D <: Document[D]](collection: Collection[D]) extends Inde override def commit(): IO[Unit] = IO.unit override def count(): IO[Long] = collection.store.all().compile.count override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = collection.store - .all() - .map[D] { t: (Id[D], Array[Byte]) => - collection.fromArray(t._2) + .all[D]() + .map { t => + collection.fromArray(t.data) } .filter(query.matches) .map { document => diff --git a/halo/src/main/scala/lightdb/store/halo/HaloStore.scala b/halo/src/main/scala/lightdb/store/halo/HaloStore.scala index ddbb4274..5d59daf3 100644 --- a/halo/src/main/scala/lightdb/store/halo/HaloStore.scala +++ b/halo/src/main/scala/lightdb/store/halo/HaloStore.scala @@ -3,7 +3,7 @@ package lightdb.store.halo import cats.effect.IO import fs2.Stream import com.oath.halodb.{HaloDB, HaloDBOptions} -import lightdb.store.ObjectStore +import lightdb.store.{ObjectData, ObjectStore} import lightdb.Id import java.nio.file.{Files, Path} @@ -32,9 +32,9 @@ case class HaloStore(directory: Path, indexThreads: Int = 2, maxFileSize: Int = } } - override def all[T](chunkSize: Int = 512): Stream[IO, (Id[T], Array[Byte])] = Stream + override def all[T](chunkSize: Int = 512): Stream[IO, ObjectData[T]] = Stream .fromBlockingIterator[IO](halo.newIterator().asScala, chunkSize) - .map(r => Id[T](new String(r.getKey, "UTF-8")) -> r.getValue) + .map(r => ObjectData(Id[T](new String(r.getKey, "UTF-8")), r.getValue)) override def get[T](id: Id[T]): IO[Option[Array[Byte]]] = IO { halo.newIterator() @@ -58,6 +58,6 @@ case class HaloStore(directory: Path, indexThreads: Int = 2, maxFileSize: Int = override def dispose(): IO[Unit] = IO(halo.close()) override def truncate(): IO[Unit] = all[Any]().evalMap { - case (id, _) => delete(id) + case ObjectData(id, _) => delete(id) }.compile.drain } \ No newline at end of file diff --git a/mapdb/src/main/scala/lighdb/storage/mapdb/MapDBStore.scala b/mapdb/src/main/scala/lighdb/storage/mapdb/MapDBStore.scala index 8e183673..868aa311 100644 --- a/mapdb/src/main/scala/lighdb/storage/mapdb/MapDBStore.scala +++ b/mapdb/src/main/scala/lighdb/storage/mapdb/MapDBStore.scala @@ -3,7 +3,7 @@ package lighdb.storage.mapdb import cats.effect.IO import lightdb.collection.Collection import lightdb.{Document, Id, LightDB} -import lightdb.store.{ObjectStore, ObjectStoreSupport} +import lightdb.store.{ObjectData, ObjectStore, ObjectStoreSupport} import org.eclipse.collections.impl.map.mutable.ConcurrentHashMap import org.mapdb.{DB, DBMaker, DataInput2, DataOutput2, HTreeMap, Serializer} @@ -30,12 +30,12 @@ case class MapDBStore(directory: Option[Path]) extends ObjectStore { override def count(): IO[Long] = IO(map.size()) - override def all[T](chunkSize: Int): fs2.Stream[IO, (Id[T], Array[Byte])] = fs2.Stream + override def all[T](chunkSize: Int): fs2.Stream[IO, ObjectData[T]] = fs2.Stream .fromBlockingIterator[IO](map.entrySet().iterator().asScala, chunkSize) // TODO: figure out how to get an iterator that doesn't load everything into memory .map { pair => val id = Id[T](pair.getKey) val value = pair.getValue - id -> value + ObjectData(id, value) } override def commit(): IO[Unit] = IO(db.commit())