Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventbridge-it: Fix commented tests #120

Merged
merged 3 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,15 @@
*/
package com.snowplowanalytics.snowplow.enrich.eventbridge

//import java.util.UUID

import scala.concurrent.duration._

import cats.effect.IO

import cats.effect.testing.specs2.CatsIO

import com.snowplowanalytics.snowplow.enrich.common.fs2.test.CollectorPayloadGen
import com.snowplowanalytics.snowplow.enrich.eventbridge.enrichments._
import org.specs2.mutable.Specification
import org.specs2.specification.AfterAll

//import com.snowplowanalytics.snowplow.enrich.eventbridge.enrichments._
//import com.snowplowanalytics.snowplow.enrich.common.fs2.test.CollectorPayloadGen
import java.util.UUID
import scala.concurrent.duration._

class EnrichEventbridgeSpec extends Specification with AfterAll with CatsIO {

Expand All @@ -48,88 +44,91 @@ class EnrichEventbridgeSpec extends Specification with AfterAll with CatsIO {
}
}

// TODO: Enable these tests, we need to find a way to parse the flattened events
// "emit the correct number of enriched events and bad rows" in {
// import utils._
//
// val testName = "count"
// val nbGood = 100l
// val nbBad = 100l
// val uuid = UUID.randomUUID().toString
//
// val resources = for {
// _ <- Containers.enrich(
// configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
// testName = "count",
// needsLocalstack = true,
// enrichments = Nil,
// uuid = uuid
// )
// enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
// } yield enrichPipe
//
// val input = CollectorPayloadGen.generate(nbGood, nbBad)
//
// resources.use { enrich =>
// for {
// // for some weird reason, the records don't get consumed until the second time calling enrich pipeline
// _ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
// output <- enrich(input).compile.toList
// (good, bad) = parseOutput(output, testName)
// } yield {
// println(s"${good.size} and ${bad.size}")
// good.size.toLong must beEqualTo(nbGood)
// bad.size.toLong must beEqualTo(nbBad)
// }
// }
// }
//
// "run the enrichments and attach their context" in {
// import utils._
//
// val testName = "enrichments"
// val nbGood = 100l
// val uuid = UUID.randomUUID().toString
//
// val enrichments = List(
// ApiRequest,
// Javascript,
// SqlQuery,
// Yauaa
// )
//
// val enrichmentsContexts = enrichments.map(_.outputSchema)
//
// val resources = for {
// _ <- Containers.mysqlServer
// _ <- Containers.httpServer
// _ <- Containers.enrich(
// configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
// testName = "enrichments",
// needsLocalstack = true,
// enrichments = enrichments,
// uuid = uuid
// )
// enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
// } yield enrichPipe
//
// val input = CollectorPayloadGen.generate(nbGood)
//
// resources.use { enrich =>
// for {
// // for some weird reason, the records don't get consumed until the second time calling enrich pipeline
// _ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
// output <- enrich(input).compile.toList
// (good, bad) = parseOutput(output, testName)
// } yield {
// good.size.toLong must beEqualTo(nbGood)
// good.map { enriched =>
// enriched.derived_contexts.data.map(_.schema) must containTheSameElementsAs(enrichmentsContexts)
// }
// bad.size.toLong must beEqualTo(0l)
// }
// }
// }
"emit the correct number of enriched events and bad rows" in {
import utils._

val testName = "count"
val nbGood = 100L
val nbBad = 100L
val uuid = UUID.randomUUID().toString

val resources = for {
_ <- Containers.enrich(
configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
testName = "count",
needsLocalstack = true,
enrichments = Nil,
uuid = uuid
)
enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
} yield enrichPipe

val input = CollectorPayloadGen.generate(nbGood, nbBad)

resources.use { enrich =>
for {
// for some weird reason, the records don't get consumed until the second time calling enrich pipeline
_ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
output <- enrich(input).compile.toList
(good, bad) = parseOutput(output, testName)
} yield {
println(s"${good.size} and ${bad.size}")
good.size.toLong must beEqualTo(nbGood)
bad.size.toLong must beEqualTo(nbBad)
}
}
}

"run the enrichments and attach their context" in {
import utils._

val testName = "enrichments"
val nbGood = 100L
val uuid = UUID.randomUUID().toString

val enrichments = List(
ApiRequest,
Javascript,
SqlQuery,
Yauaa
)

val enrichmentsContexts = enrichments.map(_.outputSchema).map { schemaKey =>
s"contexts_${schemaKey.name}_${schemaKey.vendor}_${schemaKey.version.model}".replace('.', '_')
}

val resources = for {
_ <- Containers.mysqlServer
_ <- Containers.httpServer
_ <- Containers.enrich(
configPath = "modules/eventbridge/src/it/resources/enrich/enrich-localstack.hocon",
testName = "enrichments",
needsLocalstack = true,
enrichments = enrichments,
uuid = uuid
)
enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid)
} yield enrichPipe

val input = CollectorPayloadGen.generate(nbGood)

resources.use { enrich =>
for {
// for some weird reason, the records don't get consumed until the second time calling enrich pipeline
_ <- enrich(CollectorPayloadGen.generate(0)).compile.toList
output <- enrich(input).compile.toList
(good, bad) = parseOutput(output, testName)
} yield {
good.size.toLong must beEqualTo(nbGood)
good.map { enriched =>
enrichmentsContexts.map { context =>
enriched.hcursor.downField(context).toOption must beSome
}
}
bad.size.toLong must beEqualTo(0L)
}
}
}

"shutdown when it receives a SIGTERM" in {
Containers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@
*/
package com.snowplowanalytics.snowplow.enrich.eventbridge

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

import cats.effect.{Blocker, ContextShift, IO, Resource, Timer}

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input
import fs2.{Pipe, Stream}
import io.circe.Json

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input
import com.snowplowanalytics.snowplow.enrich.common.fs2.test.Utils
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object utils {

Expand All @@ -34,8 +28,10 @@ object utils {

sealed trait OutputRow
object OutputRow {
final case class Good(event: Event) extends OutputRow
final case class Bad(badRow: BadRow) extends OutputRow
// TODO: we should use Event, we need to find a way to parse the flattened events
final case class Good(event: Json) extends OutputRow
// TODO: we should use BadRow, we need to find a way to parse it
final case class Bad(badRow: Json) extends OutputRow
}

def mkEnrichPipe(
Expand Down Expand Up @@ -75,7 +71,7 @@ object utils {
case Right(json) =>
json.hcursor
.downField("detail")
.as[Event] match {
.as[Json] match {
case Right(r) => r
case Left(e) => throw new RuntimeException(s"Can't parse enriched events from eventbridge: $e, json: $json")
}
Expand All @@ -97,18 +93,14 @@ object utils {

case Left(e) => throw new RuntimeException(s"Can't parse bad row [$s]. Error: $e")
}
Utils.parseBadRow(parsed.noSpaces) match {
case Right(br) => OutputRow.Bad(br)
case Left(e) =>
throw new RuntimeException(s"Can't decode bad row $s. Error: $e")
}
OutputRow.Bad(parsed)
}

def parseOutput(output: List[OutputRow], testName: String): (List[Event], List[BadRow]) = {
def parseOutput(output: List[OutputRow], testName: String): (List[Json], List[Json]) = {
val good = output.collect { case OutputRow.Good(e) => e }
println(s"[$testName] Bad rows:")
val bad = output.collect { case OutputRow.Bad(b) =>
println(s"[$testName] ${b.compact}")
println(s"[$testName] ${b.toString()}")
b
}
(good, bad)
Expand Down
Loading