diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f66ec617a..e33097f0d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Increment the: ## [Unreleased] * [EXPORTER] Jaeger Exporter - Populate Span Links ([#1251](https://github.com/open-telemetry/opentelemetry-cpp/pull/1251)) +* [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https://github.com/open-telemetry/opentelemetry-cpp/pull/1209)) ## [1.2.0] 2022-01-31 diff --git a/api/include/opentelemetry/common/timestamp.h b/api/include/opentelemetry/common/timestamp.h index 54e7b7aa6c..c34045675a 100644 --- a/api/include/opentelemetry/common/timestamp.h +++ b/api/include/opentelemetry/common/timestamp.h @@ -169,5 +169,39 @@ class SteadyTimestamp private: int64_t nanos_since_epoch_; }; + +class DurationUtil +{ +public: + template + static std::chrono::duration AdjustWaitForTimeout( + std::chrono::duration timeout, + std::chrono::duration indefinite_value) noexcept + { + // Do not call now() when this duration is max value, now() may have a expemsive cost. + if (timeout == std::chrono::duration::max()) + { + return indefinite_value; + } + + // std::future::wait_for, std::this_thread::sleep_for, and std::condition_variable::wait_for + // mey use steady_clock or system_clock.We need make sure now() + timeout do not overflow. + auto max_timeout = std::chrono::duration_cast>( + std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now()); + if (timeout >= max_timeout) + { + return indefinite_value; + } + max_timeout = std::chrono::duration_cast>( + std::chrono::system_clock::time_point::max() - std::chrono::system_clock::now()); + if (timeout >= max_timeout) + { + return indefinite_value; + } + + return timeout; + } +}; + } // namespace common OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/ostream/src/log_exporter.cc b/exporters/ostream/src/log_exporter.cc index ef103bb6b2..a8ea060481 100644 --- a/exporters/ostream/src/log_exporter.cc +++ b/exporters/ostream/src/log_exporter.cc @@ -180,7 +180,7 @@ sdk::common::ExportResult OStreamLogExporter::Export( return sdk::common::ExportResult::kSuccess; } -bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept +bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept { const std::lock_guard locked(lock_); is_shutdown_ = true; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 1a199bed48..554f792cdb 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -15,10 +15,15 @@ #include "opentelemetry/exporters/otlp/otlp_environment.h" +#include #include +#include +#include +#include #include #include #include +#include OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -71,20 +76,25 @@ struct OtlpHttpClientOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultHeaders(); + // Concurrent sessions + std::size_t concurrent_sessions = 8; + inline OtlpHttpClientOptions(nostd::string_view input_url, HttpRequestContentType input_content_type, JsonBytesMappingKind input_json_bytes_mapping, bool input_use_json_name, bool input_console_debug, std::chrono::system_clock::duration input_timeout, - const OtlpHeaders &input_http_headers) + const OtlpHeaders &input_http_headers, + std::size_t input_concurrent_sessions = 8) : url(input_url), content_type(input_content_type), json_bytes_mapping(input_json_bytes_mapping), use_json_name(input_use_json_name), console_debug(input_console_debug), timeout(input_timeout), - http_headers(input_http_headers) + http_headers(input_http_headers), + concurrent_sessions(input_concurrent_sessions) {} }; @@ -99,6 +109,8 @@ class OtlpHttpClient */ explicit OtlpHttpClient(OtlpHttpClientOptions &&options); + ~OtlpHttpClient(); + /** * Export * @param message message to export, it should be ExportTraceServiceRequest, @@ -114,19 +126,33 @@ class OtlpHttpClient */ bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept; + /** + * @brief Release the lifetime of specify session. + * + * @param session the session to release + */ + void ReleaseSession(const opentelemetry::ext::http::client::Session &session) noexcept; + private: - // Stores if this HTTP client had its Shutdown() method called - bool is_shutdown_ = false; + /** + * Add http session and hold it's lifetime. + * @param session the session to add + * @param event_handle the event handle of this session + */ + void addSession( + std::shared_ptr session, + std::unique_ptr &&event_handle) noexcept; - // The configuration options associated with this HTTP client. - const OtlpHttpClientOptions options_; + /** + * @brief Real delete all sessions and event handles. + * @note This function is called in the same thread where we create sessions and handles + * + * @return return true if there are more sessions to delete + */ + bool cleanupGCSessions() noexcept; - // Object that stores the HTTP sessions that have been created - std::shared_ptr http_client_; - // Cached parsed URI - std::string http_uri_; - mutable opentelemetry::common::SpinLockMutex lock_; bool isShutdown() const noexcept; + // For testing friend class OtlpHttpExporterTestPeer; friend class OtlpHttpLogExporterTestPeer; @@ -138,6 +164,51 @@ class OtlpHttpClient */ OtlpHttpClient(OtlpHttpClientOptions &&options, std::shared_ptr http_client); + + struct HttpSessionData + { + std::shared_ptr session; + std::unique_ptr event_handle; + + inline HttpSessionData() = default; + + inline explicit HttpSessionData( + std::shared_ptr &&input_session, + std::unique_ptr &&input_handle) + { + session.swap(input_session); + event_handle.swap(input_handle); + } + + inline explicit HttpSessionData(HttpSessionData &&other) + { + session.swap(other.session); + event_handle.swap(other.event_handle); + } + }; + + // Stores if this HTTP client had its Shutdown() method called + bool is_shutdown_; + + // The configuration options associated with this HTTP client. + const OtlpHttpClientOptions options_; + + // Object that stores the HTTP sessions that have been created + std::shared_ptr http_client_; + + // Cached parsed URI + std::string http_uri_; + + // Running sessions and event handles + std::unordered_map + running_sessions_; + // Sessions and event handles that are waiting to be deleted + std::list gc_sessions_; + // Lock for running_sessions_, gc_sessions_ and http_client_ + std::recursive_mutex session_manager_lock_; + // Condition variable and mutex to control the concurrency count of running sessions + std::mutex session_waker_lock_; + std::condition_variable session_waker_; }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 3e6a521194..20b67bc6af 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -11,6 +11,7 @@ #include "opentelemetry/exporters/otlp/otlp_environment.h" #include +#include #include #include @@ -50,6 +51,9 @@ struct OtlpHttpExporterOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultHeaders(); + + // Concurrent sessions + std::size_t concurrent_sessions = 8; }; /** diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h index d330e62be4..e5d49cdac3 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_exporter.h @@ -11,6 +11,7 @@ # include "opentelemetry/exporters/otlp/otlp_environment.h" # include +# include # include # include @@ -50,6 +51,9 @@ struct OtlpHttpLogExporterOptions // Additional HTTP headers OtlpHeaders http_headers = GetOtlpDefaultLogHeaders(); + + // Concurrent sessions + std::size_t concurrent_sessions = 8; }; /** diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 544f74ca7c..abc24a927a 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -14,7 +14,6 @@ #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" -#include #include "google/protobuf/message.h" #include "google/protobuf/reflection.h" #include "google/protobuf/stubs/common.h" @@ -35,9 +34,11 @@ LIBPROTOBUF_EXPORT void Base64Escape(StringPiece src, std::string *dest); #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" +#include "opentelemetry/common/timestamp.h" #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk_config.h" +#include #include #include #include @@ -70,7 +71,10 @@ class ResponseHandler : public http_client::EventHandler /** * Creates a response handler, that by default doesn't display to console */ - ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {} + ResponseHandler(bool console_debug = false) : console_debug_{console_debug} + { + stoping_.store(false); + } /** * Automatically called when the response is received, store the body into a string and notify any @@ -100,20 +104,15 @@ class ResponseHandler : public http_client::EventHandler // Set the response_received_ flag to true and notify any threads waiting on this result response_received_ = true; - stop_waiting_ = true; } - cv_.notify_all(); - } - /**resource - * A method the user calls to block their thread until the response is received. The longest - * duration is the timeout of the request, set by SetTimeoutMs() - */ - bool waitForResponse() - { - std::unique_lock lk(mutex_); - cv_.wait(lk, [this] { return stop_waiting_; }); - return response_received_; + { + bool expected = false; + if (stoping_.compare_exchange_strong(expected, true, std::memory_order_release)) + { + Unbind(); + } + } } /** @@ -130,7 +129,8 @@ class ResponseHandler : public http_client::EventHandler void OnEvent(http_client::SessionState state, opentelemetry::nostd::string_view reason) noexcept override { - // need to modify stop_waiting_ under lock before calling notify_all + // need to modify stoping_ under lock before calling notify_all + bool need_stop = false; switch (state) { case http_client::SessionState::CreateFailed: @@ -140,8 +140,7 @@ class ResponseHandler : public http_client::EventHandler case http_client::SessionState::TimedOut: case http_client::SessionState::NetworkError: case http_client::SessionState::Cancelled: { - std::unique_lock lk(mutex_); - stop_waiting_ = true; + need_stop = true; } break; @@ -152,10 +151,16 @@ class ResponseHandler : public http_client::EventHandler // If any failure event occurs, release the condition variable to unblock main thread switch (state) { - case http_client::SessionState::CreateFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: session create failed"); - cv_.notify_all(); - break; + case http_client::SessionState::CreateFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: session create failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::Created: if (console_debug_) @@ -178,10 +183,16 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::ConnectFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: connection failed"); - cv_.notify_all(); - break; + case http_client::SessionState::ConnectFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: connection failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::Connected: if (console_debug_) @@ -197,10 +208,16 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::SendFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request send failed"); - cv_.notify_all(); - break; + case http_client::SessionState::SendFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: request send failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::Response: if (console_debug_) @@ -209,20 +226,38 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::SSLHandshakeFailed: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: SSL handshake failed"); - cv_.notify_all(); - break; + case http_client::SessionState::SSLHandshakeFailed: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: SSL handshake failed."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; - case http_client::SessionState::TimedOut: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request time out"); - cv_.notify_all(); - break; + case http_client::SessionState::TimedOut: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: request time out."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; - case http_client::SessionState::NetworkError: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: network error"); - cv_.notify_all(); - break; + case http_client::SessionState::NetworkError: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: network error."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; case http_client::SessionState::ReadError: if (console_debug_) @@ -238,23 +273,63 @@ class ResponseHandler : public http_client::EventHandler } break; - case http_client::SessionState::Cancelled: - OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: (manually) cancelled\n"); - cv_.notify_all(); - break; + case http_client::SessionState::Cancelled: { + std::stringstream error_message; + error_message << "[OTLP HTTP Client] Session state: (manually) cancelled."; + if (!reason.empty()) + { + error_message.write(reason.data(), reason.size()); + } + OTEL_INTERNAL_LOG_ERROR(error_message.str()); + } + break; default: break; } + + if (need_stop) + { + bool expected = false; + if (stoping_.compare_exchange_strong(expected, true, std::memory_order_release)) + { + Unbind(); + } + } } + void Unbind() + { + // ReleaseSession may destroy this object, so we need to move owner and session into stack + // first. + OtlpHttpClient *owner = owner_; + const opentelemetry::ext::http::client::Session *session = session_; + + owner_ = nullptr; + session_ = nullptr; + + if (nullptr != owner && nullptr != session) + { + // Release the session at last + owner->ReleaseSession(*session); + } + } + + void Bind(OtlpHttpClient *owner, + const opentelemetry::ext::http::client::Session &session) noexcept + { + session_ = &session; + owner_ = owner; + }; + private: // Define a condition variable and mutex - std::condition_variable cv_; std::mutex mutex_; + OtlpHttpClient *owner_ = nullptr; + const opentelemetry::ext::http::client::Session *session_ = nullptr; // Whether notify has been called - bool stop_waiting_ = false; + std::atomic stoping_; // Whether the response has been received bool response_received_ = false; @@ -562,31 +637,37 @@ void ConvertListFieldToJson(nlohmann::json &value, } // namespace OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options) - : options_(options), http_client_(http_client::HttpClientFactory::Create()) + : is_shutdown_(false), options_(options), http_client_(http_client::HttpClientFactory::Create()) {} +OtlpHttpClient::~OtlpHttpClient() +{ + if (!isShutdown()) + { + Shutdown(); + } + + // Wait for all the sessions to finish + std::unique_lock lock(session_waker_lock_); + session_waker_.wait(lock, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.empty(); + }); + + // And then remove all session datas + while (cleanupGCSessions()) + ; +} + OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options, std::shared_ptr http_client) - : options_(options), http_client_(http_client) + : is_shutdown_(false), options_(options), http_client_(http_client) {} // ----------------------------- HTTP Client methods ------------------------------ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( const google::protobuf::Message &message) noexcept { - // Return failure if this exporter has been shutdown - if (isShutdown()) - { - const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown"; - if (options_.console_debug) - { - std::cerr << error_message << std::endl; - } - OTEL_INTERNAL_LOG_ERROR(error_message); - - return opentelemetry::sdk::common::ExportResult::kFailure; - } - // Parse uri and store it to cache if (http_uri_.empty()) { @@ -654,22 +735,38 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( } // Send the request - auto session = http_client_->CreateSession(options_.url); - auto request = session->CreateRequest(); - - for (auto &header : options_.http_headers) { - request->AddHeader(header.first, header.second); - } - request->SetUri(http_uri_); - request->SetTimeoutMs(std::chrono::duration_cast(options_.timeout)); - request->SetMethod(http_client::Method::Post); - request->SetBody(body_vec); - request->ReplaceHeader("Content-Type", content_type); + std::lock_guard guard{session_manager_lock_}; + // Return failure if this exporter has been shutdown + if (isShutdown()) + { + const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown"; + if (options_.console_debug) + { + std::cerr << error_message << std::endl; + } + OTEL_INTERNAL_LOG_ERROR(error_message); - // Send the request - std::unique_ptr handler(new ResponseHandler(options_.console_debug)); - session->SendRequest(*handler); + return opentelemetry::sdk::common::ExportResult::kFailure; + } + + auto session = http_client_->CreateSession(options_.url); + auto request = session->CreateRequest(); + + for (auto &header : options_.http_headers) + { + request->AddHeader(header.first, header.second); + } + request->SetUri(http_uri_); + request->SetTimeoutMs(std::chrono::duration_cast(options_.timeout)); + request->SetMethod(http_client::Method::Post); + request->SetBody(body_vec); + request->ReplaceHeader("Content-Type", content_type); + + // Send the request + addSession(std::move(session), std::unique_ptr{ + new ResponseHandler(options_.console_debug)}); + } // Wait for the response to be received if (options_.console_debug) @@ -680,38 +777,130 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export( << std::chrono::duration_cast(options_.timeout).count() << " milliseconds)"); } - bool write_successful = handler->waitForResponse(); - // End the session - session->FinishSession(); + // Wait for any session to finish if there are to many sessions + std::unique_lock lock(session_waker_lock_); + bool wait_successful = session_waker_.wait_for(lock, options_.timeout, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.size() <= options_.concurrent_sessions; + }); + + cleanupGCSessions(); // If an error occurred with the HTTP request - if (!write_successful) + if (!wait_successful) { - // TODO: retry logic return opentelemetry::sdk::common::ExportResult::kFailure; } return opentelemetry::sdk::common::ExportResult::kSuccess; } -bool OtlpHttpClient::Shutdown(std::chrono::microseconds) noexcept +bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept { { - const std::lock_guard locked(lock_); + std::lock_guard guard{session_manager_lock_}; is_shutdown_ = true; + + // Shutdown the session manager + http_client_->CancelAllSessions(); + http_client_->FinishAllSessions(); } - // Shutdown the session manager - http_client_->CancelAllSessions(); - http_client_->FinishAllSessions(); + // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented + // in type 'long int' here. So we reset timeout to meet signed long int limit here. + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + + // Wait for all the sessions to finish + std::unique_lock lock(session_waker_lock_); + if (timeout <= std::chrono::microseconds::zero()) + { + session_waker_.wait(lock, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.empty(); + }); + } + else + { + session_waker_.wait_for(lock, timeout, [this] { + std::lock_guard guard{session_manager_lock_}; + return running_sessions_.empty(); + }); + } + while (cleanupGCSessions()) + ; return true; } +void OtlpHttpClient::ReleaseSession( + const opentelemetry::ext::http::client::Session &session) noexcept +{ + bool has_session = false; + + { + std::lock_guard guard{session_manager_lock_}; + + auto seesion_iter = running_sessions_.find(&session); + if (seesion_iter != running_sessions_.end()) + { + // Move session and handle into gc list, and they will be destroyed later + gc_sessions_.emplace_back(std::move(seesion_iter->second)); + running_sessions_.erase(seesion_iter); + + has_session = true; + } + } + + if (has_session) + { + session_waker_.notify_all(); + } +} + +void OtlpHttpClient::addSession( + std::shared_ptr session, + std::unique_ptr &&event_handle) noexcept +{ + if (!session || !event_handle) + { + return; + } + + opentelemetry::ext::http::client::Session *key = session.get(); + ResponseHandler *handle = static_cast(event_handle.get()); + + handle->Bind(this, *key); + + HttpSessionData &session_data = running_sessions_[key]; + session_data.session.swap(session); + session_data.event_handle.swap(event_handle); + + // Send request after the session is added + key->SendRequest(*handle); +} + +bool OtlpHttpClient::cleanupGCSessions() noexcept +{ + std::lock_guard guard{session_manager_lock_}; + std::list gc_sessions; + gc_sessions_.swap(gc_sessions); + + for (auto &session_data : gc_sessions) + { + // FinishSession must be called with same thread and before the session is destroyed + if (session_data.session) + { + session_data.session->FinishSession(); + } + } + + return !gc_sessions_.empty(); +} + bool OtlpHttpClient::isShutdown() const noexcept { - const std::lock_guard locked(lock_); return is_shutdown_; } diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 92155dd00d..cfaa06a479 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -29,7 +29,8 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) options.use_json_name, options.console_debug, options.timeout, - options.http_headers))) + options.http_headers, + options.concurrent_sessions))) {} OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr http_client) diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index 436c77beaa..8a0cc536d4 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -31,7 +31,8 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio options.use_json_name, options.console_debug, options.timeout, - options.http_headers))) + options.http_headers, + options.concurrent_sessions))) {} OtlpHttpLogExporter::OtlpHttpLogExporter(std::unique_ptr http_client) diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index ef0b5a509e..e9dbd711e8 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -160,6 +160,8 @@ TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTest) child_span->End(); parent_span->End(); + + static_cast(provider.get())->ForceFlush(); } // Create spans, let processor call Export() @@ -232,13 +234,16 @@ TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTest) { EXPECT_EQ("Custom-Header-Value", custom_header->second); } + // let the otlp_http_client to continue http_client::nosend::Response response; - callback.OnResponse(response); + response.Finish(callback); }); child_span->End(); parent_span->End(); + + static_cast(provider.get())->ForceFlush(); } // Test exporter configuration options diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index ffd1a9a0f3..1373b2ece3 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -118,25 +118,6 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); - logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", - {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}, - {"bool_value", true}, - {"int32_value", static_cast(1)}, - {"uint32_value", static_cast(2)}, - {"int64_value", static_cast(0x1100000000LL)}, - {"uint64_value", static_cast(0x1200000000ULL)}, - {"double_value", static_cast(3.1)}, - {"vec_bool_value", attribute_storage_bool_value}, - {"vec_int32_value", attribute_storage_int32_value}, - {"vec_uint32_value", attribute_storage_uint32_value}, - {"vec_int64_value", attribute_storage_int64_value}, - {"vec_uint64_value", attribute_storage_uint64_value}, - {"vec_double_value", attribute_storage_double_value}, - {"vec_string_value", attribute_storage_string_value}}, - trace_id, span_id, - opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, - std::chrono::system_clock::now()); trace_id.ToLowerBase16(MakeSpan(trace_id_hex)); report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); @@ -167,10 +148,33 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) { EXPECT_EQ("Custom-Header-Value", custom_header->second); } + // let the otlp_http_client to continue http_client::nosend::Response response; - callback.OnResponse(response); + response.Finish(callback); }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); } // Create log records, let processor call Export() @@ -205,25 +209,6 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); - logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", - {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}, - {"bool_value", true}, - {"int32_value", static_cast(1)}, - {"uint32_value", static_cast(2)}, - {"int64_value", static_cast(0x1100000000LL)}, - {"uint64_value", static_cast(0x1200000000ULL)}, - {"double_value", static_cast(3.1)}, - {"vec_bool_value", attribute_storage_bool_value}, - {"vec_int32_value", attribute_storage_int32_value}, - {"vec_uint32_value", attribute_storage_uint32_value}, - {"vec_int64_value", attribute_storage_int64_value}, - {"vec_uint64_value", attribute_storage_uint64_value}, - {"vec_double_value", attribute_storage_double_value}, - {"vec_string_value", attribute_storage_string_value}}, - trace_id, span_id, - opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, - std::chrono::system_clock::now()); report_trace_id.assign(reinterpret_cast(trace_id_bin), sizeof(trace_id_bin)); report_span_id.assign(reinterpret_cast(span_id_bin), sizeof(span_id_bin)); @@ -256,6 +241,28 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) http_client::nosend::Response response; callback.OnResponse(response); }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); } // Test exporter configuration options diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h index 9f2f05f3f0..1448bbbea3 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -280,18 +280,30 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient bool CancelAllSessions() noexcept override { - for (auto &session : sessions_) + // CancelSession may change sessions_, we can not change a container while iterating it. + while (!sessions_.empty()) { - session.second->CancelSession(); + std::map> sessions; + sessions.swap(sessions_); + for (auto &session : sessions) + { + session.second->CancelSession(); + } } return true; } bool FinishAllSessions() noexcept override { - for (auto &session : sessions_) + // FinishSession may change sessions_, we can not change a container while iterating it. + while (!sessions_.empty()) { - session.second->FinishSession(); + std::map> sessions; + sessions.swap(sessions_); + for (auto &session : sessions) + { + session.second->FinishSession(); + } } return true; } diff --git a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h index 02433d75ce..c89d0cb5f2 100644 --- a/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h +++ b/ext/include/opentelemetry/ext/http/client/nosend/http_client_nosend.h @@ -93,6 +93,19 @@ class Response : public opentelemetry::ext::http::client::Response return status_code_; } + void Finish(opentelemetry::ext::http::client::EventHandler &callback) noexcept + { + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Created, ""); + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Connecting, ""); + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Connected, ""); + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Sending, ""); + + // let the otlp_http_client to continue + callback.OnResponse(*this); + + callback.OnEvent(opentelemetry::ext::http::client::SessionState::Response, ""); + } + public: Headers headers_; opentelemetry::ext::http::client::Body body_; diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 1b6d443c8a..6ec66a6f16 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -114,9 +114,9 @@ class BatchLogProcessor : public LogProcessor common::CircularBuffer buffer_; /* Important boolean flags to handle the workflow of the processor */ - std::atomic is_shutdown_{false}; - std::atomic is_force_flush_{false}; - std::atomic is_force_flush_notified_{false}; + std::atomic is_shutdown_; + std::atomic is_force_flush_; + std::atomic is_force_flush_notified_; /* The background worker thread */ std::thread worker_thread_; diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index d25ff2d950..d9486ac58f 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -145,9 +145,9 @@ class BatchSpanProcessor : public SpanProcessor common::CircularBuffer buffer_; /* Important boolean flags to handle the workflow of the processor */ - std::atomic is_shutdown_{false}; - std::atomic is_force_flush_{false}; - std::atomic is_force_flush_notified_{false}; + std::atomic is_shutdown_; + std::atomic is_force_flush_; + std::atomic is_force_flush_notified_; /* The background worker thread */ std::thread worker_thread_; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 9b20705b0a..59e5d29551 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -23,7 +23,11 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, max_export_batch_size_(max_export_batch_size), buffer_(max_queue_size_), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) -{} +{ + is_shutdown_.store(false); + is_force_flush_.store(false); + is_force_flush_notified_.store(false); +} std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept { @@ -44,7 +48,8 @@ void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept // If the queue gets at least half full a preemptive notification is // sent to the worker thread to start a new export cycle. - if (buffer_.size() >= max_queue_size_ / 2) + size_t buffer_size = buffer_.size(); + if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_) { // signal the worker thread cv_.notify_one(); @@ -58,25 +63,37 @@ bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept return false; } - is_force_flush_ = true; + // Now wait for the worker thread to signal back from the Export method + std::unique_lock lk(force_flush_cv_m_); - // Keep attempting to wake up the worker thread - while (is_force_flush_.load() == true) + is_force_flush_notified_.store(false, std::memory_order_release); + auto break_condition = [this]() { + if (is_shutdown_.load() == true) + { + return true; + } + + // Wake up the worker thread once. + if (is_force_flush_.exchange(true) == false) + { + cv_.notify_one(); + } + + return is_force_flush_notified_.load(std::memory_order_acquire); + }; + + // Fix timeout to meet requirement of wait_for + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout <= std::chrono::microseconds::zero()) { - cv_.notify_one(); + force_flush_cv_.wait(lk, break_condition); + return true; } - - // Now wait for the worker thread to signal back from the Export method - std::unique_lock lk(force_flush_cv_m_); - while (is_force_flush_notified_.load() == false) + else { - force_flush_cv_.wait(lk); + return force_flush_cv_.wait_for(lk, timeout, break_condition); } - - // Notify the worker thread - is_force_flush_notified_ = false; - - return true; } void BatchLogProcessor::DoBackgroundWork() @@ -91,20 +108,16 @@ void BatchLogProcessor::DoBackgroundWork() if (is_shutdown_.load() == true) { + // Break loop if another thread call ForceFlush + is_force_flush_ = false; DrainQueue(); return; } - bool was_force_flush_called = is_force_flush_.load(); + bool was_force_flush_called = is_force_flush_.exchange(false); // Check if this export was the result of a force flush. - if (was_force_flush_called == true) - { - // Since this export was the result of a force flush, signal the - // main thread that the worker thread has been notified - is_force_flush_ = false; - } - else + if (!was_force_flush_called) { // If the buffer was empty during the entire `timeout` time interval, // go back to waiting. If this was a spurious wake-up, we export only if @@ -128,40 +141,46 @@ void BatchLogProcessor::DoBackgroundWork() void BatchLogProcessor::Export(const bool was_force_flush_called) { - std::vector> records_arr; + do + { + std::vector> records_arr; + size_t num_records_to_export; - size_t num_records_to_export; + if (was_force_flush_called == true) + { + num_records_to_export = buffer_.size(); + } + else + { + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } - if (was_force_flush_called == true) - { - num_records_to_export = buffer_.size(); - } - else - { - num_records_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); - } + if (num_records_to_export == 0) + { + break; + } - buffer_.Consume(num_records_to_export, - [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - records_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; + buffer_.Consume(num_records_to_export, + [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); }); - }); - exporter_->Export( - nostd::span>(records_arr.data(), records_arr.size())); + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size())); + } while (was_force_flush_called); // Notify the main thread in case this export was the result of a force flush. if (was_force_flush_called == true) { - is_force_flush_notified_ = true; - while (is_force_flush_notified_.load() == true) + if (is_force_flush_notified_.exchange(true, std::memory_order_acq_rel) == false) { - force_flush_cv_.notify_one(); + force_flush_cv_.notify_all(); } } } @@ -176,6 +195,8 @@ void BatchLogProcessor::DrainQueue() bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { + auto start_time = std::chrono::system_clock::now(); + std::lock_guard shutdown_guard{shutdown_m_}; bool already_shutdown = is_shutdown_.exchange(true); @@ -185,10 +206,25 @@ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept worker_thread_.join(); } + auto worker_end_time = std::chrono::system_clock::now(); + auto offset = std::chrono::duration_cast(worker_end_time - start_time); + + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout > offset && timeout > std::chrono::microseconds::zero()) + { + timeout -= offset; + } + else + { + // Some module use zero as indefinite timeout.So we can not reset timeout to zero here + timeout = std::chrono::microseconds(1); + } + // Should only shutdown exporter ONCE. if (!already_shutdown && exporter_ != nullptr) { - return exporter_->Shutdown(); + return exporter_->Shutdown(timeout); } return true; diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index 0ab042b9ab..387ad43ce3 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -22,7 +22,11 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, max_export_batch_size_(options.max_export_batch_size), buffer_(max_queue_size_), worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this) -{} +{ + is_shutdown_.store(false); + is_force_flush_.store(false); + is_force_flush_notified_.store(false); +} std::unique_ptr BatchSpanProcessor::MakeRecordable() noexcept { @@ -48,7 +52,8 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr &&span) noexcept // If the queue gets at least half full a preemptive notification is // sent to the worker thread to start a new export cycle. - if (buffer_.size() >= max_queue_size_ / 2) + size_t buffer_size = buffer_.size(); + if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_) { // signal the worker thread cv_.notify_one(); @@ -62,25 +67,37 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept return false; } - is_force_flush_ = true; + // Now wait for the worker thread to signal back from the Export method + std::unique_lock lk(force_flush_cv_m_); - // Keep attempting to wake up the worker thread - while (is_force_flush_.load() == true) + is_force_flush_notified_.store(false, std::memory_order_release); + auto break_condition = [this]() { + if (is_shutdown_.load() == true) + { + return true; + } + + // Wake up the worker thread once. + if (is_force_flush_.exchange(true) == false) + { + cv_.notify_one(); + } + + return is_force_flush_notified_.load(std::memory_order_acquire); + }; + + // Fix timeout to meet requirement of wait_for + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout <= std::chrono::microseconds::zero()) { - cv_.notify_one(); + force_flush_cv_.wait(lk, break_condition); + return true; } - - // Now wait for the worker thread to signal back from the Export method - std::unique_lock lk(force_flush_cv_m_); - while (is_force_flush_notified_.load() == false) + else { - force_flush_cv_.wait(lk); + return force_flush_cv_.wait_for(lk, timeout, break_condition); } - - // Notify the worker thread - is_force_flush_notified_ = false; - - return true; } void BatchSpanProcessor::DoBackgroundWork() @@ -95,20 +112,16 @@ void BatchSpanProcessor::DoBackgroundWork() if (is_shutdown_.load() == true) { + // Break loop if another thread call ForceFlush + is_force_flush_ = false; DrainQueue(); return; } - bool was_force_flush_called = is_force_flush_.load(); + bool was_force_flush_called = is_force_flush_.exchange(false); // Check if this export was the result of a force flush. - if (was_force_flush_called == true) - { - // Since this export was the result of a force flush, signal the - // main thread that the worker thread has been notified - is_force_flush_ = false; - } - else + if (!was_force_flush_called) { // If the buffer was empty during the entire `timeout` time interval, // go back to waiting. If this was a spurious wake-up, we export only if @@ -133,39 +146,44 @@ void BatchSpanProcessor::DoBackgroundWork() void BatchSpanProcessor::Export(const bool was_force_flush_called) { - std::vector> spans_arr; - - size_t num_spans_to_export; - - if (was_force_flush_called == true) - { - num_spans_to_export = buffer_.size(); - } - else + do { - num_spans_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); - } + std::vector> spans_arr; + size_t num_spans_to_export; + + if (was_force_flush_called == true) + { + num_spans_to_export = buffer_.size(); + } + else + { + num_spans_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } - buffer_.Consume(num_spans_to_export, - [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - spans_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; + if (num_spans_to_export == 0) + { + break; + } + buffer_.Consume(num_spans_to_export, + [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + spans_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); }); - }); - exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); + exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); + } while (was_force_flush_called); // Notify the main thread in case this export was the result of a force flush. if (was_force_flush_called == true) { - is_force_flush_notified_ = true; - while (is_force_flush_notified_.load() == true) + if (is_force_flush_notified_.exchange(true, std::memory_order_acq_rel) == false) { - force_flush_cv_.notify_one(); + force_flush_cv_.notify_all(); } } } @@ -180,6 +198,7 @@ void BatchSpanProcessor::DrainQueue() bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { + auto start_time = std::chrono::system_clock::now(); std::lock_guard shutdown_guard{shutdown_m_}; bool already_shutdown = is_shutdown_.exchange(true); @@ -189,10 +208,26 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept worker_thread_.join(); } + auto worker_end_time = std::chrono::system_clock::now(); + auto offset = std::chrono::duration_cast(worker_end_time - start_time); + + // Fix timeout to meet requirement of wait_for + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + if (timeout > offset && timeout > std::chrono::microseconds::zero()) + { + timeout -= offset; + } + else + { + // Some module use zero as indefinite timeout.So we can not reset timeout to zero here + timeout = std::chrono::microseconds(1); + } + // Should only shutdown exporter ONCE. if (!already_shutdown && exporter_ != nullptr) { - return exporter_->Shutdown(); + return exporter_->Shutdown(timeout); } return true;