diff --git a/build.sbt b/build.sbt index 6ee35fa..f4de9bc 100644 --- a/build.sbt +++ b/build.sbt @@ -2,13 +2,13 @@ import sbt.Keys.thisProjectRef ThisBuild / organization := "io.waylay.kairosdb" -val playWsVersion = "3.0.3" -val playJsonVersion = "3.0.3" -val specs2Version = "4.20.6" +val playWsVersion = "3.0.3" +val playJsonVersion = "3.0.4" +val specs2Version = "4.20.8" -val dockerTestkitVersion = "0.12.0" -val scalaTestVersion = "3.2.18" -val playVersion = "3.0.3" // test only +val testContainersVersion = "0.41.4" +val scalaTestVersion = "3.2.19" +val playVersion = "3.0.4" // test only val scala2_13 = "2.13.14" @@ -28,30 +28,29 @@ val exclusions = Seq( lazy val root = (project in file(".")) .settings( - name := "kairosdb-scala", - Test / fork := true, + name := "kairosdb-scala", + Test / fork := true, IntegrationTest / parallelExecution := false, libraryDependencies ++= Seq( - "org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0", - "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.1", - "org.playframework" %% "play-json" % playJsonVersion, - "org.playframework" %% "play-ws-standalone" % playWsVersion, - "org.playframework" %% "play-ws-standalone-json" % playWsVersion, - "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", - "io.lemonlabs" %% "scala-uri" % "4.0.3", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0", + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.1", + "org.playframework" %% "play-json" % playJsonVersion, + "org.playframework" %% "play-ws-standalone" % playWsVersion, + "org.playframework" %% "play-ws-standalone-json" % playWsVersion, + "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", + "io.lemonlabs" %% "scala-uri" % "4.0.3", // TEST - "org.specs2" %% "specs2-core" % specs2Version % Test, - "org.specs2" %% "specs2-junit" % specs2Version % Test, - "de.leanovate.play-mockws" %% "play-mockws-3-0" % "3.0.4" % Test, - "org.playframework" %% "play-ahc-ws" % playVersion % TestAndIntegrationTest, // neede for play-mockws + "org.specs2" %% "specs2-core" % specs2Version % Test, + "org.specs2" %% "specs2-junit" % specs2Version % Test, + "de.leanovate.play-mockws" %% "play-mockws-3-0" % "3.0.5" % Test, + "org.playframework" %% "play-ahc-ws" % playVersion % TestAndIntegrationTest, // neede for play-mockws "org.playframework" %% "play-test" % playVersion % TestAndIntegrationTest, // play-mockws depends on some types in this dependency "org.playframework" %% "play-ahc-ws-standalone" % playWsVersion % TestAndIntegrationTest, // INTEGRATION TESTS // TODO investigate if we can do this with specs2 - "org.scalatest" %% "scalatest-wordspec" % scalaTestVersion % TestAndIntegrationTest, - "org.scalatest" %% "scalatest-mustmatchers" % scalaTestVersion % TestAndIntegrationTest, - - "com.whisk" %% "docker-testkit-scalatest" % dockerTestkitVersion % TestAndIntegrationTest excludeAll (exclusions: _*) + "org.scalatest" %% "scalatest-wordspec" % scalaTestVersion % TestAndIntegrationTest, + "org.scalatest" %% "scalatest-mustmatchers" % scalaTestVersion % TestAndIntegrationTest, + "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersVersion % TestAndIntegrationTest ), scalacOptions ++= Seq( "-feature", diff --git a/examples/src/main/scala/Example.scala b/examples/src/main/scala/Example.scala index d9abc02..b445dcd 100644 --- a/examples/src/main/scala/Example.scala +++ b/examples/src/main/scala/Example.scala @@ -1,5 +1,5 @@ -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.ActorMaterializer import io.waylay.kairosdb.driver.KairosDB import io.waylay.kairosdb.driver.Implicits._ import io.waylay.kairosdb.driver.models.KairosCompatibleType.KNumber @@ -15,31 +15,37 @@ import scala.collection.immutable.Seq object Example extends App { - implicit val actorSystem = ActorSystem() + implicit val actorSystem = ActorSystem() implicit val actorMaterializer = ActorMaterializer() - val wsClient = StandaloneAhcWSClient() - val kairosDB = new KairosDB(wsClient, KairosDBConfig(), global) + val wsClient = StandaloneAhcWSClient() + val kairosDB = new KairosDB(wsClient, KairosDBConfig(), global) val res = for { version <- kairosDB.version - names <- kairosDB.listMetricNames - _ <- kairosDB.addDataPoint(DataPoint("kairosdbscala.test", 9001, tags = Tag("awesome", "yes"))) + names <- kairosDB.listMetricNames + _ <- kairosDB.addDataPoint(DataPoint("kairosdbscala.test", 9001, tags = Tag("awesome", "yes"))) // same as above, but without implicit conversions - _ <- kairosDB.addDataPoint(DataPoint(MetricName("kairosdbscala.test"), KNumber(9001), tags = Seq(Tag("awesome", "yes")))) + _ <- kairosDB.addDataPoint( + DataPoint(MetricName("kairosdbscala.test"), KNumber(9001), tags = Seq(Tag("awesome", "yes"))) + ) qr <- kairosDB.queryMetrics( - QueryMetrics( - Query("kairosscala.test", tags = QueryTag("awesome" -> "yes")), 5.minutes.ago.startTime - ) - ) + QueryMetrics( + Query("kairosscala.test", tags = QueryTag("awesome" -> "yes")), + 5.minutes.ago.startTime + ) + ) // same as above, but without implicits _ <- kairosDB.queryMetrics( - QueryMetrics(Seq( - Query(MetricName("kairosscala.test"), tags = Seq(QueryTag("awesome", Seq("yes", "true")))) - ), TimeSpan(RelativeStartTime(5.minutes))) - ) + QueryMetrics( + Seq( + Query(MetricName("kairosscala.test"), tags = Seq(QueryTag("awesome", Seq("yes", "true")))) + ), + TimeSpan(RelativeStartTime(5.minutes)) + ) + ) } yield { println(s"The KairosDB version is $version.") @@ -53,7 +59,7 @@ object Example extends App { Try(actorSystem.terminate()) } - res.onComplete{ + res.onComplete { case Success(_) => println("done") case Failure(e) => e.printStackTrace() } diff --git a/project/plugins.sbt b/project/plugins.sbt index 629af11..29b2d3c 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,13 +1,13 @@ -addSbtPlugin("com.github.sbt" % "sbt-site" % "1.7.0") -addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0") -addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0") -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.12") -addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.11") -addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.2.1") +addSbtPlugin("com.github.sbt" % "sbt-site" % "1.7.0") +addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0") +addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.1.0") +addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.11") +addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.2.1") // scala-xml issues // TODO remove when everything has migrated to scala-xml 2.x // https://github.com/scala/bug/issues/12632 ThisBuild / libraryDependencySchemes ++= Seq( "org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always -) \ No newline at end of file +) diff --git a/src/it/resources/docker-compose.yaml b/src/it/resources/docker-compose.yaml new file mode 100644 index 0000000..0b990e0 --- /dev/null +++ b/src/it/resources/docker-compose.yaml @@ -0,0 +1,12 @@ +version: '3.8' + +services: + + kairosdb: + image: brunoballekens/kairosdb-scala-driver-it:1.3.0-1 + volumes: + - ./conf/auth:/opt/kairosdb/conf/auth + expose: + - 8080 + environment: + - JAVA_OPTS=-Djava.security.auth.login.config=/opt/kairosdb/conf/auth/basicAuth.conf -Dkairosdb.jetty.auth_module_name=basicAuth -Dkairosdb.jetty.basic_auth.user=test -Dkairosdb.jetty.basic_auth.password=test diff --git a/src/it/scala/integration/AddAndQueryDataPointsIntegrationSpec.scala b/src/it/scala/integration/AddAndQueryDataPointsIntegrationSpec.scala index 0d1d9af..e7106d4 100644 --- a/src/it/scala/integration/AddAndQueryDataPointsIntegrationSpec.scala +++ b/src/it/scala/integration/AddAndQueryDataPointsIntegrationSpec.scala @@ -20,26 +20,43 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec { "work for a single data point" in { val instant = Instant.ofEpochSecond(1470837457L) - val start = Instant.ofEpochSecond(1470830000L) - val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val start = Instant.ofEpochSecond(1470830000L) + val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) val res = for { - _ <- kairosDB.addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), instant, Seq(Tag("aoeu", "snth")))) + _ <- + kairosDB.addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), instant, Seq(Tag("aoeu", "snth")))) result <- kairosDB.queryMetrics(qm) - } yield { - result - } + } yield result - res.futureValue must be(QueryResponse.Response(Seq(ResponseQuery(1, Seq( - Result("my.new.metric", Seq(GroupBy.GroupByType("number")), Seq(TagResult("aoeu", Seq("snth"))), Seq((instant, KNumber(555)))) - ))))) + res.futureValue must be( + QueryResponse.Response( + Seq( + ResponseQuery( + 1, + Seq( + Result( + "my.new.metric", + Seq(GroupBy.GroupByType("number")), + Seq(TagResult("aoeu", Seq("snth"))), + Seq((instant, KNumber(555))) + ) + ) + ) + ) + ) + ) } "work for multiple data points with gzip compression" in { val instant = Instant.ofEpochSecond(1470837457L) - val start = Instant.ofEpochSecond(1470830000L) + val start = Instant.ofEpochSecond(1470830000L) val metric2 = MetricName("my.new.metric2") - val qm = QueryMetrics(Seq(Query(metric2.name)), start) + val qm = QueryMetrics(Seq(Query(metric2.name)), start) val dps = Seq( DataPoint(metric2, KNumber(111), instant.plusMillis(1), Seq(Tag("aoeu", "123"))), @@ -47,29 +64,34 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec { DataPoint(metric2, KNumber(333), instant.plusMillis(3), Seq(Tag("aoeu", "456"))) ) - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) val res = for { - _ <- kairosDB.addDataPoints(dps, gzip = true) + _ <- kairosDB.addDataPoints(dps, gzip = true) result <- kairosDB.queryMetrics(qm) - } yield { - result - } + } yield result res.futureValue must be( QueryResponse.Response( Seq( - ResponseQuery(3, Seq( - Result( - "my.new.metric2", - Seq(GroupBy.GroupByType("number")), - List(TagResult("aoeu", List("123", "456")), TagResult("snth", List("321"))), - Seq( - instant.plusMillis(1) -> KNumber(111), - instant.plusMillis(2) -> KNumber(222), - instant.plusMillis(3) -> KNumber(333) + ResponseQuery( + 3, + Seq( + Result( + "my.new.metric2", + Seq(GroupBy.GroupByType("number")), + List(TagResult("aoeu", List("123", "456")), TagResult("snth", List("321"))), + Seq( + instant.plusMillis(1) -> KNumber(111), + instant.plusMillis(2) -> KNumber(222), + instant.plusMillis(3) -> KNumber(333) + ) ) ) - )) + ) ) ) ) @@ -77,7 +99,7 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec { "work for an aggregate query with nulls" in { val start = Instant.ofEpochSecond(0) - val end = start.plus(Duration.ofDays(4)) + val end = start.plus(Duration.ofDays(4)) val firstPoint = start.plus(Duration.ofDays(1)) // nulls in between @@ -85,54 +107,105 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec { val datapoint = DataPoint(MetricName("my.new.metric"), KNumber(555), firstPoint, Seq(Tag("aoeu", "snth"))) - val qm = QueryMetrics(Seq( - Query("my.new.metric", QueryTag("aoeu" -> "snth"), aggregators = Seq( - Aggregator.Average(1.days, align = Some(Align.AlignStartTime)), - Aggregator.Gaps(1.days, align = Some(Align.AlignStartTime)) - )) - ), TimeSpan(start, Some(end))) + val qm = QueryMetrics( + Seq( + Query( + "my.new.metric", + QueryTag("aoeu" -> "snth"), + aggregators = Seq( + Aggregator.Average(1.days, align = Some(Align.AlignStartTime)), + Aggregator.Gaps(1.days, align = Some(Align.AlignStartTime)) + ) + ) + ), + TimeSpan(start, Some(end)) + ) - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) val res = for { - _ <- kairosDB.addDataPoint(datapoint) - _ <- kairosDB.addDataPoint(datapoint.copy(timestamp = secondPoint)) + _ <- kairosDB.addDataPoint(datapoint) + _ <- kairosDB.addDataPoint(datapoint.copy(timestamp = secondPoint)) version <- kairosDB.version results <- kairosDB.queryMetrics(qm) - } yield { - (version, results) - } + } yield (version, results) res.futureValue match { // bug in Kairos < 1.1.4 - //see https://github.com/kairosdb/kairosdb/issues/339 + // see https://github.com/kairosdb/kairosdb/issues/339 case ("KairosDB 1.1.3-1.20170102211109", results) => - results must be(QueryResponse.Response(Seq(ResponseQuery(2, Seq( - Result("my.new.metric", Seq(GroupBy.GroupByType("number")), Seq(TagResult("aoeu", Seq("snth"))), Seq( - (Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)), - (Instant.parse("1970-01-03T00:00:00Z"), KNull), - (Instant.parse("1970-01-04T00:00:00Z"), KNumber(555) - )) - )))))) + results must be( + QueryResponse.Response( + Seq( + ResponseQuery( + 2, + Seq( + Result( + "my.new.metric", + Seq(GroupBy.GroupByType("number")), + Seq(TagResult("aoeu", Seq("snth"))), + Seq( + (Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)), + (Instant.parse("1970-01-03T00:00:00Z"), KNull), + (Instant.parse("1970-01-04T00:00:00Z"), KNumber(555)) + ) + ) + ) + ) + ) + ) + ) case ("KairosDB 1.2.0-1.20180201074909", results) => - results must be(QueryResponse.Response(Seq(ResponseQuery(2, Seq( - Result("my.new.metric", Seq(GroupBy.GroupByType("number")), Seq(TagResult("aoeu", Seq("snth"))), Seq( - (Instant.parse("1970-01-01T00:00:00Z"), KNull), - (Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)), - (Instant.parse("1970-01-03T00:00:00Z"), KNull), - (Instant.parse("1970-01-04T00:00:00Z"), KNumber(555) - )) - )))))) + results must be( + QueryResponse.Response( + Seq( + ResponseQuery( + 2, + Seq( + Result( + "my.new.metric", + Seq(GroupBy.GroupByType("number")), + Seq(TagResult("aoeu", Seq("snth"))), + Seq( + (Instant.parse("1970-01-01T00:00:00Z"), KNull), + (Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)), + (Instant.parse("1970-01-03T00:00:00Z"), KNull), + (Instant.parse("1970-01-04T00:00:00Z"), KNumber(555)) + ) + ) + ) + ) + ) + ) + ) case ("KairosDB 1.3.0-1.20210808220820", results) => - results must be(QueryResponse.Response(Seq(ResponseQuery(2, Seq( - Result("my.new.metric", Seq(GroupBy.GroupByType("number")), Seq(TagResult("aoeu", Seq("snth"))), Seq( - (Instant.parse("1970-01-01T00:00:00Z"), KNull), - (Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)), - (Instant.parse("1970-01-03T00:00:00Z"), KNull), - (Instant.parse("1970-01-04T00:00:00Z"), KNumber(555)), - (Instant.parse("1970-01-05T00:00:00Z"), KNull) + results must be( + QueryResponse.Response( + Seq( + ResponseQuery( + 2, + Seq( + Result( + "my.new.metric", + Seq(GroupBy.GroupByType("number")), + Seq(TagResult("aoeu", Seq("snth"))), + Seq( + (Instant.parse("1970-01-01T00:00:00Z"), KNull), + (Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)), + (Instant.parse("1970-01-03T00:00:00Z"), KNull), + (Instant.parse("1970-01-04T00:00:00Z"), KNumber(555)), + (Instant.parse("1970-01-05T00:00:00Z"), KNull) + ) + ) + ) + ) ) - )))))) + ) + ) case other => fail(s"Unknown kairos version ${other._1}") } diff --git a/src/it/scala/integration/AuthSpec.scala b/src/it/scala/integration/AuthSpec.scala index 8c480bc..448331b 100644 --- a/src/it/scala/integration/AuthSpec.scala +++ b/src/it/scala/integration/AuthSpec.scala @@ -1,34 +1,20 @@ package integration -import java.nio.file.Paths - -import com.spotify.docker.client.messages.HostConfig import io.waylay.kairosdb.driver.KairosDB import io.waylay.kairosdb.driver.KairosDB.KairosDBResponseException import io.waylay.kairosdb.driver.models._ import scala.concurrent.ExecutionContext.Implicits.global -import scala.collection.immutable.Seq class AuthSpec extends IntegrationSpec { - // enabling auth by providing a properties file - override lazy val volumes = Seq( - HostConfig.Bind - .from(Paths.get("src/it/resources/conf/auth").toAbsolutePath.toString) - .to("/opt/kairosdb/conf/auth") - .build() - ) - - override lazy val env: Seq[String] = Seq( - "JAVA_OPTS=-Djava.security.auth.login.config=/opt/kairosdb/conf/auth/basicAuth.conf -Dkairosdb.jetty.auth_module_name=basicAuth "+ - "-Dkairosdb.jetty.basic_auth.user=test " + - "-Dkairosdb.jetty.basic_auth.password=test" - ) - "The health status" should { "fail without auth" in { - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort), + global + ) val res = kairosDB.version.failed.futureValue res mustBe an[KairosDBResponseException] @@ -42,7 +28,7 @@ class AuthSpec extends IntegrationSpec { password = Some("test") ) val kairosDB = new KairosDB(wsClient, kairosConfig, global) - val res = kairosDB.version.futureValue + val res = kairosDB.version.futureValue res must startWith("KairosDB") } diff --git a/src/it/scala/integration/DeleteDataPointIntegrationSpec.scala b/src/it/scala/integration/DeleteDataPointIntegrationSpec.scala index e1410d6..44d87e7 100644 --- a/src/it/scala/integration/DeleteDataPointIntegrationSpec.scala +++ b/src/it/scala/integration/DeleteDataPointIntegrationSpec.scala @@ -18,20 +18,37 @@ class DeleteDataPointIntegrationSpec extends IntegrationSpec { "return empty seq" in { val instant = Instant.ofEpochSecond(1470837457L) - val start = Instant.ofEpochSecond(1470830000L) - val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) - - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) - - val res = kairosDB.addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), instant, Seq(Tag("aoeu", "snth")))).flatMap { _ => - kairosDB.deleteDataPoints(qm) - }.flatMap { _ => - kairosDB.queryMetrics(qm) - }.futureValue - - res must be(QueryResponse.Response(Seq(ResponseQuery(0, Seq( - Result("my.new.metric", Seq.empty, Seq.empty, Seq.empty) - ))))) + val start = Instant.ofEpochSecond(1470830000L) + val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) + + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) + + val res = kairosDB + .addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), instant, Seq(Tag("aoeu", "snth")))) + .flatMap { _ => + kairosDB.deleteDataPoints(qm) + } + .flatMap { _ => + kairosDB.queryMetrics(qm) + } + .futureValue + + res must be( + QueryResponse.Response( + Seq( + ResponseQuery( + 0, + Seq( + Result("my.new.metric", Seq.empty, Seq.empty, Seq.empty) + ) + ) + ) + ) + ) } } } diff --git a/src/it/scala/integration/DeleteDataPointsByTagIntegrationSpec.scala b/src/it/scala/integration/DeleteDataPointsByTagIntegrationSpec.scala index 63640db..4470b78 100644 --- a/src/it/scala/integration/DeleteDataPointsByTagIntegrationSpec.scala +++ b/src/it/scala/integration/DeleteDataPointsByTagIntegrationSpec.scala @@ -12,16 +12,15 @@ import io.waylay.kairosdb.driver.models._ import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.immutable.Seq - class DeleteDataPointsByTagIntegrationSpec extends IntegrationSpec { "Inserting, deleting and then querying data points" should { "only return data points that weren't matched by delete" in { val instant = Instant.ofEpochSecond(1470837457L) - val start = Instant.ofEpochSecond(1470830000L) - val delete = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu", Seq("123", "456")))), start) - val qm = QueryMetrics(Seq(Query("my.new.metric")), start) + val start = Instant.ofEpochSecond(1470830000L) + val delete = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu", Seq("123", "456")))), start) + val qm = QueryMetrics(Seq(Query("my.new.metric")), start) val dps = Seq( DataPoint(MetricName("my.new.metric"), KNumber(111), instant.plusMillis(1), Seq(Tag("aoeu", "123"))), @@ -32,26 +31,46 @@ class DeleteDataPointsByTagIntegrationSpec extends IntegrationSpec { DataPoint(MetricName("my.other.metric"), KNumber(555), instant.plusMillis(6), Seq(Tag("aoeu", "snth"))) ) - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) - val res = kairosDB.addDataPoints(dps).flatMap { _ => - kairosDB.deleteDataPoints(delete) - }.flatMap { _ => - kairosDB.queryMetrics(qm) - }.futureValue + val res = kairosDB + .addDataPoints(dps) + .flatMap { _ => + kairosDB.deleteDataPoints(delete) + } + .flatMap { _ => + kairosDB.queryMetrics(qm) + } + .futureValue - res must be(QueryResponse.Response(Seq(ResponseQuery(3, Seq( - Result( - "my.new.metric", - Seq(GroupBy.GroupByType("number")), - Seq(TagResult("htns", Seq("888", "999")), TagResult("snth", Seq("321"))), // not sure if the order is deteministic. Convert to Set? + res must be( + QueryResponse.Response( Seq( - instant.plusMillis(2) -> KNumber(222), - instant.plusMillis(4) -> KNumber(444), - instant.plusMillis(5) -> KNumber(123) + ResponseQuery( + 3, + Seq( + Result( + "my.new.metric", + Seq(GroupBy.GroupByType("number")), + Seq( + TagResult("htns", Seq("888", "999")), + TagResult("snth", Seq("321")) + ), // not sure if the order is deteministic. Convert to Set? + Seq( + instant.plusMillis(2) -> KNumber(222), + instant.plusMillis(4) -> KNumber(444), + instant.plusMillis(5) -> KNumber(123) + ) + ) + ) + ) ) ) - ))))) + ) } } } diff --git a/src/it/scala/integration/DeleteMetricIntegrationSpec.scala b/src/it/scala/integration/DeleteMetricIntegrationSpec.scala index 3f0fe79..cb568b0 100644 --- a/src/it/scala/integration/DeleteMetricIntegrationSpec.scala +++ b/src/it/scala/integration/DeleteMetricIntegrationSpec.scala @@ -13,12 +13,20 @@ class DeleteMetricIntegrationSpec extends IntegrationSpec { "Deleting a metric name" should { "after deleting a metric, all metrics should not contain the metric" in { - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) - val res = kairosDB.addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), tags = Seq(Tag("aoeu", "snth")))).flatMap { _ => - kairosDB.deleteMetric(MetricName("my.new.metric")) - }.flatMap { _ => - kairosDB.listMetricNames - }.futureValue + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) + val res = kairosDB + .addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), tags = Seq(Tag("aoeu", "snth")))) + .flatMap { _ => + kairosDB.deleteMetric(MetricName("my.new.metric")) + } + .flatMap { _ => + kairosDB.listMetricNames + } + .futureValue res must not contain MetricName("my.new.metric") } diff --git a/src/it/scala/integration/HealthStatusIntegrationSpec.scala b/src/it/scala/integration/HealthStatusIntegrationSpec.scala index 5260345..f9ccc38 100644 --- a/src/it/scala/integration/HealthStatusIntegrationSpec.scala +++ b/src/it/scala/integration/HealthStatusIntegrationSpec.scala @@ -13,13 +13,21 @@ class HealthStatusIntegrationSpec extends IntegrationSpec { "The health endpoint" should { "for /check return all healthy" in { - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) val res = kairosDB.healthCheck.futureValue res must be(AllHealthy) } "for /status respond that there are no thread deadlocks and datastore query works" in { - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) val res = kairosDB.healthStatus.futureValue res must be(HealthStatusResults(Seq("JVM-Thread-Deadlock: OK", "Datastore-Query: OK"))) diff --git a/src/it/scala/integration/IntegrationSpec.scala b/src/it/scala/integration/IntegrationSpec.scala index efad194..afc2221 100644 --- a/src/it/scala/integration/IntegrationSpec.scala +++ b/src/it/scala/integration/IntegrationSpec.scala @@ -1,9 +1,7 @@ package integration -import com.spotify.docker.client.messages.HostConfig +import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService, ForAllTestContainer} import com.typesafe.scalalogging.StrictLogging -import com.whisk.docker.testkit.{ContainerGroup, ContainerSpec, DockerReadyChecker} -import com.whisk.docker.testkit.scalatest.DockerTestKitForAll import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.testkit.NoMaterializer import org.scalatest.BeforeAndAfterAll @@ -11,63 +9,39 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.must.Matchers import org.scalatest.time.{Second, Seconds, Span} import org.scalatest.wordspec.AnyWordSpec +import org.testcontainers.containers.wait.strategy.Wait import play.api.libs.ws.ahc.StandaloneAhcWSClient -trait IntegrationSpec extends AnyWordSpec with Matchers with ScalaFutures with StrictLogging with BeforeAndAfterAll with DockerTestKitForAll { +import java.io.File - lazy val DefaultKairosDbPort = 8080 +trait IntegrationSpec + extends AnyWordSpec + with Matchers + with ScalaFutures + with StrictLogging + with BeforeAndAfterAll + with ForAllTestContainer { - lazy val env = Seq.empty[String] - lazy val volumes = Seq.empty[HostConfig.Bind] + lazy val DefaultKairosDbPort = 8080 - val kairosdbContainer = ContainerSpec("brunoballekens/kairosdb-scala-driver-it:1.3.0-1") - .withEnv(env:_*) - // broken with the spotify client - .withVolumeBindings(volumes:_*) - .withExposedPorts(DefaultKairosDbPort) - // .withReadyChecker( - // DockerReadyChecker.HttpResponseCode(DefaultKairosDbPort, "/api/v1/version", code = 200) - // .within(100.millis) - // .looped(20, 250.millis)) - .withReadyChecker(DockerReadyChecker.LogLineContains("KairosDB service started")) - //.withReadyChecker(LoggingLogLineContains("KairosDB service started")) + override val container = DockerComposeContainer( + new File("src/it/resources/docker-compose.yaml"), + tailChildContainers = true, + exposedServices = + Seq(ExposedService("kairosdb", 8080, Wait.forHttp("/api/v1/version").withBasicCredentials("test", "test"))) + ) - lazy val kairosPort: Int = { - managedContainers.containers.head - .mappedPortOpt(DefaultKairosDbPort) - .getOrElse(throw new IllegalStateException(s"Missing container mapped port for $DefaultKairosDbPort")) - } + lazy val kairosPort: Int = + container.getServicePort("kairosdb", 8080) implicit val pc: PatienceConfig = PatienceConfig(Span(2000, Seconds), Span(1, Second)) - //override def dockerInitPatienceInterval = PatienceConfig(scaled(Span(30, Seconds)), scaled(Span(10, Millis))) - - override val managedContainers: ContainerGroup = ContainerGroup(Seq(kairosdbContainer.toContainer)) + // override def dockerInitPatienceInterval = PatienceConfig(scaled(Span(30, Seconds)), scaled(Span(10, Millis))) implicit val materializer: Materializer = NoMaterializer - val wsClient: StandaloneAhcWSClient = StandaloneAhcWSClient() + val wsClient: StandaloneAhcWSClient = StandaloneAhcWSClient() override def afterAll(): Unit = { wsClient.close() super.afterAll() } } - -//private case class LoggingLogLineContains(str: String) extends DockerReadyChecker with StrictLogging { -// -// override def apply(container: BaseContainer)(implicit docker: ContainerCommandExecutor, ec: ExecutionContext): Future[Unit] = { -// container.state() match { -// case ContainerState.Ready(_) => -// Future.successful(()) -// case state: ContainerState.HasId => -// docker.withLogStreamLinesRequirement(state.id, withErr = true) { m => -// // drop newlines -// logger.info(m.dropRight(1)) -// m.contains(str) -// }.map(_ => ()) -// case _ => -// Future.failed( -// new FailFastCheckException("can't initialise LogStream to container without Id") -// ) -// } -// } -//} diff --git a/src/it/scala/integration/ListMetricsIntegrationSpec.scala b/src/it/scala/integration/ListMetricsIntegrationSpec.scala index d8be222..d104087 100644 --- a/src/it/scala/integration/ListMetricsIntegrationSpec.scala +++ b/src/it/scala/integration/ListMetricsIntegrationSpec.scala @@ -12,10 +12,17 @@ class ListMetricsIntegrationSpec extends IntegrationSpec { "Listing metric names" should { "after inserting a new datapoint into a metric, all metrics should contain this metric" in { - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) - val res = kairosDB.addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), tags = Seq(Tag("aoeu", "snth")))).flatMap { _ => - kairosDB.listMetricNames - }.futureValue + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) + val res = kairosDB + .addDataPoint(DataPoint(MetricName("my.new.metric"), KNumber(555), tags = Seq(Tag("aoeu", "snth")))) + .flatMap { _ => + kairosDB.listMetricNames + } + .futureValue res must contain(MetricName("my.new.metric")) } diff --git a/src/it/scala/integration/StringDataPointsIntegrationSpec.scala b/src/it/scala/integration/StringDataPointsIntegrationSpec.scala index 8b08f7f..6c3ce63 100644 --- a/src/it/scala/integration/StringDataPointsIntegrationSpec.scala +++ b/src/it/scala/integration/StringDataPointsIntegrationSpec.scala @@ -18,34 +18,78 @@ class StringDataPointsIntegrationSpec extends IntegrationSpec { "work for a single string data point" in { val instant = Instant.ofEpochSecond(1470837457L) - val start = Instant.ofEpochSecond(1470830000L) - val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) + val start = Instant.ofEpochSecond(1470830000L) + val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) - val res = kairosDB.addDataPoint(DataPoint(MetricName("my.new.metric"), KString("my test string"), instant, Seq(Tag("aoeu", "snth")))).flatMap { _ => - kairosDB.queryMetrics(qm) - }.futureValue + val res = kairosDB + .addDataPoint( + DataPoint(MetricName("my.new.metric"), KString("my test string"), instant, Seq(Tag("aoeu", "snth"))) + ) + .flatMap { _ => + kairosDB.queryMetrics(qm) + } + .futureValue - res must be(QueryResponse.Response(Seq(ResponseQuery(1, Seq( - Result("my.new.metric", Seq(GroupBy.GroupByType("text")), Seq(TagResult("aoeu", Seq("snth"))), Seq((instant, KString("my test string")))) - ))))) + res must be( + QueryResponse.Response( + Seq( + ResponseQuery( + 1, + Seq( + Result( + "my.new.metric", + Seq(GroupBy.GroupByType("text")), + Seq(TagResult("aoeu", Seq("snth"))), + Seq((instant, KString("my test string"))) + ) + ) + ) + ) + ) + ) } "also work for a string with only numbers" in { val instant = Instant.ofEpochSecond(1470837457L) - val start = Instant.ofEpochSecond(1470830000L) - val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) + val start = Instant.ofEpochSecond(1470830000L) + val qm = QueryMetrics(Seq(Query("my.new.metric", QueryTag("aoeu" -> "snth"))), start) - val kairosDB = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDB = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) - val res = kairosDB.addDataPoint(DataPoint(MetricName("my.new.metric"), KString("12345"), instant, Seq(Tag("aoeu", "snth")))).flatMap { _ => - kairosDB.queryMetrics(qm) - }.futureValue + val res = kairosDB + .addDataPoint(DataPoint(MetricName("my.new.metric"), KString("12345"), instant, Seq(Tag("aoeu", "snth")))) + .flatMap { _ => + kairosDB.queryMetrics(qm) + } + .futureValue - res must be(QueryResponse.Response(Seq(ResponseQuery(1, Seq( - Result("my.new.metric", Seq(GroupBy.GroupByType("text")), Seq(TagResult("aoeu", Seq("snth"))), Seq((instant, KString("12345")))) - ))))) + res must be( + QueryResponse.Response( + Seq( + ResponseQuery( + 1, + Seq( + Result( + "my.new.metric", + Seq(GroupBy.GroupByType("text")), + Seq(TagResult("aoeu", Seq("snth"))), + Seq((instant, KString("12345"))) + ) + ) + ) + ) + ) + ) } } } diff --git a/src/it/scala/integration/VersionIntegrationSpec.scala b/src/it/scala/integration/VersionIntegrationSpec.scala index 8c05194..c1e7419 100644 --- a/src/it/scala/integration/VersionIntegrationSpec.scala +++ b/src/it/scala/integration/VersionIntegrationSpec.scala @@ -10,7 +10,11 @@ class VersionIntegrationSpec extends IntegrationSpec { "Getting the KairosDB version" should { "return version" in { - val kairosDb = new KairosDB(wsClient, KairosDBConfig(port = kairosPort), global) + val kairosDb = new KairosDB( + wsClient, + KairosDBConfig(port = kairosPort, username = Some("test"), password = Some("test")), + global + ) val res = kairosDb.version.futureValue res must startWith("KairosDB")