Skip to content

Commit

Permalink
Events (#2522)
Browse files Browse the repository at this point in the history
* first draft for event system - includes example

Signed-off-by: jkoberg <[email protected]>

* add event middleware

Signed-off-by: jkoberg <[email protected]>

* events: distinguish grantee userid and groupid

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* seperate consumer from publisher

Signed-off-by: jkoberg <[email protected]>

* code review suggestions

Signed-off-by: jkoberg <[email protected]>

* simplify example

Signed-off-by: jkoberg <[email protected]>

* add changelog

Signed-off-by: jkoberg <[email protected]>

* make nats server configurable

Signed-off-by: jkoberg <[email protected]>

* add license headers

Signed-off-by: jkoberg <[email protected]>

* cheat the linter

Signed-off-by: jkoberg <[email protected]>

Co-authored-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
kobergj and butonic committed Feb 14, 2022
1 parent 65bb12a commit 20a38e4
Show file tree
Hide file tree
Showing 13 changed files with 995 additions and 10 deletions.
7 changes: 7 additions & 0 deletions changelog/unreleased/events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: introduce events

This will introduce events into the system. Events are a simple way to bring information from
one service to another. Read `pkg/events/example` and subfolders for more information

https://github.com/cs3org/reva/pull/2522

10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Masterminds/sprig v2.22.0+incompatible
github.com/ReneKroon/ttlcache/v2 v2.11.0
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/asim/go-micro/plugins/events/nats/v4 v4.0.0-20220118152736-9e0be6c85d75
github.com/aws/aws-sdk-go v1.42.39
github.com/beevik/etree v1.1.0
github.com/bluele/gcache v0.0.2
Expand Down Expand Up @@ -38,7 +39,6 @@ require (
github.com/hashicorp/go-hclog v1.1.0
github.com/hashicorp/go-plugin v1.4.3
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/jedib0t/go-pretty v4.3.0+incompatible
github.com/juliangruber/go-intersect v1.1.0
github.com/mattn/go-sqlite3 v1.14.10
Expand All @@ -48,6 +48,8 @@ require (
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mitchellh/mapstructure v1.4.3
github.com/nats-io/nats-server/v2 v2.7.2
github.com/nats-io/nats-streaming-server v0.24.1
github.com/onsi/ginkgo/v2 v2.0.0
github.com/onsi/gomega v1.18.1
github.com/pkg/errors v0.9.1
Expand All @@ -58,24 +60,24 @@ require (
github.com/rs/zerolog v1.26.1
github.com/sciencemesh/meshdirectory-web v1.0.4
github.com/sethvargo/go-password v0.2.0
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/studio-b12/gowebdav v0.0.0-20211109083228-3f8721cd4b6f
github.com/thanhpk/randstr v1.0.4
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tus/tusd v1.8.0
github.com/wk8/go-ordered-map v0.2.0
go-micro.dev/v4 v4.3.1-0.20211108085239-0c2041e43908
go.mongodb.org/mongo-driver v1.7.2 // indirect
go.opencensus.io v0.23.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0
go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/exporters/jaeger v1.3.0
go.opentelemetry.io/otel/sdk v1.3.0
go.opentelemetry.io/otel/trace v1.3.0
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9
golang.org/x/term v0.0.0-20210916214954-140adaaadfaf
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c
google.golang.org/grpc v1.42.0
Expand Down
430 changes: 424 additions & 6 deletions go.sum

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions internal/grpc/interceptors/eventsmiddleware/conversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package eventsmiddleware

import (
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
"github.com/cs3org/reva/pkg/events"
)

// ShareCreated converts response to event
func ShareCreated(r *collaboration.CreateShareResponse) events.ShareCreated {
e := events.ShareCreated{
Sharer: r.Share.Creator,
GranteeUserID: r.Share.GetGrantee().GetUserId(),
GranteeGroupID: r.Share.GetGrantee().GetGroupId(),
ItemID: r.Share.ResourceId,
CTime: r.Share.Ctime,
}

return e
}
95 changes: 95 additions & 0 deletions internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package eventsmiddleware

import (
"context"
"fmt"

"go-micro.dev/v4/util/log"
"google.golang.org/grpc"

"github.com/asim/go-micro/plugins/events/nats/v4"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
"github.com/cs3org/reva/pkg/events"
"github.com/cs3org/reva/pkg/events/server"
"github.com/cs3org/reva/pkg/rgrpc"
)

const (
defaultPriority = 200
)

func init() {
rgrpc.RegisterUnaryInterceptor("eventsmiddleware", NewUnary)
}

// NewUnary returns a new unary interceptor that emits events when needed
// no lint because of the switch statement that should be extendable
//nolint:gocritic
func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error) {
publisher, err := publisherFromConfig(m)
if err != nil {
return nil, 0, err
}

interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
res, err := handler(ctx, req)
if err != nil {
return res, err
}

var ev interface{}
switch v := res.(type) {
case *collaboration.CreateShareResponse:
ev = ShareCreated(v)
}

if ev != nil {
if err := events.Publish(publisher, ev); err != nil {
log.Error(err)
}
}

return res, nil
}
return interceptor, defaultPriority, nil
}

// NewStream returns a new server stream interceptor
// that creates the application context.
func NewStream() grpc.StreamServerInterceptor {
interceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// TODO: Use ss.RecvMsg() and ss.SendMsg() to send events from a stream
return handler(srv, ss)
}
return interceptor
}

func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) {
typ := m["type"].(string)
switch typ {
default:
return nil, fmt.Errorf("stream type '%s' not supported", typ)
case "nats":
address := m["address"].(string)
cid := m["clusterID"].(string)
return server.NewNatsStream(nats.Address(address), nats.ClusterID(cid))
}
}
1 change: 1 addition & 0 deletions internal/grpc/interceptors/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package loader

import (
// Load core GRPC services
_ "github.com/cs3org/reva/internal/grpc/interceptors/eventsmiddleware"
_ "github.com/cs3org/reva/internal/grpc/interceptors/readonly"
// Add your own service here
)
107 changes: 107 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package events

import (
"log"
"reflect"

"go-micro.dev/v4/events"
)

var (
// MainQueueName is the name of the main queue
// All events will go through here as they are forwarded to the consumer via the
// group name
// TODO: "fan-out" so not all events go through the same queue? requires investigation
MainQueueName = "main-queue"

// MetadatakeyEventType is the key used for the eventtype in the metadata map of the event
MetadatakeyEventType = "eventtype"
)

type (
// Unmarshaller is the interface events need to fulfill
Unmarshaller interface {
Unmarshal([]byte) (interface{}, error)
}

// Publisher is the interface publishers need to fulfill
Publisher interface {
Publish(string, interface{}, ...events.PublishOption) error
}

// Consumer is the interface consumer need to fulfill
Consumer interface {
Consume(string, ...events.ConsumeOption) (<-chan events.Event, error)
}

// Stream is the interface common to Publisher and Consumer
Stream interface {
Publish(string, interface{}, ...events.PublishOption) error
Consume(string, ...events.ConsumeOption) (<-chan events.Event, error)
}
)

// Consume returns a channel that will get all events that match the given evs
// group defines the service type: One group will get exactly one copy of a event that is emitted
// NOTE: uses reflect on initialization
func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan interface{}, error) {
c, err := s.Consume(MainQueueName, events.WithGroup(group))
if err != nil {
return nil, err
}

registeredEvents := map[string]Unmarshaller{}
for _, e := range evs {
typ := reflect.TypeOf(e)
registeredEvents[typ.String()] = e
}

outchan := make(chan interface{})
go func() {
for {
e := <-c
et := e.Metadata[MetadatakeyEventType]
ev, ok := registeredEvents[et]
if !ok {
log.Printf("not registered: %s", et)
continue
}

event, err := ev.Unmarshal(e.Payload)
if err != nil {
log.Printf("can't unmarshal event %v", err)
continue
}

outchan <- event
}
}()
return outchan, nil
}

// Publish publishes the ev to the MainQueue from where it is distributed to all subscribers
// NOTE: needs to use reflect on runtime
func Publish(s Publisher, ev interface{}) error {
evName := reflect.TypeOf(ev).String()
return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{
MetadatakeyEventType: evName,
}))
}
60 changes: 60 additions & 0 deletions pkg/events/example/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

// Package consumer contains an example implementation of an event consumer
package consumer

import (
"fmt"
"log"

"github.com/cs3org/reva/pkg/events"
)

// Example consumes events from the queue
func Example(c events.Consumer) {
// Step 1 - which group does the consumer belong to?
// each group will get each event that is emitted, but only one member of the group will get it.
group := "test-consumer"

// Step 2 - which events does the consumer listen too?
evs := []events.Unmarshaller{
// for example created shares
events.ShareCreated{},
}

// Step 3 - create event channel
evChan, err := events.Consume(c, group, evs...)
if err != nil {
log.Fatal(err)
}

// Step 4 - listen to events
for {
event := <-evChan

// best to use type switch to differentiate events
switch v := event.(type) {
case events.ShareCreated:
fmt.Printf("%s) Share created: %+v\n", group, v)
default:
fmt.Printf("%s) Unregistered event: %+v\n", group, v)
}
}

}
Loading

0 comments on commit 20a38e4

Please sign in to comment.