Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request Logger JsonData Fix #3423

Merged
merged 1 commit into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions components/seldon-request-logger/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ USER default

ENV APP_ENV=production \
PATH="/home/appuser/.local/bin:${PATH}" \
PORT=8080
PORT=8080 \
GUNICORN_WORKERS=3 \
GUNICORN_THREADS=8

COPY app app
WORKDIR app

EXPOSE $PORT

CMD gunicorn --bind 0.0.0.0:$PORT --workers 3 --threads 8 default_logger:app
CMD gunicorn --bind 0.0.0.0:$PORT --workers $GUNICORN_WORKERS --threads $GUNICORN_THREADS default_logger:app

27 changes: 10 additions & 17 deletions components/seldon-request-logger/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ CLIENT_ID=sd-api
[email protected]
OIDC_PASSWORD=xxxxxx
OIDC_SCOPES=openid profile email groups
DEPLOY_API_HOST=http://xx.xx.xx.xx/seldon-deploy/api/v1alpha1
OIDC_PROVIDER=http://xx.xx.xx.xx/auth/realms/deploy-realm
#DEPLOY_API_HOST=http://xx.xx.xx.xx/seldon-deploy/api/v1alpha1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems comment left out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented out so as not to assume deploy exists when running locally but should really improve Makefile for the 2 options of running with and without deploy

#OIDC_PROVIDER=http://xx.xx.xx.xx/auth/realms/deploy-realm
ELASTICSEARCH_HOST=localhost
ELASTICSEARCH_PORT=9200

Expand All @@ -24,10 +24,14 @@ create_dummy_metadata:
python testing/create_dummy_metadata.py

run_container:
docker run -p 2222:8080 seldonio/seldon-request-logger:latest
docker run -p 2222:2222 -e PORT=2222 --network host seldonio/seldon-request-logger:${VERSION}

run_elastic:
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.0
docker run -p 9200:9200 -p 9300:9300 --network host -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch-oss:7.6.0


test:
python app/tests.py

run_local: export FLASK_RUN_PORT = 2222
run_local: export FLASK_APP = default_logger.py
Expand All @@ -38,7 +42,7 @@ curl_metadata:
curl -X GET -k -v http://localhost:2222/metadata -H "Content-Type: application/json"

#see test.sh for more test data
test:
smoke_test:
curl -v "http://localhost:2222/" \
-X POST \
-H "X-B3-Flags: 1" \
Expand All @@ -48,19 +52,8 @@ test:
-H "CE-ID: 45a8b444-3213-4758-be3f-540bf93f85ff" \
-H "CE-Source: dev.knative.example" \
-H 'Content-Type: application/json' \
-d '{"request": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {}, "metrics": []}, "data": {"names": ["f0", "f1"], "ndarray": [[0.77, 0.63]]}, "date": "2019-06-17T10:59:55.693Z[GMT]"}, "response": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {"classifier": "seldonio/mock_classifier:1.0"}, "metrics": []}, "data": {"names": ["proba"], "ndarray": [[0.09826376903346358]]}, "date": "2019-06-17T10:59:55.696Z[GMT]"}, "sdepName": "seldon-single-model"}'
-d '{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}'

test_single_dim:
curl -v "http://localhost:2222/" \
-X POST \
-H "X-B3-Flags: 1" \
-H 'CE-SpecVersion: 0.2' \
-H "CE-Type: io.seldon.serving.inference.request" \
-H "CE-Time: 2018-04-05T03:56:24Z" \
-H "CE-ID: 45a8b444-3213-4758-be3f-540bf93f85ff" \
-H "CE-Source: dev.knative.example" \
-H 'Content-Type: application/json' \
-d '{"request": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {}, "metrics": []}, "data": {"names": ["f0", "f1"], "ndarray": [0.77, 0.63]}, "date": "2019-06-17T10:59:55.693Z[GMT]"}, "response": {"meta": {"puid": "71dlk7k1rhmci0cd8g5rmeolmn", "tags": {}, "routing": {}, "requestPath": {"classifier": "seldonio/mock_classifier:1.0"}, "metrics": []}, "data": {"names": ["proba"], "ndarray": [0.09826376903346358]}, "date": "2019-06-17T10:59:55.696Z[GMT]"}, "sdepName": "seldon-single-model"}'


IMAGE=seldon-request-logger
Expand Down
31 changes: 20 additions & 11 deletions components/seldon-request-logger/app/default_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import log_mapping
from collections.abc import Iterable
import array
import traceback
from flask import jsonify

MAX_PAYLOAD_BYTES = 300000
app = Flask(__name__)
Expand All @@ -31,7 +33,11 @@
def index():

request_id = log_helper.extract_request_id(request.headers)
if not request_id:
return Response(f"Header {log_helper.REQUEST_ID_HEADER_NAME} not found", 400)
type_header = request.headers.get(log_helper.TYPE_HEADER_NAME)
if type_header is None:
return Response(f"Header {log_helper.TYPE_HEADER_NAME} not found", 400)
message_type = log_helper.parse_message_type(type_header)
index_name = log_helper.build_index_name(request.headers)

Expand Down Expand Up @@ -68,13 +74,13 @@ def index():
try:

# now process and update the doc
process_and_update_elastic_doc(
added_content = process_and_update_elastic_doc(
es, message_type, body, request_id, request.headers, index_name
)

return ""
return jsonify(added_content)
except Exception as ex:
print(ex)
traceback.print_exc()
sys.stdout.flush()
return Response("problem logging request", 500)

Expand Down Expand Up @@ -107,6 +113,8 @@ def process_and_update_elastic_doc(
elastic_object, message_type, message_body, request_id, headers, index_name
):

added_content = []

if message_type == "unknown":
print("UNKNOWN REQUEST TYPE FOR " + request_id + " - NOT PROCESSING")
sys.stdout.flush()
Expand All @@ -129,7 +137,7 @@ def process_and_update_elastic_doc(
# req or res might be batches of instances so split out into individual docs
if "instance" in new_content_part:

if type(new_content_part["instance"]) == type([]):
if type(new_content_part["instance"]) == type([]) and not (new_content_part["dataType"] == "json"):
# if we've a list then this is batch
# we assume first dimension is always batch

Expand All @@ -151,19 +159,19 @@ def process_and_update_elastic_doc(
item_request_id = build_request_id_batched(
request_id, no_items_in_batch, index
)
upsert_doc_to_elastic(
added_content.append(upsert_doc_to_elastic(
elastic_object, message_type, item_body, item_request_id, index_name
)
))
index = index + 1
else:
#not batch so don't batch elements either
if "elements" in new_content_part and type(new_content_part["elements"]) == type([]):
new_content_part["elements"] = new_content_part["elements"][0]

item_request_id = build_request_id_batched(request_id, 1, 0)
upsert_doc_to_elastic(
added_content.append(upsert_doc_to_elastic(
elastic_object, message_type, doc_body, item_request_id, index_name
)
))
elif message_type == "feedback":
item_request_id = build_request_id_batched(request_id, 1, 0)
upsert_doc_to_elastic(elastic_object, message_type, doc_body, item_request_id, index_name)
Expand Down Expand Up @@ -201,7 +209,7 @@ def process_and_update_elastic_doc(
else:
print("unexpected data format")
print(new_content_part)
return
return added_content


def build_request_id_batched(request_id, no_items_in_batch, item_index):
Expand Down Expand Up @@ -239,7 +247,7 @@ def upsert_doc_to_elastic(
+ message_type
)
sys.stdout.flush()
return str(new_content)
return new_content


# take request or response part and process it by deriving metadata
Expand Down Expand Up @@ -297,6 +305,7 @@ def create_np_from_v2(data: list,ty: str, shape: list) -> np.array:
arr.shape = tuple(shape)
return arr


def extract_data_part(content, headers, message_type):
copy = content.copy()

Expand Down Expand Up @@ -390,7 +399,7 @@ def extract_data_part(content, headers, message_type):
if req_datatype == "binData":
copy["dataType"] = "image"

if isinstance(req_features, Iterable):
if isinstance(req_features, Iterable) and not(req_datatype == "jsonData" or req_datatype == "binData" or req_datatype == "strData"):

elements = createElelmentsArray(req_features, list(req_datadef.names), namespace, serving_engine, inferenceservice_name, endpoint_name, message_type)
copy["elements"] = elements
Expand Down
6 changes: 3 additions & 3 deletions components/seldon-request-logger/app/log_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ def build_index_name(headers):
index_name = os.getenv("INDEX_NAME")
if index_name:
return index_name

# Adding seldon_environment (dev/test/staging/prod) to index_name if defined as a environment variable
seldon_environment = os.getenv("SELDON_ENVIRONMENT")
if seldon_environment:
index_name = "inference-log-" + seldon_environment + "-" + serving_engine(headers)
else:
index_name = "inference-log-" + serving_engine(headers)

# otherwise create an index per deployment
# index_name = "inference-log-" + serving_engine(headers)
namespace = clean_header(NAMESPACE_HEADER_NAME, headers)
Expand Down Expand Up @@ -183,7 +183,7 @@ def connect_elasticsearch():
else:
print("Could not connect to Elasticsearch")
sys.stdout.flush()
sys.exit()
sys.exit(-1)
return _es


Expand Down
78 changes: 78 additions & 0 deletions components/seldon-request-logger/app/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import default_logger
import numpy as np
import log_helper
import json

testapp = default_logger.app.test_client()

#for more local testing see README
class TestRequestLogger(unittest.TestCase):
Expand Down Expand Up @@ -84,5 +88,79 @@ def test_not_enriched_elements_request(self):
self.assertEqual(expected_results, actual_results)


def test_no_ce_requied_headers(self):
response = testapp.post(
'/',
data=dict(data=dict(ndarray=[[1]])),
follow_redirects=True
)
self.assertTrue(response.status_code == 400)


def test_seldon_request(self):
response = testapp.post(
'/',
data='{"data":{"ndarray":[[1]]}}',
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME,"io.seldon.serving.inference.request"),
(log_helper.REQUEST_ID_HEADER_NAME,"1"),
(log_helper.MODELID_HEADER_NAME,self.test_seldon_request.__name__)]
)
self.assertTrue(response.status_code == 200)
contents_added = response.json
for item in contents_added:
document = default_logger.es.get(item["_index"],item["_id"])
data = document["_source"]
self.assertTrue(data["request"]["dataType"] == "number")

def test_seldon_jsondata_request(self):
response = testapp.post(
'/',
data=json.dumps({"jsonData":[1,2,3,5]}),
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME,"io.seldon.serving.inference.request"),
(log_helper.REQUEST_ID_HEADER_NAME,"1"),
(log_helper.MODELID_HEADER_NAME,self.test_seldon_jsondata_request.__name__)]
)
self.assertTrue(response.status_code == 200)
contents_added = response.json
self.assertTrue(len(contents_added) == 1)


def test_seldon_requests_ok(self):
payloads = ['{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}',
'{"data":{"names":["a","b"],"ndarray":[[1,2],[3,4]]}}',
'{"data":{"names":["a"],"ndarray":["test1","test2"]}}',
'{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[1,2,3,4]}}}']
for idx,payload in enumerate(payloads):
response = testapp.post(
'/',
data=json.dumps({"jsonData": [1, 2, 3, 5]}),
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME, "io.seldon.serving.inference.request"),
(log_helper.REQUEST_ID_HEADER_NAME, "1"),
(log_helper.MODELID_HEADER_NAME,
self.test_seldon_requests_ok.__name__+str(idx))]
)
self.assertTrue(response.status_code == 200)


def test_seldon_responses_ok(self):
payloads = ['{"data":{"names":["c"],"tensor":{"shape":[2,1],"values":[5,6]}}}',
'{"data":{"names":["c"],"ndarray":[[7],[8]]}}',
'{"data":{"names":["c"],"ndarray":[[7],[8]]}}',
'{"data":{"names":["t0","t1"],"ndarray":[[0.5,0.5]]}}']
for idx, payload in enumerate(payloads):
response = testapp.post(
'/',
data=json.dumps({"jsonData": [1, 2, 3, 5]}),
follow_redirects=True,
headers=[(log_helper.TYPE_HEADER_NAME, "io.seldon.serving.inference.response"),
(log_helper.REQUEST_ID_HEADER_NAME, "1"),
(log_helper.MODELID_HEADER_NAME,
self.test_seldon_requests_ok.__name__ + str(idx))]
)
self.assertTrue(response.status_code == 200)

if __name__ == '__main__':
unittest.main()