Skip to content

Commit

Permalink
core: add network-specific clusters to support OS preference (#443)
Browse files Browse the repository at this point in the history
Description: Consumes OS network preference signals to classify connections based on when they're created. New streams will be opened on connections grouped under the current network indication. Note that sockets are still opened by syscalls and so it is simply assumed they are attached to the appropriate network interface.
Risk Level: Moderate
Testing: Local testing

Co-authored-by: Jose Nino <[email protected]>
Signed-off-by: Mike Schore <[email protected]>
Signed-off-by: JP Simard <[email protected]>
  • Loading branch information
2 people authored and jpsim committed Nov 28, 2022
1 parent 63276b4 commit 1657f93
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 15 deletions.
42 changes: 41 additions & 1 deletion mobile/library/common/config_template.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
const char* config_template = R"(
static_resources:
clusters:
- name: base
- name: base # Note: the direct API depends on the existence of a cluster with this name.
connect_timeout: {{ connect_timeout_seconds }}s
dns_refresh_rate: {{ dns_refresh_rate_seconds }}s
dns_lookup_family: V4_ONLY
Expand All @@ -28,6 +28,46 @@ R"(
- {{ domain }}
sni: {{ domain }}
type: LOGICAL_DNS
- name: base_wlan # Note: the direct API depends on the existence of a cluster with this name.
connect_timeout: {{ connect_timeout_seconds }}s
dns_refresh_rate: {{ dns_refresh_rate_seconds }}s
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: base_wlan
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address: {address: {{ domain }}, port_value: 443}
tls_context:
common_tls_context:
validation_context:
trusted_ca: *trusted_ca
verify_subject_alt_name:
- {{ domain }}
sni: {{ domain }}
type: LOGICAL_DNS
- name: base_wwan # Note: the direct API depends on the existence of a cluster with this name.
connect_timeout: {{ connect_timeout_seconds }}s
dns_refresh_rate: {{ dns_refresh_rate_seconds }}s
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: base_wwan
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address: {address: {{ domain }}, port_value: 443}
tls_context:
common_tls_context:
validation_context:
trusted_ca: *trusted_ca
verify_subject_alt_name:
- {{ domain }}
sni: {{ domain }}
type: LOGICAL_DNS
stats_flush_interval: {{ stats_flush_interval_seconds }}s
watchdog:
megamiss_timeout: 60s
Expand Down
28 changes: 24 additions & 4 deletions mobile/library/common/http/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,16 @@ void Dispatcher::post(Event::PostCb callback) {
init_queue_.push_back(callback);
}

Dispatcher::Dispatcher(std::atomic<envoy_network_t>& preferred_network)
: preferred_network_(preferred_network) {}

envoy_status_t Dispatcher::startStream(envoy_stream_t new_stream_handle,
envoy_http_callbacks bridge_callbacks) {
post([this, bridge_callbacks, new_stream_handle]() -> void {
DirectStreamCallbacksPtr callbacks =
std::make_unique<DirectStreamCallbacks>(new_stream_handle, bridge_callbacks, *this);
// The dispatch_lock_ does not need to guard the cluster_manager_ pointer here because this
// functor is only executed once the init_queue_ has been flushed to Envoy's event dispatcher.
AsyncClient& async_client =
TS_UNCHECKED_READ(cluster_manager_)->httpAsyncClientForCluster("base");

AsyncClient& async_client = getClient();
AsyncClient::Stream* underlying_stream = async_client.start(*callbacks, {});

if (!underlying_stream) {
Expand Down Expand Up @@ -195,6 +196,25 @@ envoy_status_t Dispatcher::resetStream(envoy_stream_t stream) {
return ENVOY_SUCCESS;
}

// Select the client based on the current preferred network. This helps to ensure that
// the engine uses connections opened on the current favored interface.
AsyncClient& Dispatcher::getClient() {
// This function must be called from the dispatcher's own thread and so this state
// is safe to access without holding the dispatch_lock_.
ASSERT(TS_UNCHECKED_READ(event_dispatcher_)->isThreadSafe(),
"cluster interaction must be performed on the event_dispatcher_'s thread.");
switch (preferred_network_.load()) {
case ENVOY_NET_WLAN:
// The ASSERT above ensures the cluster_manager_ is safe to access.
return TS_UNCHECKED_READ(cluster_manager_)->httpAsyncClientForCluster("base_wlan");
case ENVOY_NET_WWAN:
return TS_UNCHECKED_READ(cluster_manager_)->httpAsyncClientForCluster("base_wwan");
case ENVOY_NET_GENERIC:
default:
return TS_UNCHECKED_READ(cluster_manager_)->httpAsyncClientForCluster("base");
}
}

Dispatcher::DirectStream* Dispatcher::getStream(envoy_stream_t stream) {
// The dispatch_lock_ does not need to guard the event_dispatcher_ pointer here because this
// function should only be called from the context of Envoy's event dispatcher.
Expand Down
7 changes: 6 additions & 1 deletion mobile/library/common/http/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ namespace Http {
*/
class Dispatcher : public Logger::Loggable<Logger::Id::http> {
public:
Dispatcher(std::atomic<envoy_network_t>& preferred_network);

void ready(Event::Dispatcher& event_dispatcher, Upstream::ClusterManager& cluster_manager);

/**
* Attempts to open a new stream to the remote. Note that this function is asynchronous and
* opening a stream may fail. The returned handle is immediately valid for use with this API, but
* there is no guarantee it will ever functionally represent an open stream.
* @param bridge_callbacks wrapper for callbacks for events on this stream.
* @param stream, the stream to start.
* @param bridge_callbacks, wrapper for callbacks for events on this stream.
* @return envoy_stream_t handle to the stream being created.
*/
envoy_status_t startStream(envoy_stream_t stream, envoy_http_callbacks bridge_callbacks);
Expand Down Expand Up @@ -138,6 +141,7 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
void post(Event::PostCb callback);
// Everything in the below interface must only be accessed from the event_dispatcher's thread.
// This allows us to generally avoid synchronization.
AsyncClient& getClient();
DirectStream* getStream(envoy_stream_t stream_handle);
void cleanup(envoy_stream_t stream_handle);

Expand All @@ -148,6 +152,7 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
Event::Dispatcher* event_dispatcher_ GUARDED_BY(dispatch_lock_);
Upstream::ClusterManager* cluster_manager_ GUARDED_BY(dispatch_lock_);
std::unordered_map<envoy_stream_t, DirectStreamPtr> streams_;
std::atomic<envoy_network_t>& preferred_network_;
};

} // namespace Http
Expand Down
8 changes: 6 additions & 2 deletions mobile/library/common/main_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ static std::unique_ptr<Envoy::MainCommon> main_common_;
static std::unique_ptr<Envoy::Http::Dispatcher> http_dispatcher_;
static Envoy::Server::ServerLifecycleNotifier::HandlePtr stageone_callback_handler_;
static std::atomic<envoy_stream_t> current_stream_handle_{0};
static std::atomic<envoy_network_t> preferred_network_{ENVOY_NET_GENERIC};

envoy_stream_t init_stream(envoy_engine_t) { return current_stream_handle_++; }

Expand All @@ -52,13 +53,16 @@ envoy_status_t send_trailers(envoy_stream_t stream, envoy_headers trailers) {
envoy_status_t reset_stream(envoy_stream_t stream) { return http_dispatcher_->resetStream(stream); }

envoy_engine_t init_engine() {
http_dispatcher_ = std::make_unique<Envoy::Http::Dispatcher>();
http_dispatcher_ = std::make_unique<Envoy::Http::Dispatcher>(preferred_network_);
// TODO(goaway): return new handle once multiple engine support is in place.
// https://github.com/lyft/envoy-mobile/issues/332
return 1;
}

envoy_status_t set_preferred_network(envoy_network_t) { return ENVOY_SUCCESS; }
envoy_status_t set_preferred_network(envoy_network_t network) {
preferred_network_.store(network);
return ENVOY_SUCCESS;
}

/**
* External entrypoint for library.
Expand Down
4 changes: 2 additions & 2 deletions mobile/library/common/main_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ envoy_engine_t init_engine();
/**
* Update the network interface to the preferred network for opening new streams.
* Note that this state is shared by all engines.
* @param network_interface, the interface to be preferred for new sockets.
* @param network, the network to be preferred for new streams.
* @return envoy_status_t, the resulting status of the operation.
*/
envoy_status_t set_preferred_network(envoy_network_t network_interface);
envoy_status_t set_preferred_network(envoy_network_t network);

/**
* External entry point for library.
Expand Down
8 changes: 4 additions & 4 deletions mobile/library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ typedef enum { ENVOY_UNDEFINED_ERROR, ENVOY_STREAM_RESET } envoy_error_code_t;

/**
* Networks classified by last physical link.
* ENVOY_GENERIC is default and includes cases where network characteristics are unknown.
* ENVOY_WLAN includes WiFi and other local area wireless networks.
* ENVOY_WWAN includes all mobile phone networks.
* ENVOY_NET_GENERIC is default and includes cases where network characteristics are unknown.
* ENVOY_NET_WLAN includes WiFi and other local area wireless networks.
* ENVOY_NET_WWAN includes all mobile phone networks.
*/
typedef enum { ENVOY_GENERIC, ENVOY_WLAN, ENVOY_WWAN } envoy_network_t;
typedef enum { ENVOY_NET_GENERIC, ENVOY_NET_WLAN, ENVOY_NET_WWAN } envoy_network_t;

#ifdef __cplusplus
extern "C" { // release function
Expand Down
19 changes: 18 additions & 1 deletion mobile/test/common/http/dispatcher_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <atomic>

#include "common/buffer/buffer_impl.h"
#include "common/http/async_client_impl.h"
#include "common/http/context_impl.h"
Expand Down Expand Up @@ -58,9 +60,10 @@ class DispatcherTest : public testing::Test {
NiceMock<LocalInfo::MockLocalInfo> local_info_;
Http::ContextImpl http_context_;
AsyncClientImpl client_;
Dispatcher http_dispatcher_;
envoy_http_callbacks bridge_callbacks_;
NiceMock<StreamInfo::MockStreamInfo> stream_info_;
std::atomic<envoy_network_t> preferred_network_{ENVOY_NET_GENERIC};
Dispatcher http_dispatcher_{preferred_network_};
};

TEST_F(DispatcherTest, BasicStreamHeadersOnly) {
Expand Down Expand Up @@ -97,6 +100,7 @@ TEST_F(DispatcherTest, BasicStreamHeadersOnly) {
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -179,6 +183,7 @@ TEST_F(DispatcherTest, BasicStream) {
envoy_data c_data = Buffer::Utility::toBridgeData(request_data);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -242,6 +247,7 @@ TEST_F(DispatcherTest, ResetStream) {
cc->on_complete_calls++;
};

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -298,6 +304,7 @@ TEST_F(DispatcherTest, MultipleStreams) {
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -350,6 +357,7 @@ TEST_F(DispatcherTest, MultipleStreams) {
envoy_headers c_headers2 = Utility::toBridgeHeaders(headers2);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -424,6 +432,7 @@ TEST_F(DispatcherTest, LocalResetAfterStreamStart) {
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -497,6 +506,7 @@ TEST_F(DispatcherTest, RemoteResetAfterStreamStart) {
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -541,6 +551,7 @@ TEST_F(DispatcherTest, DestroyWithActiveStream) {
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions())));
Expand Down Expand Up @@ -571,6 +582,7 @@ TEST_F(DispatcherTest, ResetInOnHeaders) {
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions())));
Expand Down Expand Up @@ -610,6 +622,7 @@ TEST_F(DispatcherTest, StreamTimeout) {
HttpTestUtility::addDefaultHeaders(headers);
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout(
Expand Down Expand Up @@ -658,6 +671,7 @@ TEST_F(DispatcherTest, StreamTimeoutHeadReply) {
HttpTestUtility::addDefaultHeaders(headers, "HEAD");
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout(
Expand Down Expand Up @@ -696,6 +710,7 @@ TEST_F(DispatcherTest, DisableTimerWithStream) {
HttpTestUtility::addDefaultHeaders(headers, "HEAD");
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(Return(client_.start(stream_callbacks_, AsyncClient::StreamOptions().setTimeout(
Expand Down Expand Up @@ -770,6 +785,7 @@ TEST_F(DispatcherTest, MultipleDataStream) {
envoy_data c_data2 = Buffer::Utility::toBridgeData(request_data2);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down Expand Up @@ -868,6 +884,7 @@ TEST_F(DispatcherTest, StreamResetAfterOnComplete) {
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
Expand Down

0 comments on commit 1657f93

Please sign in to comment.