Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/177 deduplicated kafka writer 2 #184

Merged
merged 27 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e4aebaa
#177 works now
kevinwallimann Nov 12, 2020
1325b9e
refactored
kevinwallimann Nov 13, 2020
b53ddce
wip
kevinwallimann Nov 16, 2020
e042a0d
Update test for multiple id columns
kevinwallimann Nov 18, 2020
c276401
Works with composite id
kevinwallimann Nov 18, 2020
a537cc4
works with nested columns
kevinwallimann Nov 18, 2020
a2aac71
#177: Configure multiple kafka partitions on testcase
kevinwallimann Nov 19, 2020
13db638
Attempt real test case
kevinwallimann Nov 25, 2020
42618b1
Doesn't work with multiple partitions
kevinwallimann Nov 25, 2020
bfe0fda
works now
kevinwallimann Nov 26, 2020
b577e39
Generalize kafka record selection
kevinwallimann Nov 26, 2020
864ed48
Refactoring + Tests wip
kevinwallimann Nov 26, 2020
152d0ef
wip
kevinwallimann Nov 27, 2020
b42880d
works, seems a bit slow
kevinwallimann Nov 27, 2020
7476bc9
Fix getMessagesAtLeastToOffset
kevinwallimann Nov 30, 2020
1a5c6ff
Add tests for kafkautil
kevinwallimann Dec 1, 2020
680e085
Add tests to AvroUtil
kevinwallimann Dec 4, 2020
39f854a
Replace copy pasted package private code with proxy object in same pa…
kevinwallimann Dec 4, 2020
f6c6f54
Get schema registry url from decoder and encoder config
kevinwallimann Dec 4, 2020
cc373d3
Add test for transformer object
kevinwallimann Dec 4, 2020
1ae6171
Add some tests, docs
kevinwallimann Dec 7, 2020
938af6f
Fix test and sonar issues
kevinwallimann Dec 7, 2020
b87bd16
Fix test, total order not guaranteed
kevinwallimann Dec 7, 2020
65373ac
wip
kevinwallimann Dec 15, 2020
bcf06af
works now
kevinwallimann Dec 16, 2020
874b474
Add test with duplicates, use max.poll.records
kevinwallimann Dec 16, 2020
2ed580b
PR fixes
kevinwallimann Dec 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,48 @@ will produce the following schema

```


##### DeduplicateKafkaSinkTransformer
`DeduplicateKafkaSinkTransformer` deduplicates records in a query from a Kafka source to a Kafka destination in a rerun after a failure.
Records are identified across source and destination topic by a user-defined id, which may be a composite id and may include consumer record
properties such as offset, partition, but also fields from the key or value schema.
Deduplication is needed because the Kafka-destination provides only a at-least-once guarantee. Deduplication works by getting the ids
from the last partial run in the destination topic and excluding them in the query.

Note that there must be only one source and one destination topic, and there must be only one writer writing to the destination topic, and
no records must have been written to the destination topic after the partial run. Otherwise, records may still be duplicated.

To use this transformer, `KafkaStreamReader`, `ConfluentAvroDecodingTransformer`, `ConfluentAvroEncodingTransformer` and `KafkaStreamWriter`
must be configured as well.

Note that usage of the star-operator `*` within column names is not supported and may lead to unexpected behaviour.

To add the transformer to the pipeline use this class name:
```
component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka.DeduplicateKafkaSinkTransformer
```

| Property Name | Required | Description |
| :--- | :---: | :--- |
| `transformer.{transformer-id}.source.id.columns` | Yes | A comma-separated list of consumer record properties that define the composite id. For example, `offset, partition` or `key.some_user_id`. |
| `transformer.{transformer-id}.destination.id.columns` | Yes | A comma-separated list of consumer record properties that define the composite id. For example, `value.src_offset, value.src_partition` or `key.some_user_id`. |
| `transformer.{transformer-id}.kafka.consumer.timeout` | No | Kafka consumer timeout in seconds. The default value is 120s. |

The following fields can be selected on the consumer record

- `topic`
- `offset`
- `partition`
- `timestamp`
- `timestampType`
- `serializedKeySize`
- `serializedValueSize`
- `key`
- `value`

In case of `key` and `value`, the fields of their schemas can be specified by adding a dot, e.g.
`key.some_nested_record.some_id` or likewise `value.some_nested_record.some_id`

See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`.
##### ParquetStreamWriter
| Property Name | Required | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ trait StreamTransformerFactory extends ComponentFactory[StreamTransformer] {
*/
def getMappingFromRetainedGlobalConfigToLocalConfig(globalConfig: Configuration): Map[String, String] = Map()
}

object StreamTransformerFactory {
val IdsKeyPrefix = "component.transformer.id"
val ClassKeyPrefix = "component.transformer.class"
val TransformerKeyPrefix = "transformer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package za.co.absa.hyperdrive.ingestor.api.utils

import org.apache.commons.configuration2.Configuration
import org.apache.commons.configuration2.{Configuration, ConfigurationConverter}
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}

import scala.collection.mutable
import scala.util.{Failure, Success, Try}

object ConfigUtils {
Expand Down Expand Up @@ -98,4 +100,16 @@ object ConfigUtils {
Success(target)
}
}

def getTransformerPrefix[T <: StreamTransformer](config: Configuration, transformerClass: Class[T]): Option[String] = {
import scala.collection.JavaConverters._
val className = transformerClass.getCanonicalName
val transformerPrefixConfig = config.subset(StreamTransformerFactory.ClassKeyPrefix)
val transformerPrefixMap = ConfigurationConverter.getMap(transformerPrefixConfig).asScala
transformerPrefixMap.find {
case (_: String, value: String) => value == className
}.map {
case (key: String, _) => key
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.api.utils

import org.apache.spark.sql.DataFrame
import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer

class DummyStreamTransformer extends StreamTransformer {
override def transform(streamData: DataFrame): DataFrame = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.commons.configuration2.{BaseConfiguration, Configuration}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactory

class TestConfigUtils extends FlatSpec with Matchers with MockitoSugar {

Expand Down Expand Up @@ -322,4 +323,21 @@ class TestConfigUtils extends FlatSpec with Matchers with MockitoSugar {
val ex4 = the[Exception] thrownBy ConfigUtils.getOptionalBoolean("key4", config)
ex4.getMessage should include("key4")
}

"getTransformerPrefix" should "get the prefix of a transformer class" in {
val config = new BaseConfiguration
config.addProperty(s"${StreamTransformerFactory.ClassKeyPrefix}.[dummy-transformer]", classOf[DummyStreamTransformer].getCanonicalName)

val prefix = ConfigUtils.getTransformerPrefix(config, classOf[DummyStreamTransformer])

prefix shouldBe Some("[dummy-transformer]")
}

it should "return None if the transformer class is not registered in the config" in {
val config = new BaseConfiguration

val prefix = ConfigUtils.getTransformerPrefix(config, classOf[DummyStreamTransformer])

prefix shouldBe None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package za.co.absa.hyperdrive.driver.drivers

import java.util.Properties

import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.logging.log4j.LogManager
Expand All @@ -29,7 +28,7 @@ case class SchemaRegistryContainer(dockerImageName: String) extends GenericConta
class KafkaSchemaRegistryWrapper {
private val logger = LogManager.getLogger

private val confluentPlatformVersion = "5.3.1"
private val confluentPlatformVersion = "5.3.1" // should be same as kafka.avro.serializer.version property in pom file
private val schemaRegistryPort = 8081
private val commonNetwork = Network.newNetwork()
val kafka: KafkaContainer = startKafka(commonNetwork)
Expand Down
Loading