Skip to content

Commit

Permalink
executor: decompress gzip paloyads before logging them (#3746)
Browse files Browse the repository at this point in the history
* executor: decompress gzip paloyads before logging them

* extend kfserving sample to include logger

* refactor

* further refactor
  • Loading branch information
RafalSkolasinski authored Nov 19, 2021
1 parent f32ac46 commit 2e57a07
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 27 deletions.
34 changes: 34 additions & 0 deletions executor/api/payload/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package payload

import (
"bytes"
"compress/gzip"
"io/ioutil"
)

func DecompressBytes(data []byte, contentEncoding string) ([]byte, error) {
if contentEncoding != "gzip" {
return data, nil
}

bytesReader := bytes.NewReader(data)
gzipReader, err := gzip.NewReader(bytesReader)
if err != nil {
return nil, err
}
output, err := ioutil.ReadAll(gzipReader)
if err != nil {
return nil, err
}
return output, nil
}

// Decompress payloads if Content-Encoding is set to gzip
func DecompressSeldonPayload(msg SeldonPayload) ([]byte, error) {
data, err := msg.GetBytes()
if err != nil {
return nil, err
}

return DecompressBytes(data, msg.GetContentEncoding())
}
13 changes: 9 additions & 4 deletions executor/api/rest/kfserving.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package rest

import (
"encoding/json"

"github.com/pkg/errors"
"github.com/seldonio/seldon-core/executor/api/payload"
)

func ChainKFserving(msg payload.SeldonPayload) (payload.SeldonPayload, error) {
data, err := payload.DecompressSeldonPayload(msg)
if err != nil {
return nil, err
}

var f interface{}
err := json.Unmarshal(msg.GetPayload().([]byte), &f)
err = json.Unmarshal(data, &f)
if err != nil {
return nil, err
}
Expand All @@ -21,10 +27,9 @@ func ChainKFserving(msg payload.SeldonPayload) (payload.SeldonPayload, error) {
b, err := json.Marshal(m)
if err != nil {
return nil, err
} else {
p := payload.BytesPayload{Msg: b}
return &p, nil
}
p := payload.BytesPayload{Msg: b, ContentType: msg.GetContentType()}
return &p, nil
} else {
return nil, errors.Errorf("Failed to convert kfserving response so it could be chained to new input")
}
Expand Down
17 changes: 9 additions & 8 deletions executor/logger/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ const (
)

type LogRequest struct {
Url *url.URL
Bytes *[]byte
ContentType string
ReqType LogRequestType
Id string
SourceUri *url.URL
ModelId string
RequestId string
Url *url.URL
Bytes *[]byte
ContentType string
ContentEncoding string
ReqType LogRequestType
Id string
SourceUri *url.URL
ModelId string
RequestId string
}
17 changes: 14 additions & 3 deletions executor/logger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"errors"
"fmt"
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
"net/http"
"time"
"github.com/seldonio/seldon-core/executor/api/payload"
)

const (
Expand Down Expand Up @@ -123,6 +125,15 @@ func (w *Worker) sendKafkaEvent(logReq LogRequest) error {

func (w *Worker) sendCloudEvent(logReq LogRequest) error {

// This temporary fix related to the fact that Triton server responses
// are now gzipped compressed. Until we introduce support for gzip
// compressed payloads in the logger / adserver and include content-encoding
// header in the CloudEvent messages this can serve as temporary solution.
data, err := payload.DecompressBytes(*logReq.Bytes, logReq.ContentEncoding)
if err != nil {
return fmt.Errorf("while creating http transport: %s", err)
}

t, err := cloudevents.NewHTTPTransport(
cloudevents.WithTarget(logReq.Url.String()),
cloudevents.WithEncoding(cloudevents.HTTPBinaryV1),
Expand Down Expand Up @@ -154,7 +165,7 @@ func (w *Worker) sendCloudEvent(logReq LogRequest) error {

event.SetSource(logReq.SourceUri.String())
event.SetDataContentType(logReq.ContentType)
if err := event.SetData(*logReq.Bytes); err != nil {
if err := event.SetData(data); err != nil {
return fmt.Errorf("while setting cloudevents data: %s", err)
}

Expand Down
18 changes: 9 additions & 9 deletions executor/predictor/predictor_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,17 +333,17 @@ func (p *PredictorProcess) logPayload(nodeName string, logger *v1.Logger, reqTyp
if err != nil {
return err
}

go func() {
err := payloadLogger.QueueLogRequest(payloadLogger.LogRequest{
Url: logUrl,
Bytes: &data,
ContentType: msg.GetContentType(),
ReqType: reqType,
Id: guuid.New().String(),
SourceUri: p.ServerUrl,
ModelId: nodeName,
RequestId: puid,
Url: logUrl,
Bytes: &data,
ContentType: msg.GetContentType(),
ContentEncoding: msg.GetContentEncoding(),
ReqType: reqType,
Id: guuid.New().String(),
SourceUri: p.ServerUrl,
ModelId: nodeName,
RequestId: puid,
})
if err != nil {
p.Log.Error(err, "failed to log request")
Expand Down
10 changes: 8 additions & 2 deletions executor/samples/local/kfserving/Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
BASE=../../..
TRITON_VERSION ?= 21.08-py3

triton-inference-server:
git clone https://github.com/NVIDIA/triton-inference-server

triton-inference-server/docs/examples/model_repository/simple: triton-inference-server
cd triton-inference-server/docs/examples && ./fetch_models.sh


## dummy logger
run_dummy_logsink:
docker run -it -p 2222:80 --rm -t mendhak/http-https-echo

## REST

run_executor:
${BASE}/executor --sdep triton --namespace default --predictor simple --file ./model.yaml --http_port 8000 --grpc_port 5000 --protocol kfserving
${BASE}/executor --sdep triton --namespace default --predictor simple --file ./model.yaml --http_port 8000 --grpc_port 5000 --protocol kfserving --log_level DEBUG


run_triton_simple:
docker run --rm --shm-size=1g --ulimit memlock=-1 --ulimit stack=67108864 -p9000:9000 -p8001:8001 -p8002:8002 -p5001:5001 -v ${PWD}/triton-inference-server/docs/examples/model_repository:/models nvcr.io/nvidia/tritonserver:21.08-py3 /opt/tritonserver/bin/tritonserver --model-repository=/models --http-port=9000 --grpc-port=5001
docker run --rm --shm-size=1g --ulimit memlock=-1 --ulimit stack=67108864 -p9000:9000 -p8001:8001 -p8002:8002 -p5001:5001 -v ${PWD}/triton-inference-server/docs/examples/model_repository:/models nvcr.io/nvidia/tritonserver:${TRITON_VERSION} /opt/tritonserver/bin/tritonserver --model-repository=/models --http-port=9000 --grpc-port=5001 --log-verbose=1

curl_rest_triton:
curl -v -d '{"inputs":[{"name":"INPUT0","data":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","data":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],"datatype":"INT32","shape":[1,16]}]}' -X POST http://0.0.0.0:9000/v2/models/simple/infer -H "Content-Type: application/json"
Expand Down
5 changes: 4 additions & 1 deletion executor/samples/local/kfserving/model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ spec:
implementation: TRITON_SERVER
modelUri: gs://seldon-models/trtis/simple-model
name: simple
type: MODEL
type: MODEL
logger:
mode: all
url: http://localhost:2222
name: simple
replicas: 1

0 comments on commit 2e57a07

Please sign in to comment.