Skip to content

Commit

Permalink
Merge pull request #1960 from kimxogus/release/6.7.x_rest-total-hits-…
Browse files Browse the repository at this point in the history
…as-int

[6.7.x] add rest_total_hits_as_int param
  • Loading branch information
sksamuel authored Dec 15, 2019
2 parents 4f88a95 + 1109239 commit 9566859
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package com.sksamuel.elastic4s.searches

import com.sksamuel.exts.OptionImplicits._

case class MultiSearchRequest(searches: Iterable[SearchRequest], maxConcurrentSearches: Option[Int] = None, typedKeys: Option[Boolean] = None) {
case class MultiSearchRequest(searches: Iterable[SearchRequest],
maxConcurrentSearches: Option[Int] = None,
typedKeys: Option[Boolean] = None,
restTotalHitsAsInt: Option[Boolean] = None) {
def maxConcurrentSearches(max: Int): MultiSearchRequest = copy(maxConcurrentSearches = max.some)

def typedKeys(enabled: Boolean): MultiSearchRequest = copy(typedKeys = enabled.some)

def restTotalHitsAsInt(restTotalHitsAsInt: Boolean): MultiSearchRequest = copy(restTotalHitsAsInt = restTotalHitsAsInt.some)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ trait ScrollApi {
def clearScroll(ids: Iterable[String]): ClearScrollRequest = ClearScrollRequest(ids.toSeq)
}

case class SearchScrollRequest(id: String, keepAlive: Option[String] = None) {
case class SearchScrollRequest(id: String, keepAlive: Option[String] = None, restTotalHitsAsInt: Option[Boolean] = None) {

def keepAlive(keepAlive: String): SearchScrollRequest = copy(keepAlive = keepAlive.some)
def keepAlive(duration: FiniteDuration): SearchScrollRequest = copy(keepAlive = Some(s"${duration.toSeconds}s"))

def restTotalHitsAsInt(restTotalHitsAsInt: Boolean): SearchScrollRequest = copy(restTotalHitsAsInt = restTotalHitsAsInt.some)
}

case class ClearScrollRequest(ids: Seq[String])
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ case class SearchRequest(indexesTypes: IndexesAndTypes,
profile: Option[Boolean] = None,
source: Option[String] = None,
trackHits: Option[Boolean] = None,
typedKeys: Option[Boolean] = None) {
typedKeys: Option[Boolean] = None,
restTotalHitsAsInt: Option[Boolean] = None) {

/** Adds a single string query to this search
*
Expand Down Expand Up @@ -300,4 +301,6 @@ case class SearchRequest(indexesTypes: IndexesAndTypes,
def collapse(collapse: CollapseRequest): SearchRequest = copy(collapse = collapse.some)

def typedKeys(enabled: Boolean): SearchRequest = copy(typedKeys = enabled.some)

def restTotalHitsAsInt(restTotalHitsAsInt: Boolean): SearchRequest = copy(restTotalHitsAsInt = restTotalHitsAsInt.some)
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_
logger.debug(
s"Request for $n items, but only ${queue.size} available; sending ${queue.size} now, requesting $toRequest from upstream"
)

Option(scrollId) match {
case None => client.execute(query).onComplete(result => self ! result)
case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result)
case Some(id) => client.execute(searchScroll(id).keepAlive(keepAlive).copy(restTotalHitsAsInt = query.restTotalHitsAsInt)).onComplete(result => self ! result)
}
// we switch state while we're waiting on elasticsearch, so we know not to send another request to ES
// because we are using a scroll and can only have one active request at at time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ScrollPublisherIntegrationTest extends WordSpec with DockerTests with Matc
"elastic-streams" should {
"publish all data from the index" in {

val publisher = client.publisher(search(indexName) query "*:*" scroll "1m")
val publisher = client.publisher(search(indexName) query "*:*" scroll "1m" restTotalHitsAsInt true)

val completionLatch = new CountDownLatch(1)
val documentLatch = new CountDownLatch(emperors.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class ScrollPublisherUnitTest extends WordSpec with Matchers with DockerTests {
"elastic-streams" should {
"throw exception if search definition has no scroll" in {
an [IllegalArgumentException] should be thrownBy
client.publisher(search("scrollpubint") query "*:*")
client.publisher(search("scrollpubint") query "*:*" restTotalHitsAsInt true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ScrollPublisherVerificationTest
).refreshImmediately
}.await

private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2)
private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2).restTotalHitsAsInt(true)

override def boundedDepthOfOnNextAndRequestRecursion: Long = 2l

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ trait SearchHandlers {
val params = scala.collection.mutable.Map.empty[String, String]
request.maxConcurrentSearches.map(_.toString).foreach(params.put("max_concurrent_searches", _))
request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _))
request.restTotalHitsAsInt.map(_.toString).foreach(params.put("rest_total_hits_as_int", _))

val body = MultiSearchBuilderFn(request)
logger.debug("Executing msearch: " + body)
Expand Down Expand Up @@ -88,6 +89,8 @@ trait SearchHandlers {

request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _))

request.restTotalHitsAsInt.map(_.toString).foreach(params.put("rest_total_hits_as_int", _))

val body = request.source.getOrElse(SearchBodyBuilderFn(request).string())
ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, ContentType.APPLICATION_JSON.getMimeType))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ trait SearchScrollHandlers {

override def build(req: SearchScrollRequest): ElasticRequest = {

val params = scala.collection.mutable.Map.empty[String, String]
req.restTotalHitsAsInt.map(_.toString).foreach(params.put("rest_total_hits_as_int", _))

val body = SearchScrollBuilderFn(req).string()
logger.debug("Executing search scroll: " + body)
val entity = HttpEntity(body, ContentType.APPLICATION_JSON.getMimeType)

ElasticRequest("POST", "/_search/scroll", entity)
ElasticRequest("POST", "/_search/scroll", params.toMap, entity)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ trait ElasticSugar extends ElasticDsl {
blockUntil(s"Expected count of $expected") { () =>
val result = client
.execute {
search(index).matchAllQuery().size(0)
search(index).matchAllQuery().size(0).restTotalHitsAsInt(true)
}
.await
.result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ trait IndexMatchers extends Matchers {
new Matcher[String] {

def apply(left: String): MatchResult = {
val count = client.execute(search(left).size(0)).await(timeout).result.totalHits
val count = client.execute(search(left).size(0).restTotalHitsAsInt(true)).await(timeout).result.totalHits
MatchResult(
count == expectedCount,
s"Index $left had count $count but expected $expectedCount",
Expand Down Expand Up @@ -55,7 +55,7 @@ trait IndexMatchers extends Matchers {
new Matcher[String] {

override def apply(left: String): MatchResult = {
val count = client.execute(search(left).size(0)).await(timeout).result.totalHits
val count = client.execute(search(left).size(0).restTotalHitsAsInt(true)).await(timeout).result.totalHits
MatchResult(
count == 0,
s"Index $left was not empty",
Expand Down

0 comments on commit 9566859

Please sign in to comment.