Skip to content

Commit

Permalink
Fix closing streams
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed May 12, 2023
1 parent e91c48e commit 352802a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
10 changes: 8 additions & 2 deletions internal/source/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,16 @@ func (d *Dispatcher) Dispatch(dispatch PluginDispatch) error {
go func() {
for {
select {
case event := <-out.Output:
case event, ok := <-out.Output:
if !ok {
return
}
log.WithField("event", string(event)).Debug("Dispatching received event...")
d.dispatch(ctx, event, dispatch)
case msg := <-out.Event:
case msg, ok := <-out.Event:
if !ok {
return
}
log.WithField("message", msg).Debug("Dispatching received message...")
d.dispatchMsg(ctx, msg, dispatch)
case <-ctx.Done():
Expand Down
11 changes: 8 additions & 3 deletions pkg/api/source/grpc_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"fmt"
"io"
"log"

"github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/kubeshop/botkube/pkg/api"
Expand Down Expand Up @@ -114,11 +115,13 @@ func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error {
func (p *Plugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &grpcClient{
client: NewSourceClient(c),
logger: NewLogger(),
}, nil
}

type grpcClient struct {
client SourceClient
logger logrus.FieldLogger
}

func (p *grpcClient) Stream(ctx context.Context, in StreamInput) (StreamOutput, error) {
Expand Down Expand Up @@ -152,20 +155,22 @@ func (p *grpcClient) Stream(ctx context.Context, in StreamInput) (StreamOutput,
// On any other error, the stream is aborted and the error contains the RPC
// status.
if err != nil {
log.Print(err)
p.logger.Errorf("canceling streaming: %s", status.Convert(err).Message())
// TODO: we should consider adding error feedback channel to StreamOutput.
return
}
var event Event
if len(feature.Event) != 0 && string(feature.Event) != "" {
if err := json.Unmarshal(feature.Event, &event); err != nil {
log.Printf("while unmarshalling message from JSON: %s", err.Error())
p.logger.Errorf("canceling streaming: cannot unmarshal JSON message: %s", err.Error())
return
}
}
out.Output <- feature.Output
out.Event <- event
}
close(out.Output)
close(out.Event)
}()

return out, nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/api/source/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package source

import (
"os"

"github.com/sirupsen/logrus"
)

// NewLogger returns a new logger used internally. We should replace it in the near future, as we shouldn't be so opinionated.
func NewLogger() logrus.FieldLogger {
logger := logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
return logger
}

0 comments on commit 352802a

Please sign in to comment.