Skip to content

Commit

Permalink
Finish CloudEvents
Browse files Browse the repository at this point in the history
* Updated plumber-schemas version
* Fixed tests
* Added CloudEvents example in examples.MD
  • Loading branch information
blinktag committed Apr 14, 2023
1 parent 05f87ad commit 4920fc6
Show file tree
Hide file tree
Showing 18 changed files with 440 additions and 467 deletions.
5 changes: 3 additions & 2 deletions backends/kafka/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
skafka "github.com/segmentio/kafka-go"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"

Expand All @@ -29,7 +30,7 @@ func (k *Kafka) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh
return errors.Wrap(err, "unable to verify write options")
}

if writeOpts.Kafka.CloudEvent.Cloudevent {
if writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return k.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func (k *Kafka) writeCloudEvents(ctx context.Context, writeOpts *opts.WriteOptio
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.Kafka.CloudEvent, msg)
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(k.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
Expand Down
6 changes: 4 additions & 2 deletions backends/nats-jetstream/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/nats-io/nats.go"
Expand All @@ -21,7 +23,7 @@ func (n *NatsJetstream) Write(ctx context.Context, writeOpts *opts.WriteOptions,
return errors.Wrap(err, "invalid write options")
}

if writeOpts.NatsJetstream.CloudEvent.Cloudevent {
if writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return n.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

Expand Down Expand Up @@ -58,7 +60,7 @@ func (n *NatsJetstream) writeCloudEvents(_ context.Context, writeOpts *opts.Writ
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.NatsJetstream.CloudEvent, msg)
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(n.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
Expand Down
6 changes: 4 additions & 2 deletions backends/nats-streaming/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"

Expand All @@ -20,7 +22,7 @@ func (n *NatsStreaming) Write(ctx context.Context, writeOpts *opts.WriteOptions,
return errors.Wrap(err, "invalid write options")
}

if writeOpts.NatsStreaming.CloudEvent.Cloudevent {
if writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return n.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

Expand Down Expand Up @@ -51,7 +53,7 @@ func (n *NatsStreaming) writeCloudEvents(ctx context.Context, writeOpts *opts.Wr
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.NatsStreaming.CloudEvent, msg)
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(n.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
Expand Down
8 changes: 4 additions & 4 deletions backends/nats-streaming/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ import (
"context"
"io/ioutil"

"github.com/batchcorp/plumber-schemas/build/go/protos/records"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/batchcorp/plumber/backends/nats-streaming/stanfakes"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
"github.com/batchcorp/plumber/backends/nats-streaming/stanfakes"
"github.com/batchcorp/plumber/validate"
)

Expand All @@ -35,6 +34,7 @@ var _ = Describe("Nats Streaming Backend", func() {
Channel: "test",
},
},
EncodeOptions: &encoding.EncodeOptions{},
}
})

Expand Down
6 changes: 4 additions & 2 deletions backends/nats/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"log"

"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/pkg/errors"
Expand All @@ -21,7 +23,7 @@ func (n *Nats) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh
return errors.Wrap(err, "unable to validate write options")
}

if writeOpts.Nats.CloudEvent.Cloudevent {
if writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return n.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

Expand Down Expand Up @@ -54,7 +56,7 @@ func (n *Nats) writeCloudEvents(ctx context.Context, writeOpts *opts.WriteOption
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.Nats.CloudEvent, msg)
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(n.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
Expand Down
20 changes: 20 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [Shallow envelope protobuf messages](#shallow-envelope-protobuf-messages)
* [Using File Descriptor Sets](#using-file-descriptor-sets)
* [Using Avro schemas when reading or writing](#using-avro-schemas-when-reading-or-writing)
* [Publish CloudEvents](#publish-cloudevents)

## Consuming

Expand Down Expand Up @@ -738,3 +739,22 @@ plumber read kafka --topics fdstest1 \
$ plumber write kafka --topics=orders --avro-schema-file=some_schema.avsc --input-file=your_data.json
$ plumber read kafka --topics=orders --avro-schema-file=some_schema.avsc
```
#### Publish CloudEvents
> **_NOTE:_** CloudEvents are currently only supported for: Kafka, NATS, NATS Streaming, and NATS JetStream
Plumber supports emitting [CloudEvent](https://github.com/cloudevents/spec) messages.
By default, if the contents of `--input` or `--input-file` represents a valid cloudevent in JSON format, the data
will be unmarshaled into a cloud event. Any `--ce-*` flags specified will override their respective values in the event
before the event is published.
If the value of `--input` or `--input-file` is not a valid cloudevent in JSON format, a new cloudevent will be created
and the input will be set as the _data_ field's value. Other fields will be set using the values supplied via `--ce-*` flags.
**Example Kafka publish:**
```bash
plumber write kafka --encode-type cloudevent --topics myevents --input-file test-assets/cloudevents/example.json
```
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/batchcorp/kong v0.2.17-batch-fix
github.com/batchcorp/natty v0.0.16
github.com/batchcorp/pgoutput v0.3.2
github.com/batchcorp/plumber-schemas v0.0.180-0.20230309225628-84c1334d79d9
github.com/batchcorp/plumber-schemas v0.0.180-0.20230414144614-85d5b0aa66d3
github.com/batchcorp/rabbit v0.1.17
github.com/batchcorp/thrifty v0.0.10
github.com/eclipse/paho.mqtt.golang v1.2.0
Expand Down Expand Up @@ -58,7 +58,7 @@ require (
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be
google.golang.org/api v0.29.0
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.29.0
google.golang.org/protobuf v1.30.0
)

require (
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ github.com/batchcorp/pgoutput v0.3.2 h1:Jck/nJLwXWmsNHF2iEfjFWrsds5LLGtqu6y6TjjU
github.com/batchcorp/pgoutput v0.3.2/go.mod h1:xzkr73rAlqUdvt2oMAQav1lIt6xnG8bqYYXt4GrXM/I=
github.com/batchcorp/plumber-schemas v0.0.180-0.20230309225628-84c1334d79d9 h1:RSTM71MF9k3s28Z8TsRfjw6nwL07AFNz01yRcbX7aEo=
github.com/batchcorp/plumber-schemas v0.0.180-0.20230309225628-84c1334d79d9/go.mod h1:rHCMVIUS+BHnUXgM72qr8CBU4R4NnHNCjEZyOamVdyY=
github.com/batchcorp/plumber-schemas v0.0.180-0.20230414144614-85d5b0aa66d3 h1:vQ8z4IDlzRSH/MUmDc3mIc0G8/dfCyoeAlfHPurL6NU=
github.com/batchcorp/plumber-schemas v0.0.180-0.20230414144614-85d5b0aa66d3/go.mod h1:0g9lX0S7Ec7JVa0kzAkJC3gWAFbwIUcK3/VJ0n68KhA=
github.com/batchcorp/plz v0.9.2 h1:bPqb+sn7OUrpHjeTEI9YO4BJS9IQ7AstDDz2gn+tcn8=
github.com/batchcorp/plz v0.9.2/go.mod h1:3gacX+hQo+xvl0vtLqCMufzxuNCwt4geAVOMt2LQYfE=
github.com/batchcorp/rabbit v0.1.17 h1:dui1W7FLTrNxyVlDN+G+6d8LXz8HBhVAcUethXql9vQ=
Expand Down Expand Up @@ -1177,6 +1179,8 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.29.0 h1:44S3JjaKmLEE4YIkjzexaP+NzZsudE3Zin5Njn/pYX0=
google.golang.org/protobuf v1.29.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
13 changes: 13 additions & 0 deletions test-assets/cloudevents/example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"specversion" : "1.0",
"type" : "com.github.pull_request.opened",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"id" : "A234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"type": "plumberexample",
"comexampleextension1" : "value",
"comexampleothervalue" : 5,
"datacontenttype" : "text/xml",
"data" : "<much wow=\"xml\"/>"
}
11 changes: 8 additions & 3 deletions util/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"github.com/cloudevents/sdk-go/v2/event"
"github.com/pkg/errors"

"github.com/batchcorp/plumber-schemas/build/go/protos/cloudevent"
"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
)

func GenCloudEvent(cfg *cloudevent.CloudEventOptions, msg *records.WriteRecord) (*event.Event, error) {
func GenCloudEvent(cfg *encoding.CloudEventSettings, msg *records.WriteRecord) (*event.Event, error) {
if cfg == nil {
return nil, errors.New("cloud event options cannot be nil")
}
Expand All @@ -20,7 +20,12 @@ func GenCloudEvent(cfg *cloudevent.CloudEventOptions, msg *records.WriteRecord)

e := cloudevents.NewEvent(cfg.CeSpecVersion)

e.SetData("application/json", []byte(msg.Input))
// Try to unmarshal entire input to see if it's a valid cloud event in JSON format
if err := e.UnmarshalJSON([]byte(msg.Input)); err != nil {
// Input is not entire cloud event, most likely just plain JSON.
// Set the input as the data field and then set all other values based on flags.
e.SetData("application/json", []byte(msg.Input))
}

if cfg.CeId != "" {
e.SetID(cfg.CeId)
Expand Down

This file was deleted.

Loading

0 comments on commit 4920fc6

Please sign in to comment.