diff --git a/fbmeshd.cmake b/fbmeshd.cmake index 45cd10d7f47..764ef10b17d 100644 --- a/fbmeshd.cmake +++ b/fbmeshd.cmake @@ -82,7 +82,6 @@ add_executable(fbmeshd openr/fbmeshd/routing/Routing.cpp openr/fbmeshd/routing/SyncRoutes80211s.cpp openr/fbmeshd/routing/UDPRoutingPacketTransport.cpp - openr/fbmeshd/separa/Separa.cpp $ ) diff --git a/openr/fbmeshd/main.cpp b/openr/fbmeshd/main.cpp index 8a16669cb24..7d8cf23f931 100644 --- a/openr/fbmeshd/main.cpp +++ b/openr/fbmeshd/main.cpp @@ -39,7 +39,6 @@ #include #include #include -#include #include using namespace openr::fbmeshd; @@ -86,26 +85,27 @@ DEFINE_bool( DEFINE_bool( enable_separa, false, - "If set, Separa algorithm will be enabled to manage mesh partitions. " - "Implies enable_separa_broadcasts=true"); + "DEPRECATED; TODO: delete after Separa is disabled on all meshes"); DEFINE_int32( - separa_hello_port, 6667, "The port used to send separa hello packets"); + separa_hello_port, + 6667, + "DEPRECATED; TODO: delete after Separa is disabled on all meshes"); DEFINE_int32( separa_broadcast_interval_s, 1, - "how often to send separa broadcasts in seconds"); + "DEPRECATED; TODO: delete after Separa is disabled on all meshes"); DEFINE_int32( separa_domain_lockdown_period_s, 60, - "how long to lockdown domains in seconds"); + "DEPRECATED; TODO: delete after Separa is disabled on all meshes"); DEFINE_double( separa_domain_change_threshold_factor, 1, - "threshold factor for doing a separa domain change"); + "DEPRECATED; TODO: delete after Separa is disabled on all meshes"); DEFINE_bool( enable_separa_broadcasts, true, - "If set, Separa broadcasts will be enabled"); + "DEPRECATED; TODO: delete after Separa is disabled on all meshes"); DEFINE_int32( decision_rep_port, @@ -358,7 +358,6 @@ main(int argc, char* argv[]) { FLAGS_mesh_ifname, kvStoreLocalCmdUrl, kvStoreLocalPubUrl, - FLAGS_enable_separa, zmqContext); } @@ -389,36 +388,6 @@ main(int argc, char* argv[]) { !(FLAGS_enable_event_based_peer_selector && FLAGS_enable_userspace_mesh_peering)}; - std::unique_ptr separa; - if (meshSpark != nullptr && FLAGS_is_openr_enabled && - (FLAGS_enable_separa_broadcasts || FLAGS_enable_separa)) { - separa = std::make_unique( - FLAGS_separa_hello_port, - std::chrono::seconds{FLAGS_separa_broadcast_interval_s}, - std::chrono::seconds{FLAGS_separa_domain_lockdown_period_s}, - FLAGS_separa_domain_change_threshold_factor, - !FLAGS_enable_separa, - nlHandler, - *meshSpark, - prefixManagerLocalCmdUrl, - decisionCmdUrl, - kvStoreLocalCmdUrl, - kvStoreLocalPubUrl, - monitorSubmitUrl, - zmqContext); - - static constexpr auto separaId{"Separa"}; - monitorEventLoopWithWatchdog(separa.get(), separaId, watchdog.get()); - - allThreads.emplace_back(std::thread([&separa]() noexcept { - LOG(INFO) << "Starting the Separa thread..."; - folly::setThreadName(separaId); - separa->run(); - LOG(INFO) << "Separa thread got stopped."; - })); - separa->waitUntilRunning(); - } - std::unique_ptr gateway11sRootRouteProgrammer; static constexpr auto gateway11sRootRouteProgrammerId{ "Gateway11sRootRouteProgrammer"}; @@ -565,11 +534,6 @@ main(int argc, char* argv[]) { gatewayConnectivityMonitor.stop(); gatewayConnectivityMonitor.waitUntilStopped(); - if (separa) { - separa->stop(); - separa->waitUntilStopped(); - } - if (routingEventLoop) { routing->resetSendPacketCallback(); routingEventLoop->terminateLoopSoon(); diff --git a/openr/fbmeshd/mesh-spark/MeshSpark.cpp b/openr/fbmeshd/mesh-spark/MeshSpark.cpp index e0339f5aa91..d6fa8865383 100644 --- a/openr/fbmeshd/mesh-spark/MeshSpark.cpp +++ b/openr/fbmeshd/mesh-spark/MeshSpark.cpp @@ -47,7 +47,6 @@ MeshSpark::MeshSpark( const std::string& ifName, const openr::KvStoreLocalCmdUrl& kvStoreLocalCmdUrl, const openr::KvStoreLocalPubUrl& kvStoreLocalPubUrl, - bool enableDomains, fbzmq::Context& zmqContext) : zmqLoop_{zmqLoop}, zmqContext_{zmqContext}, @@ -66,8 +65,7 @@ MeshSpark::MeshSpark( &zmqLoop, "node1", /* nodeId is used for writing to kvstore. not used herei */ kvStoreLocalCmdUrl, - kvStoreLocalPubUrl), - enableDomains_{enableDomains} { + kvStoreLocalPubUrl) { syncPeersTimer_ = fbzmq::ZmqTimeout::make( &zmqLoop_, [this]() mutable noexcept { syncPeers(); }); syncPeersTimer_->scheduleTimeout(syncPeersInterval_, true); @@ -246,45 +244,6 @@ MeshSpark::filterWhiteListedPeers(std::vector& peers) { peers = whiteListedPeers; } -void -MeshSpark::filterInDomainPeers(std::vector& peers) { - if (!enableDomains_) { - return; - } - std::vector inDomainPeers; - const auto myDomain = *myDomain_.rlock(); - neighborDomainCache_.withWLock([&peers, &inDomainPeers, myDomain]( - auto& neighborDomainCache_) { - // Cleanup expired peer info in the cache - std::unordered_set peerSet(peers.begin(), peers.end()); - std::unordered_set keysInCache; - for (const auto& it : neighborDomainCache_) { - keysInCache.emplace(it.first); - } - for (const auto& key : keysInCache) { - if (peerSet.count(key) == 0) { - neighborDomainCache_.erase(key); - } - } - - // filter out peers that don't belong to our domain - std::copy_if( - peers.begin(), - peers.end(), - std::back_inserter(inDomainPeers), - [&neighborDomainCache_, myDomain](const auto& peer) { - if (neighborDomainCache_.count(peer) == 0) { - return false; - } - const auto peerDomainAndEnabled = neighborDomainCache_.at(peer); - return !peerDomainAndEnabled.second || - (peerDomainAndEnabled.first.hasValue() && myDomain.hasValue() && - peerDomainAndEnabled.first == myDomain); - }); - }); - peers = inDomainPeers; -} - void MeshSpark::syncPeers() { VLOG(1) << folly::sformat("MeshSpark::{}()", __func__); @@ -299,9 +258,6 @@ MeshSpark::syncPeers() { } } - // remove peers that are not in this node's domain - filterInDomainPeers(activePeers); - // remove peers that are not white-listed filterWhiteListedPeers(activePeers); @@ -352,43 +308,3 @@ MeshSpark::syncPeers() { } } } - -folly::Optional -MeshSpark::getDomain() { - return *myDomain_.rlock(); -} - -void -MeshSpark::setDomain(folly::Optional newDomain) { - myDomain_.withWLock([this, newDomain](auto& myDomain_) { - myDomain_ = newDomain; - zmqLoop_.runImmediatelyOrInEventLoop([this]() { syncPeers(); }); - }); -} - -void -MeshSpark::updateCache( - folly::MacAddress node, - std::pair, bool> domain) { - std::pair, bool> oldDomain; - neighborDomainCache_.withWLock( - [domain, node, &oldDomain](auto& neighborDomainCache_) { - auto it = neighborDomainCache_.find(node); - if (it != neighborDomainCache_.end()) { - oldDomain = it->second; - it->second = domain; - } else { - neighborDomainCache_[node] = domain; - } - }); - if (!domain.second) { - return; - } - myDomain_.withRLock([this, oldDomain, domain](auto& myDomain_) { - if (myDomain_.hasValue() && - (domain.first == myDomain_ || oldDomain.first == myDomain_) && - domain.first != oldDomain.first) { - zmqLoop_.runImmediatelyOrInEventLoop([this]() { syncPeers(); }); - } - }); -} diff --git a/openr/fbmeshd/mesh-spark/MeshSpark.h b/openr/fbmeshd/mesh-spark/MeshSpark.h index f77f108caca..0499f6d4f84 100644 --- a/openr/fbmeshd/mesh-spark/MeshSpark.h +++ b/openr/fbmeshd/mesh-spark/MeshSpark.h @@ -34,17 +34,8 @@ class MeshSpark final { const std::string& ifName, const openr::KvStoreLocalCmdUrl& kvStoreLocalCmdUrl, const openr::KvStoreLocalPubUrl& kvStoreLocalPubUrl, - bool enableDomains, fbzmq::Context& zmqContext); - folly::Optional getDomain(); - - void setDomain(folly::Optional newDomain); - - void updateCache( - folly::MacAddress node, - std::pair, bool> domain); - private: /** * bind/connect to openr sockets @@ -84,8 +75,6 @@ class MeshSpark final { void filterWhiteListedPeers(std::vector& peers); - void filterInDomainPeers(std::vector& peers); - // ZmqEventLoop pointer for scheduling async events and socket callback // registration fbzmq::ZmqEventLoop& zmqLoop_; @@ -124,16 +113,4 @@ class MeshSpark final { // node name -> ipv4 address std::unordered_map kvStoreIPs_; - // enable domain filtering? - const bool enableDomains_; - - // Stores the current domain, nodes in the same domain can for OpenR peerings - folly::Synchronized> myDomain_; - - // Cache for checking which nodes belong to which domain - folly::Synchronized, bool>>> - neighborDomainCache_; - }; // MeshSpark diff --git a/openr/fbmeshd/separa/Separa.cpp b/openr/fbmeshd/separa/Separa.cpp deleted file mode 100644 index b62db029be6..00000000000 --- a/openr/fbmeshd/separa/Separa.cpp +++ /dev/null @@ -1,488 +0,0 @@ -/** - * Copyright (c) Facebook, Inc. and its affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#include "Separa.h" - -#include -#include - -#include -#include -#include - -#include -#include -#include -#include -#include - -using namespace openr::fbmeshd; - -Separa::Separa( - uint16_t const udpHelloPort, - std::chrono::seconds const broadcastInterval, - std::chrono::seconds const domainLockdownPeriod, - double const domainChangeThresholdFactor, - const bool backwardsCompatibilityMode, - Nl80211Handler& nlHandler, - MeshSpark& meshSpark, - const openr::PrefixManagerLocalCmdUrl& prefixManagerCmdUrl, - const openr::DecisionCmdUrl& decisionCmdUrl, - const openr::KvStoreLocalCmdUrl& kvStoreLocalCmdUrl, - const openr::KvStoreLocalPubUrl& kvStoreLocalPubUrl, - const openr::MonitorSubmitUrl& monitorSubmitUrl, - fbzmq::Context& zmqContext) - : udpHelloPort_(udpHelloPort), - broadcastInterval_(broadcastInterval), - domainLockdownPeriod_{domainLockdownPeriod}, - domainChangeThresholdFactor_{domainChangeThresholdFactor}, - backwardsCompatibilityMode_{backwardsCompatibilityMode}, - nlHandler_{nlHandler}, - meshSpark_{meshSpark}, - prefixManagerClient_{prefixManagerCmdUrl, zmqContext, 3000ms}, - decisionCmdUrl_{decisionCmdUrl}, - kvStoreClient_( - zmqContext, - this, - "unused", /* nodeId is used for writing to kvstore. not used here */ - kvStoreLocalCmdUrl, - kvStoreLocalPubUrl), - zmqMonitorClient_{zmqContext, monitorSubmitUrl}, - zmqContext_{zmqContext} { - // Initialize ZMQ sockets - scheduleTimeout(std::chrono::seconds(0), [this]() { prepare(); }); - - // Schedule periodic timer for submission to monitor - monitorTimer_ = fbzmq::ZmqTimeout::make(this, [this]() noexcept { - zmqMonitorClient_.setCounters( - openr::prepareSubmitCounters(tData_.getCounters())); - }); - monitorTimer_->scheduleTimeout( - openr::Constants::kMonitorSubmitInterval, true); -} - -void -Separa::prepare() noexcept { - socketFd_ = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); - CHECK_NE(socketFd_, -1) << folly::errnoStr(errno); - - // make socket non-blocking - CHECK_EQ(fcntl(socketFd_, F_SETFL, O_NONBLOCK), 0) << folly::errnoStr(errno); - - // make v6 only - { - int v6Only = 1; - CHECK_EQ( - setsockopt( - socketFd_, IPPROTO_IPV6, IPV6_V6ONLY, &v6Only, sizeof(v6Only)), - 0) - << folly::errnoStr(errno); - } - - // not really needed, but helps us use same port with other listeners, if - // any - { - int reuseAddr = 1; - CHECK_EQ( - setsockopt( - socketFd_, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr)), - 0) - << folly::errnoStr(errno); - } - - if (!backwardsCompatibilityMode_) { - // request additional packet info, e.g. input iface index and sender address - { - int recvPktInfo = 1; - CHECK_NE( - setsockopt( - socketFd_, - IPPROTO_IPV6, - IPV6_RECVPKTINFO, - &recvPktInfo, - sizeof(recvPktInfo)), - -1) - << folly::errnoStr(errno); - } - - // - // bind the socket to receive any separa packet - // - { - auto mcastSockAddr = - folly::SocketAddress(folly::IPAddress("::"), udpHelloPort_); - - sockaddr_storage addrStorage; - mcastSockAddr.getAddress(&addrStorage); - sockaddr* saddr = reinterpret_cast(&addrStorage); - - CHECK_EQ(bind(socketFd_, saddr, mcastSockAddr.getActualSize()), 0) - << folly::errnoStr(errno); - } - - // allow reporting the packet TTL to user space - { - int recvHopLimit = 1; - if (::setsockopt( - socketFd_, - IPPROTO_IPV6, - IPV6_RECVHOPLIMIT, - &recvHopLimit, - sizeof(recvHopLimit)) != 0) { - LOG(FATAL) << "Failed enabling TTL receive on socket. Error: " - << folly::errnoStr(errno); - } - } - - // Listen for incoming messages on socket FD - addSocketFd(socketFd_, ZMQ_POLLIN, [this](int) noexcept { - try { - processHelloPacket(); - } catch (std::exception const& err) { - LOG(ERROR) << "Separa: error processing hello packet " - << folly::exceptionStr(err); - } - }); - } - - broadcastTimer_ = fbzmq::ZmqTimeout::make( - this, [this]() mutable noexcept { sendHelloPacket(); }); - broadcastTimer_->scheduleTimeout(broadcastInterval_, true); -} - -void -Separa::prepareDecisionCmdSocket() noexcept { - if (decisionCmdSock_) { - return; - } - decisionCmdSock_ = - std::make_unique>(zmqContext_); - const auto decisionCmd = - decisionCmdSock_->connect(fbzmq::SocketUrl{decisionCmdUrl_}); - if (decisionCmd.hasError()) { - LOG(FATAL) << "Error connecting to URL '" << decisionCmdUrl_ << "' " - << decisionCmd.error(); - } -} - -void -Separa::processHelloPacket() { - // the read buffer - uint8_t buf[1280]; - - ssize_t bytesRead; - int ifIndex; - folly::SocketAddress clientAddr; - int hopLimit; - std::chrono::microseconds myRecvTime; - IoProvider ioProvider; - - std::tie(bytesRead, ifIndex, clientAddr, hopLimit, myRecvTime) = - IoProvider::recvMessage(socketFd_, buf, 1280, &ioProvider); - - VLOG(9) << "Received message from " << clientAddr.getAddressStr(); - VLOG(9) << "Read a total of " << bytesRead << " bytes from fd " << socketFd_; - - if (static_cast(bytesRead) > 1280) { - LOG(ERROR) << "Message from " << clientAddr.getAddressStr() - << " has been truncated"; - return; - } - - tData_.addStatValue("fbmeshd.separa.hello_packet_recv", 1, fbzmq::SUM); - - if (domainLockUntil_.hasValue() && - std::chrono::steady_clock::now() < *domainLockUntil_) { - VLOG(9) << "Domain is locked, ignoring hello packet"; - tData_.addStatValue("fbmeshd.separa.hello_packet_dropped", 1, fbzmq::SUM); - return; - } - - tData_.addStatValue("fbmeshd.separa.hello_packet_processed", 1, fbzmq::SUM); - - // Copy buffer into string object and parse it into helloPacket. - std::string readBuf(reinterpret_cast(&buf[0]), bytesRead); - thrift::SeparaPayload helloPacket; - try { - helloPacket = fbzmq::util::readThriftObjStr( - readBuf, serializer_); - } catch (std::exception const& err) { - LOG(ERROR) << "Failed parsing hello packet " << folly::exceptionStr(err); - return; - } - - const auto& netif = nlHandler_.lookupMeshNetif(); - const auto helloPacketDomain = folly::MacAddress::fromNBO(helloPacket.domain); - folly::Optional helloPacketDesiredDomain = - helloPacket.desiredDomain == 0 - ? folly::Optional{folly::none} - : folly::MacAddress::fromNBO(helloPacket.desiredDomain); - const auto maybeFromNodeName = - folly::IPAddressV6{clientAddr.getAddressStr()}.getMacAddressFromEUI64(); - if (!maybeFromNodeName.hasValue()) { - LOG(ERROR) << "Received Separa hello from unexpected source"; - return; - } - const auto fromNodeName{*maybeFromNodeName}; - const auto currentDomain = meshSpark_.getDomain(); - - VLOG(9) << "Received Separa Hello: " << helloPacketDomain << ", " - << helloPacket.metricToGate << ", " - << helloPacketDesiredDomain.value_or(folly::MacAddress::fromNBO(0)); - - meshSpark_.updateCache( - fromNodeName, - std::make_pair( - helloPacketDesiredDomain, helloPacket.enabled.value_or(true))); - - if (!helloPacket.enabled) { - return; - } - auto newDomain{currentDomain}; - if (helloPacketDomain == *netif.maybeMacAddress) { - // The packet says that it belongs to self domain. - if (helloPacket.metricToGate == 0) { - // we are ourselves a gate - newDomain = *netif.maybeMacAddress; - } else if (helloPacket.metricToGate == std::numeric_limits::max()) { - // we don't have default route, set our domain to null - newDomain = folly::none; - } - } else if ( - helloPacketDomain != currentDomain && - helloPacket.metricToGate != std::numeric_limits::max()) { - // the packet is from another domain - auto myGate = getGateway(); - if (myGate.hasError()) { - LOG(ERROR) << "Failed getting gateway"; - return; - } - - const auto metrics = nlHandler_.getMetrics(); - - if (metrics.count(fromNodeName) == 0) { - LOG(ERROR) << "Failed to get 11s metric for the source node"; - return; - } - - if ((helloPacket.metricToGate + metrics.at(fromNodeName)) * - domainChangeThresholdFactor_ < - myGate->second) { - if (myGate->second != std::numeric_limits::max()) { - tData_.addStatValue( - "fbmeshd.separa.domain_change_mygate_metric", - myGate->second, - fbzmq::AVG); - tData_.addStatValue( - "fbmeshd.separa.domain_change_trigger_factor", - 100.0 * myGate->second / - (helloPacket.metricToGate + metrics.at(fromNodeName)), - fbzmq::AVG); - } - tData_.addStatValue( - "fbmeshd.separa.domain_change_new_metric", - helloPacket.metricToGate + metrics.at(fromNodeName), - fbzmq::AVG); - newDomain = helloPacketDomain; - } - } - - if (newDomain != currentDomain) { - tData_.addStatValue("fbmeshd.separa.num_domain_change", 1, fbzmq::SUM); - meshSpark_.setDomain(newDomain); - // lock domain for domainLockdownPeriod_ - domainLockUntil_ = std::chrono::steady_clock::now() + domainLockdownPeriod_; - } -} - -void -Separa::sendHelloPacket() { - const auto myGate = getGateway(); - if (myGate.hasError()) { - LOG(ERROR) << "Failed getting gateway"; - tData_.addStatValue( - "fbmeshd.separa.hello_packet_send_gateway_retrieval_failed", - 1, - fbzmq::SUM); - return; - } - - const auto myDomain = meshSpark_.getDomain(); - const std::string packet{fbzmq::util::writeThriftObjStr( - thrift::SeparaPayload{apache::thrift::FRAGILE, - myGate->first.u64NBO(), - myGate->second, - myDomain.hasValue() ? myDomain->u64NBO() : 0, - !backwardsCompatibilityMode_}, - serializer_)}; - - VLOG(9) << "Sending Separa hello packet: " << myGate->first.u64NBO() << ", " - << myGate->second; - - const auto& netif = nlHandler_.lookupMeshNetif(); - const folly::IPAddressV6 srcAddr{folly::IPAddressV6::LinkLocalTag::LINK_LOCAL, - netif.maybeMacAddress.value()}; - openr::IoProvider ioProvider{}; - - // Send hello packets to ourselves, and all our neighbors - std::vector stations; - for (const auto& station : nlHandler_.getStationsInfo()) { - stations.push_back(station.macAddress); - } - stations.push_back(*netif.maybeMacAddress); - - for (const auto& station : stations) { - const folly::SocketAddress dstAddr{ - folly::IPAddressV6{folly::IPAddressV6::LinkLocalTag::LINK_LOCAL, - station}, - udpHelloPort_}; - - auto bytesSent = openr::IoProvider::sendMessage( - socketFd_, - netif.maybeIfIndex.value(), - srcAddr, - dstAddr, - packet, - &ioProvider); - - if ((bytesSent < 0) || (static_cast(bytesSent) != packet.size())) { - LOG(ERROR) << "Sending hello to " << dstAddr.getAddressStr() - << "failed due to error " << folly::errnoStr(errno); - tData_.addStatValue( - "fbmeshd.separa.hello_packet_send_failed", 1, fbzmq::SUM); - return; - } - tData_.addStatValue("fbmeshd.separa.hello_packet_sent", 1, fbzmq::SUM); - } -} - -folly::Expected, folly::Unit> -Separa::getGateway() { - // Check if we ourselves are a gate. - auto ret = prefixManagerClient_.getPrefixes(); - if (ret.hasError()) { - LOG(ERROR) << "Failed getting prefixes from OpenR: " << ret.error(); - tData_.addStatValue( - "fbmeshd.separa.get_gateway.openr_prefix_retrieval_zmq_error", - 1, - fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - if (!ret->success) { - tData_.addStatValue( - "fbmeshd.separa.get_gateway.openr_prefix_retrieval_failure", - 1, - fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - if (std::any_of( - ret->prefixes.begin(), - ret->prefixes.end(), - [](const auto& prefixEntry) { - return prefixEntry.prefix == openr::toIpPrefix("0.0.0.0/0"); - })) { - // We are a gate - tData_.addStatValue("fbmeshd.separa.get_gateway.is_gate", 1, fbzmq::SUM); - return std::make_pair( - nlHandler_.lookupMeshNetif().maybeMacAddress.value(), 0); - } - - // Check if we can reach to a gate - prepareDecisionCmdSocket(); - - folly::MacAddress currentNode = - nlHandler_.lookupMeshNetif().maybeMacAddress.value(); - std::unordered_set visitedNodes; - folly::Optional metric; - while (true) { - { - const auto res = decisionCmdSock_->sendThriftObj( - openr::thrift::DecisionRequest( - apache::thrift::FRAGILE, - openr::thrift::DecisionCommand::ROUTE_DB_GET, - macAddrToNodeName(currentNode)), - serializer_); - if (res.hasError()) { - LOG(ERROR) << "Exception in sending ROUTE_DB_GET command to Decision. " - << " exception: " << res.error(); - decisionCmdSock_.reset(); - tData_.addStatValue( - "fbmeshd.separa.get_gateway.decision_socket_send_zmq_error", - 1, - fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - } - - const auto res = - decisionCmdSock_->recvThriftObj( - serializer_, openr::Constants::kReadTimeout); - if (res.hasError()) { - LOG(ERROR) << "Exception in recieving response from Decision. " - << " exception: " << res.error(); - decisionCmdSock_.reset(); - tData_.addStatValue( - "fbmeshd.separa.get_gateway.decision_socket_receive_zmq_error", - 1, - fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - - const auto route = std::find_if( - res->routeDb.unicastRoutes.begin(), - res->routeDb.unicastRoutes.end(), - [](const auto& route) { - return route.dest == openr::toIpPrefix("0.0.0.0/0"); - }); - if (route == res->routeDb.unicastRoutes.end()) { - tData_.addStatValue( - "fbmeshd.separa.get_gateway.is_non_gate", 1, fbzmq::SUM); - const auto returnValue = metric.value_or(std::numeric_limits::max()); - tData_.addStatValue( - "fbmeshd.separa.get_gateway.metric", returnValue, fbzmq::AVG); - return std::make_pair(currentNode, returnValue); - } - - const auto ip = openr::toIPAddress(route->nextHops.front().address); - if (!ip.isV4()) { - tData_.addStatValue( - "fbmeshd.separa.get_gateway.not_ipv4_error", 1, fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - - const auto node = kvStoreClient_.getKey(folly::sformat( - "{}{}", - openr::Constants::kPrefixAllocMarker, - openr::toIPAddress(route->nextHops.front().address) - .asV4() - .getNthLSByte(0))); - if (!node.hasValue()) { - tData_.addStatValue( - "fbmeshd.separa.get_gateway.node_not_in_kvstore_error", - 1, - fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - - const auto maybeNode = nodeNameToMacAddr(node->originatorId); - if (!maybeNode.hasValue()) { - tData_.addStatValue( - "fbmeshd.separa.get_gateway.node_name_malformed", 1, fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - - metric = metric.value_or(0) + route->nextHops.front().metric; - - visitedNodes.emplace(currentNode); - currentNode = *maybeNode; - if (visitedNodes.count(currentNode) > 0) { - tData_.addStatValue( - "fbmeshd.separa.get_gateway.routing_loop", 1, fbzmq::SUM); - return folly::makeUnexpected(folly::unit); - } - } -} diff --git a/openr/fbmeshd/separa/Separa.h b/openr/fbmeshd/separa/Separa.h deleted file mode 100644 index 408414c645b..00000000000 --- a/openr/fbmeshd/separa/Separa.h +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Copyright (c) Facebook, Inc. and its affiliates. - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. - */ - -#pragma once - -#include -#include - -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -namespace openr { -namespace fbmeshd { - -class Separa final : public fbzmq::ZmqEventLoop { - public: - Separa( - uint16_t const udpHelloPort, - std::chrono::seconds const broadcastInterval, - std::chrono::seconds const domainLockdownPeriod, - double const domainChangeThresholdFactor, - const bool backwardsCompatibilityMode, - Nl80211Handler& nlHandler, - MeshSpark& meshSpark, - const PrefixManagerLocalCmdUrl& prefixManagerCmdUrl, - const openr::DecisionCmdUrl& decisionCmdUrl, - const openr::KvStoreLocalCmdUrl& kvStoreLocalCmdUrl, - const openr::KvStoreLocalPubUrl& kvStoreLocalPubUrl, - const openr::MonitorSubmitUrl& monitorSubmitUrl, - fbzmq::Context& zmqContext); - - ~Separa() override = default; - - private: - // Separa is non-copyable - Separa(Separa const&) = delete; - Separa& operator=(Separa const&) = delete; - - // Initializes ZMQ sockets - void prepare() noexcept; - - // connect to decision cmdSocket. - void prepareDecisionCmdSocket() noexcept; - - // process hello packet received from neighbors - void processHelloPacket(); - - // send Separa hello packet - void sendHelloPacket(); - - // get the node which acts as the gateway node - folly::Expected, folly::Unit> getGateway(); - - // UDP port for send/recv of broadcasts - const uint16_t udpHelloPort_; - - // how often to send connectivity information broadcasts - const std::chrono::seconds broadcastInterval_; - - std::unique_ptr broadcastTimer_; - - // domain lock timer - folly::Optional domainLockUntil_; - - const std::chrono::seconds domainLockdownPeriod_; - - const double domainChangeThresholdFactor_; - - // the socket we use for send/recv of hello packets - int socketFd_{-1}; - - const bool backwardsCompatibilityMode_; - - // netlink handler used to request metrics from the kernel - Nl80211Handler& nlHandler_; - - MeshSpark& meshSpark_; - - openr::PrefixManagerClient prefixManagerClient_; - - const std::string decisionCmdUrl_; - std::unique_ptr> decisionCmdSock_; - - apache::thrift::CompactSerializer serializer_; - - // kvStoreClient for getting prefixes - openr::KvStoreClient kvStoreClient_; - - // Timer for submitting to monitor periodically - std::unique_ptr monitorTimer_; - - // DS to keep track of stats - fbzmq::ThreadData tData_; - - // client to interact with monitor - fbzmq::ZmqMonitorClient zmqMonitorClient_; - - // ZMQ context for processing - fbzmq::Context& zmqContext_; - -}; // Separa - -} // namespace fbmeshd -} // namespace openr