diff --git a/README.md b/README.md index 5a436fb..9c234d9 100644 --- a/README.md +++ b/README.md @@ -10,5 +10,5 @@ | security | jwts, nacl, TBD | tokens, secrets, and certs | | server | grpc | build backend servers | | store | cockroach | data persistence | -| streams | TBD | asynchronous communication | +| streams | custom | asynchronous communication | | telemetry | memory | logs, metrics, and traces | diff --git a/streams/custom/streams.go b/streams/custom/streams.go new file mode 100644 index 0000000..bb9d5fa --- /dev/null +++ b/streams/custom/streams.go @@ -0,0 +1,181 @@ +package custom + +import ( + "context" + "encoding/json" + "time" + + "github.com/w-h-a/pkg/client" + pb "github.com/w-h-a/pkg/proto/streams" + "github.com/w-h-a/pkg/streams" + "github.com/w-h-a/pkg/telemetry/log" +) + +const ( + OrderCreated = "order:created" + OrderCancelled = "order:cancelled" + OrderExpired = "order:expired" + PaymentCreated = "payment:created" +) + +type customStream struct { + options streams.StreamsOptions + streams client.Client +} + +func (s *customStream) Options() streams.StreamsOptions { + return s.options +} + +func (s *customStream) Subscribe(id string, opts ...streams.SubscribeOption) error { + options := streams.NewSubscribeOptions(opts...) + + req := s.streams.NewRequest( + client.RequestWithNamespace("wha-platform-resource"), + client.RequestWithPort(8081), + client.RequestWithName("streams"), + client.RequestWithMethod("Stream.Subscribe"), + client.RequestWithUnmarshaledRequest( + &pb.SubscribeRequest{ + Id: id, + Group: options.Group, + Topic: options.Topic, + AckWait: options.AckWait.Nanoseconds(), + RetryLimit: int64(options.RetryLimit), + Offset: options.Offset.Unix(), + }, + ), + ) + + rsp := &pb.SubscribeResponse{} + + if err := s.streams.Call(context.Background(), req, rsp); err != nil { + return err + } + + return nil +} + +func (s *customStream) Unsubscribe(id string) error { + req := s.streams.NewRequest( + client.RequestWithNamespace("wha-platform-resource"), + client.RequestWithPort(8081), + client.RequestWithName("streams"), + client.RequestWithMethod("Stream.Unsubscribe"), + client.RequestWithUnmarshaledRequest( + &pb.UnsubscribeRequest{ + Id: id, + }, + ), + ) + + rsp := &pb.UnsubscribeResponse{} + + if err := s.streams.Call(context.Background(), req, rsp); err != nil { + return err + } + + return nil +} + +func (s *customStream) Consume(id string) (streams.Subscriber, error) { + pbReq := &pb.ConsumeRequest{ + Id: id, + } + + req := s.streams.NewRequest( + client.RequestWithNamespace("wha-platform-resource"), + client.RequestWithPort(8081), + client.RequestWithName("streams"), + client.RequestWithMethod("Stream.Consume"), + client.RequestWithUnmarshaledRequest(pbReq), + ) + + stream, err := s.streams.Stream(context.Background(), req) + if err != nil { + return nil, err + } + + if err := stream.Send(pbReq); err != nil { + return nil, err + } + + sub := NewSubscriber() + + go func() { + for { + var ev pb.Event + + if err := stream.Recv(&ev); err != nil { + log.Errorf("failed to receive event from stream: %v", err) + sub.Close() + stream.Close() + return + } + + event := &streams.Event{ + Id: ev.Id, + Topic: ev.Topic, + Payload: ev.Payload, + Timestamp: time.Unix(ev.Timestamp, 0), + Metadata: ev.Metadata, + } + + cpy := *event + + cpy.SetAck(Ack(stream, sub, cpy)) + cpy.SetNack(Nack(stream, sub, cpy)) + + sub.Channel() <- cpy + } + }() + + return sub, nil +} + +func (s *customStream) Produce(topic string, data interface{}, opts ...streams.ProduceOption) error { + options := streams.NewProduceOptions(opts...) + + var bytes []byte + + if p, ok := data.([]byte); ok { + bytes = p + } else { + p, err := json.Marshal(data) + if err != nil { + return streams.ErrEncodingData + } + bytes = p + } + + req := s.streams.NewRequest( + client.RequestWithNamespace("wha-platform-resource"), + client.RequestWithPort(8081), + client.RequestWithName("streams"), + client.RequestWithMethod("Stream.Produce"), + client.RequestWithUnmarshaledRequest( + &pb.ProduceRequest{ + Topic: topic, + Payload: bytes, + Metadata: options.Metadata, + }, + ), + ) + + rsp := &pb.ProduceResponse{} + + if err := s.streams.Call(options.Context, req, rsp); err != nil { + return err + } + + return nil +} + +func (s *customStream) String() string { + return "custom" +} + +func NewStreams(c client.Client) streams.Streams { + o := streams.NewStreamsOptions() + return &customStream{o, c} +} diff --git a/streams/custom/subscriber.go b/streams/custom/subscriber.go new file mode 100644 index 0000000..d2b2cf0 --- /dev/null +++ b/streams/custom/subscriber.go @@ -0,0 +1,54 @@ +package custom + +import ( + pb "github.com/w-h-a/pkg/proto/streams" + "github.com/w-h-a/pkg/streams" +) + +type customSubscriber struct { + options streams.SubscribeOptions + channel chan streams.Event +} + +func (s *customSubscriber) Options() streams.SubscribeOptions { + return s.options +} + +func (s *customSubscriber) Channel() chan streams.Event { + return s.channel +} + +func (s *customSubscriber) Close() { + close(s.channel) +} + +func (s *customSubscriber) Ack(ev streams.Event) interface{} { + return &pb.AckRequest{Id: ev.Id, Success: true} +} + +func (s *customSubscriber) Nack(ev streams.Event) interface{} { + return &pb.AckRequest{Id: ev.Id, Success: false} +} + +func (s *customSubscriber) SetAttempts(c int, ev streams.Event) { + +} + +func (s *customSubscriber) GetAttempts(ev streams.Event) (int, bool) { + return 0, true +} + +func (s *customSubscriber) String() string { + return "custom" +} + +func NewSubscriber(opts ...streams.SubscribeOption) streams.Subscriber { + options := streams.NewSubscribeOptions(opts...) + + s := &customSubscriber{ + options: options, + channel: make(chan streams.Event), + } + + return s +} diff --git a/streams/custom/utils.go b/streams/custom/utils.go new file mode 100644 index 0000000..c30cca3 --- /dev/null +++ b/streams/custom/utils.go @@ -0,0 +1,18 @@ +package custom + +import ( + "github.com/w-h-a/pkg/client" + "github.com/w-h-a/pkg/streams" +) + +func Ack(s client.Stream, sub streams.Subscriber, event streams.Event) func() error { + return func() error { + return s.Send(sub.Ack(event)) + } +} + +func Nack(s client.Stream, sub streams.Subscriber, event streams.Event) func() error { + return func() error { + return s.Send(sub.Nack(event)) + } +}