Skip to content

Commit

Permalink
add rest_total_hits_as_int to SearchScrollRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
kimxogus committed Dec 13, 2019
1 parent 04b7fca commit 1109239
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ trait ScrollApi {
def clearScroll(ids: Iterable[String]): ClearScrollRequest = ClearScrollRequest(ids.toSeq)
}

case class SearchScrollRequest(id: String, keepAlive: Option[String] = None) {
case class SearchScrollRequest(id: String, keepAlive: Option[String] = None, restTotalHitsAsInt: Option[Boolean] = None) {

def keepAlive(keepAlive: String): SearchScrollRequest = copy(keepAlive = keepAlive.some)
def keepAlive(duration: FiniteDuration): SearchScrollRequest = copy(keepAlive = Some(s"${duration.toSeconds}s"))

def restTotalHitsAsInt(restTotalHitsAsInt: Boolean): SearchScrollRequest = copy(restTotalHitsAsInt = restTotalHitsAsInt.some)
}

case class ClearScrollRequest(ids: Seq[String])
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_
logger.debug(
s"Request for $n items, but only ${queue.size} available; sending ${queue.size} now, requesting $toRequest from upstream"
)

Option(scrollId) match {
case None => client.execute(query).onComplete(result => self ! result)
case Some(id) => client.execute(searchScroll(id) keepAlive keepAlive).onComplete(result => self ! result)
case Some(id) => client.execute(searchScroll(id).keepAlive(keepAlive).copy(restTotalHitsAsInt = query.restTotalHitsAsInt)).onComplete(result => self ! result)
}
// we switch state while we're waiting on elasticsearch, so we know not to send another request to ES
// because we are using a scroll and can only have one active request at at time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ScrollPublisherIntegrationTest extends WordSpec with DockerTests with Matc
"elastic-streams" should {
"publish all data from the index" in {

val publisher = client.publisher(search(indexName) query "*:*" scroll "1m")
val publisher = client.publisher(search(indexName) query "*:*" scroll "1m" restTotalHitsAsInt true)

val completionLatch = new CountDownLatch(1)
val documentLatch = new CountDownLatch(emperors.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class ScrollPublisherUnitTest extends WordSpec with Matchers with DockerTests {
"elastic-streams" should {
"throw exception if search definition has no scroll" in {
an [IllegalArgumentException] should be thrownBy
client.publisher(search("scrollpubint") query "*:*")
client.publisher(search("scrollpubint") query "*:*" restTotalHitsAsInt true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ScrollPublisherVerificationTest
).refreshImmediately
}.await

private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2)
private val query = search("scrollpubver").matchAllQuery().scroll("1m").limit(2).restTotalHitsAsInt(true)

override def boundedDepthOfOnNextAndRequestRecursion: Long = 2l

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ trait SearchScrollHandlers {

override def build(req: SearchScrollRequest): ElasticRequest = {

val params = scala.collection.mutable.Map.empty[String, String]
req.restTotalHitsAsInt.map(_.toString).foreach(params.put("rest_total_hits_as_int", _))

val body = SearchScrollBuilderFn(req).string()
logger.debug("Executing search scroll: " + body)
val entity = HttpEntity(body, ContentType.APPLICATION_JSON.getMimeType)

ElasticRequest("POST", "/_search/scroll", entity)
ElasticRequest("POST", "/_search/scroll", params.toMap, entity)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ trait ElasticSugar extends ElasticDsl {
blockUntil(s"Expected count of $expected") { () =>
val result = client
.execute {
search(index).matchAllQuery().size(0)
search(index).matchAllQuery().size(0).restTotalHitsAsInt(true)
}
.await
.result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ trait IndexMatchers extends Matchers {
new Matcher[String] {

def apply(left: String): MatchResult = {
val count = client.execute(search(left).size(0)).await(timeout).result.totalHits
val count = client.execute(search(left).size(0).restTotalHitsAsInt(true)).await(timeout).result.totalHits
MatchResult(
count == expectedCount,
s"Index $left had count $count but expected $expectedCount",
Expand Down Expand Up @@ -55,7 +55,7 @@ trait IndexMatchers extends Matchers {
new Matcher[String] {

override def apply(left: String): MatchResult = {
val count = client.execute(search(left).size(0)).await(timeout).result.totalHits
val count = client.execute(search(left).size(0).restTotalHitsAsInt(true)).await(timeout).result.totalHits
MatchResult(
count == 0,
s"Index $left was not empty",
Expand Down

0 comments on commit 1109239

Please sign in to comment.