Skip to content

Commit

Permalink
[ML] Change inference cache to store only the inner part of results (#…
Browse files Browse the repository at this point in the history
…2376)

Previously the inference cache stored complete results, including
a request ID and time taken. This was inefficient as it then meant
the original response had to be parsed and modified before sending
back to the Java side.

This PR changes the cache to store just the inner portion of the
inference result. Then the outer layer is added per request after
retrieving from the cache.

Additionally, the result writing functions are moved into a class
of their own, which means they can be unit tested.

Companion to elastic/elasticsearch#88901
  • Loading branch information
droberts195 authored Aug 3, 2022
1 parent 5f57234 commit 4266662
Show file tree
Hide file tree
Showing 11 changed files with 478 additions and 279 deletions.
4 changes: 2 additions & 2 deletions bin/pytorch_inference/CCommandParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class CCommandParser {
class CRequestCacheInterface {
public:
using TComputeResponse = std::function<std::string(SRequest)>;
using TReadResponse = std::function<void(const std::string&)>;
using TReadResponse = std::function<void(const std::string&, bool)>;

public:
virtual ~CRequestCacheInterface() = default;
Expand Down Expand Up @@ -102,7 +102,7 @@ class CCommandParser {
bool lookup(SRequest request,
const TComputeResponse& computeResponse,
const TReadResponse& readResponse) override {
readResponse(computeResponse(std::move(request)));
readResponse(computeResponse(std::move(request)), false);
return false;
}

Expand Down
1 change: 1 addition & 0 deletions bin/pytorch_inference/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ ml_add_executable(pytorch_inference
CBufferedIStreamAdapter.cc
CCmdLineParser.cc
CCommandParser.cc
CResultWriter.cc
CThreadSettings.cc
)
138 changes: 138 additions & 0 deletions bin/pytorch_inference/CResultWriter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/

#include "CResultWriter.h"

#include <core/CRapidJsonConcurrentLineWriter.h>

#include "CCommandParser.h"
#include "CThreadSettings.h"

namespace ml {
namespace torch {

const std::string CResultWriter::RESULT{"result"};
const std::string CResultWriter::INFERENCE{"inference"};
const std::string CResultWriter::ERROR{"error"};
const std::string CResultWriter::TIME_MS{"time_ms"};
const std::string CResultWriter::CACHE_HIT{"cache_hit"};
const std::string CResultWriter::THREAD_SETTINGS{"thread_settings"};
const std::string CResultWriter::ACK{"ack"};
const std::string CResultWriter::ACKNOWLEDGED{"acknowledged"};
const std::string CResultWriter::NUM_ALLOCATIONS{"num_allocations"};
const std::string CResultWriter::NUM_THREADS_PER_ALLOCATION{"num_threads_per_allocation"};

CResultWriter::CResultWriter(std::ostream& strmOut)
: m_WrappedOutputStream{strmOut} {
}

void CResultWriter::writeInnerError(const std::string& message,
TRapidJsonLineWriter& jsonWriter) {
jsonWriter.Key(ERROR);
jsonWriter.StartObject();
jsonWriter.Key(ERROR);
jsonWriter.String(message);
jsonWriter.EndObject();
}

void CResultWriter::writeError(const std::string& requestId, const std::string& message) {
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
jsonWriter.StartObject();
jsonWriter.Key(CCommandParser::REQUEST_ID);
jsonWriter.String(requestId);
writeInnerError(message, jsonWriter);
jsonWriter.EndObject();
}

void CResultWriter::wrapAndWriteInnerResponse(const std::string& innerResponse,
const std::string& requestId,
bool isCacheHit,
std::uint64_t timeMs) {
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
jsonWriter.StartObject();
jsonWriter.Key(CCommandParser::REQUEST_ID);
jsonWriter.String(requestId);
jsonWriter.Key(CACHE_HIT);
jsonWriter.Bool(isCacheHit);
jsonWriter.Key(TIME_MS);
jsonWriter.Uint64(timeMs);
jsonWriter.RawValue(innerResponse.c_str(), innerResponse.length(), rapidjson::kObjectType);
jsonWriter.EndObject();
}

void CResultWriter::writeThreadSettings(const std::string& requestId,
const CThreadSettings& threadSettings) {
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
jsonWriter.StartObject();
jsonWriter.Key(CCommandParser::REQUEST_ID);
jsonWriter.String(requestId);
jsonWriter.Key(THREAD_SETTINGS);
jsonWriter.StartObject();
jsonWriter.Key(NUM_THREADS_PER_ALLOCATION);
jsonWriter.Uint(threadSettings.numThreadsPerAllocation());
jsonWriter.Key(NUM_ALLOCATIONS);
jsonWriter.Uint(threadSettings.numAllocations());
jsonWriter.EndObject();
jsonWriter.EndObject();
}

void CResultWriter::writeSimpleAck(const std::string& requestId) {
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
jsonWriter.StartObject();
jsonWriter.Key(ml::torch::CCommandParser::REQUEST_ID);
jsonWriter.String(requestId);
jsonWriter.Key(ACK);
jsonWriter.StartObject();
jsonWriter.Key(ACKNOWLEDGED);
jsonWriter.Bool(true);
jsonWriter.EndObject();
jsonWriter.EndObject();
}

std::string CResultWriter::createInnerResult(const ::torch::Tensor& results) {
rapidjson::StringBuffer stringBuffer;
{
TRapidJsonLineWriter jsonWriter{stringBuffer};
// Even though we don't really want the outer braces on the
// inner result we have to write them or else the JSON
// writer will not put commas in the correct places.
jsonWriter.StartObject();
try {
auto sizes = results.sizes();

switch (sizes.size()) {
case 3:
this->writePrediction<3>(results, jsonWriter);
break;
case 2:
this->writePrediction<2>(results, jsonWriter);
break;
default: {
std::ostringstream ss;
ss << "Cannot convert results tensor of size [" << sizes << ']';
writeInnerError(ss.str(), jsonWriter);
break;
}
}
} catch (const c10::Error& e) {
writeInnerError(e.what(), jsonWriter);
} catch (const std::runtime_error& e) {
writeInnerError(e.what(), jsonWriter);
}
jsonWriter.EndObject();
}
// Return the object without the opening and closing braces and
// the trailing newline. The resulting partial document will
// later be wrapped, so does not need these.
return std::string{stringBuffer.GetString() + 1, stringBuffer.GetLength() - 3};
}
}
}
177 changes: 177 additions & 0 deletions bin/pytorch_inference/CResultWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/

#ifndef INCLUDED_ml_torch_CResultWriter_h
#define INCLUDED_ml_torch_CResultWriter_h

#include <core/CJsonOutputStreamWrapper.h>
#include <core/CRapidJsonLineWriter.h>

#include <rapidjson/stringbuffer.h>
#include <torch/csrc/api/include/torch/types.h>

#include <cstdint>
#include <iosfwd>
#include <sstream>
#include <string>

namespace ml {
namespace torch {
class CThreadSettings;

//! \brief
//! Formats and writes results for PyTorch inference.
//!
//! DESCRIPTION:\n
//! There are four types of result:
//!
//! 1. Inference results
//! 2. Thread settings
//! 3. Acknowledgements
//! 4. Errors
//!
//! IMPLEMENTATION DECISIONS:\n
//! We can cache inference results and errors, but when we reply with a
//! cached value we still need to change the request ID, time taken, and
//! cache hit indicator. Therefore this class contains functionality for
//! building the invariant portion of results to be cached and later
//! spliced into a complete response.
//!
class CResultWriter {
public:
using TRapidJsonLineWriter = core::CRapidJsonLineWriter<rapidjson::StringBuffer>;

public:
explicit CResultWriter(std::ostream& strmOut);

//! No copying
CResultWriter(const CResultWriter&) = delete;
CResultWriter& operator=(const CResultWriter&) = delete;

//! Write an error directly to the output stream.
void writeError(const std::string& requestId, const std::string& message);

//! Write thread settings to the output stream.
void writeThreadSettings(const std::string& requestId, const CThreadSettings& threadSettings);

//! Write a simple acknowledgement to the output stream.
void writeSimpleAck(const std::string& requestId);

//! Wrap the invariant portion of a cached result with request ID,
//! cache hit indicator and time taken. Then write the full document
//! to the output stream.
void wrapAndWriteInnerResponse(const std::string& innerResponse,
const std::string& requestId,
bool isCacheHit,
std::uint64_t timeMs);

//! Write the prediction portion of an inference result.
template<std::size_t N>
void writePrediction(const ::torch::Tensor& prediction, TRapidJsonLineWriter& jsonWriter) {

// Creating the accessor will throw if the tensor does not have exactly
// N dimensions. Do this before writing any output so the error message
// isn't mingled with a partial result.

if (prediction.dtype() == ::torch::kFloat32) {
auto accessor = prediction.accessor<float, N>();
this->writeInferenceResults(accessor, jsonWriter);

} else if (prediction.dtype() == ::torch::kFloat64) {
auto accessor = prediction.accessor<double, N>();
this->writeInferenceResults(accessor, jsonWriter);

} else {
std::ostringstream ss;
ss << "Cannot process result tensor of type [" << prediction.dtype() << ']';
writeInnerError(ss.str(), jsonWriter);
}
}

//! Create the invariant portion of an inference result, suitable for
//! caching and later splicing into a full result.
std::string createInnerResult(const ::torch::Tensor& results);

private:
//! Field names.
static const std::string RESULT;
static const std::string INFERENCE;
static const std::string ERROR;
static const std::string TIME_MS;
static const std::string CACHE_HIT;
static const std::string THREAD_SETTINGS;
static const std::string ACK;
static const std::string ACKNOWLEDGED;
static const std::string NUM_ALLOCATIONS;
static const std::string NUM_THREADS_PER_ALLOCATION;

private:
//! Create the invariant portion of an error result, suitable for
//! caching and later splicing into a full result.
static void writeInnerError(const std::string& message, TRapidJsonLineWriter& jsonWriter);

//! Write a one dimensional tensor.
template<typename T>
void writeTensor(const ::torch::TensorAccessor<T, 1UL>& accessor,
TRapidJsonLineWriter& jsonWriter) {
jsonWriter.StartArray();
for (int i = 0; i < accessor.size(0); ++i) {
jsonWriter.Double(static_cast<double>(accessor[i]));
}
jsonWriter.EndArray();
}

//! Write an N dimensional tensor for N > 1.
template<typename T, std::size_t N_DIMS>
void writeTensor(const ::torch::TensorAccessor<T, N_DIMS>& accessor,
TRapidJsonLineWriter& jsonWriter) {
jsonWriter.StartArray();
for (int i = 0; i < accessor.size(0); ++i) {
this->writeTensor(accessor[i], jsonWriter);
}
jsonWriter.EndArray();
}

//! Write a 3D inference result
template<typename T>
void writeInferenceResults(const ::torch::TensorAccessor<T, 3UL>& accessor,
TRapidJsonLineWriter& jsonWriter) {

jsonWriter.Key(RESULT);
jsonWriter.StartObject();
jsonWriter.Key(INFERENCE);
this->writeTensor(accessor, jsonWriter);
jsonWriter.EndObject();
}

//! Write a 2D inference result
template<typename T>
void writeInferenceResults(const ::torch::TensorAccessor<T, 2UL>& accessor,
TRapidJsonLineWriter& jsonWriter) {

jsonWriter.Key(RESULT);
jsonWriter.StartObject();
jsonWriter.Key(INFERENCE);
// The Java side requires a 3D array, so wrap the 2D result in an
// extra outer array.
jsonWriter.StartArray();
this->writeTensor(accessor, jsonWriter);
jsonWriter.EndArray();
jsonWriter.EndObject();
}

private:
core::CJsonOutputStreamWrapper m_WrappedOutputStream;
};
}
}

#endif // INCLUDED_ml_torch_CResultWriter_h
Loading

0 comments on commit 4266662

Please sign in to comment.