diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index e4cd7e0b9175..4e1ea746ac59 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -93,10 +93,7 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon } } - if (common_response.clear_route_cache()) { - clearRouteCache(common_response); - } - + clearRouteCache(common_response); onFinishProcessorCall(Grpc::Status::Ok); if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { @@ -324,10 +321,7 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { onFinishProcessorCall(Grpc::Status::FailedPrecondition); } - if (common_response.clear_route_cache()) { - clearRouteCache(common_response); - } - + clearRouteCache(common_response); headers_ = nullptr; if (send_trailers_ && trailers_available_) { @@ -365,20 +359,6 @@ absl::Status ProcessorState::handleTrailersResponse(const TrailersResponse& resp return absl::FailedPreconditionError("spurious message"); } -void ProcessorState::clearRouteCache(const CommonResponse& common_response) { - // Only clear the route cache if there is a mutation to the header and clearing is allowed. - if (filter_.config().disableClearRouteCache()) { - filter_.stats().clear_route_cache_disabled_.inc(); - ENVOY_LOG(debug, "NOT clearing route cache, it is disabled in the config"); - } else if (common_response.has_header_mutation()) { - ENVOY_LOG(debug, "clearing route cache"); - filter_callbacks_->downstreamCallbacks()->clearRouteCache(); - } else { - filter_.stats().clear_route_cache_ignored_.inc(); - ENVOY_LOG(debug, "NOT clearing route cache, no header mutations detected"); - } -} - void ProcessorState::enqueueStreamingChunk(Buffer::Instance& data, bool end_stream, bool delivered) { chunk_queue_.push(data, end_stream, delivered); @@ -440,6 +420,25 @@ void DecodingProcessorState::clearWatermark() { } } +void DecodingProcessorState::clearRouteCache(const CommonResponse& common_response) { + if (!common_response.clear_route_cache()) { + return; + } + // Only clear the route cache if there is a mutation to the header and clearing is allowed. + if (filter_.config().disableClearRouteCache()) { + filter_.stats().clear_route_cache_disabled_.inc(); + ENVOY_LOG(debug, "NOT clearing route cache, it is disabled in the config"); + return; + } + if (common_response.has_header_mutation()) { + ENVOY_LOG(debug, "clearing route cache"); + decoder_callbacks_->downstreamCallbacks()->clearRouteCache(); + return; + } + filter_.stats().clear_route_cache_ignored_.inc(); + ENVOY_LOG(debug, "NOT clearing route cache, no header mutations detected"); +} + void EncodingProcessorState::setProcessingModeInternal(const ProcessingMode& mode) { // Account for the different default behaviors of headers and trailers -- // headers are sent by default and trailers are not. diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 32bff6adabd0..c921cdb322c2 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -210,7 +210,7 @@ class ProcessorState : public Logger::Loggable { const envoy::config::core::v3::TrafficDirection traffic_direction_; private: - void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse& common_response); + virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} }; class DecodingProcessorState : public ProcessorState { @@ -280,6 +280,9 @@ class DecodingProcessorState : public ProcessorState { void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); + void + clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse& common_response) override; + Http::StreamDecoderFilterCallbacks* decoder_callbacks_{}; }; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_grpc_corpus/clusterfuzz-testcase-minimized-ext_proc_grpc_fuzz_test-4756668218736640.test b/test/extensions/filters/http/ext_proc/ext_proc_grpc_corpus/clusterfuzz-testcase-minimized-ext_proc_grpc_fuzz_test-4756668218736640.test new file mode 100644 index 000000000000..d065d4bb9763 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/ext_proc_grpc_corpus/clusterfuzz-testcase-minimized-ext_proc_grpc_fuzz_test-4756668218736640.test @@ -0,0 +1 @@ +ext_proc_data: "scterpc_cre:csp_yilooo0\n\000*!pV1:ae!FoFFF,F\n" diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 74f011c3a541..3fef3b403fa1 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -2086,4 +2086,55 @@ TEST_P(ExtProcIntegrationTest, GetAndSetHeadersAndTrailersWithHeaderScrubbing) { verifyDownstreamResponse(*response, 200); } +// Test clear route cache in both upstream and downstream header and body processing. +TEST_P(ExtProcIntegrationTest, GetAndSetBodyOnBothWithClearRouteCache) { + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED); + proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED); + initializeConfig(); + HttpIntegrationTest::initialize(); + + auto response = sendDownstreamRequestWithBody("Replace this!", absl::nullopt); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + auto* content_length = + headers_resp.mutable_response()->mutable_header_mutation()->add_set_headers(); + content_length->mutable_header()->set_key("content-length"); + content_length->mutable_header()->set_value("13"); + headers_resp.mutable_response()->set_clear_route_cache(true); + return true; + }); + processRequestBodyMessage( + *grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse& body_resp) { + EXPECT_TRUE(body.end_of_stream()); + auto* body_mut = body_resp.mutable_response()->mutable_body_mutation(); + body_mut->set_body("Hello, World!"); + body_resp.mutable_response()->set_clear_route_cache(true); + return true; + }); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + + processResponseHeadersMessage( + *grpc_upstreams_[0], false, [](const HttpHeaders&, HeadersResponse& headers_resp) { + headers_resp.mutable_response()->mutable_header_mutation()->add_remove_headers( + "content-length"); + headers_resp.mutable_response()->set_clear_route_cache(true); + return true; + }); + upstream_request_->encodeData(100, true); + processResponseBodyMessage( + *grpc_upstreams_[0], false, [](const HttpBody&, BodyResponse& body_resp) { + auto* header_mut = body_resp.mutable_response()->mutable_header_mutation(); + auto* header_add = header_mut->add_set_headers(); + header_add->mutable_header()->set_key("x-testing-response-header"); + header_add->mutable_header()->set_value("Yes"); + body_resp.mutable_response()->set_clear_route_cache(true); + return true; + }); + verifyDownstreamResponse(*response, 200); +} + } // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 3f54f761d261..3c77b8c99957 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -2064,7 +2064,8 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead } // Using the default configuration, verify that the "clear_route_cache" flag makes the appropriate -// callback on the filter when header modifications are also present. +// callback on the filter for inbound traffic when header modifications are also present. +// Also verify it does not make the callback for outbound traffic. TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) { initialize(R"EOF( grpc_service: @@ -2075,7 +2076,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) { )EOF"); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); - + // Call ClearRouteCache() for inbound traffic with header mutation. EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache()); processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& resp) { auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation(); @@ -2092,9 +2093,8 @@ TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) { Buffer::OwnedImpl buffered_response_data; setUpEncodingBuffering(buffered_response_data); + // There is no ClearRouteCache() call for outbound traffic. EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_data, true)); - - EXPECT_CALL(encoder_callbacks_.downstream_callbacks_, clearRouteCache()); processResponseBody([](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation(); auto* resp_add = resp_headers_mut->add_set_headers(); @@ -2105,6 +2105,8 @@ TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) { filter_->onDestroy(); + EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 0); + EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 0); EXPECT_EQ(config_->stats().streams_started_.value(), 1); EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 3); EXPECT_EQ(config_->stats().stream_msgs_received_.value(), 3); @@ -2112,7 +2114,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutation) { } // Verify that the "disable_route_cache_clearing" setting prevents the "clear_route_cache" flag -// from performing route clearing callbacks when enabled. +// from performing route clearing callbacks for inbound traffic when enabled. TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) { initialize(R"EOF( grpc_service: @@ -2123,6 +2125,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) { disable_clear_route_cache: true )EOF"); + // The ClearRouteCache() call is disabled for inbound traffic. EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& resp) { auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation(); @@ -2139,6 +2142,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) { Buffer::OwnedImpl buffered_response_data; setUpEncodingBuffering(buffered_response_data); + // There is no ClearRouteCache() call for outbound traffic regardless. EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_data, true)); processResponseBody([](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { auto* resp_headers_mut = resp.mutable_response()->mutable_header_mutation(); @@ -2150,8 +2154,8 @@ TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) { filter_->onDestroy(); + EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 1); EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 0); - EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 2); EXPECT_EQ(config_->stats().streams_started_.value(), 1); EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 3); EXPECT_EQ(config_->stats().stream_msgs_received_.value(), 3); @@ -2159,7 +2163,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheDisabledHeaderMutation) { } // Using the default configuration, verify that the "clear_route_cache" flag does not preform -// route clearing callbacks on the filter when no header changes are present. +// route clearing callbacks for inbound traffic when no header changes are present. TEST_F(HttpFilterTest, ClearRouteCacheUnchanged) { initialize(R"EOF( grpc_service: @@ -2169,6 +2173,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheUnchanged) { response_body_mode: "BUFFERED" )EOF"); + // Do not call ClearRouteCache() for inbound traffic without header mutation. EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); processRequestHeaders(false, [](const HttpHeaders&, ProcessingResponse&, HeadersResponse& resp) { resp.mutable_response()->set_clear_route_cache(true); @@ -2181,6 +2186,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheUnchanged) { Buffer::OwnedImpl buffered_response_data; setUpEncodingBuffering(buffered_response_data); + // There is no ClearRouteCache() call for outbound traffic regardless. EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_data, true)); processResponseBody([](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { resp.mutable_response()->set_clear_route_cache(true); @@ -2188,7 +2194,7 @@ TEST_F(HttpFilterTest, ClearRouteCacheUnchanged) { filter_->onDestroy(); - EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 2); + EXPECT_EQ(config_->stats().clear_route_cache_ignored_.value(), 1); EXPECT_EQ(config_->stats().clear_route_cache_disabled_.value(), 0); EXPECT_EQ(config_->stats().streams_started_.value(), 1); EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 3);