Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Log stats for partition id per operation #147

Merged
merged 10 commits into from
Oct 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/configuration/http_filters/dynamodb_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ in all operations from the batch.
upstream_rq_total_xxx, Counter, Total number of requests on <table_name> table per response code (503/2xx/etc)
upstream_rq_time_xxx, Timer, Time spent on <table_name> table per response code (400/3xx/etc)

*Disclaimer: Please note that this is a pre-release Amazon DynamoDB feature that is not yet widely available.*
Per partition and operation stats can be found in the *http.<stat_prefix>.dynamodb.table.<table_name>.*
namespace. For batch operations, Envoy tracks per partition and operation stats only if it is the same
table used in all operations.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

capacity.<operation_name>.__partition_id=<last_seven_characters_from_partition_id>, Counter, Total number of capacity for <operation_name> on <table_name> table for a given <partition_id>

Additional detailed stats:

* For 4xx responses and partial batch operation failures, the total number of failures for a given
Expand Down
2 changes: 1 addition & 1 deletion docs/intro/arch_overview/dynamo.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ DynamoDB
Envoy supports an HTTP level DynamoDB sniffing filter with the following features:

* DynamoDB API request/response parser.
* DynamoDB per operation/per table statistics.
* DynamoDB per operation/per table/per partition and operation statistics.
* Failure type statistics for 4xx responses, parsed from response JSON,
e.g., ProvisionedThroughputExceededException.
* Batch operation partial failure statistics.
Expand Down
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ add_library(
common/version.cc
dynamo/dynamo_filter.cc
dynamo/dynamo_request_parser.cc
dynamo/dynamo_utility.cc
event/dispatcher_impl.cc
event/event_impl_base.cc
event/file_event_impl.cc
Expand Down
102 changes: 56 additions & 46 deletions source/common/dynamo/dynamo_filter.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "dynamo_filter.h"
#include "dynamo_utility.h"

#include "common/buffer/buffer_impl.h"
#include "common/dynamo/dynamo_request_parser.h"
Expand All @@ -12,7 +13,7 @@ namespace Dynamo {
Http::FilterHeadersStatus DynamoFilter::decodeHeaders(Http::HeaderMap& headers, bool) {
if (enabled_) {
start_decode_ = std::chrono::system_clock::now();
operation_ = Dynamo::RequestParser::parseOperation(headers);
operation_ = RequestParser::parseOperation(headers);
}

return Http::FilterHeadersStatus::Continue;
Expand Down Expand Up @@ -43,7 +44,8 @@ void DynamoFilter::onDecodeComplete(const Buffer::Instance& data) {
std::string body = buildBody(decoder_callbacks_->decodingBuffer(), data);
if (!body.empty()) {
try {
table_descriptor_ = Dynamo::RequestParser::parseTable(operation_, body);
Json::StringLoader json_body(body);
table_descriptor_ = RequestParser::parseTable(operation_, json_body);
} catch (const Json::Exception& jsonEx) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_req_body", stat_prefix_)).inc();
Expand All @@ -52,19 +54,31 @@ void DynamoFilter::onDecodeComplete(const Buffer::Instance& data) {
}

void DynamoFilter::onEncodeComplete(const Buffer::Instance& data) {
if (response_headers_) {
uint64_t status = Http::Utility::getResponseStatus(*response_headers_);
if (!response_headers_) {
return;
}

chargeBasicStats(status);
uint64_t status = Http::Utility::getResponseStatus(*response_headers_);
chargeBasicStats(status);

if (Http::CodeUtility::is4xx(status)) {
chargeFailureSpecificStats(data);
}
// Batch Operations will always return status 200 for a partial or full success. Check
// unprocessed keys to determine partial success.
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.BatchOperations
if (Dynamo::RequestParser::isBatchOperation(operation_)) {
chargeUnProcessedKeysStats(data);
std::string body = buildBody(encoder_callbacks_->encodingBuffer(), data);
if (!body.empty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (body.empty()) {
return;
}

try {
Json::StringLoader json_body(body);
chargeTablePartitionIdStats(json_body);

if (Http::CodeUtility::is4xx(status)) {
chargeFailureSpecificStats(json_body);
}
// Batch Operations will always return status 200 for a partial or full success. Check
// unprocessed keys to determine partial success.
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.BatchOperations
if (RequestParser::isBatchOperation(operation_)) {
chargeUnProcessedKeysStats(json_body);
}
} catch (const Json::Exception&) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
}
}
}
Expand Down Expand Up @@ -165,47 +179,43 @@ void DynamoFilter::chargeStatsPerEntity(const std::string& entity, const std::st
latency);
}

void DynamoFilter::chargeUnProcessedKeysStats(const Buffer::Instance& data) {
std::string body = buildBody(encoder_callbacks_->encodingBuffer(), data);
if (!body.empty()) {
try {
// The unprocessed keys block contains a list of tables and keys for that table that did not
// complete apart of the batch operation. Only the table names will be logged for errors.
std::vector<std::string> unprocessed_tables =
Dynamo::RequestParser::parseBatchUnProcessedKeys(body);
for (const std::string& unprocessed_table : unprocessed_tables) {
stats_.counter(fmt::format("{}error.{}.BatchFailureUnprocessedKeys", stat_prefix_,
unprocessed_table)).inc();
}
} catch (const Json::Exception& jsonEx) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
}
void DynamoFilter::chargeUnProcessedKeysStats(const Json::Object& json_body) {
// The unprocessed keys block contains a list of tables and keys for that table that did not
// complete apart of the batch operation. Only the table names will be logged for errors.
std::vector<std::string> unprocessed_tables = RequestParser::parseBatchUnProcessedKeys(json_body);
for (const std::string& unprocessed_table : unprocessed_tables) {
stats_.counter(fmt::format("{}error.{}.BatchFailureUnprocessedKeys", stat_prefix_,
unprocessed_table)).inc();
}
}

void DynamoFilter::chargeFailureSpecificStats(const Buffer::Instance& data) {
std::string body = buildBody(encoder_callbacks_->encodingBuffer(), data);
void DynamoFilter::chargeFailureSpecificStats(const Json::Object& json_body) {
std::string error_type = RequestParser::parseErrorType(json_body);

if (!body.empty()) {
try {
std::string error_type = Dynamo::RequestParser::parseErrorType(body);

if (!error_type.empty()) {
if (table_descriptor_.table_name.empty()) {
stats_.counter(fmt::format("{}error.no_table.{}", stat_prefix_, error_type)).inc();
} else {
stats_.counter(fmt::format("{}error.{}.{}", stat_prefix_, table_descriptor_.table_name,
error_type)).inc();
}
}
} catch (const Json::Exception& jsonEx) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
if (!error_type.empty()) {
if (table_descriptor_.table_name.empty()) {
stats_.counter(fmt::format("{}error.no_table.{}", stat_prefix_, error_type)).inc();
} else {
stats_.counter(fmt::format("{}error.{}.{}", stat_prefix_, table_descriptor_.table_name,
error_type)).inc();
}
} else {
stats_.counter(fmt::format("{}empty_response_body", stat_prefix_)).inc();
}
}

void DynamoFilter::chargeTablePartitionIdStats(const Json::Object& json_body) {
if (table_descriptor_.table_name.empty() || operation_.empty()) {
return;
}

std::vector<RequestParser::PartitionDescriptor> partitions =
RequestParser::parsePartitions(json_body);
for (const RequestParser::PartitionDescriptor& partition : partitions) {
std::string stats_string = Utility::buildPartitionStatString(
stat_prefix_, table_descriptor_.table_name, operation_, partition.partition_id_);
stats_.counter(stats_string).add(partition.capacity_);
}
}

} // Dynamo
7 changes: 5 additions & 2 deletions source/common/dynamo/dynamo_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "envoy/runtime/runtime.h"
#include "envoy/stats/stats.h"

#include "common/json/json_loader.h"

namespace Dynamo {

/**
Expand Down Expand Up @@ -44,8 +46,9 @@ class DynamoFilter : public Http::StreamFilter {
void chargeBasicStats(uint64_t status);
void chargeStatsPerEntity(const std::string& entity, const std::string& entity_type,
uint64_t status);
void chargeFailureSpecificStats(const Buffer::Instance& data);
void chargeUnProcessedKeysStats(const Buffer::Instance& data);
void chargeFailureSpecificStats(const Json::Object& json_body);
void chargeUnProcessedKeysStats(const Json::Object& json_body);
void chargeTablePartitionIdStats(const Json::Object& json_body);

Runtime::Loader& runtime_;
std::string stat_prefix_;
Expand Down
90 changes: 50 additions & 40 deletions source/common/dynamo/dynamo_request_parser.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "dynamo_request_parser.h"

#include "common/common/utility.h"
#include "common/json/json_loader.h"

namespace Dynamo {

Expand Down Expand Up @@ -60,60 +59,51 @@ std::string RequestParser::parseOperation(const Http::HeaderMap& headerMap) {
}

RequestParser::TableDescriptor RequestParser::parseTable(const std::string& operation,
const std::string& data) {
const Json::Object& json_data) {
TableDescriptor table{"", true};

// Simple operations on a single table, have "TableName" explicitly specified.
if (find(SINGLE_TABLE_OPERATIONS.begin(), SINGLE_TABLE_OPERATIONS.end(), operation) !=
SINGLE_TABLE_OPERATIONS.end()) {
Json::StringLoader json(data);
if (json.hasObject("TableName")) {
table.table_name = json.getString("TableName");
}
table.table_name = json_data.getString("TableName", "");
} else if (find(BATCH_OPERATIONS.begin(), BATCH_OPERATIONS.end(), operation) !=
BATCH_OPERATIONS.end()) {
Json::StringLoader json(data);
if (json.hasObject("RequestItems")) {
Json::Object tables = json.getObject("RequestItems");
tables.iterate([&table](const std::string& key, const Json::Object&) {
if (table.table_name.empty()) {
table.table_name = key;
} else {
if (table.table_name != key) {
table.table_name = "";
table.is_single_table = false;
return false;
}
Json::Object tables = json_data.getObject("RequestItems", true);
tables.iterate([&table](const std::string& key, const Json::Object&) {
if (table.table_name.empty()) {
table.table_name = key;
} else {
if (table.table_name != key) {
table.table_name = "";
table.is_single_table = false;
return false;
}

return true;
});
}
}
return true;
});
}

return table;
}
std::vector<std::string> RequestParser::parseBatchUnProcessedKeys(const std::string& data) {
std::vector<std::string> RequestParser::parseBatchUnProcessedKeys(const Json::Object& json_data) {
std::vector<std::string> unprocessed_tables;
Json::StringLoader json(data);
if (json.hasObject("UnprocessedKeys")) {
Json::Object tables = json.getObject("UnprocessedKeys");
tables.iterate([&unprocessed_tables](const std::string& key, const Json::Object&) {
unprocessed_tables.emplace_back(key);
return true;
});
}
Json::Object tables = json_data.getObject("UnprocessedKeys", true);
tables.iterate([&unprocessed_tables](const std::string& key, const Json::Object&) {
unprocessed_tables.emplace_back(key);
return true;
});

return unprocessed_tables;
}
std::string RequestParser::parseErrorType(const std::string& data) {
Json::StringLoader json(data);

if (json.hasObject("__type")) {
std::string error_type = json.getString("__type");
for (const std::string& supported_error_type : SUPPORTED_ERROR_TYPES) {
if (StringUtil::endsWith(error_type, supported_error_type)) {
return supported_error_type;
}
std::string RequestParser::parseErrorType(const Json::Object& json_data) {
std::string error_type = json_data.getString("__type", "");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if __type is not set on the json, we'll iterate though all supported_error_types.

we could early exit if __type is missing or if error_type after json_data.getString("__type", ""); is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (error_type.empty()) {
return "";
}

for (const std::string& supported_error_type : SUPPORTED_ERROR_TYPES) {
if (StringUtil::endsWith(error_type, supported_error_type)) {
return supported_error_type;
}
}

Expand All @@ -125,4 +115,24 @@ bool RequestParser::isBatchOperation(const std::string& operation) {
BATCH_OPERATIONS.end();
}

std::vector<RequestParser::PartitionDescriptor>
RequestParser::parsePartitions(const Json::Object& json_data) {
std::vector<RequestParser::PartitionDescriptor> partition_descriptors;

Json::Object partitions =
json_data.getObject("ConsumedCapacity", true).getObject("Partitions", true);
partitions.iterate([&partition_descriptors, &partitions](const std::string& key,
const Json::Object&) {
// For a given partition id, the amount of capacity used is returned in the body as a double.
// A stat will be created to track the capacity consumed for the operation, table and partition.
// Stats counter only increments by whole numbers, capacity is round up to the nearest integer
// to account for this.
uint64_t capacity_integer = static_cast<uint64_t>(std::ceil(partitions.getDouble(key, 0.0)));
partition_descriptors.emplace_back(key, capacity_integer);
return true;
});

return partition_descriptors;
}

} // Dynamo
24 changes: 21 additions & 3 deletions source/common/dynamo/dynamo_request_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "envoy/http/header_map.h"

#include "common/json/json_loader.h"

namespace Dynamo {

/*
Expand All @@ -18,6 +20,13 @@ class RequestParser {
bool is_single_table;
};

struct PartitionDescriptor {
PartitionDescriptor(const std::string& partition, uint64_t capacity)
: partition_id_(partition), capacity_(capacity) {}
std::string partition_id_;
uint64_t capacity_;
};

/**
* Parse operation out of x-amz-target header.
* @return empty string if operation cannot be parsed.
Expand All @@ -40,7 +49,7 @@ class RequestParser {
*
* @throw Json::Exception if data is not in valid Json format.
*/
static TableDescriptor parseTable(const std::string& operation, const std::string& data);
static TableDescriptor parseTable(const std::string& operation, const Json::Object& json_data);

/**
* Parse error details which might be provided for a given response code.
Expand All @@ -52,20 +61,29 @@ class RequestParser {
*
* @throw Json::Exception if data is not in valid Json format.
*/
static std::string parseErrorType(const std::string& data);
static std::string parseErrorType(const Json::Object& json_data);

/**
* Parse unprocessed keys for batch operation results.
* @return empty set if there are no unprocessed keys or a set of table names that did not get
* processed in the batch operation.
*/
static std::vector<std::string> parseBatchUnProcessedKeys(const std::string& data);
static std::vector<std::string> parseBatchUnProcessedKeys(const Json::Object& json_data);

/**
* @return true if the operation is in the set of supported BATCH_OPERATIONS
*/
static bool isBatchOperation(const std::string& operation);

/**
* Parse the Partition ids and the consumed capacity from the body.
Copy link
Member

@RomanDzhabarov RomanDzhabarov Oct 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for consistency name method as parsePartitions(const std::string& body) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this file, data is the consistent variable name.

* @return empty set if there is no partition data or a set of partition data containing
* the partition id as a string and the capacity consumed as an integer.
*
* @throw Json::Exception if data is not in valid Json format.
*/
static std::vector<PartitionDescriptor> parsePartitions(const Json::Object& json_data);

private:
static const Http::LowerCaseString X_AMZ_TARGET;
static const std::vector<std::string> SINGLE_TABLE_OPERATIONS;
Expand Down
Loading