Skip to content

Commit

Permalink
Accept observer
Browse files Browse the repository at this point in the history
Summary:
Adds `wangle::AcceptObserver`, an observer that is notified when a connection is accepted.

- Can be used by instrumentation that ties its lifetime to that of the transport; the callback can be used to initialize instrumentation and install callbacks on accept (such as `AsyncTransport::LifecycleCallback`, D21613750) without requiring further changes to existing application / acceptor logic.

Since there were no existing tests around the base `wangle::Acceptor` logic, I added a few in addition to what I needed for testing `AcceptObserver`.

Reviewed By: mingtaoy

Differential Revision: D21652470

fbshipit-source-id: dd9cb652ff30d6d47f20f1aced221d2d1ef7a715
  • Loading branch information
bschlinker authored and facebook-github-bot committed Jul 16, 2020
1 parent 720329e commit 1eda2c3
Show file tree
Hide file tree
Showing 4 changed files with 515 additions and 3 deletions.
82 changes: 82 additions & 0 deletions wangle/acceptor/AcceptObserver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace folly {
class AsyncTransport;
}

namespace wangle {

class Acceptor;

/**
* Observer of events related to connection acceptance.
*
* This observer can be combined with AsyncTransport::LifecycleObserver and
* other observers to enable instrumentation to be installed when a connection
* is accepted. For instance, a sampling algorithm can be executed in accept()
* to sample and install instrumentation on a subset of connections.
*/
class AcceptObserver {
public:
virtual ~AcceptObserver() = default;

/**
* accept() is invoked after a connection has been accepted and an
* AsyncTransport has been instantiated to manage the socket fd / connection.
*
* @param transport Transport of accepted connection.
*/
virtual void accept(folly::AsyncTransport* transport) noexcept = 0;

/**
* ready() is invoked after a connection has been accepted and SSL
* handshakes (if any) have completed, right before onNewConnection is called.
*
* @param transport Transport of ready connection.
*/
virtual void ready(folly::AsyncTransport* transport) noexcept = 0;

/**
* acceptorDestroy() is invoked when the worker (acceptor) is destroyed.
*
* No further events will be invoked after acceptorDestroy().
*
* @param acceptor Acceptor that was destroyed.
*/
virtual void acceptorDestroy(Acceptor* acceptor) noexcept = 0;

/**
* observerAttached() is invoked when the observer is installed.
*
* @param acceptor Acceptor that the observer is attached to.
*/
virtual void observerAttach(Acceptor* acceptor) noexcept = 0;

/**
* observerDetached() is invoked if the observer is uninstalled prior to
* worker (acceptor) destruction.
*
* No further events will be invoked after observerDetached().
*
* @param acceptor Acceptor that the observer was removed from.
*/
virtual void observerDetach(Acceptor* acceptor) noexcept = 0;
};

} // namespace wangle
42 changes: 41 additions & 1 deletion wangle/acceptor/Acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <wangle/acceptor/AcceptorHandshakeManager.h>
#include <wangle/acceptor/FizzConfigUtil.h>
#include <wangle/acceptor/ManagedConnection.h>
#include <wangle/acceptor/AcceptObserver.h>
#include <wangle/acceptor/SecurityProtocolContextManager.h>
#include <wangle/ssl/SSLContextManager.h>

Expand All @@ -47,7 +48,9 @@ static const std::string empty_string;
std::atomic<uint64_t> Acceptor::totalNumPendingSSLConns_{0};

Acceptor::Acceptor(const ServerSocketConfig& accConfig)
: accConfig_(accConfig), socketOptions_(accConfig.getSocketOptions()) {}
: accConfig_(accConfig),
socketOptions_(accConfig.getSocketOptions()),
observerList_(this) {}

void Acceptor::init(
AsyncServerSocket* serverSocket,
Expand Down Expand Up @@ -288,13 +291,19 @@ void Acceptor::processEstablishedConnection(
}

tinfo.tfoSucceded = sslSock->getTFOSucceded();
for (const auto& cb : observerList_.getAll()) {
cb->accept(sslSock.get());
}
startHandshakeManager(
std::move(sslSock), this, clientAddr, acceptTime, tinfo);
} else {
tinfo.secure = false;
tinfo.acceptTime = acceptTime;
AsyncSocket::UniquePtr sock(makeNewAsyncSocket(base_, fd));
tinfo.tfoSucceded = sock->getTFOSucceded();
for (const auto& cb : observerList_.getAll()) {
cb->accept(sock.get());
}
plaintextConnectionReady(
std::move(sock),
clientAddr,
Expand Down Expand Up @@ -329,6 +338,9 @@ void Acceptor::connectionReady(
tinfo.initWithSocket(asyncSocket);
tinfo.appProtocol = std::make_shared<std::string>(nextProtocolName);
if (state_ < State::kDraining) {
for (const auto& cb : observerList_.getAll()) {
cb->ready(sock.get());
}
onNewConnection(
std::move(sock),
&clientAddr,
Expand Down Expand Up @@ -480,4 +492,32 @@ void Acceptor::dropConnections(double pctToDrop) {
});
}

Acceptor::AcceptObserverList::AcceptObserverList(Acceptor* acceptor)
: acceptor_(acceptor) {}

Acceptor::AcceptObserverList::~AcceptObserverList() {
for (const auto& cb : observers_) {
cb->acceptorDestroy(acceptor_);
}
}

void Acceptor::AcceptObserverList::add(AcceptObserver* observer) {
observers_.emplace_back(observer);
observer->observerAttach(acceptor_);
}

bool Acceptor::AcceptObserverList::remove(AcceptObserver* observer) {
const auto eraseIt =
std::remove(observers_.begin(), observers_.end(), observer);
if (eraseIt == observers_.end()) {
return false;
}

for (auto it = eraseIt; it != observers_.end(); it++) {
(*it)->observerDetach(acceptor_);
}
observers_.erase(eraseIt, observers_.end());
return true;
}

} // namespace wangle
60 changes: 58 additions & 2 deletions wangle/acceptor/Acceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

namespace wangle {

class AcceptObserver;
class ManagedConnection;
class SecurityProtocolContextManager;
class SSLContextManager;
Expand Down Expand Up @@ -332,6 +333,28 @@ class Acceptor : public folly::AsyncServerSocket::AcceptCallback,
SSLErrorEnum /*error*/,
const folly::exception_wrapper& /*ex*/) noexcept {}

/**
* Adds observer for accept events.
*
* Can be used to install socket observers and instrumentation without
* changing / interfering with application-specific acceptor logic.
*
* @param observer Observer to add (implements AcceptObserver).
*/
void addAcceptObserver(AcceptObserver* observer) {
observerList_.add(observer);
}

/**
* Remove observer for accept events.
*
* @param observer Observer to remove.
* @return Whether observer found and removed from list.
*/
bool removeAcceptObserver(AcceptObserver* observer) {
return observerList_.remove(observer);
}

protected:
using OnDataAvailableParams =
folly::AsyncUDPSocket::ReadCallback::OnDataAvailableParams;
Expand Down Expand Up @@ -403,7 +426,6 @@ class Acceptor : public folly::AsyncServerSocket::AcceptCallback,
true /* defer the security negotiation until sslAccept */));
}

protected:
/**
* onConnectionsDrained() will be called once all connections have been
* drained while the acceptor is stopping.
Expand All @@ -425,7 +447,6 @@ class Acceptor : public folly::AsyncServerSocket::AcceptCallback,
void onConnectionAdded(const ManagedConnection*) override {}
void onConnectionRemoved(const ManagedConnection*) override {}

protected:
const ServerSocketConfig accConfig_;

// Helper function to initialize downstreamConnectionManager_
Expand Down Expand Up @@ -480,6 +501,41 @@ class Acceptor : public folly::AsyncServerSocket::AcceptCallback,

std::shared_ptr<const fizz::server::FizzServerContext> recreateFizzContext(
const std::shared_ptr<fizz::server::CertManager>& fizzCertManager);

// Wrapper around list of AcceptObservers to handle cleanup on destruction
class AcceptObserverList {
public:
explicit AcceptObserverList(Acceptor* acceptor);

/**
* Destructor, triggers observerDetach for any attached observers.
*/
~AcceptObserverList();

/**
* Add observer and trigger observerAttach.
*/
void add(AcceptObserver* observer);

/**
* Remove observer and trigger observerDetach.
*/
bool remove(AcceptObserver* observer);

/**
* Get reference to vector containing observers.
*/
const std::vector<AcceptObserver*>& getAll() const {
return observers_;
}

private:
Acceptor* acceptor_{nullptr};
std::vector<AcceptObserver*> observers_;
};

// List of AcceptObservers
AcceptObserverList observerList_;
};

class AcceptorFactory {
Expand Down
Loading

0 comments on commit 1eda2c3

Please sign in to comment.