Skip to content

Commit

Permalink
Continued adding of functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Mar 27, 2024
1 parent 058ad24 commit adf0963
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 39 deletions.
29 changes: 24 additions & 5 deletions all/src/test/scala/spec/SimpleSpec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package spec

import cats.effect.IO
import cats.effect.testing.scalatest.AsyncIOSpec
import fabric.rw._
import lighdb.storage.mapdb.SharedMapDBSupport
Expand All @@ -8,6 +9,7 @@ import lightdb.index.lucene._
import lightdb.query._
import lightdb._
import lightdb.store.halo.MultiHaloSupport
import lightdb.upgrade.DatabaseUpgrade
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec

Expand Down Expand Up @@ -67,7 +69,7 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
}
}
"search by name for positive result" in {
db.people.query.filter(Person.name is "Jane Doe").search().compile.toList.map { results =>
db.people.query.filter(Person.name is "Jane Doe").stream().compile.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
Expand All @@ -76,7 +78,7 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
}
}
"search by age for positive result" in {
db.people.query.filter(Person.age is 19).search().compile.toList.map { results =>
db.people.query.filter(Person.age is 19).stream().compile.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
Expand All @@ -85,7 +87,7 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
}
}
"search by id for John" in {
db.people.query.filter(Person._id is id1).search().compile.toList.map { results =>
db.people.query.filter(Person._id is id1).stream().compile.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id1)
Expand All @@ -110,7 +112,7 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
}
}
"list all documents" in {
db.people.query.search().compile.toList.flatMap { results =>
db.people.query.stream().compile.toList.flatMap { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
Expand Down Expand Up @@ -140,14 +142,19 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
db.people.commit()
}
"list new documents" in {
db.people.query.search().compile.toList.map { results =>
db.people.query.stream().compile.toList.map { results =>
results.length should be(1)
val doc = results.head
doc.id should be(id2)
doc(Person.name) should be("Jan Doe")
doc(Person.age) should be(20)
}
}
"verify start time has been set" in {
db.startTime.get().map { startTime =>
startTime should be > 0L
}
}
// TODO: support multiple item types (make sure queries don't return different types)
// TODO: test batch operations: insert, replace, and delete
"dispose" in {
Expand All @@ -158,7 +165,11 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
object db extends LightDB(directory = Some(Paths.get("testdb"))) with LuceneIndexerSupport with MultiHaloSupport {
override protected def autoCommit: Boolean = true

val startTime: StoredValue[Long] = stored[Long]("startTime", -1L)

val people: Collection[Person] = collection("people", Person)

override def upgrades: List[DatabaseUpgrade] = List(InitialSetupUpgrade)
}

case class Person(name: String, age: Int, _id: Id[Person] = Id()) extends Document[Person]
Expand All @@ -169,4 +180,12 @@ class SimpleSpec extends AsyncWordSpec with AsyncIOSpec with Matchers {
val name: FD[String] = field("name", _.name)
val age: FD[Int] = field("age", _.age)
}

object InitialSetupUpgrade extends DatabaseUpgrade {
override def applyToNew: Boolean = true
override def blockStartup: Boolean = true
override def alwaysRun: Boolean = false

override def upgrade(ldb: LightDB): IO[Unit] = db.startTime.set(System.currentTimeMillis()).map(_ => ())
}
}
3 changes: 2 additions & 1 deletion core/shared/src/main/scala/lightdb/LightDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ abstract class LightDB(val directory: Option[Path]) {

def init(truncate: Boolean = false): IO[Unit] = if (_initialized.compareAndSet(false, true)) {
for {
_ <- logger.info(s"LightDB initializing...")
// Truncate the database before we do anything if specified
_ <- this.truncate().whenA(truncate)
// Determine if this is an uninitialized database
Expand All @@ -38,7 +39,7 @@ abstract class LightDB(val directory: Option[Path]) {
applied <- appliedUpgrades.get()
// Determine upgrades that need to be applied
upgrades = this.upgrades.filter(u => u.alwaysRun || !applied.contains(u.label)) match {
case list if !dbInitialized => list.filterNot(_.applyToNew)
case list if !dbInitialized => list.filter(_.applyToNew)
case list => list
}
_ <- (for {
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/index/Indexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ trait Indexer[D <: Document[D]] {

def count(): IO[Long]

def search(query: Query[D]): Stream[IO, SearchResult[D]]
def search(query: Query[D]): IO[SearchResults[D]]

def truncate(): IO[Unit]

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/index/NullIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ case class NullIndexer[D <: Document[D]](collection: Collection[D]) extends Inde

override def count(): IO[Long] = IO.pure(0L)

override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = fs2.Stream.empty
override def search(query: Query[D]): IO[SearchResults[D]] = IO.pure(SearchResults(query, 0, fs2.Stream.empty))

override def truncate(): IO[Unit] = IO.unit

Expand Down
2 changes: 0 additions & 2 deletions core/shared/src/main/scala/lightdb/index/SearchResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import lightdb.query.Query
import lightdb.{Document, Id}

trait SearchResult[D <: Document[D]] {
def query: Query[D]
def total: Long
def id: Id[D]

def get(): IO[D]
Expand Down
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/lightdb/index/SearchResults.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package lightdb.index

import cats.effect.IO
import fs2.Stream
import lightdb.Document
import lightdb.query.Query

case class SearchResults[D <: Document[D]](query: Query[D],
total: Int,
stream: Stream[IO, SearchResult[D]]) {
def documentsStream(): fs2.Stream[IO, D] = stream.evalMap(_.get())
}
6 changes: 4 additions & 2 deletions core/shared/src/main/scala/lightdb/query/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package lightdb.query
import cats.effect.IO
import lightdb.Document
import lightdb.collection.Collection
import lightdb.index.SearchResult
import lightdb.index.{SearchResult, SearchResults}

case class Query[D <: Document[D]](collection: Collection[D],
filter: Option[Filter[D]] = None,
Expand All @@ -17,7 +17,9 @@ case class Query[D <: Document[D]](collection: Collection[D],
def sort(sort: Sort*): Query[D] = copy(sort = this.sort ::: sort.toList)
def limit(limit: Int): Query[D] = copy(limit = limit)
def scoreDocs(b: Boolean = true): Query[D] = copy(scoreDocs = b)
def search(): fs2.Stream[IO, SearchResult[D]] = collection.indexer.search(this)
def search(): IO[SearchResults[D]] = collection.indexer.search(this)
def stream(): fs2.Stream[IO, SearchResult[D]] = fs2.Stream.force(search().map(_.stream))
def documentsStream(): fs2.Stream[IO, D] = stream().evalMap(_.get())

def matches(document: D): Boolean = filter.forall(_.matches(document))
}
32 changes: 16 additions & 16 deletions halo/src/main/scala/lightdb/store/halo/HaloIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@ import cats.effect.IO
import lightdb.{Document, Id}
import lightdb.collection.Collection
import lightdb.field.Field
import lightdb.index.{Indexer, SearchResult}
import lightdb.index.{Indexer, SearchResult, SearchResults}
import lightdb.query.Query

case class HaloIndexer[D <: Document[D]](collection: Collection[D]) extends Indexer[D] {
override def put(value: D): IO[D] = IO.pure(value)
override def delete(id: Id[D]): IO[Unit] = IO.unit
override def commit(): IO[Unit] = IO.unit
override def count(): IO[Long] = collection.store.all().compile.count
override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = collection.store
.all[D]()
.map { t =>
collection.fromArray(t.data)
}
.filter(query.matches)
.map { document =>
val q = query
new SearchResult[D] {
override def query: Query[D] = q
override def total: Long = -1L
override def id: Id[D] = document._id
override def get(): IO[D] = IO.pure(document)
override def apply[F](field: Field[D, F]): F = field.getter(document)
override def search(query: Query[D]): IO[SearchResults[D]] = IO {
val stream = collection.store
.all[D]()
.map { t =>
collection.fromArray(t.data)
}
}
.filter(query.matches)
.map { document =>
new SearchResult[D] {
override def id: Id[D] = document._id
override def get(): IO[D] = IO.pure(document)
override def apply[F](field: Field[D, F]): F = field.getter(document)
}
}
SearchResults(query, -1, stream)
}
override def truncate(): IO[Unit] = IO.unit
override def dispose(): IO[Unit] = IO.unit
}
35 changes: 26 additions & 9 deletions lucene/src/main/scala/lightdb/index/lucene/LuceneIndexer.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package lightdb.index.lucene

import cats.effect.IO
import fabric._
import fabric.define.DefType
import fabric.io.JsonFormatter
import fabric.rw._
import lightdb.collection.Collection
import lightdb.index.{Indexer, SearchResult}
import lightdb.index.{Indexer, SearchResult, SearchResults}
import lightdb.query.{Condition, Filter, Query}
import lightdb.{Document, Id}
import org.apache.lucene.analysis.Analyzer
Expand All @@ -12,15 +15,14 @@ import org.apache.lucene.index.{IndexWriter, IndexWriterConfig, Term}
import org.apache.lucene.queryparser.classic.QueryParser
import org.apache.lucene.search.{BooleanClause, BooleanQuery, IndexSearcher, MatchAllDocsQuery, PhraseQuery, ScoreDoc, SearcherFactory, SearcherManager, Sort, SortField, TermQuery, Query => LuceneQuery}
import org.apache.lucene.store.{ByteBuffersDirectory, FSDirectory}
import org.apache.lucene.document.{Field, IntField, IntRange, LongField, StringField, TextField, Document => LuceneDocument}
import org.apache.lucene.document.{DoubleField, Field, FloatField, IntField, IntRange, LongField, StringField, TextField, Document => LuceneDocument}

import java.nio.file.Path
import scala.collection.immutable.ArraySeq

case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
autoCommit: Boolean = false,
analyzer: Analyzer = new StandardAnalyzer) extends Indexer[D] {
i =>
analyzer: Analyzer = new StandardAnalyzer) extends Indexer[D] { i =>
private var disposed = false
private lazy val path: Option[Path] = collection.db.directory.map(_.resolve(collection.collectionName))
private lazy val directory = path
Expand Down Expand Up @@ -50,8 +52,20 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
case s: String => document.add(new Field(field.name, s, textFieldType))
case i: Int => document.add(new IntField(field.name, i, fieldStore))
case l: Long => document.add(new LongField(field.name, l, fieldStore))
case f: Float => document.add(new FloatField(field.name, f, fieldStore))
case d: Double => document.add(new DoubleField(field.name, d, fieldStore))
case bd: BigDecimal => document.add(new StringField(field.name, bd.asString, fieldStore))
case Some(value) => addField(document, field, value)
case value => throw new RuntimeException(s"Unsupported value: $value (${value.getClass})")
case Str(s, _) => addField(document, field, s) // TODO: Should this store as a StringField instead? Maybe add something to field for tokenize?
case NumDec(bd, _) => addField(document, field, bd)
case NumInt(i, _) => addField(document, field, i)
case obj: Obj =>
val jsonString = JsonFormatter.Compact(obj)
document.add(new StringField(field.name, jsonString, fieldStore))
case null | Null => // Ignore null
case value =>
val json = field.rw.asInstanceOf[RW[Any]].read(value)
addField(document, field, json)
}
}

Expand Down Expand Up @@ -125,13 +139,15 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
b.build()
case i: Int => IntField.newExactQuery(field.name, i)
case id: Id[_] => new TermQuery(new Term(field.name, id.value))
case json: Json => new TermQuery(new Term(field.name, JsonFormatter.Compact(json)))
case _ => throw new UnsupportedOperationException(s"Unsupported: $value (${field.name})")
}
case Filter.NotEquals(field, value) =>
filter2Query(Filter.GroupedFilter(0, List(Filter.Equals(field, value) -> Condition.MustNot)))
case _ => throw new RuntimeException(s"Unsupported filter: $filter")
}

override def search(query: Query[D]): fs2.Stream[IO, SearchResult[D]] = {
override def search(query: Query[D]): IO[SearchResults[D]] = IO {
val q = query.filter.map(filter2Query).getOrElse(new MatchAllDocsQuery)
val sortFields = if (query.sort.isEmpty) {
List(SortField.FIELD_SCORE)
Expand All @@ -150,12 +166,13 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],
// TODO: Offset
val topDocs = indexSearcher.search(q, query.limit, new Sort(sortFields: _*), query.scoreDocs)
val hits = topDocs.scoreDocs
val total = topDocs.totalHits.value
val total = topDocs.totalHits.value.toInt
val storedFields = indexSearcher.storedFields()
fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*)
val stream = fs2.Stream[IO, ScoreDoc](ArraySeq.unsafeWrapArray(hits): _*)
.map { sd =>
LuceneSearchResult(sd, total, collection, query, storedFields)
LuceneSearchResult(sd, collection, storedFields)
}
SearchResults(query, total, stream)
}

override def truncate(): IO[Unit] = IO {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import org.apache.lucene.index.StoredFields
import org.apache.lucene.search.ScoreDoc

case class LuceneSearchResult[D <: Document[D]](scoreDoc: ScoreDoc,
total: Long,
collection: Collection[D],
query: Query[D],
storedFields: StoredFields) extends SearchResult[D] {
private lazy val document = storedFields.document(scoreDoc.doc)
private lazy val doc = collection(id)
Expand Down

0 comments on commit adf0963

Please sign in to comment.