Skip to content

Commit

Permalink
Add support for using different subject name strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
enamrik authored and Kirmanie L Ravariere committed Jun 14, 2022
1 parent d165b4d commit 16fafdb
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 25 deletions.
28 changes: 23 additions & 5 deletions avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ func SerializeAvro(configuration Configuration, topic string, data interface{},
bytesData := []byte(data.(string))

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)

var subject, subjectNameError = GetSubjectName(schema, topic, element, configuration.Producer.SubjectNameStrategy)
if subjectNameError != nil {
return nil, subjectNameError
}

var schemaInfo *srclient.Schema
schemaID := 0

Expand Down Expand Up @@ -86,22 +91,35 @@ func SerializeAvro(configuration Configuration, topic string, data interface{},
// is used to configure the Schema Registry client. The element is used to define the subject.
// The data should be a byte array.
func DeserializeAvro(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) {
bytesDecodedData, err := DecodeWireFormat(data)
schemaID, bytesDecodedData, err := DecodeWireFormat(data)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
err)
}

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema

var xk6KafkaError *Xk6KafkaError

client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)

var subject, subjectNameError = GetSubjectName(schema, topic, element, configuration.Consumer.SubjectNameStrategy)
if subjectNameError != nil {
return nil, subjectNameError
}

if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else if configuration.Consumer.UseMagicPrefix {
// Schema not provided and no valid version flag, so we use te schemaID in the magic prefix
schemaResult, err := client.GetSchema(schemaID)
schemaInfo = schemaResult
if err != nil {
xk6KafkaError = NewXk6KafkaError(failedCreateAvroCodec,
"Failed to get schema by magic prefix",
err)
}
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
Expand Down
235 changes: 232 additions & 3 deletions avro_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"github.com/riferrei/srclient"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestSerializeDeserializeAvroFailsOnSchemaError(t *testing.T) {
assert.Equal(t, failedCreateAvroCodec, err.Code)

// Deserialize the key or value
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, jsonSchema, 0)
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3, 4, 5}, element, jsonSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to create codec for decoding Avro", err.Message)
Expand All @@ -73,7 +74,7 @@ func TestSerializeDeserializeAvroFailsOnWireFormatError(t *testing.T) {

// Deserialize a broken key or value
// Proper wire-formatted message has 5 bytes (the wire format) plus data
deserialized, err = DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4}, element, schema, 0)
deserialized, err = DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3}, element, schema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to remove wire format from the binary data", err.Message)
Expand All @@ -92,10 +93,238 @@ func TestSerializeDeserializeAvroFailsOnEncodeDecodeError(t *testing.T) {
assert.Equal(t, "Failed to encode data into Avro", err.Message)
assert.Equal(t, failedEncodeToAvro, err.Code)

deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, avroSchema, 0)
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3, 5}, element, avroSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to decode data from Avro", err.Message)
assert.Equal(t, failedDecodeAvroFromBinary, err.Code)
}
}

func TestAvroSerializeTopicNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeTopicNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: "TopicNameStrategy",
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}

schema := `{"type":"record","name":"TestAvroSerializeTopicNameStrategyIsDefaultStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := topic + "-value"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, schemaResult)
}

func TestAvroSerializeTopicNameStrategyIsDefaultStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeTopicNameStrategyIsDefaultStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}

schema := `{"type":"record","name":"TestAvroSerializeTopicNameStrategyIsDefaultStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := topic + "-value"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, schemaResult)
}

func TestAvroSerializeTopicRecordNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeTopicRecordNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: "TopicRecordNameStrategy",
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroSerializeTopicRecordNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := topic + "-io.confluent.kafka.avro.TestAvroSerializeTopicRecordNameStrategy"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, schemaResult)
}

func TestAvroSerializeRecordNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroSerializeRecordNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: "RecordNameStrategy",
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroSerializeRecordNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)

expectedSubject := "io.confluent.kafka.avro.TestAvroSerializeRecordNameStrategy"
srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry)
resultSchema, err := GetSchema(srClient, expectedSubject, avroSchema, srclient.Avro, 0)
assert.Nil(t, err)
assert.NotNil(t, resultSchema)
}

func TestAvroDeserializeUsingMagicPrefix(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingMagicPrefix-topic"
config := Configuration{
Consumer: ConsumerConfiguration{
UseMagicPrefix: true,
},
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: "RecordNameStrategy",
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingMagicPrefix","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, "", 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingDefaultSubjectNameStrategy(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingDefaultSubjectNameStrategy-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingDefaultSubjectNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, "", 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingSubjectNameStrategyRecordName(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingSubjectNameStrategyRecordName-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: "RecordNameStrategy",
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
SubjectNameStrategy: "RecordNameStrategy",
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyRecordName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: "TopicRecordNameStrategy",
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
SubjectNameStrategy: "TopicRecordNameStrategy",
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}

func TestAvroDeserializeUsingSubjectNameStrategyTopicName(t *testing.T) {
data := `{"field":"value"}`
topic := "TestAvroDeserializeUsingSubjectNameStrategyTopicName-topic"
config := Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
SubjectNameStrategy: "TopicNameStrategy",
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroSerializer,
SubjectNameStrategy: "TopicNameStrategy",
},
SchemaRegistry: SchemaRegistryConfiguration{
Url: "http://localhost:8081",
},
}
schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyTopicName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`

serialized, err := SerializeAvro(config, topic, data, Value, schema, 0)
assert.Nil(t, err)

dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0)
assert.Equal(t, "value", dData.(map[string]interface{})["field"])
assert.Nil(t, dErr)
}
11 changes: 7 additions & 4 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
)

type ConsumerConfiguration struct {
KeyDeserializer string `json:"keyDeserializer"`
ValueDeserializer string `json:"valueDeserializer"`
KeyDeserializer string `json:"keyDeserializer"`
ValueDeserializer string `json:"valueDeserializer"`
SubjectNameStrategy string `json:"subjectNameStrategy"`
UseMagicPrefix bool `json:"useMagicPrefix"`
}

type ProducerConfiguration struct {
KeySerializer string `json:"keySerializer"`
ValueSerializer string `json:"valueSerializer"`
KeySerializer string `json:"keySerializer"`
ValueSerializer string `json:"valueSerializer"`
SubjectNameStrategy string `json:"subjectNameStrategy"`
}

type Configuration struct {
Expand Down
2 changes: 1 addition & 1 deletion jsonschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func SerializeJson(configuration Configuration, topic string, data interface{},
// configuration is used to configure the Schema Registry client. The element is
// used to define the subject. The data should be a byte array.
func DeserializeJson(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) {
bytesDecodedData, err := DecodeWireFormat(data)
_, bytesDecodedData, err := DecodeWireFormat(data)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
Expand Down
4 changes: 2 additions & 2 deletions jsonschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestSerializeDeserializeJsonFailsOnSchemaError(t *testing.T) {
assert.Equal(t, failedCreateJsonSchemaCodec, err.Code)

// Deserialize the key or value
deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, schema, 0)
deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{0, 2, 3, 4, 5, 6}, element, schema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to create codec for decoding JSON data", err.Message)
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestSerializeDeserializeJsonFailsOnMarshalError(t *testing.T) {
assert.Equal(t, "Failed to unmarshal JSON data", err.Message)
assert.Equal(t, failedUnmarshalJson, err.Code)

deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, jsonSchema, 0)
deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{0, 2, 3, 4, 5, 6}, element, jsonSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to unmarshal JSON data", err.Message)
Expand Down
Loading

0 comments on commit 16fafdb

Please sign in to comment.