From b5e41c6ba8e39f9ece7f862761615c0dba9aa051 Mon Sep 17 00:00:00 2001 From: Clive Cox Date: Thu, 22 Jul 2021 18:08:55 +0100 Subject: [PATCH] Request Logger JsonData Fix --- components/seldon-request-logger/Dockerfile | 6 +- components/seldon-request-logger/Makefile | 27 +++---- .../app/default_logger.py | 31 +++++--- .../seldon-request-logger/app/log_helper.py | 6 +- components/seldon-request-logger/app/tests.py | 78 +++++++++++++++++++ 5 files changed, 115 insertions(+), 33 deletions(-) diff --git a/components/seldon-request-logger/Dockerfile b/components/seldon-request-logger/Dockerfile index 1247bf8d87..9db9af6ce0 100644 --- a/components/seldon-request-logger/Dockerfile +++ b/components/seldon-request-logger/Dockerfile @@ -25,12 +25,14 @@ USER default ENV APP_ENV=production \ PATH="/home/appuser/.local/bin:${PATH}" \ - PORT=8080 + PORT=8080 \ + GUNICORN_WORKERS=3 \ + GUNICORN_THREADS=8 COPY app app WORKDIR app EXPOSE $PORT -CMD gunicorn --bind 0.0.0.0:$PORT --workers 3 --threads 8 default_logger:app +CMD gunicorn --bind 0.0.0.0:$PORT --workers $GUNICORN_WORKERS --threads $GUNICORN_THREADS default_logger:app diff --git a/components/seldon-request-logger/Makefile b/components/seldon-request-logger/Makefile index 7a39b27c16..be14391b8e 100644 --- a/components/seldon-request-logger/Makefile +++ b/components/seldon-request-logger/Makefile @@ -7,8 +7,8 @@ CLIENT_ID=sd-api OIDC_USERNAME=admin@seldon.io OIDC_PASSWORD=xxxxxx OIDC_SCOPES=openid profile email groups -DEPLOY_API_HOST=http://xx.xx.xx.xx/seldon-deploy/api/v1alpha1 -OIDC_PROVIDER=http://xx.xx.xx.xx/auth/realms/deploy-realm +#DEPLOY_API_HOST=http://xx.xx.xx.xx/seldon-deploy/api/v1alpha1 +#OIDC_PROVIDER=http://xx.xx.xx.xx/auth/realms/deploy-realm ELASTICSEARCH_HOST=localhost ELASTICSEARCH_PORT=9200 @@ -24,10 +24,14 @@ create_dummy_metadata: python testing/create_dummy_metadata.py run_container: - docker run -p 2222:8080 seldonio/seldon-request-logger:latest + docker run -p 2222:2222 -e PORT=2222 --network host seldonio/seldon-request-logger:${VERSION} run_elastic: - docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.0 + docker run -p 9200:9200 -p 9300:9300 --network host -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.0 + + +test: + python app/tests.py run_local: export FLASK_RUN_PORT = 2222 run_local: export FLASK_APP = default_logger.py @@ -38,7 +42,7 @@ curl_metadata: curl -X GET -k -v http://localhost:2222/metadata -H "Content-Type: application/json" #see test.sh for more test data -test: +smoke_test: curl -v "http://localhost:2222/" \ -X POST \ -H "X-B3-Flags: 1" \ @@ -48,19 +52,8 @@ test: -H "CE-ID: 45a8b444-3213-4758-be3f-540bf93f85ff" \ -H "CE-Source: dev.knative.example" \ -H 'Content-Type: application/json' \ - -d '{"request": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {}, "metrics": []}, "data": {"names": ["f0", "f1"], "ndarray": [[0.77, 0.63]]}, "date": "2019-06-17T10:59:55.693Z[GMT]"}, "response": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {"classifier": "seldonio/mock_classifier:1.0"}, "metrics": []}, "data": {"names": ["proba"], "ndarray": [[0.09826376903346358]]}, "date": "2019-06-17T10:59:55.696Z[GMT]"}, "sdepName": "seldon-single-model"}' + -d '{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}' -test_single_dim: - curl -v "http://localhost:2222/" \ - -X POST \ - -H "X-B3-Flags: 1" \ - -H 'CE-SpecVersion: 0.2' \ - -H "CE-Type: io.seldon.serving.inference.request" \ - -H "CE-Time: 2018-04-05T03:56:24Z" \ - -H "CE-ID: 45a8b444-3213-4758-be3f-540bf93f85ff" \ - -H "CE-Source: dev.knative.example" \ - -H 'Content-Type: application/json' \ - -d '{"request": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {}, "metrics": []}, "data": {"names": ["f0", "f1"], "ndarray": [0.77, 0.63]}, "date": "2019-06-17T10:59:55.693Z[GMT]"}, "response": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {"classifier": "seldonio/mock_classifier:1.0"}, "metrics": []}, "data": {"names": ["proba"], "ndarray": [0.09826376903346358]}, "date": "2019-06-17T10:59:55.696Z[GMT]"}, "sdepName": "seldon-single-model"}' IMAGE=seldon-request-logger diff --git a/components/seldon-request-logger/app/default_logger.py b/components/seldon-request-logger/app/default_logger.py index d087f2cc0e..fe80e4d529 100644 --- a/components/seldon-request-logger/app/default_logger.py +++ b/components/seldon-request-logger/app/default_logger.py @@ -14,6 +14,8 @@ import log_mapping from collections.abc import Iterable import array +import traceback +from flask import jsonify MAX_PAYLOAD_BYTES = 300000 app = Flask(__name__) @@ -31,7 +33,11 @@ def index(): request_id = log_helper.extract_request_id(request.headers) + if not request_id: + return Response(f"Header {log_helper.REQUEST_ID_HEADER_NAME} not found", 400) type_header = request.headers.get(log_helper.TYPE_HEADER_NAME) + if type_header is None: + return Response(f"Header {log_helper.TYPE_HEADER_NAME} not found", 400) message_type = log_helper.parse_message_type(type_header) index_name = log_helper.build_index_name(request.headers) @@ -68,13 +74,13 @@ def index(): try: # now process and update the doc - process_and_update_elastic_doc( + added_content = process_and_update_elastic_doc( es, message_type, body, request_id, request.headers, index_name ) - return "" + return jsonify(added_content) except Exception as ex: - print(ex) + traceback.print_exc() sys.stdout.flush() return Response("problem logging request", 500) @@ -107,6 +113,8 @@ def process_and_update_elastic_doc( elastic_object, message_type, message_body, request_id, headers, index_name ): + added_content = [] + if message_type == "unknown": print("UNKNOWN REQUEST TYPE FOR " + request_id + " - NOT PROCESSING") sys.stdout.flush() @@ -129,7 +137,7 @@ def process_and_update_elastic_doc( # req or res might be batches of instances so split out into individual docs if "instance" in new_content_part: - if type(new_content_part["instance"]) == type([]): + if type(new_content_part["instance"]) == type([]) and not (new_content_part["dataType"] == "json"): # if we've a list then this is batch # we assume first dimension is always batch @@ -151,9 +159,9 @@ def process_and_update_elastic_doc( item_request_id = build_request_id_batched( request_id, no_items_in_batch, index ) - upsert_doc_to_elastic( + added_content.append(upsert_doc_to_elastic( elastic_object, message_type, item_body, item_request_id, index_name - ) + )) index = index + 1 else: #not batch so don't batch elements either @@ -161,9 +169,9 @@ def process_and_update_elastic_doc( new_content_part["elements"] = new_content_part["elements"][0] item_request_id = build_request_id_batched(request_id, 1, 0) - upsert_doc_to_elastic( + added_content.append(upsert_doc_to_elastic( elastic_object, message_type, doc_body, item_request_id, index_name - ) + )) elif message_type == "feedback": item_request_id = build_request_id_batched(request_id, 1, 0) upsert_doc_to_elastic(elastic_object, message_type, doc_body, item_request_id, index_name) @@ -201,7 +209,7 @@ def process_and_update_elastic_doc( else: print("unexpected data format") print(new_content_part) - return + return added_content def build_request_id_batched(request_id, no_items_in_batch, item_index): @@ -239,7 +247,7 @@ def upsert_doc_to_elastic( + message_type ) sys.stdout.flush() - return str(new_content) + return new_content # take request or response part and process it by deriving metadata @@ -297,6 +305,7 @@ def create_np_from_v2(data: list,ty: str, shape: list) -> np.array: arr.shape = tuple(shape) return arr + def extract_data_part(content, headers, message_type): copy = content.copy() @@ -390,7 +399,7 @@ def extract_data_part(content, headers, message_type): if req_datatype == "binData": copy["dataType"] = "image" - if isinstance(req_features, Iterable): + if isinstance(req_features, Iterable) and not(req_datatype == "jsonData" or req_datatype == "binData" or req_datatype == "strData"): elements = createElelmentsArray(req_features, list(req_datadef.names), namespace, serving_engine, inferenceservice_name, endpoint_name, message_type) copy["elements"] = elements diff --git a/components/seldon-request-logger/app/log_helper.py b/components/seldon-request-logger/app/log_helper.py index 1cb771b4ac..645372590a 100644 --- a/components/seldon-request-logger/app/log_helper.py +++ b/components/seldon-request-logger/app/log_helper.py @@ -40,14 +40,14 @@ def build_index_name(headers): index_name = os.getenv("INDEX_NAME") if index_name: return index_name - + # Adding seldon_environment (dev/test/staging/prod) to index_name if defined as a environment variable seldon_environment = os.getenv("SELDON_ENVIRONMENT") if seldon_environment: index_name = "inference-log-" + seldon_environment + "-" + serving_engine(headers) else: index_name = "inference-log-" + serving_engine(headers) - + # otherwise create an index per deployment # index_name = "inference-log-" + serving_engine(headers) namespace = clean_header(NAMESPACE_HEADER_NAME, headers) @@ -183,7 +183,7 @@ def connect_elasticsearch(): else: print("Could not connect to Elasticsearch") sys.stdout.flush() - sys.exit() + sys.exit(-1) return _es diff --git a/components/seldon-request-logger/app/tests.py b/components/seldon-request-logger/app/tests.py index 595a0c7099..0fdd8906ef 100644 --- a/components/seldon-request-logger/app/tests.py +++ b/components/seldon-request-logger/app/tests.py @@ -2,6 +2,10 @@ import default_logger import numpy as np +import log_helper +import json + +testapp = default_logger.app.test_client() #for more local testing see README class TestRequestLogger(unittest.TestCase): @@ -84,5 +88,79 @@ def test_not_enriched_elements_request(self): self.assertEqual(expected_results, actual_results) + def test_no_ce_requied_headers(self): + response = testapp.post( + '/', + data=dict(data=dict(ndarray=[[1]])), + follow_redirects=True + ) + self.assertTrue(response.status_code == 400) + + + def test_seldon_request(self): + response = testapp.post( + '/', + data='{"data":{"ndarray":[[1]]}}', + follow_redirects=True, + headers=[(log_helper.TYPE_HEADER_NAME,"io.seldon.serving.inference.request"), + (log_helper.REQUEST_ID_HEADER_NAME,"1"), + (log_helper.MODELID_HEADER_NAME,self.test_seldon_request.__name__)] + ) + self.assertTrue(response.status_code == 200) + contents_added = response.json + for item in contents_added: + document = default_logger.es.get(item["_index"],item["_id"]) + data = document["_source"] + self.assertTrue(data["request"]["dataType"] == "number") + + def test_seldon_jsondata_request(self): + response = testapp.post( + '/', + data=json.dumps({"jsonData":[1,2,3,5]}), + follow_redirects=True, + headers=[(log_helper.TYPE_HEADER_NAME,"io.seldon.serving.inference.request"), + (log_helper.REQUEST_ID_HEADER_NAME,"1"), + (log_helper.MODELID_HEADER_NAME,self.test_seldon_jsondata_request.__name__)] + ) + self.assertTrue(response.status_code == 200) + contents_added = response.json + self.assertTrue(len(contents_added) == 1) + + + def test_seldon_requests_ok(self): + payloads = ['{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}', + '{"data":{"names":["a","b"],"ndarray":[[1,2],[3,4]]}}', + '{"data":{"names":["a"],"ndarray":["test1","test2"]}}', + '{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}'] + for idx,payload in enumerate(payloads): + response = testapp.post( + '/', + data=json.dumps({"jsonData": [1, 2, 3, 5]}), + follow_redirects=True, + headers=[(log_helper.TYPE_HEADER_NAME, "io.seldon.serving.inference.request"), + (log_helper.REQUEST_ID_HEADER_NAME, "1"), + (log_helper.MODELID_HEADER_NAME, + self.test_seldon_requests_ok.__name__+str(idx))] + ) + self.assertTrue(response.status_code == 200) + + + def test_seldon_responses_ok(self): + payloads = ['{"data":{"names":["c"],"tensor":{"shape":[2,1],"values":[5,6]}}}', + '{"data":{"names":["c"],"ndarray":[[7],[8]]}}', + '{"data":{"names":["c"],"ndarray":[[7],[8]]}}', + '{"data":{"names":["t0","t1"],"ndarray":[[0.5,0.5]]}}'] + for idx, payload in enumerate(payloads): + response = testapp.post( + '/', + data=json.dumps({"jsonData": [1, 2, 3, 5]}), + follow_redirects=True, + headers=[(log_helper.TYPE_HEADER_NAME, "io.seldon.serving.inference.response"), + (log_helper.REQUEST_ID_HEADER_NAME, "1"), + (log_helper.MODELID_HEADER_NAME, + self.test_seldon_requests_ok.__name__ + str(idx))] + ) + self.assertTrue(response.status_code == 200) + if __name__ == '__main__': unittest.main() \ No newline at end of file