Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Log stats for partition id per operation #147

Merged
merged 10 commits into from
Oct 17, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 49 additions & 59 deletions source/common/dynamo/dynamo_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void DynamoFilter::onDecodeComplete(const Buffer::Instance& data) {
std::string body = buildBody(decoder_callbacks_->decodingBuffer(), data);
if (!body.empty()) {
try {
table_descriptor_ = RequestParser::parseTable(operation_, body);
Json::StringLoader json_body(body);
table_descriptor_ = RequestParser::parseTable(operation_, json_body);
} catch (const Json::Exception& jsonEx) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_req_body", stat_prefix_)).inc();
Expand All @@ -53,21 +54,31 @@ void DynamoFilter::onDecodeComplete(const Buffer::Instance& data) {
}

void DynamoFilter::onEncodeComplete(const Buffer::Instance& data) {
if (response_headers_) {
uint64_t status = Http::Utility::getResponseStatus(*response_headers_);
if (!response_headers_) {
return;
}

chargeBasicStats(status);
std::string body = buildBody(encoder_callbacks_->encodingBuffer(), data);
chargeTablePartitionIdStats(body);
uint64_t status = Http::Utility::getResponseStatus(*response_headers_);
chargeBasicStats(status);

if (Http::CodeUtility::is4xx(status)) {
chargeFailureSpecificStats(body);
}
// Batch Operations will always return status 200 for a partial or full success. Check
// unprocessed keys to determine partial success.
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.BatchOperations
if (RequestParser::isBatchOperation(operation_)) {
chargeUnProcessedKeysStats(body);
std::string body = buildBody(encoder_callbacks_->encodingBuffer(), data);
if (!body.empty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (body.empty()) {
return;
}

try {
Json::StringLoader json_body(body);
chargeTablePartitionIdStats(json_body);

if (Http::CodeUtility::is4xx(status)) {
chargeFailureSpecificStats(json_body);
}
// Batch Operations will always return status 200 for a partial or full success. Check
// unprocessed keys to determine partial success.
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.BatchOperations
if (RequestParser::isBatchOperation(operation_)) {
chargeUnProcessedKeysStats(json_body);
}
} catch (const Json::Exception&) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
}
}
}
Expand Down Expand Up @@ -168,63 +179,42 @@ void DynamoFilter::chargeStatsPerEntity(const std::string& entity, const std::st
latency);
}

void DynamoFilter::chargeUnProcessedKeysStats(const std::string& body) {
if (!body.empty()) {
try {
// The unprocessed keys block contains a list of tables and keys for that table that did not
// complete apart of the batch operation. Only the table names will be logged for errors.
std::vector<std::string> unprocessed_tables =
Dynamo::RequestParser::parseBatchUnProcessedKeys(body);
for (const std::string& unprocessed_table : unprocessed_tables) {
stats_.counter(fmt::format("{}error.{}.BatchFailureUnprocessedKeys", stat_prefix_,
unprocessed_table)).inc();
}
} catch (const Json::Exception&) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
}
void DynamoFilter::chargeUnProcessedKeysStats(const Json::Object& json_body) {
// The unprocessed keys block contains a list of tables and keys for that table that did not
// complete apart of the batch operation. Only the table names will be logged for errors.
std::vector<std::string> unprocessed_tables = RequestParser::parseBatchUnProcessedKeys(json_body);
for (const std::string& unprocessed_table : unprocessed_tables) {
stats_.counter(fmt::format("{}error.{}.BatchFailureUnprocessedKeys", stat_prefix_,
unprocessed_table)).inc();
}
}

void DynamoFilter::chargeFailureSpecificStats(const std::string& body) {
if (!body.empty()) {
try {
std::string error_type = RequestParser::parseErrorType(body);

if (!error_type.empty()) {
if (table_descriptor_.table_name.empty()) {
stats_.counter(fmt::format("{}error.no_table.{}", stat_prefix_, error_type)).inc();
} else {
stats_.counter(fmt::format("{}error.{}.{}", stat_prefix_, table_descriptor_.table_name,
error_type)).inc();
}
}
} catch (const Json::Exception&) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
void DynamoFilter::chargeFailureSpecificStats(const Json::Object& json_body) {
std::string error_type = RequestParser::parseErrorType(json_body);

if (!error_type.empty()) {
if (table_descriptor_.table_name.empty()) {
stats_.counter(fmt::format("{}error.no_table.{}", stat_prefix_, error_type)).inc();
} else {
stats_.counter(fmt::format("{}error.{}.{}", stat_prefix_, table_descriptor_.table_name,
error_type)).inc();
}
} else {
stats_.counter(fmt::format("{}empty_response_body", stat_prefix_)).inc();
}
}

void DynamoFilter::chargeTablePartitionIdStats(const std::string& body) {
void DynamoFilter::chargeTablePartitionIdStats(const Json::Object& json_body) {
if (table_descriptor_.table_name.empty() || operation_.empty()) {
return;
}
if (!body.empty()) {
try {
std::vector<RequestParser::PartitionDescriptor> partitions =
RequestParser::parsePartitions(body);
for (const RequestParser::PartitionDescriptor& partition : partitions) {
std::string stats_string = Utility::buildPartitionStatString(
stat_prefix_, table_descriptor_.table_name, operation_, partition.partition_id_);
stats_.counter(stats_string).add(partition.capacity_);
}
} catch (const Json::Exception&) {
// Body parsing failed. This should not happen, just put a stat for that.
stats_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
}

std::vector<RequestParser::PartitionDescriptor> partitions =
RequestParser::parsePartitions(json_body);
for (const RequestParser::PartitionDescriptor& partition : partitions) {
std::string stats_string = Utility::buildPartitionStatString(
stat_prefix_, table_descriptor_.table_name, operation_, partition.partition_id_);
stats_.counter(stats_string).add(partition.capacity_);
}
}

Expand Down
7 changes: 4 additions & 3 deletions source/common/dynamo/dynamo_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "dynamo_request_parser.h"

#include "common/json/json_loader.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: include goes below envoy headers, separated by new line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

#include "envoy/http/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/stats.h"
Expand Down Expand Up @@ -44,9 +45,9 @@ class DynamoFilter : public Http::StreamFilter {
void chargeBasicStats(uint64_t status);
void chargeStatsPerEntity(const std::string& entity, const std::string& entity_type,
uint64_t status);
void chargeFailureSpecificStats(const std::string& body);
void chargeUnProcessedKeysStats(const std::string& body);
void chargeTablePartitionIdStats(const std::string& body);
void chargeFailureSpecificStats(const Json::Object& json_body);
void chargeUnProcessedKeysStats(const Json::Object& json_body);
void chargeTablePartitionIdStats(const Json::Object& json_body);

Runtime::Loader& runtime_;
std::string stat_prefix_;
Expand Down
74 changes: 29 additions & 45 deletions source/common/dynamo/dynamo_request_parser.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "dynamo_request_parser.h"

#include "common/common/utility.h"
#include "common/json/json_loader.h"

namespace Dynamo {

Expand Down Expand Up @@ -60,60 +59,47 @@ std::string RequestParser::parseOperation(const Http::HeaderMap& headerMap) {
}

RequestParser::TableDescriptor RequestParser::parseTable(const std::string& operation,
const std::string& data) {
const Json::Object& json_data) {
TableDescriptor table{"", true};

// Simple operations on a single table, have "TableName" explicitly specified.
if (find(SINGLE_TABLE_OPERATIONS.begin(), SINGLE_TABLE_OPERATIONS.end(), operation) !=
SINGLE_TABLE_OPERATIONS.end()) {
Json::StringLoader json(data);
if (json.hasObject("TableName")) {
table.table_name = json.getString("TableName");
}
table.table_name = json_data.getString("TableName", "");
} else if (find(BATCH_OPERATIONS.begin(), BATCH_OPERATIONS.end(), operation) !=
BATCH_OPERATIONS.end()) {
Json::StringLoader json(data);
if (json.hasObject("RequestItems")) {
Json::Object tables = json.getObject("RequestItems");
tables.iterate([&table](const std::string& key, const Json::Object&) {
if (table.table_name.empty()) {
table.table_name = key;
} else {
if (table.table_name != key) {
table.table_name = "";
table.is_single_table = false;
return false;
}
Json::Object tables = json_data.getObject("RequestItems", true);
tables.iterate([&table](const std::string& key, const Json::Object&) {
if (table.table_name.empty()) {
table.table_name = key;
} else {
if (table.table_name != key) {
table.table_name = "";
table.is_single_table = false;
return false;
}

return true;
});
}
}
return true;
});
}

return table;
}
std::vector<std::string> RequestParser::parseBatchUnProcessedKeys(const std::string& data) {
std::vector<std::string> RequestParser::parseBatchUnProcessedKeys(const Json::Object& json_data) {
std::vector<std::string> unprocessed_tables;
Json::StringLoader json(data);
if (json.hasObject("UnprocessedKeys")) {
Json::Object tables = json.getObject("UnprocessedKeys");
tables.iterate([&unprocessed_tables](const std::string& key, const Json::Object&) {
unprocessed_tables.emplace_back(key);
return true;
});
}
Json::Object tables = json_data.getObject("UnprocessedKeys", true);
tables.iterate([&unprocessed_tables](const std::string& key, const Json::Object&) {
unprocessed_tables.emplace_back(key);
return true;
});

return unprocessed_tables;
}
std::string RequestParser::parseErrorType(const std::string& data) {
Json::StringLoader json(data);

if (json.hasObject("__type")) {
std::string error_type = json.getString("__type");
for (const std::string& supported_error_type : SUPPORTED_ERROR_TYPES) {
if (StringUtil::endsWith(error_type, supported_error_type)) {
return supported_error_type;
}
std::string RequestParser::parseErrorType(const Json::Object& json_data) {
std::string error_type = json_data.getString("__type", "");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if __type is not set on the json, we'll iterate though all supported_error_types.

we could early exit if __type is missing or if error_type after json_data.getString("__type", ""); is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

for (const std::string& supported_error_type : SUPPORTED_ERROR_TYPES) {
if (StringUtil::endsWith(error_type, supported_error_type)) {
return supported_error_type;
}
}

Expand All @@ -126,13 +112,11 @@ bool RequestParser::isBatchOperation(const std::string& operation) {
}

std::vector<RequestParser::PartitionDescriptor>
RequestParser::parsePartitions(const std::string& data) {
Json::StringLoader json(data);
RequestParser::parsePartitions(const Json::Object& json_data) {
std::vector<RequestParser::PartitionDescriptor> partition_descriptors;

Json::Object consumed_capacity = json.getObject("ConsumedCapacity", true);
Json::Object partitions = consumed_capacity.getObject("Partitions", true);

Json::Object partitions =
json_data.getObject("ConsumedCapacity", true).getObject("Partitions", true);
partitions.iterate([&partition_descriptors, &partitions](const std::string& key,
const Json::Object&) {
// For a given partition id, the amount of capacity used is returned in the body as a double.
Expand Down
9 changes: 5 additions & 4 deletions source/common/dynamo/dynamo_request_parser.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/json/json_loader.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: header ordering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

#include "envoy/http/header_map.h"

namespace Dynamo {
Expand Down Expand Up @@ -47,7 +48,7 @@ class RequestParser {
*
* @throw Json::Exception if data is not in valid Json format.
*/
static TableDescriptor parseTable(const std::string& operation, const std::string& data);
static TableDescriptor parseTable(const std::string& operation, const Json::Object& json_data);

/**
* Parse error details which might be provided for a given response code.
Expand All @@ -59,14 +60,14 @@ class RequestParser {
*
* @throw Json::Exception if data is not in valid Json format.
*/
static std::string parseErrorType(const std::string& data);
static std::string parseErrorType(const Json::Object& json_data);

/**
* Parse unprocessed keys for batch operation results.
* @return empty set if there are no unprocessed keys or a set of table names that did not get
* processed in the batch operation.
*/
static std::vector<std::string> parseBatchUnProcessedKeys(const std::string& data);
static std::vector<std::string> parseBatchUnProcessedKeys(const Json::Object& json_data);

/**
* @return true if the operation is in the set of supported BATCH_OPERATIONS
Expand All @@ -80,7 +81,7 @@ class RequestParser {
*
* @throw Json::Exception if data is not in valid Json format.
*/
static std::vector<PartitionDescriptor> parsePartitions(const std::string& data);
static std::vector<PartitionDescriptor> parsePartitions(const Json::Object& json_data);

private:
static const Http::LowerCaseString X_AMZ_TARGET;
Expand Down
Loading