Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clusterinfo consistency #4600

Merged
merged 9 commits into from
Oct 8, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "envoy/router/router.h"
#include "envoy/ssl/connection.h"
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/upstream.h"

namespace Envoy {
namespace Http {
Expand Down Expand Up @@ -111,6 +112,13 @@ class StreamFilterCallbacks {
*/
virtual Router::RouteConstSharedPtr route() PURE;

/**
* Returns the clusterInfo for the cached route.
* This method is to avoid multiple look ups in the filter chain, it also provides a consistent
* view of clusterInfo after a route is picked/repicked.
* NOTE: Cached clusterInfo and route will be updated the same time.
*/
virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline after this line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Thought fix_format would catch this.

/**
* Clears the route cache for the current request. This must be called when a filter has modified
* the headers in a way that would affect routing.
Expand Down
6 changes: 3 additions & 3 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const AsyncStreamImpl::NullPathMatchCriterion
AsyncStreamImpl::RouteEntryImpl::path_match_criterion_;
const std::list<LowerCaseString> AsyncStreamImpl::NullConfig::internal_only_headers_;

AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher,
AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
Stats::Store& stats_store, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info,
Upstream::ClusterManager& cm, Runtime::Loader& runtime,
Runtime::RandomGenerator& random,
Expand Down Expand Up @@ -80,7 +80,7 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
router_(parent.config_), stream_info_(Protocol::Http11, parent.dispatcher().timeSystem()),
tracing_config_(Tracing::EgressConfig::get()),
route_(std::make_shared<RouteImpl>(parent_.cluster_.name(), timeout)) {
route_(std::make_shared<RouteImpl>(parent_.cluster_->name(), timeout)) {
if (buffer_body_for_retry) {
buffered_body_.reset(new Buffer::OwnedImpl());
}
Expand Down
6 changes: 4 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "envoy/router/shadow_writer.h"
#include "envoy/ssl/connection.h"
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/upstream.h"

#include "common/common/empty_string.h"
#include "common/common/linked_object.h"
Expand All @@ -35,7 +36,7 @@ class AsyncRequestImpl;

class AsyncClientImpl final : public AsyncClient {
public:
AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info,
Upstream::ClusterManager& cm, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, Router::ShadowWriterPtr&& shadow_writer);
Expand All @@ -52,7 +53,7 @@ class AsyncClientImpl final : public AsyncClient {
Event::Dispatcher& dispatcher() override { return dispatcher_; }

private:
const Upstream::ClusterInfo& cluster_;
Upstream::ClusterInfoConstSharedPtr cluster_;
Router::FilterConfig config_;
Event::Dispatcher& dispatcher_;
std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
Expand Down Expand Up @@ -267,6 +268,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
Event::Dispatcher& dispatcher() override { return parent_.dispatcher_; }
void resetStream() override;
Router::RouteConstSharedPtr route() override { return route_; }
Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; }
void clearRouteCache() override {}
uint64_t streamId() override { return stream_id_; }
StreamInfo::StreamInfo& streamInfo() override { return stream_info_; }
Expand Down
19 changes: 18 additions & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,13 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_);
stream_info_.route_entry_ = route ? route->routeEntry() : nullptr;
cached_route_ = std::move(route);
if (nullptr == stream_info_.route_entry_) {
cached_cluster_info_ = nullptr;
} else {
Upstream::ThreadLocalCluster* local_cluster =
connection_manager_.cluster_manager_.get(stream_info_.route_entry_->clusterName());
cached_cluster_info_ = (nullptr == local_cluster) ? nullptr : local_cluster->info();
}
}

void ConnectionManagerImpl::ActiveStream::sendLocalReply(
Expand Down Expand Up @@ -1477,8 +1484,17 @@ Tracing::Span& ConnectionManagerImpl::ActiveStreamFilterBase::activeSpan() {

Tracing::Config& ConnectionManagerImpl::ActiveStreamFilterBase::tracingConfig() { return parent_; }

Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::clusterInfo() {
// NOTE: Refreshing route caches clusterInfo as well.
if (!parent_.cached_route_.has_value()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary as we don't do this for the route call. The HCM will initialize the cache before calling any filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we did the same for route(), see route() right below?
I think the cached_cluster_info has the same position as the cached_route. Say after an entity clearRouteCache and a fitler wants to access clusterInfo without knowing that, that could be a problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry, makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

parent_.refreshCachedRoute();
}

return parent_.cached_cluster_info_.value();
}

Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route() {
if (!parent_.cached_route_) {
if (!parent_.cached_route_.has_value()) {
parent_.refreshCachedRoute();
}

Expand All @@ -1487,6 +1503,7 @@ Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route

void ConnectionManagerImpl::ActiveStreamFilterBase::clearRouteCache() {
parent_.cached_route_ = absl::optional<Router::RouteConstSharedPtr>();
parent_.cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>();
}

Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamDecoderFilter::createBuffer() {
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
Event::Dispatcher& dispatcher() override;
void resetStream() override;
Router::RouteConstSharedPtr route() override;
Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
void clearRouteCache() override;
uint64_t streamId() override;
StreamInfo::StreamInfo& streamInfo() override;
Expand Down Expand Up @@ -403,6 +404,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
State state_;
StreamInfo::StreamInfoImpl stream_info_;
absl::optional<Router::RouteConstSharedPtr> cached_route_;
absl::optional<Upstream::ClusterInfoConstSharedPtr> cached_cluster_info_;
DownstreamWatermarkCallbacks* watermark_callbacks_{nullptr};
uint32_t buffer_limit_{0};
uint32_t high_watermark_count_{0};
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
const LoadBalancerFactorySharedPtr& lb_factory)
: parent_(parent), lb_factory_(lb_factory), cluster_info_(cluster),
http_async_client_(*cluster, parent.parent_.stats_, parent.thread_local_dispatcher_,
http_async_client_(cluster, parent.parent_.stats_, parent.thread_local_dispatcher_,
parent.parent_.local_info_, parent.parent_, parent.parent_.runtime_,
parent.parent_.random_,
Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent.parent_)}) {
Expand Down
2 changes: 1 addition & 1 deletion test/common/grpc/grpc_client_integration_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
EXPECT_CALL(cm_, httpConnPoolForCluster(_, _, _, _))
.WillRepeatedly(Return(http_conn_pool_.get()));
http_async_client_ = std::make_unique<Http::AsyncClientImpl>(
*cluster_info_ptr_, *stats_store_, dispatcher_, local_info_, cm_, runtime_, random_,
cluster_info_ptr_, *stats_store_, dispatcher_, local_info_, cm_, runtime_, random_,
std::move(shadow_writer_ptr_));
EXPECT_CALL(cm_, httpAsyncClientForCluster(fake_cluster_name_))
.WillRepeatedly(ReturnRef(*http_async_client_));
Expand Down
5 changes: 4 additions & 1 deletion test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace {
class AsyncClientImplTest : public testing::Test {
public:
AsyncClientImplTest()
: client_(*cm_.thread_local_cluster_.cluster_.info_, stats_store_, dispatcher_, local_info_,
: client_(cm_.thread_local_cluster_.cluster_.info_, stats_store_, dispatcher_, local_info_,
cm_, runtime_, random_,
Router::ShadowWriterPtr{new NiceMock<Router::MockShadowWriter>()}) {
message_->headers().insertMethod().value(std::string("GET"));
Expand Down Expand Up @@ -909,6 +909,9 @@ TEST_F(AsyncClientImplTest, RdsGettersTest) {
EXPECT_EQ("", route_config.name());
EXPECT_EQ(0, route_config.internalOnlyHeaders().size());
EXPECT_EQ(nullptr, route_config.route(headers, 0));
auto cluster_info = filter_callbacks->clusterInfo();
ASSERT_NE(nullptr, cluster_info);
EXPECT_EQ(cm_.thread_local_cluster_.cluster_.info_, cluster_info);
EXPECT_CALL(stream_callbacks_, onReset());
}

Expand Down
21 changes: 18 additions & 3 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "test/mocks/server/mocks.h"
#include "test/mocks/ssl/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_info.h"
#include "test/mocks/upstream/mocks.h"
#include "test/test_common/printers.h"
#include "test/test_common/test_time.h"
Expand Down Expand Up @@ -1507,7 +1508,7 @@ TEST_F(HttpConnectionManagerImplTest, AllowNonWebSocketOnWebSocketRoute) {
TEST_F(HttpConnectionManagerImplTest, WebSocketNoThreadLocalCluster) {
setup(false, "");

EXPECT_CALL(cluster_manager_, get(_)).WillOnce(Return(nullptr));
EXPECT_CALL(cluster_manager_, get(_)).Times(2).WillRepeatedly(Return(nullptr));
expectOnUpstreamInitFailure();
EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_active_.value());
EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_total_.value());
Expand Down Expand Up @@ -2639,9 +2640,19 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) {
}));

setupFilterChain(3, 2);
const std::string fake_cluster1_name = "fake_cluster1";
const std::string fake_cluster2_name = "fake_cluster2";

Router::RouteConstSharedPtr route1 = std::make_shared<NiceMock<Router::MockRoute>>();
Router::RouteConstSharedPtr route2 = std::make_shared<NiceMock<Router::MockRoute>>();
std::shared_ptr<Upstream::MockThreadLocalCluster> fake_cluster1 =
std::make_shared<NiceMock<Upstream::MockThreadLocalCluster>>();
EXPECT_CALL(cluster_manager_, get(_))
.WillOnce(Return(fake_cluster1.get()))
.WillOnce(Return(nullptr));

std::shared_ptr<Router::MockRoute> route1 = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(route1->route_entry_, clusterName()).WillRepeatedly(ReturnRef(fake_cluster1_name));
std::shared_ptr<Router::MockRoute> route2 = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(route2->route_entry_, clusterName()).WillRepeatedly(ReturnRef(fake_cluster2_name));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _))
.WillOnce(Return(route1))
Expand All @@ -2652,18 +2663,22 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) {
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
EXPECT_EQ(route1, decoder_filters_[0]->callbacks_->route());
EXPECT_EQ(route1->routeEntry(), decoder_filters_[0]->callbacks_->streamInfo().routeEntry());
EXPECT_EQ(fake_cluster1->info(), decoder_filters_[0]->callbacks_->clusterInfo());
decoder_filters_[0]->callbacks_->clearRouteCache();
return FilterHeadersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
EXPECT_EQ(route2, decoder_filters_[1]->callbacks_->route());
EXPECT_EQ(route2->routeEntry(), decoder_filters_[1]->callbacks_->streamInfo().routeEntry());
// RDS & CDS consistency problem: route2 points to fake_cluster2, which doesn't exist.
EXPECT_EQ(nullptr, decoder_filters_[1]->callbacks_->clusterInfo());
decoder_filters_[1]->callbacks_->clearRouteCache();
return FilterHeadersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[2], decodeHeaders(_, true))
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->clusterInfo());
EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->route());
EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->streamInfo().routeEntry());
return FilterHeadersStatus::StopIteration;
Expand Down
5 changes: 3 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,9 @@ TEST_F(ClusterManagerImplTest, ShutdownOrder) {
EXPECT_EQ(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0],
cluster_manager_->get("cluster_1")->loadBalancer().chooseHost(nullptr));

// Local reference, primary reference, thread local reference, host reference.
EXPECT_EQ(4U, cluster.info().use_count());
// Local reference, primary reference, thread local reference, host reference, async client
// reference.
EXPECT_EQ(5U, cluster.info().use_count());

// Thread local reference should be gone.
factory_.tls_.shutdownThread();
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ MockFilterChainFactory::MockFilterChainFactory() {}
MockFilterChainFactory::~MockFilterChainFactory() {}

template <class T> static void initializeMockStreamFilterCallbacks(T& callbacks) {
callbacks.cluster_info_.reset(new NiceMock<Upstream::MockClusterInfo>());
callbacks.route_.reset(new NiceMock<Router::MockRoute>());
ON_CALL(callbacks, dispatcher()).WillByDefault(ReturnRef(callbacks.dispatcher_));
ON_CALL(callbacks, streamInfo()).WillByDefault(ReturnRef(callbacks.stream_info_));
ON_CALL(callbacks, clusterInfo()).WillByDefault(Return(callbacks.cluster_info_));
ON_CALL(callbacks, route()).WillByDefault(Return(callbacks.route_));
}

Expand Down
4 changes: 4 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "test/mocks/router/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_info.h"
#include "test/mocks/upstream/host.h"
#include "test/test_common/printers.h"

Expand Down Expand Up @@ -177,6 +178,7 @@ class MockStreamFilterCallbacksBase {
Event::MockDispatcher dispatcher_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;
std::shared_ptr<Router::MockRoute> route_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_info_;
};

class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
Expand All @@ -189,6 +191,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
MOCK_METHOD0(connection, const Network::Connection*());
MOCK_METHOD0(dispatcher, Event::Dispatcher&());
MOCK_METHOD0(resetStream, void());
MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr());
MOCK_METHOD0(route, Router::RouteConstSharedPtr());
MOCK_METHOD0(clearRouteCache, void());
MOCK_METHOD0(streamId, uint64_t());
Expand Down Expand Up @@ -252,6 +255,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks,
MOCK_METHOD0(connection, const Network::Connection*());
MOCK_METHOD0(dispatcher, Event::Dispatcher&());
MOCK_METHOD0(resetStream, void());
MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr());
MOCK_METHOD0(route, Router::RouteConstSharedPtr());
MOCK_METHOD0(clearRouteCache, void());
MOCK_METHOD0(streamId, uint64_t());
Expand Down