Skip to content

Commit

Permalink
refactor: fix event stream poor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Feb 24, 2025
1 parent 2b3cc91 commit ce267ac
Showing 1 changed file with 46 additions and 62 deletions.
108 changes: 46 additions & 62 deletions internal/eventstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@
package eventstream

import (
"sync"
"github.com/tochemey/goakt/v3/internal/collection/syncmap"
)

// Subscribers defines the map of subscribers
type Subscribers map[string]Subscriber

type Stream interface {
// AddSubscriber adds a subscriber
AddSubscriber() Subscriber
Expand All @@ -52,9 +49,8 @@ type Stream interface {

// EventsStream defines the stream broker
type EventsStream struct {
subs Subscribers
topics map[string]Subscribers
mu sync.Mutex
subscribers *syncmap.Map[string, Subscriber]
topics *syncmap.Map[string, *syncmap.Map[string, Subscriber]]
}

// enforce a compilation error
Expand All @@ -63,19 +59,16 @@ var _ Stream = (*EventsStream)(nil)
// New creates an instance of EventsStream
func New() Stream {
return &EventsStream{
subs: Subscribers{},
topics: map[string]Subscribers{},
mu: sync.Mutex{},
subscribers: syncmap.New[string, Subscriber](),
topics: syncmap.New[string, *syncmap.Map[string, Subscriber]](),
}
}

// AddSubscriber adds a subscriber
func (b *EventsStream) AddSubscriber() Subscriber {
b.mu.Lock()
defer b.mu.Unlock()
c := newSubscriber()
b.subs[c.ID()] = c
return c
subscriber := newSubscriber()
b.subscribers.Set(subscriber.ID(), subscriber)
return subscriber
}

// RemoveSubscriber removes a subscriber
Expand All @@ -85,88 +78,79 @@ func (b *EventsStream) RemoveSubscriber(sub Subscriber) {
for _, topic := range sub.Topics() {
b.Unsubscribe(sub, topic)
}
b.mu.Lock()
// remove subscriber from list of subscribers.
delete(b.subs, sub.ID())
b.mu.Unlock()
b.subscribers.Delete(sub.ID())
sub.Shutdown()
}

// Broadcast notifies all subscribers of a given topic of a new message
func (b *EventsStream) Broadcast(msg any, topics []string) {
b.mu.Lock()
defer b.mu.Unlock()

for _, topic := range topics {
for _, consumer := range b.topics[topic] {
if !consumer.Active() {
continue
if subscribers, ok := b.topics.Get(topic); ok && subscribers.Len() != 0 {
for _, subscriber := range subscribers.Values() {
if !subscriber.Active() {
continue
}
go subscriber.signal(NewMessage(topic, msg))
}
go consumer.signal(NewMessage(topic, msg))
}
}
}

// SubscribersCount returns the number of subscribers for a given topic
func (b *EventsStream) SubscribersCount(topic string) int {
// get total subscribers subscribed to given topic.
b.mu.Lock()
defer b.mu.Unlock()
return len(b.topics[topic])
if subscribers, ok := b.topics.Get(topic); ok {
return subscribers.Len()
}
return 0
}

// Subscribe subscribes a subscriber to a topic
func (b *EventsStream) Subscribe(sub Subscriber, topic string) {
func (b *EventsStream) Subscribe(subscriber Subscriber, topic string) {
// subscribe to given topic
b.mu.Lock()
defer b.mu.Unlock()

// only subscribe active consumer
if !sub.Active() {
if !subscriber.Active() {
return
}

if b.topics[topic] == nil {
b.topics[topic] = Subscribers{}
subscriber.subscribe(topic)
if subscribers, ok := b.topics.Get(topic); ok && subscribers.Len() != 0 {
subscribers.Set(subscriber.ID(), subscriber)
return
}
sub.subscribe(topic)
b.topics[topic][sub.ID()] = sub

// here the topic does not exist
subscribers := syncmap.New[string, Subscriber]()
subscribers.Set(subscriber.ID(), subscriber)
b.topics.Set(topic, subscribers)
}

// Unsubscribe removes a subscriber from a topic
func (b *EventsStream) Unsubscribe(sub Subscriber, topic string) {
b.mu.Lock()
defer b.mu.Unlock()

delete(b.topics[topic], sub.ID())
sub.unsubscribe(topic)
func (b *EventsStream) Unsubscribe(subscriber Subscriber, topic string) {
subscriber.unsubscribe(topic)
if subscribers, ok := b.topics.Get(topic); ok && subscribers.Len() != 0 {
subscribers.Delete(subscriber.ID())
}
}

// Publish publishes a message to a topic
func (b *EventsStream) Publish(topic string, msg any) {
b.mu.Lock()
subscribers := b.topics[topic]
b.mu.Unlock()

for _, consumer := range subscribers {
if !consumer.Active() {
continue
if subscribers, ok := b.topics.Get(topic); ok && subscribers.Len() != 0 {
for _, subscriber := range subscribers.Values() {
if !subscriber.Active() {
continue
}
go subscriber.signal(NewMessage(topic, msg))
}
go consumer.signal(NewMessage(topic, msg))
}
}

// Close closes the stream
func (b *EventsStream) Close() {
// acquire the lock
b.mu.Lock()
// release the lock once done
defer b.mu.Unlock()
for _, sub := range b.subs {
if sub.Active() {
sub.Shutdown()
for _, subscriber := range b.subscribers.Values() {
if subscriber.Active() {
subscriber.Shutdown()
}
}
b.subs = Subscribers{}
b.topics = map[string]Subscribers{}
b.subscribers.Reset()
b.topics.Reset()
}

0 comments on commit ce267ac

Please sign in to comment.