diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8453a625..4f41e32d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -63,11 +63,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v')) - run: mkdir -p money-java-servlet/target money-aspectj/target money-http-client/target money-otlp-http-exporter/target money-api/target money-kafka/target money-otel-handler/target money-otel-jaeger-exporter/target money-otlp-exporter/target money-otel-zipkin-exporter/target money-akka/target money-otel-formatters/target money-spring/target money-core/target money-otel-logging-exporter/target money-otel-inmemory-exporter/target money-jakarta-servlet/target money-wire/target project/target + run: mkdir -p money-java-servlet/target money-aspectj/target money-http-client/target money-otlp-http-exporter/target money-api/target money-otel-handler/target money-otel-jaeger-exporter/target money-otlp-exporter/target money-otel-zipkin-exporter/target money-akka/target money-otel-formatters/target money-spring/target money-core/target money-otel-logging-exporter/target money-otel-inmemory-exporter/target money-jakarta-servlet/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v')) - run: tar cf targets.tar money-java-servlet/target money-aspectj/target money-http-client/target money-otlp-http-exporter/target money-api/target money-kafka/target money-otel-handler/target money-otel-jaeger-exporter/target money-otlp-exporter/target money-otel-zipkin-exporter/target money-akka/target money-otel-formatters/target money-spring/target money-core/target money-otel-logging-exporter/target money-otel-inmemory-exporter/target money-jakarta-servlet/target money-wire/target project/target + run: tar cf targets.tar money-java-servlet/target money-aspectj/target money-http-client/target money-otlp-http-exporter/target money-api/target money-otel-handler/target money-otel-jaeger-exporter/target money-otlp-exporter/target money-otel-zipkin-exporter/target money-akka/target money-otel-formatters/target money-spring/target money-core/target money-otel-logging-exporter/target money-otel-inmemory-exporter/target money-jakarta-servlet/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v')) diff --git a/build.sbt b/build.sbt index bf1764bb..6a178f67 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,6 @@ import Dependencies._ import com.typesafe.sbt.SbtScalariform import sbt.Keys._ import sbt._ -import sbtavro.SbtAvro._ import scoverage.ScoverageKeys import scoverage.ScoverageSbtPlugin._ @@ -43,8 +42,6 @@ lazy val money = moneyHttpClient, moneyJavaServlet, moneyJakartaServlet, - moneyWire, - moneyKafka, moneySpring, moneyOtelFormatters, moneyOtelHandler, @@ -164,41 +161,6 @@ lazy val moneyJakartaServlet = ) .dependsOn(moneyCore % "test->test;compile->compile") -lazy val moneyWire = - Project("money-wire", file("./money-wire")) - .enablePlugins(AutomateHeaderPlugin) - .settings(projectSettings: _*) - .settings( - libraryDependencies ++= - Seq( - json4sNative, - json4sJackson - ) ++ commonTestDependencies, - fork := false, - doc / javacOptions := Seq("-source", "1.6"), - // Configure the desired Avro version. sbt-avro automatically injects a libraryDependency. - AvroConfig / version := "1.7.6", - AvroConfig / stringType := "String" - ).dependsOn(moneyCore % "test->test;compile->compile") - -lazy val moneyKafka = - Project("money-kafka", file("./money-kafka")) - .enablePlugins(AutomateHeaderPlugin) - .settings(projectSettings: _*) - .settings( - libraryDependencies ++= - Seq( - kafka, - bijectionCore, - bijectionAvro, - chill, - chillAvro, - chillBijection, - commonsIo - ) ++ commonTestDependencies - ) - .dependsOn(moneyCore, moneyWire % "test->test;compile->compile") - lazy val moneySpring = Project("money-spring", file("./money-spring")) .enablePlugins(AutomateHeaderPlugin) diff --git a/money-kafka/src/main/resources/broker-defaults.properties b/money-kafka/src/main/resources/broker-defaults.properties deleted file mode 100755 index 4ae0616b..00000000 --- a/money-kafka/src/main/resources/broker-defaults.properties +++ /dev/null @@ -1,31 +0,0 @@ -# See http://kafka.apache.org/documentation.html#brokerconfigs for default values. - -# Each broker is uniquely identified by a non-negative integer id. This id serves as the brokers "name", and allows -# the broker to be moved to a different host/port without confusing consumers. You can choose any number you like so -# long as it is unique. -broker.id=0 - -# Hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all -# interfaces, and publish one to ZK. -host.name=127.0.0.1 - -# The port on which the server accepts client connections. -port=9092 - -# The default number of partitions per topic. -# -num.partitions=1 - -# Enable auto creation of topic on the server. If this is set to true then attempts to produce, consume, or fetch -# metadata for a non-existent topic will automatically create it with the default replication factor and number of -# partitions. -auto.create.topics.enable=true - -# The maximum size of a message that the server can receive. It is important that this property be in sync with the -# maximum fetch size your consumers use or else an unruly consumer will be able to publish messages too large for -# consumers to consume. -# -# Be careful with this setting when producing messages in batches with compression enabled. In such a scenario the -# batch of messages is treated as a single message, and its total size must be smaller than this setting. -# -message.max.bytes=1000000 \ No newline at end of file diff --git a/money-kafka/src/main/resources/consumer-defaults.properties b/money-kafka/src/main/resources/consumer-defaults.properties deleted file mode 100755 index dfa3e742..00000000 --- a/money-kafka/src/main/resources/consumer-defaults.properties +++ /dev/null @@ -1,110 +0,0 @@ -### -### Consumer Basics -### - -# A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same -# group id multiple processes indicate that they are all part of the same consumer group. -# -group.id=test-consumer - -# Specifies the zookeeper connection string in the form `hostname:port`, where host and port are the host and port of a -# zookeeper server. To allow connecting through other zookeeper nodes when that zookeeper machine is down, you can also -# specify multiple hosts in the form `hostname1:port1,hostname2:port2,hostname3:port3`. -# -# The server may also have a zookeeper chroot path as part of it's zookeeper connection string which puts its data under -# some path in the global zookeeper namespace. If so the consumer should use the same chroot path in its connection -# string. For example to give a chroot path of /chroot/path you would give the connection string as -# `hostname1:port1,hostname2:port2,hostname3:port3/chroot/path`. -# -# Important: You must only add the custom chroot _to the end of the string_: -# -# # WRONG! -# zookeeper.connect=zkserver1:2181/my_kafka,zkserver:2181/my_kafka -# -# # CORRECT -# zookeeper.connect=zkserver1:2181,zkserver:2181/my_kafka -# -zookeeper.connect=localhost:2181 - -# If true, periodically commit to zookeeper the offset of messages already fetched by the consumer. This committed -# offset will be used when the process fails as the position from which the new consumer will begin. -# -auto.commit.enable=true - -# What to do when there is no initial offset in Zookeeper or if an offset is out of range: -# * "smallest": Automatically reset the offset to the smallest offset (= read from the very beginning of the stream) -# * "largest": Automatically reset the offset to the largest offset (= connect and wait for new data coming in) -# * anything else: Throw exception to the consumer. -# -# If this is set to largest, the consumer may lose some messages when the number of partitions -- for the topics it -# has subscribed to -- changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to -# smallest. -# -auto.offset.reset=largest - -# Throw a timeout exception to the consumer if no message is available for consumption after the specified interval. -# A negative value means that the consumer happily waits forever (without throwing a timeout exception). -# -consumer.timeout.ms=-1 - -# The socket receive buffer for network requests. -# -socket.receive.buffer.bytes=65536 - -# The socket timeout for network requests. -# -# See the entry "What is the relationship between fetch.wait.max.ms and socket.timeout.ms on the consumer?" in the -# FAQ. -# -socket.timeout.ms=30000 - -# The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to -# immediately satisfy `fetch.min.bytes`. -# -# See the entry "What is the relationship between fetch.wait.max.ms and socket.timeout.ms on the consumer?" in the -# FAQ. -# -fetch.wait.max.ms=100 - -# The minimum amount of data the server should return for a fetch request. If insufficient data is available the -# request will wait for that much data to accumulate before answering the request. -# -fetch.min.bytes=1 - -# The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will -# be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size -# must be at least as large as the maximum message size the server allows or else it is possible for the producer to -# send messages larger than the consumer can fetch. -# -fetch.message.max.bytes=1048576 - -# Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes. -# -queued.max.message.chunks=10 - -# When a new consumer joins a consumer group, the set of consumers attempt to "rebalance" the load to assign partitions -# to each consumer. If the set of consumers changes while this assignment is taking place, the rebalance will fail and -# retry. This setting controls the maximum number of attempts before giving up. -# -rebalance.max.retries=4 - -# Backoff time between retries during rebalance. -# -rebalance.backoff.ms=2000 - -# Backoff time to wait before trying to determine the leader of a partition that has just lost its leader. -# -refresh.leader.backoff.ms=200 - -# Zookeeper session timeout. If the consumer fails to heartbeat to zookeeper for this period of time it is considered -# dead and a rebalance will occur. -# -zookeeper.session.timeout.ms=6000 - -# The max time that the client waits while establishing a connection to zookeeper. -# -zookeeper.connection.timeout.ms=6000 - -# How far a ZK follower can be behind a ZK leader -# -zookeeper.sync.time.ms=2000 \ No newline at end of file diff --git a/money-kafka/src/main/resources/producer-defaults.properties b/money-kafka/src/main/resources/producer-defaults.properties deleted file mode 100755 index ae1e21fc..00000000 --- a/money-kafka/src/main/resources/producer-defaults.properties +++ /dev/null @@ -1,117 +0,0 @@ -### -### Producer Basics -### - -# The client id is a user-specified string sent in each request to help trace calls. It should logically identify the -# application making the request. -# -client.id=test-producer - -# This is for bootstrapping knowledge about the rest of the cluster, and the producer will only use it for getting -# metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established -# based on the broker information returned in the metadata. -# -# The format is: -# host1:port1,host2:port2 -# -# The list can be a subset of brokers or a VIP pointing to a subset of brokers. -# -metadata.broker.list=localhost:9092 - -# The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of -# the key. -# -partitioner.class=kafka.producer.DefaultPartitioner - -# specifies whether the messages are sent asynchronously (async) or synchronously (sync) -# Specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) "async" for -# asynchronous send and (2) "sync" for synchronous send. By setting the producer to async we allow batching together of -# requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent -# data. -# -producer.type=sync - -# This value controls when a produce request is considered completed. Specifically, how many other brokers must have -# committed the data to their log and acknowledged this to the leader? Typical values are: -# -# 0) The producer never waits for an acknowledgement from the broker, i.e. fire-and-forget (the same behavior as 0.7). -# This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a -# server fails). -# 1) The producer gets an acknowledgement after the leader replica has received the data. -# This option provides better durability as the client waits until the server acknowledges the request as successful -# (only messages that were written to the now-dead leader but not yet replicated will be lost). -# -1) The producer gets an acknowledgement after all in-sync replicas have received the data. -# This option provides the best durability, we guarantee that no messages will be lost as long as at least one -# in-sync replica remains. -# -# In general, the valid range of this setting is [-1, #numPartitionsOfTopic]. -# -request.required.acks=0 - - -# This property will cause the producer to automatically retry a failed send request. This property specifies the -# number of retries when such failures occur. Note that setting a non-zero value here can lead to duplicates in the case -# of network errors that cause a message to be sent but the acknowledgement to be lost. -message.send.max.retries=3 - -# The producer generally refreshes the topic metadata from brokers when there is a failure (key missing, leader -# not available...). It will also poll regularly (default: every 10min = 600000ms). -# -# If you set this to a negative value, metadata will only get refreshed on failure. -# -# If you set this to zero, the metadata will get refreshed after each message sent (not recommended). -# Important note: The refresh happens only AFTER the message is sent, so if the producer never sends a message the -# metadata is never refreshed! -# -topic.metadata.refresh.interval.ms=600000 - -# Specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". -# The old config values work as well: 0 (none), 1 (gzipo), 2 (snappy). -# -compression.codec=snappy - -# The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]. -# -serializer.class=kafka.serializer.DefaultEncoder - -# Set whether compression should be turned on for particular topics. If the compression codec is anything other than -# `NoCompressionCodec`, enable compression only for specified topics if any. If the list of compressed topics is empty, -# then enable the specified compression codec for all topics. If the compression codec is `NoCompressionCodec`, -# compression is disabled for all topics. -# -#compressed.topics= - -### -### Async Producer -### - -# Maximum time to buffer data when using async mode. For example a setting of 100 will try to batch together 100ms of -# messages to send at once. This will improve throughput but adds message delivery latency due to the buffering. -# -queue.buffering.max.ms=5000 - -# The maximum number of unsent messages that can be queued up the producer when using async mode before either the -# producer must be blocked or data must be dropped. -# -queue.buffering.max.messages=10000 - -# The amount of time to block before dropping messages when running in async mode and the buffer has reached -# `queue.buffering.max.messages`. -# -# 0: Events will be enqueued immediately or dropped if the queue is full (the producer send call will never -# block). -# -ve: The producer will block indefinitely if the queue is full, and it will never willingly drop a send. -# +ve: The producer will block up to this many milliseconds if the queue is full. -# -queue.enqueue.timeout.ms=-1 - -# The number of messages to send in one batch when using async mode. The producer will wait until either this number of -# messages are ready to send or queue.buffer.max.ms is reached. -# -# Important note: If compression is enabled, then the compressed batch of messages is treated as a single message, -# whose size must be smaller than `max.message.bytes`. If compression is disabled, only each individual (uncompressed) -# message must be smaller than `max.message.byes`, i.e. in this case the batch size does not really matter w.r.t. -# `max.message.bytes`. -# See http://grokbase.com/t/kafka/users/139v9xqqj7/understanding-messagesizetoolarge-and-batches. -# -batch.num.messages=200 \ No newline at end of file diff --git a/money-kafka/src/main/resources/reference.conf b/money-kafka/src/main/resources/reference.conf deleted file mode 100644 index 3b7ae70e..00000000 --- a/money-kafka/src/main/resources/reference.conf +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2012-2015 Comcast Cable Communications Management, LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -money { - emitter { - emitters = [ - { - name = "kafka-emitter" - class-name = "com.comcast.money.kafka.KafkaEmitter" - subscriptions = [Trace] - configuration = { - topic = "money" - compression.codec = "1" // gzip compression enabled by default - producer.type = "async" // async does not block, but will allow message loss in a network partition - batch.num.messages = "1" // batching messages together increases throughput - message.send.max.retries = "3" // retrying on a network partition could introduce duplicates - request.required.acks = "0" // this indicates that we never wait for an ack. 1 gets an ack once the leader replica receives the data. -1 waits until all replicas get data - metadata.broker.list = "localhost:9092" - } - } - ] - } -} diff --git a/money-kafka/src/main/scala/com/comcast/money/kafka/KafkaSpanHandler.scala b/money-kafka/src/main/scala/com/comcast/money/kafka/KafkaSpanHandler.scala deleted file mode 100644 index 1c5622ae..00000000 --- a/money-kafka/src/main/scala/com/comcast/money/kafka/KafkaSpanHandler.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2012 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.kafka -import java.util.Properties -import com.comcast.money.api.{ SpanHandler, SpanInfo } -import com.comcast.money.wire.AvroConversions -import com.typesafe.config.Config -import org.apache.kafka.clients.producer.{ KafkaProducer, Producer, ProducerRecord } - -import scala.collection.JavaConverters._ - -// We use the producer maker so that we can mock this out -trait ProducerMaker { - def convertConfigToProperties(config: Config): Properties - def createProducer(properties: Properties): Producer[Array[Byte], Array[Byte]] -} - -trait ConfigDrivenProducerMaker extends ProducerMaker { - - def convertConfigToProperties(config: Config): Properties = { - val properties = new Properties() - - for (entry <- config.entrySet.asScala) { - val key = entry.getKey() - val value = config.getAnyRef(key) - properties.put(key, value) - } - - properties - } - - def createProducer(properties: Properties): Producer[Array[Byte], Array[Byte]] = - new KafkaProducer[Array[Byte], Array[Byte]](properties) -} - -class KafkaSpanHandler(config: Config) extends SpanHandler with ConfigDrivenProducerMaker { - - import AvroConversions._ - - private[kafka] val topic: String = config.getString("topic") - private[kafka] val properties: Properties = convertConfigToProperties(config) - private[kafka] val producer: Producer[Array[Byte], Array[Byte]] = createProducer(properties) - - def handle(span: SpanInfo): Unit = { - producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, span.convertTo[Array[Byte]])) - } -} diff --git a/money-kafka/src/test/resources/application.conf b/money-kafka/src/test/resources/application.conf deleted file mode 100644 index f957eeca..00000000 --- a/money-kafka/src/test/resources/application.conf +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2012-2015 Comcast Cable Communications Management, LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -money { - enabled = true - mdc.enabled = true - application-name = "unknown" - log-exceptions = false - - handling = { - async = true - handlers = [ - { - class = "com.comcast.money.kafka.KafkaEmitter" - topic = "money" - compression.codec = "1" // gzip compression enabled by default - producer.type = "async" // async does not block, but will allow message loss in a network partition - batch.num.messages = "1" // batching messages together increases throughput - message.send.max.retries = "3" // retrying on a network partition could introduce duplicates - request.required.acks = "0" // this indicates that we never wait for an ack. 1 gets an ack once the leader replica receives the data. -1 waits until all replicas get data - metadata.broker.list = "localhost:9092" - } - ] - } -} diff --git a/money-kafka/src/test/scala/com/comcast/money/kafka/KafkaSpanHandlerSpec.scala b/money-kafka/src/test/scala/com/comcast/money/kafka/KafkaSpanHandlerSpec.scala deleted file mode 100644 index 521e2a7a..00000000 --- a/money-kafka/src/test/scala/com/comcast/money/kafka/KafkaSpanHandlerSpec.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2012 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.kafka - -import com.comcast.money.api.Note -import com.comcast.money.api -import com.typesafe.config.{ Config, ConfigFactory } -import kafka.message.{ CompressionCodec, GZIPCompressionCodec } -import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord } -import org.mockito.ArgumentCaptor -import org.mockito.Mockito._ -import org.scalatest.BeforeAndAfterAll -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec -import org.scalatestplus.mockito.MockitoSugar - -import scala.collection.JavaConverters._ -import java.{ util => ju } - -import com.comcast.money.wire.TestSpanInfo -import io.opentelemetry.api.trace.StatusCode - -trait MockProducerMaker extends ProducerMaker { - - val mockProducer = mock(classOf[KafkaProducer[Array[Byte], Array[Byte]]]) - - def makeProducer(conf: Config): KafkaProducer[Array[Byte], Array[Byte]] = mockProducer -} - -class TestKafkaSpanHandler(config: Config) extends KafkaSpanHandler(config) { - - var mockProducer: KafkaProducer[Array[Byte], Array[Byte]] = _ - - override def createProducer(properties: ju.Properties): KafkaProducer[Array[Byte], Array[Byte]] = { - mockProducer = mock(classOf[KafkaProducer[Array[Byte], Array[Byte]]]) - mockProducer - } -} - -class KafkaSpanHandlerSpec extends AnyWordSpec - with Matchers - with MockitoSugar - with BeforeAndAfterAll { - - trait KafkaFixture { - val testConfig = mock[Config] - when(testConfig.getString("topic")).thenReturn("test-topic") - - val underTest = new TestKafkaSpanHandler(testConfig) - - val testProducer = underTest.mockProducer - val sampleData = TestSpanInfo( - id = api.SpanId.createNew(), - name = "key", - appName = "app", - host = "host", - startTimeNanos = 1000000L, - status = StatusCode.OK, - durationNanos = 35000L, - notes = Map[String, Note[_]]("what" -> api.Note.of("what", 1L), "when" -> api.Note.of("when", 2L), "bob" -> api.Note.of("bob", "craig")).asJava) - } - - "A KafkaEmitter" should { - "make a producer in configure" in new KafkaFixture { - underTest.producer shouldBe underTest.mockProducer - } - "send a message to the producer for a span" in new KafkaFixture { - underTest.handle(sampleData) - - val captor = ArgumentCaptor.forClass(classOf[ProducerRecord[Array[Byte], Array[Byte]]]) - verify(testProducer).send(captor.capture()) - } - } - - "A ConfigDrivenProducerMaker" should { - "set the properties from the config" in { - val config = ConfigFactory.parseString( - """ - | topic = "money" - | compression.codec = "1" - | producer.type = "async" - | batch.num.messages = "1" - | message.send.max.retries = "3" - | request.required.acks = "0" - | metadata.broker.list = "localhost:9092" - | bootstrap.servers = "localhost:9092" - | key.serializer = "org.apache.kafka.common.serialization.StringSerializer" - | value.serializer = "org.apache.kafka.common.serialization.StringSerializer" - """.stripMargin) - val testHandler = new KafkaSpanHandler(config) - - val producerConfig = testHandler.properties - - producerConfig.getProperty("metadata.broker.list") shouldBe "localhost:9092" - producerConfig.getProperty("compression.codec") shouldBe "1" - producerConfig.getProperty("producer.type") shouldBe "async" - producerConfig.getProperty("batch.num.messages") shouldBe "1" - producerConfig.getProperty("message.send.max.retries") shouldBe "3" - producerConfig.getProperty("request.required.acks") shouldBe "0" - } - } -} diff --git a/money-wire/src/main/avro/money.avsc b/money-wire/src/main/avro/money.avsc deleted file mode 100644 index c88e529a..00000000 --- a/money-wire/src/main/avro/money.avsc +++ /dev/null @@ -1,111 +0,0 @@ -{ - "namespace": "com.comcast.money.wire.avro", - "name": "Span", - "type": "record", - "fields": [ - { - "name": "name", - "type": "string" - }, - { - "name": "appName", - "type": "string", - "default": "unknown" - }, - { - "name": "host", - "type": "string" - }, - { - "name": "libraryName", - "type": ["null","string"], - "default": null - }, - { - "name": "libraryVersion", - "type": ["null","string"], - "default": null - }, - { - "name": "duration", - "type": "long", - "default": 0 - }, - { - "name": "success", - "type": "boolean", - "default": true - }, - { - "name": "startTime", - "type": "long" - }, - { - "name": "id", - "type": { - "name": "SpanId", - "type": "record", - "fields": [ - { - "name": "traceId", - "type": "string" - }, - { - "name": "parentId", - "type": "long" - }, - { - "name": "spanId", - "type": "long" - } - ] - } - }, - { - "name": "notes", - "type": { - "type": "array", - "items": { - "name": "Note", - "type": "record", - "fields": [ - { - "name": "name", - "type": "string" - }, - { - "name": "timestamp", - "type": "long" - }, - { - "name": "value", - "type": { - "name": "NoteValue", - "type": "record", - "fields": [ - { - "name": "type", - "type": { - "name": "NoteType", - "type": "enum", - "symbols": [ - "Boolean", - "Long", - "String", - "Double" - ] - } - }, - { - "name": "data", - "type": ["string","null"] - } - ] - } - } - ] - } - } - } - ] -} \ No newline at end of file diff --git a/money-wire/src/main/scala/com/comcast/money/wire/SpanConverters.scala b/money-wire/src/main/scala/com/comcast/money/wire/SpanConverters.scala deleted file mode 100644 index b430894b..00000000 --- a/money-wire/src/main/scala/com/comcast/money/wire/SpanConverters.scala +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright 2012 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.wire - -import java.io.ByteArrayOutputStream -import java.util -import java.util.Collections -import java.util.concurrent.TimeUnit -import com.comcast.money.api -import com.comcast.money.api.{ InstrumentationLibrary, Note, SpanId, SpanInfo } -import com.comcast.money.core._ -import com.comcast.money.wire.avro -import com.comcast.money.wire.avro.NoteType -import com.fasterxml.jackson.annotation.JsonIgnore -import com.fasterxml.jackson.databind.{ DeserializationFeature, ObjectMapper } -import io.opentelemetry.api.trace.{ Span, SpanKind, StatusCode, TraceFlags, TraceState } -import org.apache.avro.Schema -import org.apache.avro.io.{ DecoderFactory, EncoderFactory } -import org.apache.avro.specific.{ SpecificDatumReader, SpecificDatumWriter } - -import scala.collection.JavaConverters._ - -trait TypeConverter[From, To] { - - def convert(from: From): To -} - -object TypeConverter { - - def instance[From, To](f: From => To): TypeConverter[From, To] = new TypeConverter[From, To] { - def convert(from: From): To = f(from) - } -} - -object AvroConversions extends SpanAvroConverters { - - implicit class AvroConversionExtensions[A](val a: A) extends AnyVal { - def convertTo[B](implicit tc: TypeConverter[A, B]): B = tc.convert(a) - } -} -object JsonConversions extends SpanJsonConverters { - - implicit class JsonConversionExtensions[A](val a: A) extends AnyVal { - def convertTo[B](implicit tc: TypeConverter[A, B]): B = tc.convert(a) - } -} - -trait SpanWireConverters { - - implicit val noteToWire: TypeConverter[api.Note[_], avro.Note] = TypeConverter.instance { from: api.Note[_] => - def avroNote(noteValue: avro.NoteValue): avro.Note = new avro.Note(from.name, from.timestamp, noteValue) - - from.value match { - case l: Long => avroNote( - new avro.NoteValue(avro.NoteType.Long, l.toString)) - case s: String => avroNote( - new avro.NoteValue(avro.NoteType.String, s)) - case b: java.lang.Boolean => avroNote( - new avro.NoteValue(avro.NoteType.Boolean, b.toString)) - case d: Double => avroNote( - new avro.NoteValue(avro.NoteType.Double, d.toString)) - case null => avroNote( - new avro.NoteValue(avro.NoteType.String, null)) - - } - } - - implicit val wireToNote: TypeConverter[avro.Note, api.Note[_]] = TypeConverter.instance { from: avro.Note => - def toOption[T](str: String)(ft: String => T): Option[T] = { - if (str == null) - None: Option[T] - else - Some(ft(str)) - } - - from.getValue.getType match { - case NoteType.Boolean => api.Note.of( - from.getName, from.getValue.getData.toBoolean, from.getTimestamp) - case NoteType.Long => api.Note.of(from.getName, from.getValue.getData.toLong, from.getTimestamp) - case NoteType.String => api.Note.of( - from.getName, from.getValue.getData, from.getTimestamp) - case NoteType.Double => api.Note.of( - from.getName, from.getValue.getData.toDouble, from.getTimestamp) - } - } - - implicit val spanIdToWire: TypeConverter[api.SpanId, avro.SpanId] = TypeConverter.instance { spanId => - new avro.SpanId(spanId.traceId, spanId.parentId, spanId.selfId) - } - - implicit val wireToSpanId: TypeConverter[avro.SpanId, api.SpanId] = TypeConverter.instance { spanId => - api.SpanId.createRemote(spanId.getTraceId, spanId.getParentId, spanId.getSpanId, TraceFlags.getSampled, TraceState.getDefault) - } - - implicit val spanToWire: TypeConverter[SpanInfo, avro.Span] = TypeConverter.instance { span: SpanInfo => - - var success = span.success - new avro.Span( - span.name, - span.appName, - span.host, - span.library.name, - span.library.version, - span.durationMicros, - if (success == null) true else success, - span.startTimeMillis, - implicitly[TypeConverter[api.SpanId, avro.SpanId]].convert(span.id), - span.notes.values.asScala.toList.map(implicitly[TypeConverter[api.Note[_], avro.Note]].convert).asJava) - } - - implicit val wireToSpan: TypeConverter[avro.Span, SpanInfo] = TypeConverter.instance { from: avro.Span => - - def toNotesMap(notes: java.util.List[avro.Note]): java.util.Map[String, api.Note[_]] = { - val res = new java.util.HashMap[String, api.Note[_]] - notes.asScala.foreach(n => res.put(n.getName, implicitly[TypeConverter[avro.Note, api.Note[_]]].convert(n))) - res - } - - def toInstrumentationLibrary(span: avro.Span): InstrumentationLibrary = - if (span.getLibraryName != null && !span.getLibraryName.isEmpty) { - new InstrumentationLibrary(span.getLibraryName, span.getLibraryVersion) - } else { - InstrumentationLibrary.UNKNOWN - } - - new SpanInfo { - override def notes(): util.Map[String, Note[_]] = toNotesMap(from.getNotes) - override def events(): util.List[SpanInfo.Event] = Collections.emptyList() - override def startTimeNanos(): Long = TimeUnit.MILLISECONDS.toNanos(from.getStartTime) - override def endTimeNanos(): Long = startTimeNanos + durationNanos - override def status(): StatusCode = if (from.getSuccess) StatusCode.OK else StatusCode.ERROR - override def kind(): SpanKind = SpanKind.INTERNAL - override def description(): String = "" - override def id(): SpanId = implicitly[TypeConverter[avro.SpanId, api.SpanId]].convert(from.getId) - override def name(): String = from.getName - override def durationNanos(): Long = TimeUnit.MICROSECONDS.toNanos(from.getDuration) - override def library(): InstrumentationLibrary = toInstrumentationLibrary(from) - override def appName(): String = from.getAppName - override def host(): String = from.getHost - } - } -} - -trait SpanAvroConverters extends SpanWireConverters { - - val spanDatumWriter = new SpecificDatumWriter[avro.Span](avro.Span.getClassSchema) - val spanDatumReader = new SpecificDatumReader[avro.Span](avro.Span.getClassSchema) - - implicit val spanToAvro: TypeConverter[SpanInfo, Array[Byte]] = TypeConverter.instance { span => - - val bytes = new ByteArrayOutputStream() - val spanBinaryEncoder = EncoderFactory.get.directBinaryEncoder(bytes, null) - val wireSpan = implicitly[TypeConverter[SpanInfo, avro.Span]].convert(span) - spanDatumWriter.write(wireSpan, spanBinaryEncoder) - bytes.toByteArray - } - - implicit val avroToSpan: TypeConverter[Array[Byte], SpanInfo] = TypeConverter.instance { bytes => - val spanBinaryDecoder = DecoderFactory.get.binaryDecoder(bytes, 0, bytes.length, null) - implicitly[TypeConverter[avro.Span, SpanInfo]].convert(spanDatumReader.read(null, spanBinaryDecoder)) - } -} - -trait SpanJsonConverters extends SpanWireConverters { - - val mapper: ObjectMapper = createSpanJsonMapper() - - /** - * Mixin that is used by the Jackson ObjectMapper so we can explicitly ignore certain properties - */ - abstract class IgnoreSpanProperties { - @JsonIgnore - def getSchema(): Schema - } - - def createSpanJsonMapper(): ObjectMapper = { - // Make sure we don't fail on unknown types - val jsonMapper: ObjectMapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - - jsonMapper.setMixIns( - Map[Class[_], Class[_]]( - classOf[avro.Span] -> classOf[IgnoreSpanProperties], - classOf[avro.SpanId] -> classOf[IgnoreSpanProperties], - classOf[avro.Note] -> classOf[IgnoreSpanProperties], - classOf[avro.NoteValue] -> classOf[IgnoreSpanProperties]).asJava) - jsonMapper - } - - implicit val spanToJson: TypeConverter[SpanInfo, String] = TypeConverter.instance { span => - mapper.writeValueAsString(implicitly[TypeConverter[SpanInfo, avro.Span]].convert(span)) - } - - implicit val jsonToSpan: TypeConverter[String, SpanInfo] = TypeConverter.instance { str => - implicitly[TypeConverter[avro.Span, SpanInfo]].convert(mapper.readValue(str, classOf[avro.Span])) - } -} diff --git a/money-wire/src/test/scala/com/comcast/money/wire/AvroConversionSpec.scala b/money-wire/src/test/scala/com/comcast/money/wire/AvroConversionSpec.scala deleted file mode 100644 index e90376c3..00000000 --- a/money-wire/src/test/scala/com/comcast/money/wire/AvroConversionSpec.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2012 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.wire - -import com.comcast.money.api.{ Note, SpanInfo } -import com.comcast.money.core.formatters.FormatterUtils.randomRemoteSpanId -import io.opentelemetry.api.trace.StatusCode -import org.scalatest.Inspectors -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -class AvroConversionSpec extends AnyWordSpec with Matchers with Inspectors { - - import AvroConversions._ - - import scala.collection.JavaConverters._ - - "Avro Conversion" should { - "roundtrip" in { - val orig = TestSpanInfo( - id = randomRemoteSpanId(), - name = "key", - appName = "app", - host = "host", - startTimeNanos = 1000000L, - endTimeNanos = 1035000L, - status = StatusCode.OK, - durationNanos = 35000L, - notes = Map[String, Note[_]]( - "what" -> Note.of("what", 1L), - "when" -> Note.of("when", 2L), - "bob" -> Note.of("bob", "craig"), - "none" -> Note.of("none", null), - "bool" -> Note.of("bool", true), - "dbl" -> Note.of("dbl", 1.0)).asJava).asInstanceOf[SpanInfo] - - val bytes = orig.convertTo[Array[Byte]] - val roundtrip = bytes.convertTo[SpanInfo] - - roundtrip.appName shouldEqual orig.appName - roundtrip.name shouldEqual orig.name - roundtrip.durationMicros shouldEqual orig.durationMicros - roundtrip.host shouldEqual orig.host - roundtrip.id shouldEqual orig.id - roundtrip.success shouldEqual orig.success - roundtrip.startTimeMillis shouldEqual orig.startTimeMillis - roundtrip.notes shouldEqual orig.notes - } - } -} diff --git a/money-wire/src/test/scala/com/comcast/money/wire/JsonConversionSpec.scala b/money-wire/src/test/scala/com/comcast/money/wire/JsonConversionSpec.scala deleted file mode 100644 index de3af2f3..00000000 --- a/money-wire/src/test/scala/com/comcast/money/wire/JsonConversionSpec.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2012 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.wire - -import com.comcast.money.api.{ Note, SpanInfo } -import com.comcast.money.core.formatters.FormatterUtils.randomRemoteSpanId -import io.opentelemetry.api.trace.StatusCode -import org.scalatest.Inspectors -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -class JsonConversionSpec extends AnyWordSpec with Matchers with Inspectors { - - import JsonConversions._ - - import scala.collection.JavaConverters._ - - val orig = TestSpanInfo( - id = randomRemoteSpanId(), - name = "key", - appName = "app", - host = "host", - startTimeNanos = 1000000L, - endTimeNanos = 1035000L, - status = StatusCode.OK, - durationNanos = 35000L, - notes = Map[String, Note[_]]( - "what" -> Note.of("what", 1L), - "when" -> Note.of("when", 2L), - "bob" -> Note.of("bob", "craig"), - "none" -> Note.of("none", null), - "bool" -> Note.of("bool", true), - "dbl" -> Note.of("dbl", 1.0)).asJava).asInstanceOf[SpanInfo] - - "Json Conversion" should { - "roundtrip" in { - - val json = orig.convertTo[String] - val converted = json.convertTo[SpanInfo] - - converted.appName shouldEqual orig.appName - converted.name shouldEqual orig.name - converted.durationMicros shouldEqual orig.durationMicros - converted.host shouldEqual orig.host - converted.id shouldEqual orig.id - converted.success shouldEqual orig.success - converted.startTimeMillis shouldEqual orig.startTimeMillis - converted.notes shouldEqual orig.notes - } - } -} diff --git a/money-wire/src/test/scala/com/comcast/money/wire/TestSpanInfo.scala b/money-wire/src/test/scala/com/comcast/money/wire/TestSpanInfo.scala deleted file mode 100644 index dac3bc74..00000000 --- a/money-wire/src/test/scala/com/comcast/money/wire/TestSpanInfo.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2012 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.comcast.money.wire - -import java.util.Collections -import com.comcast.money.api.{ InstrumentationLibrary, Note, SpanId, SpanInfo } -import com.comcast.money.core.Money -import io.opentelemetry.api.trace.{ Span, SpanKind, StatusCode } - -case class TestSpanInfo( - id: SpanId, - name: String, - kind: SpanKind = SpanKind.INTERNAL, - library: InstrumentationLibrary = new InstrumentationLibrary("test", "0.0.1"), - startTimeNanos: Long = 0L, - endTimeNanos: Long = 0L, - durationNanos: Long = 0L, - status: StatusCode = StatusCode.UNSET, - description: String = "", - notes: java.util.Map[String, Note[_]] = Collections.emptyMap(), - appName: String = Money.Environment.applicationName, - host: String = Money.Environment.hostName) extends SpanInfo diff --git a/project/plugins.sbt b/project/plugins.sbt index 6fae3dfb..55f35d93 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -11,8 +11,6 @@ addSbtPlugin("com.lightbend.sbt" % "sbt-aspectj" % "0.11.0") addSbtPlugin("com.typesafe.sbt" % "sbt-site" % "1.3.2") -addSbtPlugin("com.cavorite" % "sbt-avro-1-8" % "1.1.9") - addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.5.0") addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.3")