Skip to content

Commit

Permalink
Prototyped direct Lucene
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Mar 24, 2024
1 parent 0324f85 commit 87cf812
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 190 deletions.
4 changes: 2 additions & 2 deletions benchmark/src/main/scala/benchmark/IMDBBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ object IMDBBenchmark { // extends IOApp {
val io = for {
_ <- implementation.init()
_ = scribe.info("--- Stage 1 ---")
akasFile <- downloadFile(new File(baseDirectory, "title.akas.tsv"), Limit.OneMillion).elapsed
akasFile <- downloadFile(new File(baseDirectory, "title.akas.tsv"), Limit.OneHundredThousand).elapsed
_ = scribe.info("--- Stage 2 ---")
totalAka <- process(akasFile.value, implementation.map2TitleAka, implementation.persistTitleAka).elapsed
_ = scribe.info("--- Stage 3 ---")
basicsFile <- downloadFile(new File(baseDirectory, "title.basics.tsv"), Limit.OneMillion).elapsed
basicsFile <- downloadFile(new File(baseDirectory, "title.basics.tsv"), Limit.OneHundredThousand).elapsed
_ = scribe.info("--- Stage 4 ---")
totalBasics <- process(basicsFile.value, implementation.map2TitleBasics, implementation.persistTitleBasics).elapsed
_ = scribe.info("--- Stage 5 ---")
Expand Down
19 changes: 11 additions & 8 deletions benchmark/src/main/scala/benchmark/LightDBImplementation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object LightDBImplementation extends BenchmarkImplementation {
override def map2TitleAka(map: Map[String, String]): TitleAkaLDB = TitleAkaLDB(
titleId = map.value("titleId"),
ordering = map.int("ordering"),
title = map.value("title"),
title = map.value("title").replace("\\N", "N"),
region = map.option("region"),
language = map.option("language"),
types = map.list("types"),
Expand Down Expand Up @@ -57,7 +57,10 @@ object LightDBImplementation extends BenchmarkImplementation {

override def findByTitleId(titleId: String): IO[List[TitleAkaLDB]] = db.titleAka.query.filter(TitleAkaLDB.titleId === titleId).search().compile.toList.flatMap(_.map(_.get()).sequence)

override def flush(): IO[Unit] = db.titleAka.commit()
override def flush(): IO[Unit] = for {
_ <- db.titleAka.commit()
_ <- db.titleBasics.commit()
} yield ()

override def verifyTitleAka(): IO[Unit] = for {
haloCount <- db.titleAka.store.count()
Expand Down Expand Up @@ -86,18 +89,18 @@ object LightDBImplementation extends BenchmarkImplementation {
object TitleAkaLDB extends JsonMapping[TitleAkaLDB] {
override implicit val rw: RW[TitleAkaLDB] = RW.gen

val titleId: FD[String] = field("titleId", _.titleId).indexed()
val ordering: FD[Int] = field("ordering", _.ordering).indexed()
val title: FD[String] = field("title", _.title).indexed()
val titleId: FD[String] = field("titleId", _.titleId)
val ordering: FD[Int] = field("ordering", _.ordering)
val title: FD[String] = field("title", _.title)
}

case class TitleBasicsLDB(tconst: String, titleType: String, primaryTitle: String, originalTitle: String, isAdult: Boolean, startYear: Int, endYear: Int, runtimeMinutes: Int, genres: List[String], _id: Id[TitleBasics]) extends Document[TitleBasics]

object TitleBasicsLDB extends JsonMapping[TitleBasicsLDB] {
override implicit val rw: RW[TitleBasicsLDB] = RW.gen

val tconst: FD[String] = field("tconst", _.tconst).indexed()
val primaryTitle: FD[String] = field("primaryTitle", _.primaryTitle).indexed()
val originalTitle: FD[String] = field("originalTitle", _.originalTitle).indexed()
val tconst: FD[String] = field("tconst", _.tconst)
val primaryTitle: FD[String] = field("primaryTitle", _.primaryTitle)
val originalTitle: FD[String] = field("originalTitle", _.originalTitle)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import fabric.rw._
class JsonDataManager[T: RW] extends DataManager[T] {
override def fromArray(array: Array[Byte]): T = {
val jsonString = new String(array, "UTF-8")
JsonParser(jsonString).as[T]
try {
JsonParser(jsonString).as[T]
} catch {
case t: Throwable => throw new RuntimeException(s"Unable to parse: [$jsonString]", t)
}
}

override def toArray(value: T): Array[Byte] = {
Expand Down
38 changes: 19 additions & 19 deletions lucene/src/main/scala/lightdb/index/lucene/IndexFeature.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package lightdb.index.lucene

import com.outr.lucene4s.Lucene
import com.outr.lucene4s.field.{Field => LuceneField}
import com.outr.lucene4s.field.FieldType
import com.outr.lucene4s.field.value.support.ValueSupport
import lightdb.field.FieldFeature

case class IndexFeature[F](fieldType: FieldType,
fullTextSearchable: Boolean,
sortable: Boolean,
valueSupport: ValueSupport[F]) extends FieldFeature {
def createField(name: String, lucene: Lucene): LuceneField[F] = lucene.create.field[F](
name = name,
fieldType = fieldType,
fullTextSearchable = fullTextSearchable,
sortable = sortable
)(valueSupport)
}
//package lightdb.index.lucene
//
//import com.outr.lucene4s.Lucene
//import com.outr.lucene4s.field.{Field => LuceneField}
//import com.outr.lucene4s.field.FieldType
//import com.outr.lucene4s.field.value.support.ValueSupport
//import lightdb.field.FieldFeature
//
//case class IndexFeature[F](fieldType: FieldType,
// fullTextSearchable: Boolean,
// sortable: Boolean,
// valueSupport: ValueSupport[F]) extends FieldFeature {
// def createField(name: String, lucene: Lucene): LuceneField[F] = lucene.create.field[F](
// name = name,
// fieldType = fieldType,
// fullTextSearchable = fullTextSearchable,
// sortable = sortable
// )(valueSupport)
//}
272 changes: 136 additions & 136 deletions lucene/src/main/scala/lightdb/index/lucene/LuceneIndexer.scala
Original file line number Diff line number Diff line change
@@ -1,136 +1,136 @@
package lightdb.index.lucene

import cats.effect.IO
import com.outr.lucene4s._
import com.outr.lucene4s.field.value.FieldAndValue
import com.outr.lucene4s.field.{FieldType, Field => LuceneField}
import com.outr.lucene4s.query.{MatchAllSearchTerm, SearchTerm, Sort => LuceneSort, SearchResult => L4SSearchResult}
import lightdb.collection.Collection
import lightdb.field.Field
import lightdb.index.{Indexer, SearchResult}
import lightdb.query.{Filter, Query, Sort}
import lightdb.{Document, Id}
import fs2.Stream

case class LuceneIndexer[D <: Document[D]](collection: Collection[D], autoCommit: Boolean = false) extends Indexer[D] { li =>
private val lucene = new DirectLucene(
uniqueFields = List("_id"),
directory = collection.db.directory.map(_.resolve(collection.collectionName)),
defaultFullTextSearchable = true,
autoCommit = autoCommit
)
private var _fields: List[IndexedField[Any]] = Nil
private var fieldsMap: Map[String, IndexedField[Any]] = Map.empty

private[lucene] def fields: List[IndexedField[Any]] = _fields

val _id: IndexedField[Id[D]] = {
collection.mapping.field.get[Id[D]]("_id").getOrElse(throw new RuntimeException("_id field not specified")).indexed(fieldType = FieldType.Untokenized)
field[Id[D]]("_id")
}

object field {
def get[F](name: String): Option[IndexedField[F]] = fieldsMap
.get(name)
.map(_.asInstanceOf[IndexedField[F]])
.orElse {
collection.mapping.field.get[F](name).flatMap { f =>
f
.features
.find(_.isInstanceOf[IndexFeature[_]])
.map(_.asInstanceOf[IndexFeature[F]])
.map { indexFeature =>
val indexedField = IndexedField(indexFeature.createField(name, lucene), f)
li.synchronized {
val aif = indexedField.asInstanceOf[IndexedField[Any]]
_fields = _fields ::: List(aif)
fieldsMap += name -> aif
}
indexedField
}
}
}
def apply[F](name: String): IndexedField[F] = get[F](name).getOrElse(throw new RuntimeException(s"Field not defined: $name"))
}

override def put(value: D): IO[D] = IO {
val fields = collection.mapping.fields.flatMap(f => field.get[Any](f.name))
if (fields.nonEmpty && fields.tail.nonEmpty) { // No need to index if _id is the only field
val fieldsAndValues = fields.map(_.fieldAndValue(value))
lucene
.doc()
.update(exact(_id.luceneField(value._id)))
.fields(fieldsAndValues: _*)
.index()
}
value
}

override def delete(id: Id[D]): IO[Unit] = IO(lucene.delete(exact(_id.luceneField(id))))

override def commit(): IO[Unit] = IO {
lucene.commit()
}

override def count(): IO[Long] = IO {
lucene.count()
}

private[lucene] def indexed[F](luceneField: LuceneField[F], field: Field[D, F]): Unit = {
IndexedField[F](luceneField, field)
}

private def filter2Lucene(filter: Filter[D]): SearchTerm = {
def fieldAndValue(field: Field[D, Any], value: Any): FieldAndValue[Any] = this.field[Any](field.name).luceneField(value)
filter match {
case Filter.Equals(field, value) =>
val fv = fieldAndValue(field.asInstanceOf[Field[D, Any]], value)
exact(fv)
case Filter.NotEquals(field, value) =>
val fv = fieldAndValue(field.asInstanceOf[Field[D, Any]], value)
none(exact(fv))
case Filter.Includes(field, values) =>
val terms = values.map(v => filter2Lucene(Filter.Equals(field.asInstanceOf[Field[D, Any]], v)))
any(terms: _*)
case Filter.Excludes(field, values) =>
val terms = values.map(v => filter2Lucene(Filter.Equals(field.asInstanceOf[Field[D, Any]], v)))
none(terms: _*)
}
}

override def search(query: Query[D]): Stream[IO, SearchResult[D]] = {
var q = lucene.query().offset(query.offset).limit(query.batchSize)
q = query.filters.foldLeft(q)((qb, f) => q.filter(filter2Lucene(f)))
q = q.sort(query.sort.map {
case Sort.BestMatch => LuceneSort.Score
case Sort.IndexOrder => LuceneSort.IndexOrder
case Sort.ByField(field, reverse) => LuceneSort(this.field[Any](field.name).luceneField, reverse)
}: _*)

val pagedResults = q.search()
val pagedResultsIterator = pagedResults.pagedResultsIterator
Stream.fromBlockingIterator[IO](pagedResultsIterator, query.batchSize)
.map(result => LuceneSearchResult(query, pagedResults.total, result))
}

override def truncate(): IO[Unit] = IO(lucene.delete(MatchAllSearchTerm))

override def dispose(): IO[Unit] = IO(lucene.dispose())

case class IndexedField[F](luceneField: LuceneField[F], field: Field[D, F]) {
def fieldAndValue(value: D): FieldAndValue[F] = luceneField(field.getter(value))
}

case class LuceneSearchResult(query: Query[D],
total: Long,
result: L4SSearchResult) extends SearchResult[D] {
override lazy val id: Id[D] = result(_id.luceneField)

override def get(): IO[D] = collection(id)

override def apply[F](field: Field[D, F]): F = {
val indexedField = fields.find(_.field.name == field.name).getOrElse(throw new RuntimeException(s"Unable to find indexed field for: ${field.name}"))
result(indexedField.luceneField).asInstanceOf[F]
}
}
}
//package lightdb.index.lucene
//
//import cats.effect.IO
//import com.outr.lucene4s._
//import com.outr.lucene4s.field.value.FieldAndValue
//import com.outr.lucene4s.field.{FieldType, Field => LuceneField}
//import com.outr.lucene4s.query.{MatchAllSearchTerm, SearchTerm, Sort => LuceneSort, SearchResult => L4SSearchResult}
//import lightdb.collection.Collection
//import lightdb.field.Field
//import lightdb.index.{Indexer, SearchResult}
//import lightdb.query.{Filter, Query, Sort}
//import lightdb.{Document, Id}
//import fs2.Stream
//
//case class LuceneIndexer[D <: Document[D]](collection: Collection[D], autoCommit: Boolean = false) extends Indexer[D] { li =>
// private val lucene = new DirectLucene(
// uniqueFields = List("_id"),
// directory = collection.db.directory.map(_.resolve(collection.collectionName)),
// defaultFullTextSearchable = true,
// autoCommit = autoCommit
// )
// private var _fields: List[IndexedField[Any]] = Nil
// private var fieldsMap: Map[String, IndexedField[Any]] = Map.empty
//
// private[lucene] def fields: List[IndexedField[Any]] = _fields
//
// val _id: IndexedField[Id[D]] = {
// collection.mapping.field.get[Id[D]]("_id").getOrElse(throw new RuntimeException("_id field not specified")).indexed(fieldType = FieldType.Untokenized)
// field[Id[D]]("_id")
// }
//
// object field {
// def get[F](name: String): Option[IndexedField[F]] = fieldsMap
// .get(name)
// .map(_.asInstanceOf[IndexedField[F]])
// .orElse {
// collection.mapping.field.get[F](name).flatMap { f =>
// f
// .features
// .find(_.isInstanceOf[IndexFeature[_]])
// .map(_.asInstanceOf[IndexFeature[F]])
// .map { indexFeature =>
// val indexedField = IndexedField(indexFeature.createField(name, lucene), f)
// li.synchronized {
// val aif = indexedField.asInstanceOf[IndexedField[Any]]
// _fields = _fields ::: List(aif)
// fieldsMap += name -> aif
// }
// indexedField
// }
// }
// }
// def apply[F](name: String): IndexedField[F] = get[F](name).getOrElse(throw new RuntimeException(s"Field not defined: $name"))
// }
//
// override def put(value: D): IO[D] = IO {
// val fields = collection.mapping.fields.flatMap(f => field.get[Any](f.name))
// if (fields.nonEmpty && fields.tail.nonEmpty) { // No need to index if _id is the only field
// val fieldsAndValues = fields.map(_.fieldAndValue(value))
// lucene
// .doc()
// .update(exact(_id.luceneField(value._id)))
// .fields(fieldsAndValues: _*)
// .index()
// }
// value
// }
//
// override def delete(id: Id[D]): IO[Unit] = IO(lucene.delete(exact(_id.luceneField(id))))
//
// override def commit(): IO[Unit] = IO {
// lucene.commit()
// }
//
// override def count(): IO[Long] = IO {
// lucene.count()
// }
//
// private[lucene] def indexed[F](luceneField: LuceneField[F], field: Field[D, F]): Unit = {
// IndexedField[F](luceneField, field)
// }
//
// private def filter2Lucene(filter: Filter[D]): SearchTerm = {
// def fieldAndValue(field: Field[D, Any], value: Any): FieldAndValue[Any] = this.field[Any](field.name).luceneField(value)
// filter match {
// case Filter.Equals(field, value) =>
// val fv = fieldAndValue(field.asInstanceOf[Field[D, Any]], value)
// exact(fv)
// case Filter.NotEquals(field, value) =>
// val fv = fieldAndValue(field.asInstanceOf[Field[D, Any]], value)
// none(exact(fv))
// case Filter.Includes(field, values) =>
// val terms = values.map(v => filter2Lucene(Filter.Equals(field.asInstanceOf[Field[D, Any]], v)))
// any(terms: _*)
// case Filter.Excludes(field, values) =>
// val terms = values.map(v => filter2Lucene(Filter.Equals(field.asInstanceOf[Field[D, Any]], v)))
// none(terms: _*)
// }
// }
//
// override def search(query: Query[D]): Stream[IO, SearchResult[D]] = {
// var q = lucene.query().offset(query.offset).limit(query.batchSize)
// q = query.filters.foldLeft(q)((qb, f) => q.filter(filter2Lucene(f)))
// q = q.sort(query.sort.map {
// case Sort.BestMatch => LuceneSort.Score
// case Sort.IndexOrder => LuceneSort.IndexOrder
// case Sort.ByField(field, reverse) => LuceneSort(this.field[Any](field.name).luceneField, reverse)
// }: _*)
//
// val pagedResults = q.search()
// val pagedResultsIterator = pagedResults.pagedResultsIterator
// Stream.fromBlockingIterator[IO](pagedResultsIterator, query.batchSize)
// .map(result => LuceneSearchResult(query, pagedResults.total, result))
// }
//
// override def truncate(): IO[Unit] = IO(lucene.delete(MatchAllSearchTerm))
//
// override def dispose(): IO[Unit] = IO(lucene.dispose())
//
// case class IndexedField[F](luceneField: LuceneField[F], field: Field[D, F]) {
// def fieldAndValue(value: D): FieldAndValue[F] = luceneField(field.getter(value))
// }
//
// case class LuceneSearchResult(query: Query[D],
// total: Long,
// result: L4SSearchResult) extends SearchResult[D] {
// override lazy val id: Id[D] = result(_id.luceneField)
//
// override def get(): IO[D] = collection(id)
//
// override def apply[F](field: Field[D, F]): F = {
// val indexedField = fields.find(_.field.name == field.name).getOrElse(throw new RuntimeException(s"Unable to find indexed field for: ${field.name}"))
// result(indexedField.luceneField).asInstanceOf[F]
// }
// }
//}
Loading

0 comments on commit 87cf812

Please sign in to comment.