Skip to content

Commit

Permalink
further refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalSkolasinski committed Nov 18, 2021
1 parent 3a3f9ae commit 0eafdc9
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 25 deletions.
34 changes: 19 additions & 15 deletions executor/api/payload/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,29 @@ import (
"io/ioutil"
)

// Decompress payloads if Content-Encoding is set
func DecompressBytes(data []byte, contentEncoding string) ([]byte, error) {
if contentEncoding != "gzip" {
return data, nil
}

bytesReader := bytes.NewReader(data)
gzipReader, err := gzip.NewReader(bytesReader)
if err != nil {
return nil, err
}
output, err := ioutil.ReadAll(gzipReader)
if err != nil {
return nil, err
}
return output, nil
}

// Decompress payloads if Content-Encoding is set to gzip
func DecompressSeldonPayload(msg SeldonPayload) ([]byte, error) {
data, err := msg.GetBytes()
if err != nil {
return nil, err
}

if msg.GetContentEncoding() == "gzip" {
bytesReader := bytes.NewReader(data)
gzipReader, err := gzip.NewReader(bytesReader)
if err != nil {
return nil, err
}
output, err := ioutil.ReadAll(gzipReader)
if err != nil {
return nil, err
}
return output, nil
} else {
return data, nil
}
return DecompressBytes(data, msg.GetContentEncoding())
}
13 changes: 12 additions & 1 deletion executor/logger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
"github.com/seldonio/seldon-core/executor/api/payload"
)

const (
Expand Down Expand Up @@ -123,6 +124,16 @@ func (w *Worker) sendKafkaEvent(logReq LogRequest) error {
}

func (w *Worker) sendCloudEvent(logReq LogRequest) error {

// This temporary fix related to the fact that Triton server responses
// are now gzipped compressed. Until we introduce support for gzip
// compressed payloads in the logger / adserver and include content-encoding
// header in the CloudEvent messages this can serve as temporary solution.
data, err := payload.DecompressBytes(*logReq.Bytes, logReq.ContentEncoding)
if err != nil {
return fmt.Errorf("while creating http transport: %s", err)
}

t, err := cloudevents.NewHTTPTransport(
cloudevents.WithTarget(logReq.Url.String()),
cloudevents.WithEncoding(cloudevents.HTTPBinaryV1),
Expand Down Expand Up @@ -154,7 +165,7 @@ func (w *Worker) sendCloudEvent(logReq LogRequest) error {

event.SetSource(logReq.SourceUri.String())
event.SetDataContentType(logReq.ContentType)
if err := event.SetData(*logReq.Bytes); err != nil {
if err := event.SetData(data); err != nil {
return fmt.Errorf("while setting cloudevents data: %s", err)
}

Expand Down
14 changes: 5 additions & 9 deletions executor/predictor/predictor_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,20 +325,16 @@ func (p *PredictorProcess) getLogUrl(logger *v1.Logger) (*url.URL, error) {
}

func (p *PredictorProcess) logPayload(nodeName string, logger *v1.Logger, reqType payloadLogger.LogRequestType, msg payload.SeldonPayload, puid string) error {
data, err := msg.GetBytes()
if err != nil {
return err
}
logUrl, err := p.getLogUrl(logger)
if err != nil {
return err
}
go func() {
// This temporary fix related to the fact that Triton server responses
// are now gzipped compressed. Until we introduce support for gzip
// compressed payloads in the logger / adserver and include content-encoding
// header in the CloudEvent messages this can serve as temporary solution.
data, err := payload.DecompressSeldonPayload(msg)
if err != nil {
p.Log.Error(err, "failed to log request")
}
err = payloadLogger.QueueLogRequest(payloadLogger.LogRequest{
err := payloadLogger.QueueLogRequest(payloadLogger.LogRequest{
Url: logUrl,
Bytes: &data,
ContentType: msg.GetContentType(),
Expand Down

0 comments on commit 0eafdc9

Please sign in to comment.