diff --git a/executor/api/payload/utils.go b/executor/api/payload/utils.go new file mode 100644 index 0000000000..2624ee4319 --- /dev/null +++ b/executor/api/payload/utils.go @@ -0,0 +1,34 @@ +package payload + +import ( + "bytes" + "compress/gzip" + "io/ioutil" +) + +func DecompressBytes(data []byte, contentEncoding string) ([]byte, error) { + if contentEncoding != "gzip" { + return data, nil + } + + bytesReader := bytes.NewReader(data) + gzipReader, err := gzip.NewReader(bytesReader) + if err != nil { + return nil, err + } + output, err := ioutil.ReadAll(gzipReader) + if err != nil { + return nil, err + } + return output, nil +} + +// Decompress payloads if Content-Encoding is set to gzip +func DecompressSeldonPayload(msg SeldonPayload) ([]byte, error) { + data, err := msg.GetBytes() + if err != nil { + return nil, err + } + + return DecompressBytes(data, msg.GetContentEncoding()) +} diff --git a/executor/api/rest/kfserving.go b/executor/api/rest/kfserving.go index 183aea75c4..1fe6a923da 100644 --- a/executor/api/rest/kfserving.go +++ b/executor/api/rest/kfserving.go @@ -2,13 +2,19 @@ package rest import ( "encoding/json" + "github.com/pkg/errors" "github.com/seldonio/seldon-core/executor/api/payload" ) func ChainKFserving(msg payload.SeldonPayload) (payload.SeldonPayload, error) { + data, err := payload.DecompressSeldonPayload(msg) + if err != nil { + return nil, err + } + var f interface{} - err := json.Unmarshal(msg.GetPayload().([]byte), &f) + err = json.Unmarshal(data, &f) if err != nil { return nil, err } @@ -21,10 +27,9 @@ func ChainKFserving(msg payload.SeldonPayload) (payload.SeldonPayload, error) { b, err := json.Marshal(m) if err != nil { return nil, err - } else { - p := payload.BytesPayload{Msg: b} - return &p, nil } + p := payload.BytesPayload{Msg: b, ContentType: msg.GetContentType()} + return &p, nil } else { return nil, errors.Errorf("Failed to convert kfserving response so it could be chained to new input") } diff --git a/executor/logger/types.go b/executor/logger/types.go index d623ccf7bc..92b3b1684c 100644 --- a/executor/logger/types.go +++ b/executor/logger/types.go @@ -13,12 +13,13 @@ const ( ) type LogRequest struct { - Url *url.URL - Bytes *[]byte - ContentType string - ReqType LogRequestType - Id string - SourceUri *url.URL - ModelId string - RequestId string + Url *url.URL + Bytes *[]byte + ContentType string + ContentEncoding string + ReqType LogRequestType + Id string + SourceUri *url.URL + ModelId string + RequestId string } diff --git a/executor/logger/worker.go b/executor/logger/worker.go index 4d1b78a3f9..790ee9510d 100644 --- a/executor/logger/worker.go +++ b/executor/logger/worker.go @@ -4,12 +4,14 @@ import ( "context" "errors" "fmt" + "net/http" + "time" + cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/go-logr/logr" - "net/http" - "time" + "github.com/seldonio/seldon-core/executor/api/payload" ) const ( @@ -123,6 +125,15 @@ func (w *Worker) sendKafkaEvent(logReq LogRequest) error { func (w *Worker) sendCloudEvent(logReq LogRequest) error { + // This temporary fix related to the fact that Triton server responses + // are now gzipped compressed. Until we introduce support for gzip + // compressed payloads in the logger / adserver and include content-encoding + // header in the CloudEvent messages this can serve as temporary solution. + data, err := payload.DecompressBytes(*logReq.Bytes, logReq.ContentEncoding) + if err != nil { + return fmt.Errorf("while creating http transport: %s", err) + } + t, err := cloudevents.NewHTTPTransport( cloudevents.WithTarget(logReq.Url.String()), cloudevents.WithEncoding(cloudevents.HTTPBinaryV1), @@ -154,7 +165,7 @@ func (w *Worker) sendCloudEvent(logReq LogRequest) error { event.SetSource(logReq.SourceUri.String()) event.SetDataContentType(logReq.ContentType) - if err := event.SetData(*logReq.Bytes); err != nil { + if err := event.SetData(data); err != nil { return fmt.Errorf("while setting cloudevents data: %s", err) } diff --git a/executor/predictor/predictor_process.go b/executor/predictor/predictor_process.go index a7d30c58f7..1e63a5b02c 100644 --- a/executor/predictor/predictor_process.go +++ b/executor/predictor/predictor_process.go @@ -333,17 +333,17 @@ func (p *PredictorProcess) logPayload(nodeName string, logger *v1.Logger, reqTyp if err != nil { return err } - go func() { err := payloadLogger.QueueLogRequest(payloadLogger.LogRequest{ - Url: logUrl, - Bytes: &data, - ContentType: msg.GetContentType(), - ReqType: reqType, - Id: guuid.New().String(), - SourceUri: p.ServerUrl, - ModelId: nodeName, - RequestId: puid, + Url: logUrl, + Bytes: &data, + ContentType: msg.GetContentType(), + ContentEncoding: msg.GetContentEncoding(), + ReqType: reqType, + Id: guuid.New().String(), + SourceUri: p.ServerUrl, + ModelId: nodeName, + RequestId: puid, }) if err != nil { p.Log.Error(err, "failed to log request") diff --git a/executor/samples/local/kfserving/Makefile b/executor/samples/local/kfserving/Makefile index 8a80f577c0..ba77c5f48f 100644 --- a/executor/samples/local/kfserving/Makefile +++ b/executor/samples/local/kfserving/Makefile @@ -1,4 +1,5 @@ BASE=../../.. +TRITON_VERSION ?= 21.08-py3 triton-inference-server: git clone https://github.com/NVIDIA/triton-inference-server @@ -6,14 +7,19 @@ triton-inference-server: triton-inference-server/docs/examples/model_repository/simple: triton-inference-server cd triton-inference-server/docs/examples && ./fetch_models.sh + +## dummy logger +run_dummy_logsink: + docker run -it -p 2222:80 --rm -t mendhak/http-https-echo + ## REST run_executor: - ${BASE}/executor --sdep triton --namespace default --predictor simple --file ./model.yaml --http_port 8000 --grpc_port 5000 --protocol kfserving + ${BASE}/executor --sdep triton --namespace default --predictor simple --file ./model.yaml --http_port 8000 --grpc_port 5000 --protocol kfserving --log_level DEBUG run_triton_simple: - docker run --rm --shm-size=1g --ulimit memlock=-1 --ulimit stack=67108864 -p9000:9000 -p8001:8001 -p8002:8002 -p5001:5001 -v ${PWD}/triton-inference-server/docs/examples/model_repository:/models nvcr.io/nvidia/tritonserver:21.08-py3 /opt/tritonserver/bin/tritonserver --model-repository=/models --http-port=9000 --grpc-port=5001 + docker run --rm --shm-size=1g --ulimit memlock=-1 --ulimit stack=67108864 -p9000:9000 -p8001:8001 -p8002:8002 -p5001:5001 -v ${PWD}/triton-inference-server/docs/examples/model_repository:/models nvcr.io/nvidia/tritonserver:${TRITON_VERSION} /opt/tritonserver/bin/tritonserver --model-repository=/models --http-port=9000 --grpc-port=5001 --log-verbose=1 curl_rest_triton: curl -v -d '{"inputs":[{"name":"INPUT0","data":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],"datatype":"INT32","shape":[1,16]},{"name":"INPUT1","data":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16],"datatype":"INT32","shape":[1,16]}]}' -X POST http://0.0.0.0:9000/v2/models/simple/infer -H "Content-Type: application/json" diff --git a/executor/samples/local/kfserving/model.yaml b/executor/samples/local/kfserving/model.yaml index 15c8f3e9cc..2fef7e52fe 100644 --- a/executor/samples/local/kfserving/model.yaml +++ b/executor/samples/local/kfserving/model.yaml @@ -14,6 +14,9 @@ spec: implementation: TRITON_SERVER modelUri: gs://seldon-models/trtis/simple-model name: simple - type: MODEL + type: MODEL + logger: + mode: all + url: http://localhost:2222 name: simple replicas: 1