Skip to content

Commit

Permalink
Request Logger JsonData Fix (#3423)
Browse files Browse the repository at this point in the history
  • Loading branch information
ukclivecox authored Jul 22, 2021
1 parent 2830970 commit a7a29d4
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 33 deletions.
6 changes: 4 additions & 2 deletions components/seldon-request-logger/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ USER default

ENV APP_ENV=production \
PATH="/home/appuser/.local/bin:${PATH}" \
PORT=8080
PORT=8080 \
GUNICORN_WORKERS=3 \
GUNICORN_THREADS=8

COPY app app
WORKDIR app

EXPOSE $PORT

CMD gunicorn --bind 0.0.0.0:$PORT --workers 3 --threads 8 default_logger:app
CMD gunicorn --bind 0.0.0.0:$PORT --workers $GUNICORN_WORKERS --threads $GUNICORN_THREADS default_logger:app

27 changes: 10 additions & 17 deletions components/seldon-request-logger/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ CLIENT_ID=sd-api
OIDC_USERNAME[email protected]
OIDC_PASSWORD=xxxxxx
OIDC_SCOPES=openid profile email groups
DEPLOY_API_HOST=http://xx.xx.xx.xx/seldon-deploy/api/v1alpha1
OIDC_PROVIDER=http://xx.xx.xx.xx/auth/realms/deploy-realm
#DEPLOY_API_HOST=http://xx.xx.xx.xx/seldon-deploy/api/v1alpha1
#OIDC_PROVIDER=http://xx.xx.xx.xx/auth/realms/deploy-realm
ELASTICSEARCH_HOST=localhost
ELASTICSEARCH_PORT=9200

Expand All @@ -24,10 +24,14 @@ create_dummy_metadata:
python testing/create_dummy_metadata.py

run_container:
docker run -p 2222:8080 seldonio/seldon-request-logger:latest
docker run -p 2222:2222 -e PORT=2222 --network host seldonio/seldon-request-logger:${VERSION}

run_elastic:
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.0
docker run -p 9200:9200 -p 9300:9300 --network host -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.0


test:
python app/tests.py

run_local: export FLASK_RUN_PORT = 2222
run_local: export FLASK_APP = default_logger.py
Expand All @@ -38,7 +42,7 @@ curl_metadata:
curl -X GET -k -v http://localhost:2222/metadata -H "Content-Type: application/json"

#see test.sh for more test data
test:
smoke_test:
curl -v "http://localhost:2222/" \
-X POST \
-H "X-B3-Flags: 1" \
Expand All @@ -48,19 +52,8 @@ test:
-H "CE-ID: 45a8b444-3213-4758-be3f-540bf93f85ff" \
-H "CE-Source: dev.knative.example" \
-H 'Content-Type: application/json' \
-d '{"request": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {}, "metrics": []}, "data": {"names": ["f0", "f1"], "ndarray": [[0.77, 0.63]]}, "date": "2019-06-17T10:59:55.693Z[GMT]"}, "response": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {"classifier": "seldonio/mock_classifier:1.0"}, "metrics": []}, "data": {"names": ["proba"], "ndarray": [[0.09826376903346358]]}, "date": "2019-06-17T10:59:55.696Z[GMT]"}, "sdepName": "seldon-single-model"}'
-d '{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}'

test_single_dim:
curl -v "http://localhost:2222/" \
-X POST \
-H "X-B3-Flags: 1" \
-H 'CE-SpecVersion: 0.2' \
-H "CE-Type: io.seldon.serving.inference.request" \
-H "CE-Time: 2018-04-05T03:56:24Z" \
-H "CE-ID: 45a8b444-3213-4758-be3f-540bf93f85ff" \
-H "CE-Source: dev.knative.example" \
-H 'Content-Type: application/json' \
-d '{"request": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {}, "metrics": []}, "data": {"names": ["f0", "f1"], "ndarray": [0.77, 0.63]}, "date": "2019-06-17T10:59:55.693Z[GMT]"}, "response": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {"classifier": "seldonio/mock_classifier:1.0"}, "metrics": []}, "data": {"names": ["proba"], "ndarray": [0.09826376903346358]}, "date": "2019-06-17T10:59:55.696Z[GMT]"}, "sdepName": "seldon-single-model"}'


IMAGE=seldon-request-logger
Expand Down
31 changes: 20 additions & 11 deletions components/seldon-request-logger/app/default_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import log_mapping
from collections.abc import Iterable
import array
import traceback
from flask import jsonify

MAX_PAYLOAD_BYTES = 300000
app = Flask(__name__)
Expand All @@ -31,7 +33,11 @@
def index():

request_id = log_helper.extract_request_id(request.headers)
if not request_id:
return Response(f"Header {log_helper.REQUEST_ID_HEADER_NAME} not found", 400)
type_header = request.headers.get(log_helper.TYPE_HEADER_NAME)
if type_header is None:
return Response(f"Header {log_helper.TYPE_HEADER_NAME} not found", 400)
message_type = log_helper.parse_message_type(type_header)
index_name = log_helper.build_index_name(request.headers)

Expand Down Expand Up @@ -68,13 +74,13 @@ def index():
try:

# now process and update the doc
process_and_update_elastic_doc(
added_content = process_and_update_elastic_doc(
es, message_type, body, request_id, request.headers, index_name
)

return ""
return jsonify(added_content)
except Exception as ex:
print(ex)
traceback.print_exc()
sys.stdout.flush()
return Response("problem logging request", 500)

Expand Down Expand Up @@ -107,6 +113,8 @@ def process_and_update_elastic_doc(
elastic_object, message_type, message_body, request_id, headers, index_name
):

added_content = []

if message_type == "unknown":
print("UNKNOWN REQUEST TYPE FOR " + request_id + " - NOT PROCESSING")
sys.stdout.flush()
Expand All @@ -129,7 +137,7 @@ def process_and_update_elastic_doc(
# req or res might be batches of instances so split out into individual docs
if "instance" in new_content_part:

if type(new_content_part["instance"]) == type([]):
if type(new_content_part["instance"]) == type([]) and not (new_content_part["dataType"] == "json"):
# if we've a list then this is batch
# we assume first dimension is always batch

Expand All @@ -151,19 +159,19 @@ def process_and_update_elastic_doc(
item_request_id = build_request_id_batched(
request_id, no_items_in_batch, index
)
upsert_doc_to_elastic(
added_content.append(upsert_doc_to_elastic(
elastic_object, message_type, item_body, item_request_id, index_name
)
))
index = index + 1
else:
#not batch so don't batch elements either
if "elements" in new_content_part and type(new_content_part["elements"]) == type([]):
new_content_part["elements"] = new_content_part["elements"][0]

item_request_id = build_request_id_batched(request_id, 1, 0)
upsert_doc_to_elastic(
added_content.append(upsert_doc_to_elastic(
elastic_object, message_type, doc_body, item_request_id, index_name
)
))
elif message_type == "feedback":
item_request_id = build_request_id_batched(request_id, 1, 0)
upsert_doc_to_elastic(elastic_object, message_type, doc_body, item_request_id, index_name)
Expand Down Expand Up @@ -201,7 +209,7 @@ def process_and_update_elastic_doc(
else:
print("unexpected data format")
print(new_content_part)
return
return added_content


def build_request_id_batched(request_id, no_items_in_batch, item_index):
Expand Down Expand Up @@ -239,7 +247,7 @@ def upsert_doc_to_elastic(
+ message_type
)
sys.stdout.flush()
return str(new_content)
return new_content


# take request or response part and process it by deriving metadata
Expand Down Expand Up @@ -297,6 +305,7 @@ def create_np_from_v2(data: list,ty: str, shape: list) -> np.array:
arr.shape = tuple(shape)
return arr


def extract_data_part(content, headers, message_type):
copy = content.copy()

Expand Down Expand Up @@ -390,7 +399,7 @@ def extract_data_part(content, headers, message_type):
if req_datatype == "binData":
copy["dataType"] = "image"

if isinstance(req_features, Iterable):
if isinstance(req_features, Iterable) and not(req_datatype == "jsonData" or req_datatype == "binData" or req_datatype == "strData"):

elements = createElelmentsArray(req_features, list(req_datadef.names), namespace, serving_engine, inferenceservice_name, endpoint_name, message_type)
copy["elements"] = elements
Expand Down
6 changes: 3 additions & 3 deletions components/seldon-request-logger/app/log_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ def build_index_name(headers):
index_name = os.getenv("INDEX_NAME")
if index_name:
return index_name

# Adding seldon_environment (dev/test/staging/prod) to index_name if defined as a environment variable
seldon_environment = os.getenv("SELDON_ENVIRONMENT")
if seldon_environment:
index_name = "inference-log-" + seldon_environment + "-" + serving_engine(headers)
else:
index_name = "inference-log-" + serving_engine(headers)

# otherwise create an index per deployment
# index_name = "inference-log-" + serving_engine(headers)
namespace = clean_header(NAMESPACE_HEADER_NAME, headers)
Expand Down Expand Up @@ -183,7 +183,7 @@ def connect_elasticsearch():
else:
print("Could not connect to Elasticsearch")
sys.stdout.flush()
sys.exit()
sys.exit(-1)
return _es


Expand Down
78 changes: 78 additions & 0 deletions components/seldon-request-logger/app/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import default_logger
import numpy as np
import log_helper
import json

testapp = default_logger.app.test_client()

#for more local testing see README
class TestRequestLogger(unittest.TestCase):
Expand Down Expand Up @@ -84,5 +88,79 @@ def test_not_enriched_elements_request(self):
self.assertEqual(expected_results, actual_results)


def test_no_ce_requied_headers(self):
response = testapp.post(
'/',
data=dict(data=dict(ndarray=[[1]])),
follow_redirects=True
)
self.assertTrue(response.status_code == 400)


def test_seldon_request(self):
response = testapp.post(
'/',
data='{"data":{"ndarray":[[1]]}}',
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME,"io.seldon.serving.inference.request"),
(log_helper.REQUEST_ID_HEADER_NAME,"1"),
(log_helper.MODELID_HEADER_NAME,self.test_seldon_request.__name__)]
)
self.assertTrue(response.status_code == 200)
contents_added = response.json
for item in contents_added:
document = default_logger.es.get(item["_index"],item["_id"])
data = document["_source"]
self.assertTrue(data["request"]["dataType"] == "number")

def test_seldon_jsondata_request(self):
response = testapp.post(
'/',
data=json.dumps({"jsonData":[1,2,3,5]}),
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME,"io.seldon.serving.inference.request"),
(log_helper.REQUEST_ID_HEADER_NAME,"1"),
(log_helper.MODELID_HEADER_NAME,self.test_seldon_jsondata_request.__name__)]
)
self.assertTrue(response.status_code == 200)
contents_added = response.json
self.assertTrue(len(contents_added) == 1)


def test_seldon_requests_ok(self):
payloads = ['{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}',
'{"data":{"names":["a","b"],"ndarray":[[1,2],[3,4]]}}',
'{"data":{"names":["a"],"ndarray":["test1","test2"]}}',
'{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}']
for idx,payload in enumerate(payloads):
response = testapp.post(
'/',
data=json.dumps({"jsonData": [1, 2, 3, 5]}),
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME, "io.seldon.serving.inference.request"),
(log_helper.REQUEST_ID_HEADER_NAME, "1"),
(log_helper.MODELID_HEADER_NAME,
self.test_seldon_requests_ok.__name__+str(idx))]
)
self.assertTrue(response.status_code == 200)


def test_seldon_responses_ok(self):
payloads = ['{"data":{"names":["c"],"tensor":{"shape":[2,1],"values":[5,6]}}}',
'{"data":{"names":["c"],"ndarray":[[7],[8]]}}',
'{"data":{"names":["c"],"ndarray":[[7],[8]]}}',
'{"data":{"names":["t0","t1"],"ndarray":[[0.5,0.5]]}}']
for idx, payload in enumerate(payloads):
response = testapp.post(
'/',
data=json.dumps({"jsonData": [1, 2, 3, 5]}),
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME, "io.seldon.serving.inference.response"),
(log_helper.REQUEST_ID_HEADER_NAME, "1"),
(log_helper.MODELID_HEADER_NAME,
self.test_seldon_requests_ok.__name__ + str(idx))]
)
self.assertTrue(response.status_code == 200)

if __name__ == '__main__':
unittest.main()

0 comments on commit a7a29d4

Please sign in to comment.