Skip to content

Commit

Permalink
Kafka tunnels now support replaying headers
Browse files Browse the repository at this point in the history
  • Loading branch information
blinktag committed Oct 6, 2022
1 parent fdf5f57 commit a9c0a40
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 9 deletions.
7 changes: 3 additions & 4 deletions backends/awssqs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ import (
"encoding/json"
"time"

"github.com/batchcorp/plumber/util"

"github.com/batchcorp/plumber/validate"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"

"github.com/batchcorp/plumber/util"
"github.com/batchcorp/plumber/validate"
)

func (a *AWSSQS) Read(ctx context.Context, readOpts *opts.ReadOptions, resultsChan chan *records.ReadRecord, errorChan chan *records.ErrorRecord) error {
Expand Down
43 changes: 38 additions & 5 deletions backends/kafka/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package kafka

import (
"context"

"github.com/batchcorp/plumber/validate"
"encoding/base64"

"github.com/pkg/errors"
"github.com/segmentio/kafka-go"
skafka "github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"

"github.com/batchcorp/collector-schemas/build/go/protos/events"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"

"github.com/batchcorp/plumber/tunnel"
"github.com/batchcorp/plumber/util"
"github.com/batchcorp/plumber/validate"
)

// Tunnels starts up a new GRPC client connected to the dProxy service and receives a stream of outbound replay messages
Expand Down Expand Up @@ -42,11 +46,18 @@ MAIN:
for {
select {
case outbound := <-outboundCh:
headers := make([]kafka.Header, 0)

if len(outbound.Metadata) > 0 {
headers = k.generateKafkaHeaders(outbound)
}

for _, topic := range opts.Kafka.Args.Topics {
if err := writer.WriteMessages(ctx, skafka.Message{
Topic: topic,
Key: []byte(opts.Kafka.Args.Key),
Value: outbound.Blob,
Topic: topic,
Key: []byte(opts.Kafka.Args.Key),
Value: outbound.Blob,
Headers: headers,
}); err != nil {
llog.Errorf("Unable to replay message: %s", err)
break MAIN
Expand All @@ -64,6 +75,28 @@ MAIN:
return nil
}

func (k *Kafka) generateKafkaHeaders(o *events.Outbound) []kafka.Header {
headers := make([]kafka.Header, 0)

for mdKey, mdVal := range o.Metadata {
var value []byte
var err error
if util.IsBase64(mdVal) {
value, err = base64.StdEncoding.DecodeString(mdVal)
if err != nil {
k.log.Errorf("Unable to decode header '%s' with value '%s' for replay '%s'", mdKey, mdVal, o.ReplayId)
continue
}
} else {
value = []byte(mdVal)
}

headers = append(headers, kafka.Header{Key: mdKey, Value: value})
}

return headers
}

func validateTunnelOptions(tunnelOpts *opts.TunnelOptions) error {
if tunnelOpts == nil {
return validate.ErrEmptyTunnelOpts
Expand Down
26 changes: 26 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"compress/gzip"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -297,3 +299,27 @@ func GenerateNATSAuthNKey(nkeyPath string) ([]nats.Option, error) {
return append(opts, nats.Nkey(pubKey, sigCB)), nil

}

// IsBase64 determines if a string is base64 encoded or not
// We store bus headers in map[string]string and binary headers get encoded as base64
// In order to replay headers properly, we need
func IsBase64(v string) bool {
decoded, err := base64.StdEncoding.DecodeString(v)
if err != nil {
return false
}

// Definitely not base64
if base64.StdEncoding.EncodeToString(decoded) != v {
return false
}

// Might be base64, some numbers will pass DecodeString() 🤦: https://go.dev/play/p/vlDi7CLw2qu
num, err := strconv.Atoi(v)
if err == nil && fmt.Sprintf("%d", num) == v {
// Input is a number, return false
return false
}

return true
}
28 changes: 28 additions & 0 deletions util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package util

import (
"crypto/tls"
"encoding/base64"
"io/ioutil"
"math/rand"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -64,4 +67,29 @@ var _ = Describe("Utility Package", func() {
Expect(len(tlsConfig.Certificates)).To(Equal(1))
})
})

Context("IsBase64", func() {
It("returns correct value", func() {
// Some static cases
cases := map[string]bool{
"aGVsbG8gd29ybGQ=": true,
"sample string": false,
"a": false,
"1234": false,
}

// Ten thousand base64 encoded random bytes
for i := 0; i < 10000; i++ {
randBytes := make([]byte, 32)
rand.Seed(time.Now().UnixNano())
rand.Read(randBytes)
randString := base64.StdEncoding.EncodeToString(randBytes)
cases[randString] = true
}

for v, want := range cases {
Expect(IsBase64(v)).To(Equal(want))
}
})
})
})

0 comments on commit a9c0a40

Please sign in to comment.