Skip to content

Commit

Permalink
Merge pull request #479 from utk14/release-4.2.1
Browse files Browse the repository at this point in the history
SB-26855: ETB Metrics fix for druid stream query paramter issue
  • Loading branch information
RevathiKotla authored Sep 22, 2021
2 parents f1ca95b + d305517 commit 410dcb4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object TextBookUtils {
def getTextBooks(config: Map[String, AnyRef])(implicit sc:SparkContext,fc: FrameworkContext): List[TextbookData] = {
val request = JSONUtils.serialize(config.get("druidConfig").get)
val druidQuery = JSONUtils.deserialize[DruidQueryModel](request)
val druidResponse = DruidDataFetcher.getDruidData(druidQuery)
val druidResponse = DruidDataFetcher.getDruidData(druidQuery, true)

val result = druidResponse.map(f => {
JSONUtils.deserialize[TextbookData](f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.sunbird.analytics.model.report

import java.time.{ZoneOffset, ZonedDateTime}

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import cats.syntax.either._
import ing.wbaa.druid._
import ing.wbaa.druid.client.DruidClient
Expand All @@ -18,7 +20,6 @@ import org.ekstep.analytics.framework.util.{HTTPClient, JSONUtils, RestUtil}
import org.sunbird.cloud.storage.BaseStorageService

import scala.concurrent.Future
import scala.io.Source

class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory {

Expand Down Expand Up @@ -114,11 +115,11 @@ class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory {

val doc: Json = parse(json).getOrElse(Json.Null)
val results = List(DruidResult.apply(Some(ZonedDateTime.of(2020, 1, 23, 17, 10, 3, 0, ZoneOffset.UTC)), doc))
val druidResponse = DruidResponseTimeseriesImpl.apply(results, QueryType.GroupBy)

val druidResponse = DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc)
implicit val mockDruidConfig = DruidConfig.DefaultConfig
val mockDruidClient = mock[DruidClient]
(mockDruidClient.doQuery[DruidResponse](_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Future(druidResponse)).anyNumberOfTimes()
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockDruidClient.doQueryAsStream(_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Source(List(druidResponse))).anyNumberOfTimes()
(mockFc.getDruidRollUpClient _).expects().returns(mockDruidClient).anyNumberOfTimes()

val resultRDD = ETBMetricsModel.execute(sc.emptyRDD, Option(jobConfig))
Expand Down Expand Up @@ -152,14 +153,13 @@ class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory {

val doc: Json = parse(json).getOrElse(Json.Null)
val results = List(DruidResult.apply(Some(ZonedDateTime.of(2020, 1, 23, 17, 10, 3, 0, ZoneOffset.UTC)), doc))
val druidResponse = DruidResponseTimeseriesImpl.apply(results, QueryType.GroupBy)

val druidResponse = DruidResult.apply(Some(ZonedDateTime.of(2019, 11, 28, 17, 0, 0, 0, ZoneOffset.UTC)), doc)
implicit val mockDruidConfig = DruidConfig.DefaultConfig
val mockDruidClient = mock[DruidClient]
(mockDruidClient.doQuery[DruidResponse](_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Future(druidResponse)).anyNumberOfTimes()
(mockDruidClient.actorSystem _).expects().returning(ActorSystem("TestQuery")).anyNumberOfTimes()
(mockDruidClient.doQueryAsStream(_: DruidQuery)(_: DruidConfig)).expects(*, mockDruidConfig).returns(Source(List(druidResponse))).anyNumberOfTimes()
(mockFc.getDruidRollUpClient _).expects().returns(mockDruidClient).anyNumberOfTimes()


ETBMetricsModel.execute(sc.emptyRDD, Option(jobConfig))


Expand All @@ -169,10 +169,10 @@ class TestETBMetricsJobModel extends SparkSpec with Matchers with MockFactory {
implicit val mockFc = mock[FrameworkContext]
val mockRestUtil = mock[HTTPClient]

val hierarchyData = JSONUtils.deserialize[ContentDetails](Source.fromInputStream
val hierarchyData = JSONUtils.deserialize[ContentDetails](scala.io.Source.fromInputStream
(getClass.getResourceAsStream("/reports/hierarchyData.json")).getLines().mkString).result.content

val textBookData = JSONUtils.deserialize[TextbookData](Source.fromInputStream
val textBookData = JSONUtils.deserialize[TextbookData](scala.io.Source.fromInputStream
(getClass.getResourceAsStream("/reports/textbookDetails.json")).getLines().mkString)

implicit val sqlContext = new SQLContext(sc)
Expand Down

0 comments on commit 410dcb4

Please sign in to comment.