Skip to content

Commit

Permalink
Add tests (#60)
Browse files Browse the repository at this point in the history
* Add test for string.go
* Add test for bytearray.go
* Add test for errors.go
* Add test for auth.go
* Fix linter errors
* Fix tests after rebasing with the logging/error-reporting PR merge
* Remove the old ReportErrors function and test
* Run Go tests in the test pipeline
* Add more tests to auth_test.go after refactoring loggin and error-reporting
* Refactor functions a bit until proper configuration validation is implemented
* Fix assertions
* Remove schema from function signatures (unused parameter)
* Add tests for serde.go
* Add tests for avro.go
* Update deserializer function interface and add topic
* Revamp avro.go to address multiple issues (fetching schemas, manual serialization, etc.)
* Add more error codes and errors
* Separate various Schema Registry functions
* Simplify serialization functions
* Explicitly pass srClient to SR functions
* Export all functions
* Fix tests to reflect changes
* Fix linter error
* Add more tests to SerializeAvro
* Refactor serde and add serde registry
* Add tests to serde_registry.go
* Rename schema registry module
* Remove duplicate code by using a function to get a connection to Kafka
* Add tests to configuration.go
* Update jsonschema.go with the latest changes to schema registry
* Add tests to jsonschema.go
* Add more test to avro.go
* Add tests to jsonschema.go
* Add tests to schema_registry.go
* Create coverage report
* Fix linter errors
* Remove duplicate tests
* Refactor configuration_test.go
* Use constants
* Fix typo
* Ignore act
* Add tests to topic.go
* Run lensesio/fast-data-dev container for testing against Kafka
* Use title case
* Assert module instance and runtime
* WIP: Add tests to producer.go (has issues with default function)
* Rename kafkatest.go so it doesnt' get included in non test builds
* Make test being able to go to vu code mode and other fixes
* Create topic before producing messages
* Check output metrics of produce function plus some fixes
* Fix bugs and add some TODO comments
* Add more tests to producer.go
* Fix typos
* Separate nil context from cancelled context error
* Fix an edge case with setting offset together with groupID
* Fix tests
* Add tests to consumer.go (buggy)
* Add more tests to consumer.go and fix others
Co-authored-by: Mihail Stoykov <[email protected]>
  • Loading branch information
mostafa authored May 18, 2022
1 parent e8905e4 commit 44de914
Show file tree
Hide file tree
Showing 39 changed files with 1,770 additions and 350 deletions.
31 changes: 14 additions & 17 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,18 @@ jobs:
go install go.k6.io/xk6/cmd/xk6@latest
xk6 build --with github.com/mostafa/xk6-kafka@latest=.
- name: Run Apache Kafka 🦉 and test xk6-kafka 🧪
- name: Run Zookeeper and Kafka ⚙️
run: |
cd ~/work/xk6-kafka/xk6-kafka
# Download and extract Kafka
wget -q https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
tar xf kafka_2.13-3.1.0.tgz
# This is needed for Kafka to work
mkdir -p /tmp/kraft-combined-logs/
cat <<EOF > /tmp/kraft-combined-logs/meta.properties
version=1
broker.id=1
cluster.id=1
node.id=1
EOF
# Run Kafka in background
./kafka_2.13-3.1.0/bin/kafka-server-start.sh ./kafka_2.13-3.1.0/config/kraft/server.properties > /dev/null 2>&1 &
# Wait for 5 seconds, run k6 (with xk6-kafka) and kill Kafka background process
sleep 5 && ./k6 run -d 5s ./scripts/test_json.js && kill %1
docker run --detach --rm --name lensesio -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev:latest
sleep 10s
- name: Run Go tests 🔬
run: go test -cover -v .

- name: Run xk6-kafka tests 🧪
run: ./k6 run -d 5s ./scripts/test_json.js

- name: Stop containers 🛑
if: always()
run: |
docker stop lensesio
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
vendor/
k6
.idea
act
37 changes: 21 additions & 16 deletions auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"time"

Expand All @@ -30,7 +29,7 @@ type Credentials struct {
ServerCaPem string `json:"serverCaPem"`
}

func unmarshalCredentials(auth string) (*Credentials, *Xk6KafkaError) {
func UnmarshalCredentials(auth string) (*Credentials, *Xk6KafkaError) {
creds := &Credentials{
Algorithm: None,
}
Expand All @@ -45,8 +44,8 @@ func unmarshalCredentials(auth string) (*Credentials, *Xk6KafkaError) {
}
}

func getDialerFromCreds(creds *Credentials) (*kafkago.Dialer, *Xk6KafkaError) {
tlsConfig, err := tlsConfig(creds)
func GetDialerFromCreds(creds *Credentials) (*kafkago.Dialer, *Xk6KafkaError) {
tlsConfig, err := TLSConfig(creds)
if err != nil && err.Unwrap() != nil {
return nil, err
}
Expand Down Expand Up @@ -84,18 +83,18 @@ func getDialerFromCreds(creds *Credentials) (*kafkago.Dialer, *Xk6KafkaError) {
return dialer, nil
}

func getDialerFromAuth(auth string) (*kafkago.Dialer, *Xk6KafkaError) {
func GetDialerFromAuth(auth string) (*kafkago.Dialer, *Xk6KafkaError) {
if auth != "" {
// Parse the auth string
creds, err := unmarshalCredentials(auth)
creds, err := UnmarshalCredentials(auth)
if err != nil {
return nil, err
}

// Try to create an authenticated dialer from the credentials
// with TLS enabled if the credentials specify a client cert
// and key.
return getDialerFromCreds(creds)
return GetDialerFromCreds(creds)
} else {
// Create a normal (unauthenticated) dialer
return &kafkago.Dialer{
Expand All @@ -105,44 +104,50 @@ func getDialerFromAuth(auth string) (*kafkago.Dialer, *Xk6KafkaError) {
}
}

func fileExists(filename string) bool {
func FileExists(filename string) bool {
_, err := os.Stat(filename)
return err == nil
}

func tlsConfig(creds *Credentials) (*tls.Config, *Xk6KafkaError) {
func TLSConfig(creds *Credentials) (*tls.Config, *Xk6KafkaError) {
var clientCertFile = &creds.ClientCertPem
if !fileExists(*clientCertFile) {
if !FileExists(*clientCertFile) {
return nil, NewXk6KafkaError(fileNotFound, "Client certificate file not found.", nil)
}

var clientKeyFile = &creds.ClientKeyPem
if !fileExists(*clientKeyFile) {
if !FileExists(*clientKeyFile) {
return nil, NewXk6KafkaError(fileNotFound, "Client key file not found.", nil)
}

var cert, err = tls.LoadX509KeyPair(*clientCertFile, *clientKeyFile)
if err != nil {
return nil, NewXk6KafkaError(
failedLoadX509KeyPair,
fmt.Sprintf("Error creating x509 keypair from client cert file %s and client key file %s", *clientCertFile, *clientKeyFile),
fmt.Sprintf("Error creating x509 keypair from client cert file \"%s\" and client key file \"%s\"", *clientCertFile, *clientKeyFile),
err)
}

var caCertFile = &creds.ServerCaPem
if !fileExists(*caCertFile) {
if !FileExists(*caCertFile) {
return nil, NewXk6KafkaError(fileNotFound, "CA certificate file not found.", nil)
}

caCert, err := ioutil.ReadFile(*caCertFile)
caCert, err := os.ReadFile(*caCertFile)
if err != nil {
// This might happen on permissions issues or if the file is unreadable somehow
return nil, NewXk6KafkaError(
failedReadCaCertFile,
fmt.Sprintf("Error reading CA certificate file %s", *caCertFile),
fmt.Sprintf("Error reading CA certificate file \"%s\"", *caCertFile),
err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, NewXk6KafkaError(
failedAppendCaCertFile,
fmt.Sprintf("Error appending CA certificate file \"%s\"", *caCertFile),
nil)
}

return &tls.Config{
Certificates: []tls.Certificate{cert},
Expand Down
180 changes: 180 additions & 0 deletions auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package kafka

import (
"errors"
"testing"
"time"

"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"github.com/stretchr/testify/assert"
)

func TestUnmarshalCredentials(t *testing.T) {
creds, err := UnmarshalCredentials(`{"username": "test", "password": "test", "algorithm": "plain", "clientCertPem": "client.pem", "clientKeyPem": "key.pem", "serverCaPem": "server.pem"}`)
assert.Nil(t, err)
assert.Equal(t, "test", creds.Username)
assert.Equal(t, "test", creds.Password)
assert.Equal(t, "plain", creds.Algorithm)
assert.Equal(t, "client.pem", creds.ClientCertPem)
assert.Equal(t, "key.pem", creds.ClientKeyPem)
assert.Equal(t, "server.pem", creds.ServerCaPem)
}

func TestUnmarshalCredentialsFails(t *testing.T) {
// This only fails on invalid JSON (apparently)
creds, err := UnmarshalCredentials(`{"invalid": "invalid`)
assert.Nil(t, creds)
assert.NotNil(t, err)
assert.Equal(t, err.Message, "Unable to unmarshal credentials")
// This is the error we get from the json package wrapped inside the Xk6KafkaError
assert.Equal(t, err.Unwrap().Error(), "unexpected end of JSON input")
}

func TestGetDialerFromCredsWithSASLPlain(t *testing.T) {
creds := &Credentials{
Username: "test",
Password: "test",
Algorithm: Plain,
}
dialer, err := GetDialerFromCreds(creds)
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, true, dialer.DualStack)
assert.Nil(t, dialer.TLS)
assert.Equal(t, "PLAIN", dialer.SASLMechanism.Name())
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Username)
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Password)
}

func TestGetDialerFromCredsWithSASLScram(t *testing.T) {
creds := &Credentials{
Username: "test",
Password: "test",
Algorithm: SHA256,
}
dialer, err := GetDialerFromCreds(creds)
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, true, dialer.DualStack)
assert.Nil(t, dialer.TLS)
assert.Equal(t, scram.SHA256.Name(), dialer.SASLMechanism.Name())
}

func TestGetDialerFromCredsFails(t *testing.T) {
creds := &Credentials{
Username: "https://www.exa\t\r\n",
Password: "test",
Algorithm: "sha256",
}
dialer, wrappedError := GetDialerFromCreds(creds)
assert.Equal(t, wrappedError.Message, "Unable to create SCRAM mechanism")
// This is a stringprep (RFC3454) error wrapped inside the Xk6KafkaError
assert.Equal(t, wrappedError.Unwrap().Error(), "Error SASLprepping username 'https://www.exa\t\r\n': prohibited character (rune: '\\u0009')")
assert.Nil(t, dialer)
}

func TestGetDialerFromAuth(t *testing.T) {
auth := `{"username": "test", "password": "test", "algorithm": "plain", "clientCertPem": "client.pem", "clientKeyPem": "key.pem", "serverCaPem": "server.pem"}`
dialer, err := GetDialerFromAuth(auth)
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, true, dialer.DualStack)
assert.Nil(t, dialer.TLS)
assert.Equal(t, "PLAIN", dialer.SASLMechanism.Name())
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Username)
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Password)
}

func TestGetDialerFromAuthNoAuthString(t *testing.T) {
dialer, err := GetDialerFromAuth("")
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, false, dialer.DualStack)
assert.Nil(t, dialer.TLS)
}

func TestFileExists(t *testing.T) {
assert.True(t, FileExists("auth_test.go"))
assert.False(t, FileExists("test.go.not"))
}

type SimpleTLSConfig struct {
creds *Credentials
err *Xk6KafkaError
}

func TestTlsConfig(t *testing.T) {
creds := &Credentials{
ClientCertPem: "fixtures/client.cer",
ClientKeyPem: "fixtures/client.pem",
ServerCaPem: "fixtures/caroot.cer",
}
tlsConfig, err := TLSConfig(creds)
assert.Nil(t, err)
assert.NotNil(t, tlsConfig)
}

func TestTlsConfigFails(t *testing.T) {
creds := []*SimpleTLSConfig{
{
creds: &Credentials{},
err: &Xk6KafkaError{
Code: fileNotFound,
Message: "Client certificate file not found.",
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/client.cer",
},
err: &Xk6KafkaError{
Code: fileNotFound,
Message: "Client key file not found.",
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/client.cer",
ClientKeyPem: "fixtures/client.pem",
},
err: &Xk6KafkaError{
Code: fileNotFound,
Message: "CA certificate file not found.",
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/invalid-client.cer",
ClientKeyPem: "fixtures/invalid-client.pem",
},
err: &Xk6KafkaError{
Code: failedLoadX509KeyPair,
Message: "Error creating x509 keypair from client cert file \"fixtures/invalid-client.cer\" and client key file \"fixtures/invalid-client.pem\"",
OriginalError: errors.New("tls: failed to find any PEM data in certificate input"),
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/client.cer",
ClientKeyPem: "fixtures/client.pem",
ServerCaPem: "fixtures/invalid-caroot.cer",
},
err: &Xk6KafkaError{
Code: failedAppendCaCertFile,
Message: "Error appending CA certificate file \"fixtures/invalid-caroot.cer\"",
},
},
}

for _, c := range creds {
tlsConfig, err := TLSConfig(c.creds)
assert.NotNil(t, err)
assert.Equal(t, c.err, err)
assert.Nil(t, tlsConfig)
}
}
Loading

0 comments on commit 44de914

Please sign in to comment.