From 04b7fcaeda0971ab61667ca9c21de8f2a54536b1 Mon Sep 17 00:00:00 2001 From: Taehyun Kim Date: Sat, 14 Dec 2019 02:19:45 +0900 Subject: [PATCH 1/2] add rest_total_hits_as_int param --- .../sksamuel/elastic4s/searches/MultiSearchRequest.scala | 7 ++++++- .../com/sksamuel/elastic4s/searches/SearchRequest.scala | 5 ++++- .../sksamuel/elastic4s/http/search/SearchHandlers.scala | 3 +++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/MultiSearchRequest.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/MultiSearchRequest.scala index 0afd090cf..18dcf6a32 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/MultiSearchRequest.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/MultiSearchRequest.scala @@ -2,8 +2,13 @@ package com.sksamuel.elastic4s.searches import com.sksamuel.exts.OptionImplicits._ -case class MultiSearchRequest(searches: Iterable[SearchRequest], maxConcurrentSearches: Option[Int] = None, typedKeys: Option[Boolean] = None) { +case class MultiSearchRequest(searches: Iterable[SearchRequest], + maxConcurrentSearches: Option[Int] = None, + typedKeys: Option[Boolean] = None, + restTotalHitsAsInt: Option[Boolean] = None) { def maxConcurrentSearches(max: Int): MultiSearchRequest = copy(maxConcurrentSearches = max.some) def typedKeys(enabled: Boolean): MultiSearchRequest = copy(typedKeys = enabled.some) + + def restTotalHitsAsInt(restTotalHitsAsInt: Boolean): MultiSearchRequest = copy(restTotalHitsAsInt = restTotalHitsAsInt.some) } diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/SearchRequest.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/SearchRequest.scala index f42727200..0fae74d93 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/SearchRequest.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/SearchRequest.scala @@ -55,7 +55,8 @@ case class SearchRequest(indexesTypes: IndexesAndTypes, profile: Option[Boolean] = None, source: Option[String] = None, trackHits: Option[Boolean] = None, - typedKeys: Option[Boolean] = None) { + typedKeys: Option[Boolean] = None, + restTotalHitsAsInt: Option[Boolean] = None) { /** Adds a single string query to this search * @@ -300,4 +301,6 @@ case class SearchRequest(indexesTypes: IndexesAndTypes, def collapse(collapse: CollapseRequest): SearchRequest = copy(collapse = collapse.some) def typedKeys(enabled: Boolean): SearchRequest = copy(typedKeys = enabled.some) + + def restTotalHitsAsInt(restTotalHitsAsInt: Boolean): SearchRequest = copy(restTotalHitsAsInt = restTotalHitsAsInt.some) } diff --git a/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchHandlers.scala b/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchHandlers.scala index a65609091..cd2f1a9dc 100644 --- a/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchHandlers.scala +++ b/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchHandlers.scala @@ -46,6 +46,7 @@ trait SearchHandlers { val params = scala.collection.mutable.Map.empty[String, String] request.maxConcurrentSearches.map(_.toString).foreach(params.put("max_concurrent_searches", _)) request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _)) + request.restTotalHitsAsInt.map(_.toString).foreach(params.put("rest_total_hits_as_int", _)) val body = MultiSearchBuilderFn(request) logger.debug("Executing msearch: " + body) @@ -88,6 +89,8 @@ trait SearchHandlers { request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _)) + request.restTotalHitsAsInt.map(_.toString).foreach(params.put("rest_total_hits_as_int", _)) + val body = request.source.getOrElse(SearchBodyBuilderFn(request).string()) ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, ContentType.APPLICATION_JSON.getMimeType)) } From 11092391fffdd9e951f7fbbfca4157418c633efe Mon Sep 17 00:00:00 2001 From: Taehyun Kim Date: Sat, 14 Dec 2019 03:00:36 +0900 Subject: [PATCH 2/2] add rest_total_hits_as_int to SearchScrollRequest --- .../scala/com/sksamuel/elastic4s/searches/ScrollApi.scala | 4 +++- .../com/sksamuel/elastic4s/streams/ScrollPublisher.scala | 3 ++- .../elastic4s/streams/ScrollPublisherIntegrationTest.scala | 2 +- .../sksamuel/elastic4s/streams/ScrollPublisherUnitTest.scala | 2 +- .../elastic4s/streams/ScrollPublisherVerificationTest.scala | 2 +- .../elastic4s/http/search/SearchScrollHandlers.scala | 5 ++++- .../scala/com/sksamuel/elastic4s/testkit/ElasticSugar.scala | 2 +- .../scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala | 4 ++-- 8 files changed, 15 insertions(+), 9 deletions(-) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/ScrollApi.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/ScrollApi.scala index 3310f94ab..1bb02bc5f 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/ScrollApi.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/searches/ScrollApi.scala @@ -14,10 +14,12 @@ trait ScrollApi { def clearScroll(ids: Iterable[String]): ClearScrollRequest = ClearScrollRequest(ids.toSeq) } -case class SearchScrollRequest(id: String, keepAlive: Option[String] = None) { +case class SearchScrollRequest(id: String, keepAlive: Option[String] = None, restTotalHitsAsInt: Option[Boolean] = None) { def keepAlive(keepAlive: String): SearchScrollRequest = copy(keepAlive = keepAlive.some) def keepAlive(duration: FiniteDuration): SearchScrollRequest = copy(keepAlive = Some(s"${duration.toSeconds}s")) + + def restTotalHitsAsInt(restTotalHitsAsInt: Boolean): SearchScrollRequest = copy(restTotalHitsAsInt = restTotalHitsAsInt.some) } case class ClearScrollRequest(ids: Seq[String]) diff --git a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala index acb356bfe..e86b6454d 100644 --- a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala +++ b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala @@ -116,9 +116,10 @@ class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_ logger.debug( s"Request for $n items, but only ${queue.size} available; sending ${queue.size} now, requesting $toRequest from upstream" ) + Option(scrollId) match { case None => client.execute(query).onComplete(result => self ! result) - case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result) + case Some(id) => client.execute(searchScroll(id).keepAlive(keepAlive).copy(restTotalHitsAsInt = query.restTotalHitsAsInt)).onComplete(result => self ! result) } // we switch state while we're waiting on elasticsearch, so we know not to send another request to ES // because we are using a scroll and can only have one active request at at time. diff --git a/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherIntegrationTest.scala b/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherIntegrationTest.scala index 815e2f879..69b93dd1c 100644 --- a/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherIntegrationTest.scala +++ b/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherIntegrationTest.scala @@ -62,7 +62,7 @@ class ScrollPublisherIntegrationTest extends WordSpec with DockerTests with Matc "elastic-streams" should { "publish all data from the index" in { - val publisher = client.publisher(search(indexName) query "*:*" scroll "1m") + val publisher = client.publisher(search(indexName) query "*:*" scroll "1m" restTotalHitsAsInt true) val completionLatch = new CountDownLatch(1) val documentLatch = new CountDownLatch(emperors.length) diff --git a/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherUnitTest.scala b/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherUnitTest.scala index 8a47c9d1b..2c21c251f 100644 --- a/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherUnitTest.scala +++ b/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherUnitTest.scala @@ -13,7 +13,7 @@ class ScrollPublisherUnitTest extends WordSpec with Matchers with DockerTests { "elastic-streams" should { "throw exception if search definition has no scroll" in { an [IllegalArgumentException] should be thrownBy - client.publisher(search("scrollpubint") query "*:*") + client.publisher(search("scrollpubint") query "*:*" restTotalHitsAsInt true) } } } diff --git a/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherVerificationTest.scala b/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherVerificationTest.scala index 54e0f614e..897d67d26 100644 --- a/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherVerificationTest.scala +++ b/elastic4s-http-streams/src/test/scala/com/sksamuel/elastic4s/streams/ScrollPublisherVerificationTest.scala @@ -50,7 +50,7 @@ class ScrollPublisherVerificationTest ).refreshImmediately }.await - private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2) + private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2).restTotalHitsAsInt(true) override def boundedDepthOfOnNextAndRequestRecursion: Long = 2l diff --git a/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchScrollHandlers.scala b/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchScrollHandlers.scala index f6e0733cc..38c190cb2 100644 --- a/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchScrollHandlers.scala +++ b/elastic4s-http/src/main/scala/com/sksamuel/elastic4s/http/search/SearchScrollHandlers.scala @@ -45,11 +45,14 @@ trait SearchScrollHandlers { override def build(req: SearchScrollRequest): ElasticRequest = { + val params = scala.collection.mutable.Map.empty[String, String] + req.restTotalHitsAsInt.map(_.toString).foreach(params.put("rest_total_hits_as_int", _)) + val body = SearchScrollBuilderFn(req).string() logger.debug("Executing search scroll: " + body) val entity = HttpEntity(body, ContentType.APPLICATION_JSON.getMimeType) - ElasticRequest("POST", "/_search/scroll", entity) + ElasticRequest("POST", "/_search/scroll", params.toMap, entity) } } } diff --git a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ElasticSugar.scala b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ElasticSugar.scala index f717c5ae3..9f3e1b503 100644 --- a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ElasticSugar.scala +++ b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ElasticSugar.scala @@ -100,7 +100,7 @@ trait ElasticSugar extends ElasticDsl { blockUntil(s"Expected count of $expected") { () => val result = client .execute { - search(index).matchAllQuery().size(0) + search(index).matchAllQuery().size(0).restTotalHitsAsInt(true) } .await .result diff --git a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala index 20d72e47a..5534cb997 100644 --- a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala +++ b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala @@ -15,7 +15,7 @@ trait IndexMatchers extends Matchers { new Matcher[String] { def apply(left: String): MatchResult = { - val count = client.execute(search(left).size(0)).await(timeout).result.totalHits + val count = client.execute(search(left).size(0).restTotalHitsAsInt(true)).await(timeout).result.totalHits MatchResult( count == expectedCount, s"Index $left had count $count but expected $expectedCount", @@ -55,7 +55,7 @@ trait IndexMatchers extends Matchers { new Matcher[String] { override def apply(left: String): MatchResult = { - val count = client.execute(search(left).size(0)).await(timeout).result.totalHits + val count = client.execute(search(left).size(0).restTotalHitsAsInt(true)).await(timeout).result.totalHits MatchResult( count == 0, s"Index $left was not empty",