Skip to content

Commit

Permalink
Fixes after upgrade of dependencies (#216)
Browse files Browse the repository at this point in the history
* Remove unneeded dependency
* Fix module init helper
* Fix test
* Remove commented code
* Stringified schema types are not recognized anymore, hence the use of srclient.SchemaType type
* Export srclient.SchemaType instead of string
* Fix tests
  • Loading branch information
mostafa authored May 12, 2023
1 parent e98d66d commit c51b879
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 75 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.18.25
github.com/dop251/goja v0.0.0-20230427124612-428fc442ff5f
github.com/linkedin/goavro/v2 v2.12.0
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1
github.com/riferrei/srclient v0.6.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY=
github.com/onsi/gomega v1.20.2/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1 h1:FyBdsRqqHH4LctMLL+BL2oGO+ONcIPwn96ctofCVtNE=
github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1/go.mod h1:lAVhWwbNaveeJmxrxuSTxMgKpF6DjnuVpn6T8WiBwYQ=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
Expand Down Expand Up @@ -266,7 +264,6 @@ github.com/spf13/afero v1.9.5/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
12 changes: 6 additions & 6 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ type Serdes interface {
Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError)
}

var TypesRegistry map[string]Serdes = map[string]Serdes{
String.String(): &StringSerde{},
Bytes.String(): &ByteArraySerde{},
srclient.Json.String(): &JSONSerde{},
srclient.Avro.String(): &AvroSerde{},
var TypesRegistry map[srclient.SchemaType]Serdes = map[srclient.SchemaType]Serdes{
String: &StringSerde{},
Bytes: &ByteArraySerde{},
srclient.Json: &JSONSerde{},
srclient.Avro: &AvroSerde{},
}

func GetSerdes(schemaType string) (Serdes, *Xk6KafkaError) {
func GetSerdes(schemaType srclient.SchemaType) (Serdes, *Xk6KafkaError) {
if serdes, ok := TypesRegistry[schemaType]; ok {
return serdes, nil
} else {
Expand Down
20 changes: 13 additions & 7 deletions kafka_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/dop251/goja"
"github.com/oxtoacart/bpool"
kafkago "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js/common"
Expand Down Expand Up @@ -40,10 +39,13 @@ func getTestModuleInstance(tb testing.TB) *kafkaTest {
tb.Cleanup(cancel)

root := New()
registry := metrics.NewRegistry()
mockVU := &modulestest.VU{
RuntimeField: runtime,
InitEnvField: &common.InitEnvironment{
Registry: metrics.NewRegistry(),
TestPreInitState: &lib.TestPreInitState{
Registry: registry,
},
},
CtxField: ctx,
}
Expand Down Expand Up @@ -78,11 +80,15 @@ func (k *kafkaTest) moveToVUCode() error {
UserAgent: null.StringFrom("TestUserAgent"),
Paused: null.BoolFrom(false),
},
BPool: bpool.NewBufferPool(1),
Samples: k.samples,
Tags: lib.NewVUStateTags(registry.RootTagSet().WithTagsFromMap(map[string]string{
"group": rootGroup.Path,
})),
BufferPool: lib.NewBufferPool(),
Samples: k.samples,
Tags: lib.NewVUStateTags(
registry.RootTagSet().WithTagsFromMap(
map[string]string{
"group": rootGroup.Path,
},
),
),
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
}
k.vu.StateField = state
Expand Down
10 changes: 5 additions & 5 deletions module.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ func (m *Module) defineConstants() {
mustAddProp("VALUE", string(Value))

// Schema types
mustAddProp("SCHEMA_TYPE_STRING", String.String())
mustAddProp("SCHEMA_TYPE_BYTES", Bytes.String())
mustAddProp("SCHEMA_TYPE_AVRO", srclient.Avro.String())
mustAddProp("SCHEMA_TYPE_JSON", srclient.Json.String())
mustAddProp("SCHEMA_TYPE_PROTOBUF", srclient.Protobuf.String())
mustAddProp("SCHEMA_TYPE_STRING", String)
mustAddProp("SCHEMA_TYPE_BYTES", Bytes)
mustAddProp("SCHEMA_TYPE_AVRO", srclient.Avro)
mustAddProp("SCHEMA_TYPE_JSON", srclient.Json)
mustAddProp("SCHEMA_TYPE_PROTOBUF", srclient.Protobuf)

// Time constants
mustAddProp("NANOSECOND", int64(time.Nanosecond))
Expand Down
24 changes: 12 additions & 12 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func TestConsume(t *testing.T) {
{
Key: test.module.Kafka.serialize(&Container{
Data: "key1",
SchemaType: String.String(),
SchemaType: String,
}),
Value: test.module.Kafka.serialize(&Container{
Data: "value1",
SchemaType: String.String(),
SchemaType: String,
}),
Offset: 0,
},
Expand All @@ -58,7 +58,7 @@ func TestConsume(t *testing.T) {

result := test.module.Kafka.deserialize(&Container{
Data: messages[0]["key"],
SchemaType: String.String(),
SchemaType: String,
})

if key, ok := result.([]byte); ok {
Expand All @@ -67,7 +67,7 @@ func TestConsume(t *testing.T) {

result = test.module.Kafka.deserialize(&Container{
Data: messages[0]["value"],
SchemaType: String.String(),
SchemaType: String,
})
if value, ok := result.([]byte); ok {
assert.Equal(t, "value1", string(value))
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestConsumeWithoutKey(t *testing.T) {
{
Value: test.module.Kafka.serialize(&Container{
Data: "value1",
SchemaType: String.String(),
SchemaType: String,
}),
Offset: 1,
},
Expand All @@ -140,7 +140,7 @@ func TestConsumeWithoutKey(t *testing.T) {

result := test.module.Kafka.deserialize(&Container{
Data: messages[0]["value"],
SchemaType: String.String(),
SchemaType: String,
})
if value, ok := result.([]byte); ok {
assert.Equal(t, "value1", string(value))
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestConsumerContextCancelled(t *testing.T) {
{
Value: test.module.Kafka.serialize(&Container{
Data: "value1",
SchemaType: String.String(),
SchemaType: String,
}),
Offset: 2,
},
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestConsumeJSON(t *testing.T) {

result := test.module.Kafka.deserialize(&Container{
Data: messages[0]["value"],
SchemaType: srclient.Json.String(),
SchemaType: srclient.Json,
})
if data, ok := result.(map[string]interface{}); ok {
assert.Equal(t, "value", data["field"])
Expand Down Expand Up @@ -280,11 +280,11 @@ func TestReaderClass(t *testing.T) {
{
Key: test.module.Kafka.serialize(&Container{
Data: "key",
SchemaType: String.String(),
SchemaType: String,
}),
Value: test.module.Kafka.serialize(&Container{
Data: "value",
SchemaType: String.String(),
SchemaType: String,
}),
},
},
Expand Down Expand Up @@ -316,12 +316,12 @@ func TestReaderClass(t *testing.T) {
assert.Equal(t, 1, len(messages))
deserializedKey := test.module.Kafka.deserialize(&Container{
Data: messages[0]["key"],
SchemaType: String.String(),
SchemaType: String,
})
assert.Equal(t, "key", deserializedKey)
deserializedValue := test.module.Kafka.deserialize(&Container{
Data: messages[0]["value"],
SchemaType: String.String(),
SchemaType: String,
})
assert.Equal(t, "value", deserializedValue)

Expand Down
6 changes: 3 additions & 3 deletions schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestSchemaRegistryClientClass(t *testing.T) {
map[string]interface{}{
"subject": "test-subject",
"schema": avroSchema,
"schemaType": srclient.Avro.String(),
"schemaType": srclient.Avro,
},
),
},
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestSchemaRegistryClientClass(t *testing.T) {
map[string]interface{}{
"data": map[string]interface{}{"field": "value"},
"schema": currentSchema,
"schemaType": srclient.Avro.String(),
"schemaType": srclient.Avro,
},
),
},
Expand All @@ -358,7 +358,7 @@ func TestSchemaRegistryClientClass(t *testing.T) {
map[string]interface{}{
"data": serialized,
"schema": currentSchema,
"schemaType": srclient.Avro.String(),
"schemaType": srclient.Avro,
},
),
},
Expand Down
12 changes: 6 additions & 6 deletions serdes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
)

type Container struct {
Data interface{} `json:"data"`
Schema *Schema `json:"schema"`
SchemaType string `json:"schemaType"`
Data interface{} `json:"data"`
Schema *Schema `json:"schema"`
SchemaType srclient.SchemaType `json:"schemaType"`
}

// serialize checks whether the incoming data has a schema or not.
Expand Down Expand Up @@ -36,7 +36,7 @@ func (k *Kafka) serialize(container *Container) []byte {
// we are dealing with binary data to be encoded with Avro, JSONSchema or Protocol Buffer

switch container.SchemaType {
case srclient.Avro.String(), srclient.Json.String():
case srclient.Avro, srclient.Json:
serde, err := GetSerdes(container.SchemaType)
if err != nil {
common.Throw(k.vu.Runtime(), err)
Expand Down Expand Up @@ -75,7 +75,7 @@ func (k *Kafka) deserialize(container *Container) interface{} {
switch container.Data.(type) {
case []byte:
switch container.SchemaType {
case String.String():
case String:
return string(container.Data.([]byte))
default:
if isJSON(container.Data.([]byte)) {
Expand Down Expand Up @@ -132,7 +132,7 @@ func (k *Kafka) deserialize(container *Container) interface{} {
jsonBytes = k.decodeWireFormat(jsonBytes)

switch container.SchemaType {
case srclient.Avro.String(), srclient.Json.String():
case srclient.Avro, srclient.Json:
serde, err := GetSerdes(container.SchemaType)
if err != nil {
common.Throw(k.vu.Runtime(), err)
Expand Down
30 changes: 15 additions & 15 deletions serdes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ func TestSerdes(t *testing.T) {
containers := []*Container{
{
Data: "string",
SchemaType: String.String(),
SchemaType: String,
},
{
Data: []byte("byte array"),
SchemaType: Bytes.String(),
SchemaType: Bytes,
},
{
Data: []byte{62, 79, 74, 65, 20, 61, 72, 72, 61, 79}, // byte array
SchemaType: Bytes.String(),
SchemaType: Bytes,
},
{
Data: map[string]interface{}{
Expand All @@ -52,7 +52,7 @@ func TestSerdes(t *testing.T) {
"array": []interface{}{1.0, 2.0, 3.0},
},
},
SchemaType: srclient.Json.String(),
SchemaType: srclient.Json,
},
{
Data: map[string]interface{}{"key": "value"},
Expand All @@ -69,7 +69,7 @@ func TestSerdes(t *testing.T) {
Version: 1,
Subject: "json-schema",
},
SchemaType: srclient.Json.String(),
SchemaType: srclient.Json,
},
{
Data: map[string]interface{}{"key": "value"},
Expand All @@ -83,7 +83,7 @@ func TestSerdes(t *testing.T) {
Version: 1,
Subject: "avro-schema",
},
SchemaType: srclient.Avro.String(),
SchemaType: srclient.Avro,
},
}

Expand Down Expand Up @@ -140,35 +140,35 @@ func TestSerializeFails(t *testing.T) {
{
container: &Container{
Data: 1.1,
SchemaType: String.String(),
SchemaType: String,
},
err: ErrInvalidDataType,
},
{
container: &Container{
Data: []interface{}{"test"},
SchemaType: Bytes.String(),
SchemaType: Bytes,
},
err: ErrFailedTypeCast,
},
{
container: &Container{
Data: "test",
SchemaType: Bytes.String(),
SchemaType: Bytes,
},
err: ErrInvalidDataType,
},
{
container: &Container{
Data: map[string]interface{}{"key": unsafe.Pointer(nil)},
SchemaType: srclient.Json.String(),
SchemaType: srclient.Json,
},
err: ErrInvalidDataType,
},
{
container: &Container{
Data: "test",
SchemaType: srclient.Json.String(),
SchemaType: srclient.Json,
},
err: ErrInvalidDataType,
},
Expand All @@ -181,14 +181,14 @@ func TestSerializeFails(t *testing.T) {
Version: 1,
Subject: "json-schema",
},
SchemaType: srclient.Json.String(),
SchemaType: srclient.Json,
},
err: ErrInvalidSchema,
},
{
container: &Container{
Data: `{"key": "value"}`,
SchemaType: srclient.Avro.String(),
SchemaType: srclient.Avro,
},
err: ErrInvalidDataType,
},
Expand All @@ -205,7 +205,7 @@ func TestSerializeFails(t *testing.T) {
Version: 1,
Subject: "avro-schema",
},
SchemaType: srclient.Avro.String(),
SchemaType: srclient.Avro,
},
err: NewXk6KafkaError(failedToEncode, "Failed to encode data", errors.New("cannot decode textual record \"io.confluent.kafka.avro.Schema\": cannot decode textual map: cannot determine codec: \"unknown\"")),
},
Expand All @@ -222,7 +222,7 @@ func TestSerializeFails(t *testing.T) {
Version: 1,
Subject: "avro-schema",
},
SchemaType: srclient.Avro.String(),
SchemaType: srclient.Avro,
},
err: ErrInvalidDataType,
},
Expand Down
Loading

0 comments on commit c51b879

Please sign in to comment.