Skip to content

Commit

Permalink
Merge pull request #52 from w-h-a/custom-streams
Browse files Browse the repository at this point in the history
feat: add custom streams service
  • Loading branch information
w-h-a authored Jul 30, 2024
2 parents 015447f + 1ab8b58 commit 772ea23
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
| security | jwts, nacl, TBD | tokens, secrets, and certs |
| server | grpc | build backend servers |
| store | cockroach | data persistence |
| streams | TBD | asynchronous communication |
| streams | custom | asynchronous communication |
| telemetry | memory | logs, metrics, and traces |
181 changes: 181 additions & 0 deletions streams/custom/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package custom

import (
"context"
"encoding/json"
"time"

"github.com/w-h-a/pkg/client"
pb "github.com/w-h-a/pkg/proto/streams"
"github.com/w-h-a/pkg/streams"
"github.com/w-h-a/pkg/telemetry/log"
)

const (
OrderCreated = "order:created"
OrderCancelled = "order:cancelled"
OrderExpired = "order:expired"
PaymentCreated = "payment:created"
)

type customStream struct {
options streams.StreamsOptions
streams client.Client
}

func (s *customStream) Options() streams.StreamsOptions {
return s.options
}

func (s *customStream) Subscribe(id string, opts ...streams.SubscribeOption) error {
options := streams.NewSubscribeOptions(opts...)

req := s.streams.NewRequest(
client.RequestWithNamespace("wha-platform-resource"),
client.RequestWithPort(8081),
client.RequestWithName("streams"),
client.RequestWithMethod("Stream.Subscribe"),
client.RequestWithUnmarshaledRequest(
&pb.SubscribeRequest{
Id: id,
Group: options.Group,
Topic: options.Topic,
AckWait: options.AckWait.Nanoseconds(),
RetryLimit: int64(options.RetryLimit),
Offset: options.Offset.Unix(),
},
),
)

rsp := &pb.SubscribeResponse{}

if err := s.streams.Call(context.Background(), req, rsp); err != nil {
return err
}

return nil
}

func (s *customStream) Unsubscribe(id string) error {
req := s.streams.NewRequest(
client.RequestWithNamespace("wha-platform-resource"),
client.RequestWithPort(8081),
client.RequestWithName("streams"),
client.RequestWithMethod("Stream.Unsubscribe"),
client.RequestWithUnmarshaledRequest(
&pb.UnsubscribeRequest{
Id: id,
},
),
)

rsp := &pb.UnsubscribeResponse{}

if err := s.streams.Call(context.Background(), req, rsp); err != nil {
return err
}

return nil
}

func (s *customStream) Consume(id string) (streams.Subscriber, error) {
pbReq := &pb.ConsumeRequest{
Id: id,
}

req := s.streams.NewRequest(
client.RequestWithNamespace("wha-platform-resource"),
client.RequestWithPort(8081),
client.RequestWithName("streams"),
client.RequestWithMethod("Stream.Consume"),
client.RequestWithUnmarshaledRequest(pbReq),
)

stream, err := s.streams.Stream(context.Background(), req)
if err != nil {
return nil, err
}

if err := stream.Send(pbReq); err != nil {
return nil, err
}

sub := NewSubscriber()

go func() {
for {
var ev pb.Event

if err := stream.Recv(&ev); err != nil {
log.Errorf("failed to receive event from stream: %v", err)
sub.Close()
stream.Close()
return
}

event := &streams.Event{
Id: ev.Id,
Topic: ev.Topic,
Payload: ev.Payload,
Timestamp: time.Unix(ev.Timestamp, 0),
Metadata: ev.Metadata,
}

cpy := *event

cpy.SetAck(Ack(stream, sub, cpy))
cpy.SetNack(Nack(stream, sub, cpy))

sub.Channel() <- cpy
}
}()

return sub, nil
}

func (s *customStream) Produce(topic string, data interface{}, opts ...streams.ProduceOption) error {
options := streams.NewProduceOptions(opts...)

var bytes []byte

if p, ok := data.([]byte); ok {
bytes = p
} else {
p, err := json.Marshal(data)
if err != nil {
return streams.ErrEncodingData
}
bytes = p
}

req := s.streams.NewRequest(
client.RequestWithNamespace("wha-platform-resource"),
client.RequestWithPort(8081),
client.RequestWithName("streams"),
client.RequestWithMethod("Stream.Produce"),
client.RequestWithUnmarshaledRequest(
&pb.ProduceRequest{
Topic: topic,
Payload: bytes,
Metadata: options.Metadata,
},
),
)

rsp := &pb.ProduceResponse{}

if err := s.streams.Call(options.Context, req, rsp); err != nil {
return err
}

return nil
}

func (s *customStream) String() string {
return "custom"
}

func NewStreams(c client.Client) streams.Streams {
o := streams.NewStreamsOptions()
return &customStream{o, c}
}
54 changes: 54 additions & 0 deletions streams/custom/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package custom

import (
pb "github.com/w-h-a/pkg/proto/streams"
"github.com/w-h-a/pkg/streams"
)

type customSubscriber struct {
options streams.SubscribeOptions
channel chan streams.Event
}

func (s *customSubscriber) Options() streams.SubscribeOptions {
return s.options
}

func (s *customSubscriber) Channel() chan streams.Event {
return s.channel
}

func (s *customSubscriber) Close() {
close(s.channel)
}

func (s *customSubscriber) Ack(ev streams.Event) interface{} {
return &pb.AckRequest{Id: ev.Id, Success: true}
}

func (s *customSubscriber) Nack(ev streams.Event) interface{} {
return &pb.AckRequest{Id: ev.Id, Success: false}
}

func (s *customSubscriber) SetAttempts(c int, ev streams.Event) {

}

func (s *customSubscriber) GetAttempts(ev streams.Event) (int, bool) {
return 0, true
}

func (s *customSubscriber) String() string {
return "custom"
}

func NewSubscriber(opts ...streams.SubscribeOption) streams.Subscriber {
options := streams.NewSubscribeOptions(opts...)

s := &customSubscriber{
options: options,
channel: make(chan streams.Event),
}

return s
}
18 changes: 18 additions & 0 deletions streams/custom/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package custom

import (
"github.com/w-h-a/pkg/client"
"github.com/w-h-a/pkg/streams"
)

func Ack(s client.Stream, sub streams.Subscriber, event streams.Event) func() error {
return func() error {
return s.Send(sub.Ack(event))
}
}

func Nack(s client.Stream, sub streams.Subscriber, event streams.Event) func() error {
return func() error {
return s.Send(sub.Nack(event))
}
}

0 comments on commit 772ea23

Please sign in to comment.