diff --git a/examples/models/tfserving-mnist/grpc_mnist_tfserving_deployment.json b/examples/models/tfserving-mnist/grpc_mnist_tfserving_deployment.json new file mode 100644 index 0000000000..0d5074b94a --- /dev/null +++ b/examples/models/tfserving-mnist/grpc_mnist_tfserving_deployment.json @@ -0,0 +1,93 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "labels": { + "app": "seldon" + }, + "name": "tfserving-mnist" + }, + "spec": { + "name": "tf-mnist", + "predictors": [ + { + "componentSpecs": [{ + "spec": { + "containers": [ + { + "image": "seldonio/tfserving-proxy_grpc:0.6", + "name": "tfserving-proxy" + }, + { + "args": [ + "/usr/bin/tensorflow_model_server", + "--port=8080", + "--model_name=mnist-model", + "--model_base_path=gs://seldon-models/tfserving/mnist-model" + ], + "image": "tensorflow/serving:latest", + "name": "mnist-model", + "ports": [ + { + "containerPort": 8080, + "protocol": "TCP" + } + ], + "resources": { + "limits": { + "cpu": "4", + "memory": "4Gi" + }, + "requests": { + "cpu": "1", + "memory": "1Gi" + } + }, + "securityContext": { + "runAsUser": 1000 + } + } + ], + "terminationGracePeriodSeconds": 1 + } + }], + "graph": { + "name": "tfserving-proxy", + "endpoint": { "type" : "GRPC" }, + "type": "MODEL", + "children": [], + "parameters": + [ + { + "name":"grpc_endpoint", + "type":"STRING", + "value":"localhost:8080" + }, + { + "name":"model_name", + "type":"STRING", + "value":"mnist-model" + }, + { + "name":"model_output", + "type":"STRING", + "value":"scores" + }, + { + "name":"model_input", + "type":"STRING", + "value":"images" + }, + { + "name":"signature_name", + "type":"STRING", + "value":"predict_images" + } + ] + }, + "name": "mnist-tfserving", + "replicas": 1 + } + ] + } +} diff --git a/examples/models/tfserving-mnist/rest_mnist_tfserving_deployment.json b/examples/models/tfserving-mnist/rest_mnist_tfserving_deployment.json new file mode 100644 index 0000000000..4a27931230 --- /dev/null +++ b/examples/models/tfserving-mnist/rest_mnist_tfserving_deployment.json @@ -0,0 +1,93 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "labels": { + "app": "seldon" + }, + "name": "tfserving-mnist" + }, + "spec": { + "name": "tf-mnist", + "predictors": [ + { + "componentSpecs": [{ + "spec": { + "containers": [ + { + "image": "seldonio/tfserving-proxy_rest:0.6", + "name": "tfserving-proxy" + }, + { + "args": [ + "/usr/bin/tensorflow_model_server", + "--rest_api_port=8080", + "--model_name=mnist-model", + "--model_base_path=gs://seldon-models/tfserving/mnist-model" + ], + "image": "tensorflow/serving:latest", + "name": "mnist-model", + "ports": [ + { + "containerPort": 8080, + "protocol": "TCP" + } + ], + "resources": { + "limits": { + "cpu": "4", + "memory": "4Gi" + }, + "requests": { + "cpu": "1", + "memory": "1Gi" + } + }, + "securityContext": { + "runAsUser": 1000 + } + } + ], + "terminationGracePeriodSeconds": 1 + } + }], + "graph": { + "name": "tfserving-proxy", + "endpoint": { "type" : "REST" }, + "type": "MODEL", + "children": [], + "parameters": + [ + { + "name":"rest_endpoint", + "type":"STRING", + "value":"http://localhost:8080" + }, + { + "name":"model_name", + "type":"STRING", + "value":"mnist-model" + }, + { + "name":"model_output", + "type":"STRING", + "value":"scores" + }, + { + "name":"model_input", + "type":"STRING", + "value":"images" + }, + { + "name":"signature_name", + "type":"STRING", + "value":"predict_images" + } + ] + }, + "name": "mnist-tfserving", + "replicas": 1 + } + ] + } +} diff --git a/integrations/tfserving/Makefile b/integrations/tfserving/Makefile index 6db6cee73c..b5997b69b1 100644 --- a/integrations/tfserving/Makefile +++ b/integrations/tfserving/Makefile @@ -1,15 +1,15 @@ -IMAGE_VERSION=0.5 +IMAGE_VERSION=0.6 IMAGE_NAME = docker.io/seldonio/tfserving-proxy SELDON_CORE_DIR=../../.. .PHONY: build_rest build_rest: - s2i build -E environment_rest . seldonio/seldon-core-s2i-python3:0.10 $(IMAGE_NAME)_rest:$(IMAGE_VERSION) + s2i build -E environment_rest . seldonio/seldon-core-s2i-python3:0.11-SNAPSHOT $(IMAGE_NAME)_rest:$(IMAGE_VERSION) .PHONY: build_grpc build_grpc: - s2i build -E environment_grpc . seldonio/seldon-core-s2i-python3:0.10 $(IMAGE_NAME)_grpc:$(IMAGE_VERSION) + s2i build -E environment_grpc . seldonio/seldon-core-s2i-python3:0.11-SNAPSHOT $(IMAGE_NAME)_grpc:$(IMAGE_VERSION) push_to_dockerhub_rest: diff --git a/integrations/tfserving/TfServingProxy.py b/integrations/tfserving/TfServingProxy.py index ecbdf06707..0706a5a1c6 100644 --- a/integrations/tfserving/TfServingProxy.py +++ b/integrations/tfserving/TfServingProxy.py @@ -5,7 +5,7 @@ from tensorflow.python.saved_model import signature_constants from tensorflow_serving.apis import predict_pb2 from tensorflow_serving.apis import prediction_service_pb2_grpc -from seldon_core.utils import get_data_from_proto, array_to_grpc_datadef, json_to_seldon_message +from seldon_core.utils import get_data_from_proto, array_to_grpc_datadef, json_to_seldon_message, grpc_datadef_to_array from seldon_core.proto import prediction_pb2 from google.protobuf.json_format import ParseError @@ -17,11 +17,10 @@ log = logging.getLogger() - -''' -A basic tensorflow serving proxy -''' class TfServingProxy(object): + """ + A basic tensorflow serving proxy + """ def __init__( self, @@ -31,8 +30,8 @@ def __init__( signature_name=None, model_input=None, model_output=None): - log.warning("rest_endpoint:",rest_endpoint) - log.warning("grpc_endpoint:",grpc_endpoint) + log.debug("rest_endpoint:",rest_endpoint) + log.debug("grpc_endpoint:",grpc_endpoint) if not grpc_endpoint is None: self.grpc = True channel = grpc.insecure_channel(grpc_endpoint) @@ -48,93 +47,79 @@ def __init__( self.model_input = model_input self.model_output = model_output - - # if we have a TFTensor message we got directly without converting the message otherwise we go the usual route - def predict_raw(self,request): - log.debug("Predict raw") + def predict_grpc(self,request): + """ + predict_grpc will be called only when there is a GRPC call to the server + which in this case, the request will be sent to the TFServer directly. + """ + log.debug("Preprocessing contents for predict function") request_data_type = request.WhichOneof("data_oneof") default_data_type = request.data.WhichOneof("data_oneof") - log.debug(str(request_data_type), str(default_data_type)) - if default_data_type == "tftensor" and self.grpc: - tfrequest = predict_pb2.PredictRequest() - tfrequest.model_spec.name = self.model_name - tfrequest.model_spec.signature_name = self.signature_name + log.debug(f"Request data type: {request_data_type}, Default data type: {default_data_type}") + + if request_data_type != "data": + raise Exception("strData, binData and jsonData not supported.") + + tfrequest = predict_pb2.PredictRequest() + tfrequest.model_spec.name = self.model_name + tfrequest.model_spec.signature_name = self.signature_name + + # For GRPC case, if we have a TFTensor message we can pass it directly + if default_data_type == "tftensor": tfrequest.inputs[self.model_input].CopyFrom(request.data.tftensor) result = self.stub.Predict(tfrequest) - log.debug(result) datadef = prediction_pb2.DefaultData( tftensor=result.outputs[self.model_output] ) return prediction_pb2.SeldonMessage(data=datadef) - elif request_data_type == "jsonData": - features = get_data_from_proto(request) - predictions = self.predict(features, features_names=[]) - try: - sm = json_to_seldon_message({"jsonData": predictions}) - except ParseError as e: - sm = prediction_pb2.SeldonMessage(strData=predictions) - return sm - else: - features = get_data_from_proto(request) - datadef = request.data - predictions = self.predict(features, datadef.names) - predictions = np.array(predictions) - - if request_data_type is not "data": - default_data_type = "tensor" - + data_arr = grpc_datadef_to_array(request.data) + tfrequest.inputs[self.model_input].CopyFrom( + tf.contrib.util.make_tensor_proto( + data_arr.tolist(), + shape=data_arr.shape)) + result = self.stub.Predict(tfrequest) + result_arr = numpy.array(result.outputs[self.model_output].float_val) + if len(result_arr.shape) == 1: + result_arr = numpy.expand_dims(result_arr, axis=0) class_names = [] data = array_to_grpc_datadef( - predictions, class_names, default_data_type) - + default_data_type, result_arr, class_names) return prediction_pb2.SeldonMessage(data=data) - - - def predict(self,X,features_names=[]): - if self.grpc and type(X) is not dict: - request = predict_pb2.PredictRequest() - request.model_spec.name = self.model_name - request.model_spec.signature_name = self.signature_name - request.inputs[self.model_input].CopyFrom(tf.contrib.util.make_tensor_proto(X.tolist(), shape=X.shape)) - result = self.stub.Predict(request) - log.debug("GRPC Response: ", str(result)) - response = numpy.array(result.outputs[self.model_output].float_val) - if len(response.shape) == 1: - response = numpy.expand_dims(response, axis=0) - return response + def predict(self, X, features_names=[]): + """ + This predict function will only be called when the server is called with a REST request. + The REST request is able to support any configuration required. + """ + if type(X) is dict: + log.debug(f"JSON Request: {X}") + data = X else: - log.debug(self.rest_endpoint) - if type(X) is dict: - log.debug("JSON Request") - data = X - else: - log.debug("Data Request") - data = {"instances":X.tolist()} - if not self.signature_name is None: - data["signature_name"] = self.signature_name - log.debug(str(data)) + log.debug(f"Data Request: {X}") + data = {"instances":X.tolist()} + if not self.signature_name is None: + data["signature_name"] = self.signature_name - response = requests.post(self.rest_endpoint, data=json.dumps(data)) + response = requests.post(self.rest_endpoint, data=json.dumps(data)) - if response.status_code == 200: - log.debug(response.text) - if type(X) is dict: - try: - return response.json() - except ValueError: - return response.text - else: - result = numpy.array(response.json()["predictions"]) - if len(result.shape) == 1: - result = numpy.expand_dims(result, axis=0) - return result - else: - log.warning("Error from server: "+ str(response) + " content: " + str(response.text)) - try: + if response.status_code == 200: + log.debug(response.text) + if type(X) is dict: + try: return response.json() except ValueError: return response.text + else: + result = numpy.array(response.json()["predictions"]) + if len(result.shape) == 1: + result = numpy.expand_dims(result, axis=0) + return result + else: + log.warning("Error from server: "+ str(response) + " content: " + str(response.text)) + try: + return response.json() + except ValueError: + return response.text diff --git a/integrations/tfserving/test_tfserving_proxy b/integrations/tfserving/test_tfserving_proxy new file mode 100644 index 0000000000..105664915d --- /dev/null +++ b/integrations/tfserving/test_tfserving_proxy @@ -0,0 +1,19 @@ +from TfServingProxy import TfServingProxy + +class FakeStub(object): + + def __init__(self, channel): + self.channel = channel + + def Predict(self, **kwargs): + return prediction_pb2.SeldonMessage(strData="predict") + + def TransformInput(selfself, **kwargs): + return prediction_pb2.SeldonMessage(strData="transform-input") + + def TransformOutput(selfself, **kwargs): + return prediction_pb2.SeldonMessage(strData="transform-output") + + def Route(selfself, **kwargs): + return prediction_pb2.SeldonMessage(strData="route") + diff --git a/integrations/tfserving/test_tfserving_proxy.py b/integrations/tfserving/test_tfserving_proxy.py new file mode 100644 index 0000000000..70947f4b2b --- /dev/null +++ b/integrations/tfserving/test_tfserving_proxy.py @@ -0,0 +1,137 @@ +from TfServingProxy import TfServingProxy +from seldon_core.proto import prediction_pb2 +from seldon_core.utils import get_data_from_proto, array_to_grpc_datadef, json_to_seldon_message +from tensorflow_serving.apis import predict_pb2 +import numpy as np +from unittest import mock +import tensorflow as tf +import requests + +ARR_REQUEST_VALUE=np.random.rand(1,1).astype(np.float32) +ARR_RESPONSE_VALUE=np.random.rand(1,1).astype(np.float32) + +class FakeStub(object): + + def __init__(self, channel): + self.channel = channel + + @staticmethod + def Predict(*args, **kwargs): + data = ARR_RESPONSE_VALUE + tensor_proto = tf.contrib.util.make_tensor_proto( + data.tolist(), + shape=data.shape) + tfresponse = predict_pb2.PredictResponse() + tfresponse.model_spec.name = "newmodel" + tfresponse.model_spec.signature_name = "signame" + tfresponse.outputs["scores"].CopyFrom( + tensor_proto) + return tfresponse + +@mock.patch("tensorflow_serving.apis.prediction_service_pb2_grpc.PredictionServiceStub", new=FakeStub) +def test_grpc_predict_function_tensor(): + t = TfServingProxy( + grpc_endpoint="localhost:8080", + model_name="newmodel", + signature_name="signame", + model_input="images", + model_output="scores") + + data = ARR_REQUEST_VALUE + datadef = array_to_grpc_datadef("tensor", data) + request = prediction_pb2.SeldonMessage(data=datadef) + response = t.predict_grpc(request) + resp_data = get_data_from_proto(response) + assert resp_data == ARR_RESPONSE_VALUE + + +@mock.patch("tensorflow_serving.apis.prediction_service_pb2_grpc.PredictionServiceStub", new=FakeStub) +def test_grpc_predict_function_tftensor(): + t = TfServingProxy( + grpc_endpoint="localhost:8080", + model_name="newmodel", + signature_name="signame", + model_input="images", + model_output="scores") + + data = ARR_REQUEST_VALUE + tensor_proto = tf.contrib.util.make_tensor_proto( + data.tolist(), + shape=data.shape) + datadef = prediction_pb2.DefaultData( + tftensor=tensor_proto + ) + request = prediction_pb2.SeldonMessage(data=datadef) + response = t.predict_grpc(request) + resp_data = get_data_from_proto(response) + assert resp_data == ARR_RESPONSE_VALUE + + +@mock.patch("tensorflow_serving.apis.prediction_service_pb2_grpc.PredictionServiceStub", new=FakeStub) +def test_grpc_predict_function_ndarray(): + t = TfServingProxy( + grpc_endpoint="localhost:8080", + model_name="newmodel", + signature_name="signame", + model_input="images", + model_output="scores") + + data = ARR_REQUEST_VALUE + datadef = array_to_grpc_datadef( + "ndarray", data, []) + request = prediction_pb2.SeldonMessage(data=datadef) + response = t.predict_grpc(request) + resp_data = get_data_from_proto(response) + assert resp_data == ARR_RESPONSE_VALUE + + +@mock.patch.object(requests, "post") +def test_rest_predict_function_json(mock_request_post): + + data = {"jsonData": ARR_RESPONSE_VALUE.tolist() } + def res(): + r = requests.Response() + r.status_code = 200 + type(r).text = mock.PropertyMock(return_value="text") # property mock + def json_func(): + return data + r.json = json_func + return r + mock_request_post.return_value = res() + + t = TfServingProxy( + rest_endpoint="http://localhost:8080", + model_name="newmodel", + signature_name="signame", + model_input="images", + model_output="scores") + + request = { "jsonData": ARR_REQUEST_VALUE.tolist() } + response = t.predict(request) + assert response == data + +@mock.patch.object(requests, "post") +def test_rest_predict_function_ndarray(mock_request_post): + + data = {"data": { "ndarray": ARR_RESPONSE_VALUE.tolist(), "names": [] } } + def res(): + r = requests.Response() + r.status_code = 200 + type(r).text = mock.PropertyMock(return_value="text") # property mock + def json_func(): + return data + r.json = json_func + return r + mock_request_post.return_value = res() + + t = TfServingProxy( + rest_endpoint="http://localhost:8080", + model_name="newmodel", + signature_name="signame", + model_input="images", + model_output="scores") + + request = { "data": { "ndarray": ARR_REQUEST_VALUE.tolist(), "names": [] }} + response = t.predict(request) + assert response == data +