Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: Add filters to keep track of local and other subscriptions #24201

Merged
merged 5 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,11 @@ func NewCore(conf *CoreConfig) (*Core, error) {
eventsLogger := conf.Logger.Named("events")
c.allLoggers = append(c.allLoggers, eventsLogger)
// start the event system
events, err := eventbus.NewEventBus(eventsLogger)
nodeID, err := c.LoadNodeID()
if err != nil {
return nil, err
}
events, err := eventbus.NewEventBus(nodeID, eventsLogger)
if err != nil {
return nil, err
}
Expand Down
72 changes: 41 additions & 31 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,27 @@ const (
)

var (
ErrNotStarted = errors.New("event broker has not been started")
cloudEventsFormatterFilter *cloudevents.FormatterFilter
subscriptions atomic.Int64 // keeps track of event subscription count in all event buses
ErrNotStarted = errors.New("event broker has not been started")
subscriptions atomic.Int64 // keeps track of event subscription count in all event buses

// these metadata fields will have the plugin mount path prepended to them
metadataPrependPathFields = []string{
"path",
logical.EventMetadataDataPath,
}
initCloudEventsFormatterFilterOnce sync.Once
swenson marked this conversation as resolved.
Show resolved Hide resolved
)

// EventBus contains the main logic of running an event broker for Vault.
// Start() must be called before the EventBus will accept events for sending.
type EventBus struct {
logger hclog.Logger
broker *eventlogger.Broker
started atomic.Bool
formatterNodeID eventlogger.NodeID
timeout time.Duration
logger hclog.Logger
broker *eventlogger.Broker
started atomic.Bool
formatterNodeID eventlogger.NodeID
timeout time.Duration
filters *Filters
cloudEventsFormatterFilter *cloudevents.FormatterFilter
}

type pluginEventBus struct {
Expand All @@ -72,6 +74,7 @@ type asyncChanNode struct {
closeOnce sync.Once
cancelFunc context.CancelFunc
pipelineID eventlogger.PipelineID
removeFilter func()
removePipeline func(ctx context.Context, t eventlogger.EventType, id eventlogger.PipelineID) (bool, error)
}

Expand Down Expand Up @@ -162,21 +165,7 @@ func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.Even
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
}

func init() {
// TODO: maybe this should relate to the Vault core somehow?
sourceUrl, err := url.Parse("https://vaultproject.io/")
if err != nil {
panic(err)
}
cloudEventsFormatterFilter = &cloudevents.FormatterFilter{
Source: sourceUrl,
Predicate: func(_ context.Context, e interface{}) (bool, error) {
return true, nil
},
}
}

func NewEventBus(logger hclog.Logger) (*EventBus, error) {
func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
broker, err := eventlogger.NewBroker()
if err != nil {
return nil, err
Expand All @@ -192,11 +181,25 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
logger = hclog.Default().Named("events")
}

sourceUrl, err := url.Parse("vault://" + localNodeID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth a changelog note?

if err != nil {
return nil, err
}

cloudEventsFormatterFilter := &cloudevents.FormatterFilter{
Source: sourceUrl,
Predicate: func(_ context.Context, e interface{}) (bool, error) {
return true, nil
},
}

return &EventBus{
logger: logger,
broker: broker,
formatterNodeID: formatterNodeID,
timeout: defaultTimeout,
logger: logger,
broker: broker,
formatterNodeID: formatterNodeID,
timeout: defaultTimeout,
cloudEventsFormatterFilter: cloudEventsFormatterFilter,
filters: NewFilters(localNodeID),
}, nil
}

Expand All @@ -215,7 +218,7 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
return nil, nil, err
}

err = bus.broker.RegisterNode(bus.formatterNodeID, cloudEventsFormatterFilter)
err = bus.broker.RegisterNode(bus.formatterNodeID, bus.cloudEventsFormatterFilter)
if err != nil {
return nil, nil, err
}
Expand All @@ -240,7 +243,12 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
}

ctx, cancel := context.WithCancel(ctx)
asyncNode := newAsyncNode(ctx, bus.logger, bus.broker)

bus.filters.addPattern(bus.filters.self, namespacePathPatterns, pattern)

asyncNode := newAsyncNode(ctx, bus.logger, bus.broker, func() {
bus.filters.removePattern(bus.filters.self, namespacePathPatterns, pattern)
})
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
if err != nil {
defer cancel()
Expand Down Expand Up @@ -301,7 +309,7 @@ func newFilterNode(namespacePatterns []string, pattern string, bexprFilter strin
}
}

// Filter for correct event type, including wildcards.
// NodeFilter for correct event type, including wildcards.
if !glob.Glob(pattern, eventRecv.EventType) {
return false, nil
}
Expand All @@ -315,11 +323,12 @@ func newFilterNode(namespacePatterns []string, pattern string, bexprFilter strin
}, nil
}

func newAsyncNode(ctx context.Context, logger hclog.Logger, broker *eventlogger.Broker) *asyncChanNode {
func newAsyncNode(ctx context.Context, logger hclog.Logger, broker *eventlogger.Broker, removeFilter func()) *asyncChanNode {
return &asyncChanNode{
ctx: ctx,
ch: make(chan *eventlogger.Event),
logger: logger,
removeFilter: removeFilter,
removePipeline: broker.RemovePipelineAndNodes,
}
}
Expand All @@ -328,6 +337,7 @@ func newAsyncNode(ctx context.Context, logger hclog.Logger, broker *eventlogger.
func (node *asyncChanNode) Close(ctx context.Context) {
node.closeOnce.Do(func() {
defer node.cancelFunc()
node.removeFilter()
removed, err := node.removePipeline(ctx, eventTypeAll, node.pipelineID)

switch {
Expand Down
29 changes: 19 additions & 10 deletions vault/eventbus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// TestBusBasics tests that basic event sending and subscribing function.
func TestBusBasics(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestBusBasics(t *testing.T) {
// TestBusIgnoresSendContext tests that the context is ignored when sending to an event,
// so that we do not give up too quickly.
func TestBusIgnoresSendContext(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestBusIgnoresSendContext(t *testing.T) {
// TestSubscribeNonRootNamespace verifies that events for non-root namespaces
// aren't filtered out by the bus.
func TestSubscribeNonRootNamespace(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestSubscribeNonRootNamespace(t *testing.T) {

// TestNamespaceFiltering verifies that events for other namespaces are filtered out by the bus.
func TestNamespaceFiltering(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestNamespaceFiltering(t *testing.T) {

// TestBus2Subscriptions verifies that events of different types are successfully routed to the correct subscribers.
func TestBus2Subscriptions(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestBusSubscriptionsCancel(t *testing.T) {
for _, tc := range testCases {
t.Run(fmt.Sprintf("cancel=%v", tc.cancel), func(t *testing.T) {
subscriptions.Store(0)
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -396,7 +396,7 @@ func waitFor(t *testing.T, maxWait time.Duration, f func() bool) {
// TestBusWildcardSubscriptions tests that a single subscription can receive
// multiple event types using * for glob patterns.
func TestBusWildcardSubscriptions(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -471,7 +471,7 @@ func TestBusWildcardSubscriptions(t *testing.T) {
// TestDataPathIsPrependedWithMount tests that "data_path", if present in the
// metadata, is prepended with the plugin's mount.
func TestDataPathIsPrependedWithMount(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestDataPathIsPrependedWithMount(t *testing.T) {

// TestBexpr tests go-bexpr filters are evaluated on an event.
func TestBexpr(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -671,7 +671,7 @@ func TestBexpr(t *testing.T) {
// TestPipelineCleanedUp ensures pipelines are properly cleaned up after
// subscriptions are closed.
func TestPipelineCleanedUp(t *testing.T) {
bus, err := NewEventBus(nil)
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -683,6 +683,10 @@ func TestPipelineCleanedUp(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// check that the filters are set
if !bus.filters.anyMatch(namespace.RootNamespace.Path, eventType) {
t.Fatal()
}
if !bus.broker.IsAnyPipelineRegistered(eventTypeAll) {
cancel()
t.Fatal()
Expand All @@ -693,4 +697,9 @@ func TestPipelineCleanedUp(t *testing.T) {
if bus.broker.IsAnyPipelineRegistered(eventTypeAll) {
t.Fatal()
}

// and that the filters are cleaned up
if bus.filters.anyMatch(namespace.RootNamespace.Path, eventType) {
t.Fatal()
}
}
124 changes: 124 additions & 0 deletions vault/eventbus/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package eventbus

import (
"slices"
"sync"
"sync/atomic"

"github.com/hashicorp/vault/sdk/logical"
"github.com/ryanuber/go-glob"
)

// Filters keeps track of all the event patterns that each node is interested in.
type Filters struct {
lock sync.RWMutex
parallel bool
self nodeID
filters map[nodeID]*NodeFilter
}

// nodeID is used to syntactically indicate that the string is a node's name identifier.
type nodeID string

// pattern is used to represent one or more combinations of patterns
type pattern struct {
eventTypePattern string
namespacePatterns []string
}

// NodeFilter keeps track of all patterns that a particular node is interested in.
type NodeFilter struct {
patterns []pattern
}

func (nf *NodeFilter) match(nsPath string, eventType logical.EventType) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to accept *namespace.Namespace instead of string for all the nsPath parameters in this file - that way callers don't have to worry about the exact format of the path (should it have a trailing slash, should it be absolute or relative, etc) and can just pass something they know is safe by its type.

if nf == nil {
return false
}
for _, p := range nf.patterns {
if glob.Glob(p.eventTypePattern, string(eventType)) {
for _, nsp := range p.namespacePatterns {
if glob.Glob(nsp, nsPath) {
return true
}
}
}
}
return false
}

// NewFilters creates an empty set of filters to keep track of each node's pattern interests.
func NewFilters(self string) *Filters {
return &Filters{
self: nodeID(self),
filters: map[nodeID]*NodeFilter{},
}
}

// addPattern adds a pattern to a node's list.
func (f *Filters) addPattern(node nodeID, namespacePatterns []string, eventTypePattern string) {
f.lock.Lock()
defer f.lock.Unlock()
if _, ok := f.filters[node]; !ok {
f.filters[node] = &NodeFilter{}
}
f.filters[node].patterns = append(f.filters[node].patterns, pattern{eventTypePattern: eventTypePattern, namespacePatterns: namespacePatterns})
}

// removePattern removes a pattern from a node's list.
func (f *Filters) removePattern(node nodeID, namespacePatterns []string, eventTypePattern string) {
check := pattern{eventTypePattern: eventTypePattern, namespacePatterns: namespacePatterns}
f.lock.Lock()
defer f.lock.Unlock()
filters, ok := f.filters[node]
if !ok {
return
}
filters.patterns = slices.DeleteFunc(filters.patterns, func(m pattern) bool {
return m.eventTypePattern == check.eventTypePattern &&
slices.Equal(m.namespacePatterns, check.namespacePatterns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we're checking for slice equality to remove patterns, perhaps we should sort this slice on the way in to make the equality check a little more reliable?

})
}

// anyMatch returns true if any node's pattern list matches the arguments.
func (f *Filters) anyMatch(nsPath string, eventType logical.EventType) bool {
f.lock.RLock()
defer f.lock.RUnlock()
if f.parallel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the number of parallel goroutines is equal to the number of nodes (always going to be small), and all the data is local, are we sure it's worth adding this parallel path? I'd err on the side of simplicity until we have data to back up the need for parallelism here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I went back and forth on this one. There are a lot of potential optimizations that could be done (combining/simplifying globs into regexes, parallelization, using a more sophisticated tree matching data structure, etc.), but it all seems a bit premature. I'll remove this for now.

wg := sync.WaitGroup{}
anyMatched := atomic.Bool{}
for _, nf := range f.filters {
wg.Add(1)
go func(nf *NodeFilter) {
if nf.match(nsPath, eventType) {
anyMatched.Store(true)
}
wg.Done()
}(nf)
}
wg.Wait()
return anyMatched.Load()
} else {
for _, nf := range f.filters {
if nf.match(nsPath, eventType) {
return true
}
}
return false
}
}

// nodeMatch returns true if the given node's pattern list matches the arguments.
func (f *Filters) nodeMatch(node nodeID, nsPath string, eventType logical.EventType) bool {
f.lock.RLock()
defer f.lock.RUnlock()
return f.filters[node].match(nsPath, eventType)
}

// localMatch returns true if the local node's pattern list matches the arguments.
func (f *Filters) localMatch(nsPath string, eventType logical.EventType) bool {
return f.nodeMatch(f.self, nsPath, eventType)
}
Loading
Loading