Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests #60

Merged
merged 50 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
7e5e99f
Add test for string.go
mostafa May 11, 2022
7bd85eb
Add test for bytearray.go
mostafa May 11, 2022
119bd2f
Add test for errors.go
mostafa May 11, 2022
263da44
Add test for auth.go
mostafa May 12, 2022
054fce4
Fix linter errors
mostafa May 12, 2022
88be882
Fix tests after rebasing with the logging/error-reporting PR merge
mostafa May 13, 2022
94fa373
Remove the old ReportErrors function and test
mostafa May 13, 2022
d9ce0f2
Run Go tests in the test pipeline
mostafa May 13, 2022
f090db0
Add more tests to auth_test.go after refactoring loggin and error-rep…
mostafa May 13, 2022
9be1bd2
Refactor functions a bit until proper configuration validation is imp…
mostafa May 13, 2022
401277b
Fix assertions
mostafa May 13, 2022
1706f30
Remove schema from function signatures (unused parameter)
mostafa May 13, 2022
8370cdd
Add tests for serde.go
mostafa May 13, 2022
cd1f221
Add tests for avro.go
mostafa May 13, 2022
765a06f
Fix linter error
mostafa May 13, 2022
e099e5a
Add more tests to SerializeAvro
mostafa May 13, 2022
968e22e
Refactor serde and add serde registry
mostafa May 14, 2022
5feb5c7
Rename schema registry module
mostafa May 14, 2022
f48cd7a
Remove duplicate code by using a function to get a connection to Kafka
mostafa May 14, 2022
99e1ee9
Add tests to configuration.go
mostafa May 14, 2022
6c86bac
Update jsonschema.go with the latest changes to schema registry
mostafa May 14, 2022
37726c8
Add tests to jsonschema.go
mostafa May 14, 2022
9d56d9f
Add more test to avro.go
mostafa May 15, 2022
785d24a
Add tests to jsonschema.go
mostafa May 15, 2022
002f446
Add tests to schema_registry.go
mostafa May 15, 2022
e619ad0
Create coverage report
mostafa May 15, 2022
a9b6adb
Fix linter errors
mostafa May 15, 2022
38ed1ad
Remove duplicate tests
mostafa May 15, 2022
d1155e3
Refactor configuration_test.go
mostafa May 15, 2022
9f4da87
Use constants
mostafa May 15, 2022
5a1e11e
Fix typo
mostafa May 15, 2022
f5c7c15
Ignore act
mostafa May 15, 2022
ef46c17
Add tests to topic.go
mostafa May 16, 2022
96efdc8
Run lensesio/fast-data-dev container for testing against Kafka
mostafa May 16, 2022
2bb540b
Use title case
mostafa May 16, 2022
350ec31
Assert module instance and runtime
mostafa May 17, 2022
33f4c47
WIP: Add tests to producer.go (has issues with default function)
mostafa May 17, 2022
4494c1d
Rename kafkatest.go so it doesnt' get included in non test builds
mstoykov May 17, 2022
c1e461d
Make test being able to go to vu code mode and other fixes
mstoykov May 17, 2022
7362502
Create topic before producing messages
mostafa May 17, 2022
1900f0b
Check output metrics of produce function plus some fixes
mostafa May 17, 2022
d931aeb
Fix bugs and add some TODO comments
mostafa May 17, 2022
4751eb0
Add more tests to producer.go
mostafa May 17, 2022
5e1dce3
Fix typos
mostafa May 17, 2022
85b1092
Separate nil context from cancelled context error
mostafa May 17, 2022
7f8b412
Fix an edge case with setting offset together with groupID
mostafa May 17, 2022
4b0abc5
Fix tests
mostafa May 17, 2022
6876cc9
Add tests to consumer.go (buggy)
mostafa May 17, 2022
67907de
Fix test
mostafa May 17, 2022
89a411d
Add more tests to consumer.go and fix others
mostafa May 17, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,18 @@ jobs:
go install go.k6.io/xk6/cmd/xk6@latest
xk6 build --with github.com/mostafa/xk6-kafka@latest=.

- name: Run Apache Kafka 🦉 and test xk6-kafka 🧪
- name: Run Zookeeper and Kafka ⚙️
run: |
cd ~/work/xk6-kafka/xk6-kafka
# Download and extract Kafka
wget -q https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
tar xf kafka_2.13-3.1.0.tgz
# This is needed for Kafka to work
mkdir -p /tmp/kraft-combined-logs/
cat <<EOF > /tmp/kraft-combined-logs/meta.properties
version=1
broker.id=1
cluster.id=1
node.id=1
EOF
# Run Kafka in background
./kafka_2.13-3.1.0/bin/kafka-server-start.sh ./kafka_2.13-3.1.0/config/kraft/server.properties > /dev/null 2>&1 &
# Wait for 5 seconds, run k6 (with xk6-kafka) and kill Kafka background process
sleep 5 && ./k6 run -d 5s ./scripts/test_json.js && kill %1
docker run --detach --rm --name lensesio -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev:latest
sleep 10s

- name: Run Go tests 🔬
run: go test -cover -v .

- name: Run xk6-kafka tests 🧪
run: ./k6 run -d 5s ./scripts/test_json.js

- name: Stop containers 🛑
if: always()
run: |
docker stop lensesio
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
vendor/
k6
.idea
act
37 changes: 21 additions & 16 deletions auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"time"

Expand All @@ -30,7 +29,7 @@ type Credentials struct {
ServerCaPem string `json:"serverCaPem"`
}

func unmarshalCredentials(auth string) (*Credentials, *Xk6KafkaError) {
func UnmarshalCredentials(auth string) (*Credentials, *Xk6KafkaError) {
creds := &Credentials{
Algorithm: None,
}
Expand All @@ -45,8 +44,8 @@ func unmarshalCredentials(auth string) (*Credentials, *Xk6KafkaError) {
}
}

func getDialerFromCreds(creds *Credentials) (*kafkago.Dialer, *Xk6KafkaError) {
tlsConfig, err := tlsConfig(creds)
func GetDialerFromCreds(creds *Credentials) (*kafkago.Dialer, *Xk6KafkaError) {
tlsConfig, err := TLSConfig(creds)
if err != nil && err.Unwrap() != nil {
return nil, err
}
Expand Down Expand Up @@ -84,18 +83,18 @@ func getDialerFromCreds(creds *Credentials) (*kafkago.Dialer, *Xk6KafkaError) {
return dialer, nil
}

func getDialerFromAuth(auth string) (*kafkago.Dialer, *Xk6KafkaError) {
func GetDialerFromAuth(auth string) (*kafkago.Dialer, *Xk6KafkaError) {
if auth != "" {
// Parse the auth string
creds, err := unmarshalCredentials(auth)
creds, err := UnmarshalCredentials(auth)
if err != nil {
return nil, err
}

// Try to create an authenticated dialer from the credentials
// with TLS enabled if the credentials specify a client cert
// and key.
return getDialerFromCreds(creds)
return GetDialerFromCreds(creds)
} else {
// Create a normal (unauthenticated) dialer
return &kafkago.Dialer{
Expand All @@ -105,44 +104,50 @@ func getDialerFromAuth(auth string) (*kafkago.Dialer, *Xk6KafkaError) {
}
}

func fileExists(filename string) bool {
func FileExists(filename string) bool {
_, err := os.Stat(filename)
return err == nil
}

func tlsConfig(creds *Credentials) (*tls.Config, *Xk6KafkaError) {
func TLSConfig(creds *Credentials) (*tls.Config, *Xk6KafkaError) {
var clientCertFile = &creds.ClientCertPem
if !fileExists(*clientCertFile) {
if !FileExists(*clientCertFile) {
return nil, NewXk6KafkaError(fileNotFound, "Client certificate file not found.", nil)
}

var clientKeyFile = &creds.ClientKeyPem
if !fileExists(*clientKeyFile) {
if !FileExists(*clientKeyFile) {
return nil, NewXk6KafkaError(fileNotFound, "Client key file not found.", nil)
}

var cert, err = tls.LoadX509KeyPair(*clientCertFile, *clientKeyFile)
if err != nil {
return nil, NewXk6KafkaError(
failedLoadX509KeyPair,
fmt.Sprintf("Error creating x509 keypair from client cert file %s and client key file %s", *clientCertFile, *clientKeyFile),
fmt.Sprintf("Error creating x509 keypair from client cert file \"%s\" and client key file \"%s\"", *clientCertFile, *clientKeyFile),
err)
}

var caCertFile = &creds.ServerCaPem
if !fileExists(*caCertFile) {
if !FileExists(*caCertFile) {
return nil, NewXk6KafkaError(fileNotFound, "CA certificate file not found.", nil)
}

caCert, err := ioutil.ReadFile(*caCertFile)
caCert, err := os.ReadFile(*caCertFile)
if err != nil {
// This might happen on permissions issues or if the file is unreadable somehow
return nil, NewXk6KafkaError(
failedReadCaCertFile,
fmt.Sprintf("Error reading CA certificate file %s", *caCertFile),
fmt.Sprintf("Error reading CA certificate file \"%s\"", *caCertFile),
err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, NewXk6KafkaError(
failedAppendCaCertFile,
fmt.Sprintf("Error appending CA certificate file \"%s\"", *caCertFile),
nil)
}

return &tls.Config{
Certificates: []tls.Certificate{cert},
Expand Down
180 changes: 180 additions & 0 deletions auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package kafka

import (
"errors"
"testing"
"time"

"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"github.com/stretchr/testify/assert"
)

func TestUnmarshalCredentials(t *testing.T) {
creds, err := UnmarshalCredentials(`{"username": "test", "password": "test", "algorithm": "plain", "clientCertPem": "client.pem", "clientKeyPem": "key.pem", "serverCaPem": "server.pem"}`)
assert.Nil(t, err)
assert.Equal(t, "test", creds.Username)
assert.Equal(t, "test", creds.Password)
assert.Equal(t, "plain", creds.Algorithm)
assert.Equal(t, "client.pem", creds.ClientCertPem)
assert.Equal(t, "key.pem", creds.ClientKeyPem)
assert.Equal(t, "server.pem", creds.ServerCaPem)
}

func TestUnmarshalCredentialsFails(t *testing.T) {
// This only fails on invalid JSON (apparently)
creds, err := UnmarshalCredentials(`{"invalid": "invalid`)
assert.Nil(t, creds)
assert.NotNil(t, err)
assert.Equal(t, err.Message, "Unable to unmarshal credentials")
// This is the error we get from the json package wrapped inside the Xk6KafkaError
assert.Equal(t, err.Unwrap().Error(), "unexpected end of JSON input")
}

func TestGetDialerFromCredsWithSASLPlain(t *testing.T) {
creds := &Credentials{
Username: "test",
Password: "test",
Algorithm: Plain,
}
dialer, err := GetDialerFromCreds(creds)
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, true, dialer.DualStack)
assert.Nil(t, dialer.TLS)
assert.Equal(t, "PLAIN", dialer.SASLMechanism.Name())
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Username)
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Password)
}

func TestGetDialerFromCredsWithSASLScram(t *testing.T) {
creds := &Credentials{
Username: "test",
Password: "test",
Algorithm: SHA256,
}
dialer, err := GetDialerFromCreds(creds)
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, true, dialer.DualStack)
assert.Nil(t, dialer.TLS)
assert.Equal(t, scram.SHA256.Name(), dialer.SASLMechanism.Name())
}

func TestGetDialerFromCredsFails(t *testing.T) {
creds := &Credentials{
Username: "https://www.exa\t\r\n",
Password: "test",
Algorithm: "sha256",
}
dialer, wrappedError := GetDialerFromCreds(creds)
assert.Equal(t, wrappedError.Message, "Unable to create SCRAM mechanism")
// This is a stringprep (RFC3454) error wrapped inside the Xk6KafkaError
assert.Equal(t, wrappedError.Unwrap().Error(), "Error SASLprepping username 'https://www.exa\t\r\n': prohibited character (rune: '\\u0009')")
assert.Nil(t, dialer)
}

func TestGetDialerFromAuth(t *testing.T) {
auth := `{"username": "test", "password": "test", "algorithm": "plain", "clientCertPem": "client.pem", "clientKeyPem": "key.pem", "serverCaPem": "server.pem"}`
dialer, err := GetDialerFromAuth(auth)
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, true, dialer.DualStack)
assert.Nil(t, dialer.TLS)
assert.Equal(t, "PLAIN", dialer.SASLMechanism.Name())
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Username)
assert.Equal(t, "test", dialer.SASLMechanism.(plain.Mechanism).Password)
}

func TestGetDialerFromAuthNoAuthString(t *testing.T) {
dialer, err := GetDialerFromAuth("")
assert.Nil(t, err)
assert.NotNil(t, dialer)
assert.Equal(t, 10*time.Second, dialer.Timeout)
assert.Equal(t, false, dialer.DualStack)
assert.Nil(t, dialer.TLS)
}

func TestFileExists(t *testing.T) {
assert.True(t, FileExists("auth_test.go"))
assert.False(t, FileExists("test.go.not"))
}

type SimpleTLSConfig struct {
creds *Credentials
err *Xk6KafkaError
}

func TestTlsConfig(t *testing.T) {
creds := &Credentials{
ClientCertPem: "fixtures/client.cer",
ClientKeyPem: "fixtures/client.pem",
ServerCaPem: "fixtures/caroot.cer",
}
tlsConfig, err := TLSConfig(creds)
assert.Nil(t, err)
assert.NotNil(t, tlsConfig)
}

func TestTlsConfigFails(t *testing.T) {
creds := []*SimpleTLSConfig{
{
creds: &Credentials{},
err: &Xk6KafkaError{
Code: fileNotFound,
Message: "Client certificate file not found.",
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/client.cer",
},
err: &Xk6KafkaError{
Code: fileNotFound,
Message: "Client key file not found.",
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/client.cer",
ClientKeyPem: "fixtures/client.pem",
},
err: &Xk6KafkaError{
Code: fileNotFound,
Message: "CA certificate file not found.",
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/invalid-client.cer",
ClientKeyPem: "fixtures/invalid-client.pem",
},
err: &Xk6KafkaError{
Code: failedLoadX509KeyPair,
Message: "Error creating x509 keypair from client cert file \"fixtures/invalid-client.cer\" and client key file \"fixtures/invalid-client.pem\"",
OriginalError: errors.New("tls: failed to find any PEM data in certificate input"),
},
},
{
creds: &Credentials{
ClientCertPem: "fixtures/client.cer",
ClientKeyPem: "fixtures/client.pem",
ServerCaPem: "fixtures/invalid-caroot.cer",
},
err: &Xk6KafkaError{
Code: failedAppendCaCertFile,
Message: "Error appending CA certificate file \"fixtures/invalid-caroot.cer\"",
},
},
}

for _, c := range creds {
tlsConfig, err := TLSConfig(c.creds)
assert.NotNil(t, err)
assert.Equal(t, c.err, err)
assert.Nil(t, tlsConfig)
}
}
Loading