Skip to content

Commit

Permalink
eventbridge: Fix integration tests (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
KapStorm authored Feb 3, 2024
1 parent e125e37 commit 61f8753
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,15 @@
*/
package com.snowplowanalytics.snowplow.enrich.eventbridge

//import java.util.UUID

import scala.concurrent.duration._

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import com.snowplowanalytics.snowplow.enrich.common.fs2.test.CollectorPayloadGen
import com.snowplowanalytics.snowplow.enrich.eventbridge.enrichments._
import org.specs2.mutable.Specification
import org.specs2.specification.AfterAll

//import com.snowplowanalytics.snowplow.enrich.eventbridge.enrichments._
//import com.snowplowanalytics.snowplow.enrich.common.fs2.test.CollectorPayloadGen
import java.util.UUID
import scala.concurrent.duration._

class EnrichEventbridgeSpec extends Specification with AfterAll with CatsIO {

Expand All @@ -48,88 +44,91 @@ class EnrichEventbridgeSpec extends Specification with AfterAll with CatsIO {
}
}

// TODO: Enable these tests, we need to find a way to parse the flattened events
// "emit the correct number of enriched events and bad rows" in {
// import utils._
//
// val testName = "count"
// val nbGood = 100l
// val nbBad = 100l
// val uuid = UUID.randomUUID().toString
//
// val resources = for {
// _ <- Containers.enrich(
// configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
// testName = "count",
// needsLocalstack = true,
// enrichments = Nil,
// uuid = uuid
// )
// enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
// } yield enrichPipe
//
// val input = CollectorPayloadGen.generate(nbGood, nbBad)
//
// resources.use { enrich =>
// for {
// // for some weird reason, the records don't get consumed until the second time calling enrich pipeline
// _ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
// output <- enrich(input).compile.toList
// (good, bad) = parseOutput(output, testName)
// } yield {
// println(s"${good.size} and ${bad.size}")
// good.size.toLong must beEqualTo(nbGood)
// bad.size.toLong must beEqualTo(nbBad)
// }
// }
// }
//
// "run the enrichments and attach their context" in {
// import utils._
//
// val testName = "enrichments"
// val nbGood = 100l
// val uuid = UUID.randomUUID().toString
//
// val enrichments = List(
// ApiRequest,
// Javascript,
// SqlQuery,
// Yauaa
// )
//
// val enrichmentsContexts = enrichments.map(_.outputSchema)
//
// val resources = for {
// _ <- Containers.mysqlServer
// _ <- Containers.httpServer
// _ <- Containers.enrich(
// configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
// testName = "enrichments",
// needsLocalstack = true,
// enrichments = enrichments,
// uuid = uuid
// )
// enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
// } yield enrichPipe
//
// val input = CollectorPayloadGen.generate(nbGood)
//
// resources.use { enrich =>
// for {
// // for some weird reason, the records don't get consumed until the second time calling enrich pipeline
// _ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
// output <- enrich(input).compile.toList
// (good, bad) = parseOutput(output, testName)
// } yield {
// good.size.toLong must beEqualTo(nbGood)
// good.map { enriched =>
// enriched.derived_contexts.data.map(_.schema) must containTheSameElementsAs(enrichmentsContexts)
// }
// bad.size.toLong must beEqualTo(0l)
// }
// }
// }
"emit the correct number of enriched events and bad rows" in {
import utils._

val testName = "count"
val nbGood = 100L
val nbBad = 100L
val uuid = UUID.randomUUID().toString

val resources = for {
_ <- Containers.enrich(
configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
testName = "count",
needsLocalstack = true,
enrichments = Nil,
uuid = uuid
)
enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
} yield enrichPipe

val input = CollectorPayloadGen.generate(nbGood, nbBad)

resources.use { enrich =>
for {
// for some weird reason, the records don't get consumed until the second time calling enrich pipeline
_ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
output <- enrich(input).compile.toList
(good, bad) = parseOutput(output, testName)
} yield {
println(s"${good.size} and ${bad.size}")
good.size.toLong must beEqualTo(nbGood)
bad.size.toLong must beEqualTo(nbBad)
}
}
}

"run the enrichments and attach their context" in {
import utils._

val testName = "enrichments"
val nbGood = 100L
val uuid = UUID.randomUUID().toString

val enrichments = List(
ApiRequest,
Javascript,
SqlQuery,
Yauaa
)

val enrichmentsContexts = enrichments.map(_.outputSchema).map { schemaKey =>
s"contexts_${schemaKey.name}_${schemaKey.vendor}_${schemaKey.version.model}".replace('.', '_')
}

val resources = for {
_ <- Containers.mysqlServer
_ <- Containers.httpServer
_ <- Containers.enrich(
configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
testName = "enrichments",
needsLocalstack = true,
enrichments = enrichments,
uuid = uuid
)
enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
} yield enrichPipe

val input = CollectorPayloadGen.generate(nbGood)

resources.use { enrich =>
for {
// for some weird reason, the records don't get consumed until the second time calling enrich pipeline
_ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
output <- enrich(input).compile.toList
(good, bad) = parseOutput(output, testName)
} yield {
good.size.toLong must beEqualTo(nbGood)
good.map { enriched =>
enrichmentsContexts.map { context =>
enriched.hcursor.downField(context).toOption must beSome
}
}
bad.size.toLong must beEqualTo(0L)
}
}
}

"shutdown when it receives a SIGTERM" in {
Containers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@
*/
package com.snowplowanalytics.snowplow.enrich.eventbridge

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

import cats.effect.{Blocker, ContextShift, IO, Resource, Timer}

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input
import fs2.{Pipe, Stream}
import io.circe.Json

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input
import com.snowplowanalytics.snowplow.enrich.common.fs2.test.Utils
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object utils {

Expand All @@ -34,8 +28,10 @@ object utils {

sealed trait OutputRow
object OutputRow {
final case class Good(event: Event) extends OutputRow
final case class Bad(badRow: BadRow) extends OutputRow
// TODO: we should use Event, we need to find a way to parse the flattened events
final case class Good(event: Json) extends OutputRow
// TODO: we should use BadRow, we need to find a way to parse it
final case class Bad(badRow: Json) extends OutputRow
}

def mkEnrichPipe(
Expand Down Expand Up @@ -75,7 +71,7 @@ object utils {
case Right(json) =>
json.hcursor
.downField("detail")
.as[Event] match {
.as[Json] match {
case Right(r) => r
case Left(e) => throw new RuntimeException(s"Can't parse enriched events from eventbridge: $e, json: $json")
}
Expand All @@ -97,18 +93,14 @@ object utils {

case Left(e) => throw new RuntimeException(s"Can't parse bad row [$s]. Error: $e")
}
Utils.parseBadRow(parsed.noSpaces) match {
case Right(br) => OutputRow.Bad(br)
case Left(e) =>
throw new RuntimeException(s"Can't decode bad row $s. Error: $e")
}
OutputRow.Bad(parsed)
}

def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow]) = {
def parseOutput(output: List[OutputRow], testName: String): (List[Json], List[Json]) = {
val good = output.collect { case OutputRow.Good(e) => e }
println(s"[$testName] Bad rows:")
val bad = output.collect { case OutputRow.Bad(b) =>
println(s"[$testName] ${b.compact}")
println(s"[$testName] ${b.toString()}")
b
}
(good, bad)
Expand Down

0 comments on commit 61f8753

Please sign in to comment.