Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20502] TCP non_blocking_send moved to TCPTransportDescriptor #4415

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions include/fastdds/rtps/transport/TCPTransportDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ namespace rtps {
*
* - \c tls_config: Configuration for TLS.
*
* - \c non_blocking_send: do not block on send operations. When it is set to true, send operations will return
* immediately if the buffer might get full, but no error will be returned to the upper layer. This means
* that the application will behave as if the datagram is sent and lost.
*
* @ingroup TRANSPORT_MODULE
*/
struct TCPTransportDescriptor : public SocketTransportDescriptor
Expand Down Expand Up @@ -276,6 +280,20 @@ struct TCPTransportDescriptor : public SocketTransportDescriptor
//! Thread settings for the accept connections thread
ThreadSettings accept_thread;

/**
* Whether to use non-blocking calls to send().
*
* When set to true, calls to send() will return immediately if the send buffer might get full.
* This may happen when receive buffer on reader's side is full. No error will be returned
* to the upper layer. This means that the application will behave
* as if the datagram is sent but lost (i.e. throughput may be reduced). This value is
* specially useful on high-frequency writers.
*
* When set to false, which is the default, calls to send() will block until the send buffer has space for the
* datagram. This may cause application lock.
*/
bool non_blocking_send;

//! Add listener port to the listening_ports list
void add_listener_port(
uint16_t port)
Expand Down
2 changes: 1 addition & 1 deletion resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@
├ interfaceWhiteList [0~*], (NOT available for SHM type)
| └ address [ipv4Address|ipv6Address]
├ TTL [uint8], (ONLY available for UDP type)
├ non_blocking_send [boolean], (ONLY available for UDP type)
├ non_blocking_send [boolean], (NOT available for SHM type)
├ output_port [uint16], (ONLY available for UDP type)
├ wan_addr [ipv4AddressFormat], (ONLY available for TCPv4 type)
├ keep_alive_frequency_ms [uint32], (ONLY available for TCP type)
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ size_t TCPChannelResourceBasic::send(
{
std::lock_guard<std::mutex> send_guard(send_mutex_);

if (parent_->get_non_blocking_send() &&
if (parent_->configuration()->non_blocking_send &&
!check_socket_send_buffer(header_size + size, socket_->native_handle()))
{
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/TCPChannelResourceSecure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ size_t TCPChannelResourceSecure::send(

if (eConnecting < connection_status_)
{
if (parent_->get_non_blocking_send() &&
if (parent_->configuration()->non_blocking_send &&
!check_socket_send_buffer(header_size + size,
secure_socket_->lowest_layer().native_handle()))
{
Expand Down
15 changes: 5 additions & 10 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ TCPTransportDescriptor::TCPTransportDescriptor()
, calculate_crc(true)
, check_crc(true)
, apply_security(false)
, non_blocking_send(false)
{
}

Expand All @@ -125,6 +126,7 @@ TCPTransportDescriptor::TCPTransportDescriptor(
, tls_config(t.tls_config)
, keep_alive_thread(t.keep_alive_thread)
, accept_thread(t.accept_thread)
, non_blocking_send(t.non_blocking_send)
{
}

Expand Down Expand Up @@ -152,6 +154,7 @@ TCPTransportDescriptor& TCPTransportDescriptor::operator =(
tls_config = t.tls_config;
keep_alive_thread = t.keep_alive_thread;
accept_thread = t.accept_thread;
non_blocking_send = t.non_blocking_send;
return *this;
}

Expand All @@ -173,14 +176,14 @@ bool TCPTransportDescriptor::operator ==(
this->tls_config == t.tls_config &&
this->keep_alive_thread == t.keep_alive_thread &&
this->accept_thread == t.accept_thread &&
this->non_blocking_send == t.non_blocking_send &&
SocketTransportDescriptor::operator ==(t));
}

TCPTransportInterface::TCPTransportInterface(
int32_t transport_kind)
: TransportInterface(transport_kind)
, alive_(true)
, non_blocking_send_(false)
#if TLS_FOUND
, ssl_context_(asio::ssl::context::sslv23)
#endif // if TLS_FOUND
Expand Down Expand Up @@ -422,7 +425,7 @@ bool TCPTransportInterface::DoInputLocatorsMatch(
}

bool TCPTransportInterface::init(
const fastrtps::rtps::PropertyPolicy* properties)
const fastrtps::rtps::PropertyPolicy*)
{
if (!apply_tls_config())
{
Expand All @@ -447,14 +450,6 @@ bool TCPTransportInterface::init(
ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint();
initial_peer_local_locator_port_ = local_endpoint.port();

// Get non_blocking_send property
if (properties)
{
auto s_non_blocking_send = eprosima::fastrtps::rtps::PropertyPolicyHelper::find_property(*properties,
"fastdds.tcp_transport.non_blocking_send");
non_blocking_send_ = s_non_blocking_send && *s_non_blocking_send == "true"? true : false;
}

// Check system buffer sizes.
if (configuration()->sendBufferSize == 0)
{
Expand Down
18 changes: 0 additions & 18 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,6 @@ class TCPTransportInterface : public TransportInterface
asio::io_service io_service_timers_;
std::unique_ptr<asio::ip::tcp::socket> initial_peer_local_locator_socket_;
uint16_t initial_peer_local_locator_port_;
/**
* Whether to use non-blocking calls to send().
*
* When set to true, calls to send() will return immediately if the send buffer is full.
* This may happen when receive buffer on reader's side is full. No error will be returned
* to the upper layer. This means that the application will behave
* as if the datagram is sent but lost (i.e. throughput may be reduced). This value is
* specially useful on high-frequency writers.
*
* When set to false, calls to send() will block until the send buffer has space for the
* datagram. This may cause application lock.
*/
bool non_blocking_send_;

#if TLS_FOUND
asio::ssl::context ssl_context_;
Expand Down Expand Up @@ -473,11 +460,6 @@ class TCPTransportInterface : public TransportInterface
void fill_local_physical_port(
Locator& locator) const;

bool get_non_blocking_send() const
{
return non_blocking_send_;
}

};

} // namespace rtps
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/xmlparser/XMLParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,14 @@ XMLP_ret XMLParser::parseXMLCommonTCPTransportData(
return XMLP_ret::XML_ERROR;
}
}
// non_blocking_send - boolType
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
else if (strcmp(name, NON_BLOCKING_SEND) == 0)
{
if (XMLP_ret::XML_OK != getXMLBool(p_aux0, &pTCPDesc->non_blocking_send, 0))
{
return XMLP_ret::XML_ERROR;
}
}
else if (strcmp(name, LISTENING_PORTS) == 0)
{
// listening_ports uint16ListType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ typedef struct TCPTransportDescriptor : public SocketTransportDescriptor
bool calculate_crc;
bool check_crc;
bool apply_security;
bool non_blocking_send;

TLSConfig tls_config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<calculate_crc>false</calculate_crc>
<check_crc>false</check_crc>
<enable_tcp_nodelay>false</enable_tcp_nodelay>
<non_blocking_send>false</non_blocking_send>
</transport_descriptor>
<!-- UDP sample transport descriptor. Several options are common with TCP -->
<transport_descriptor>
Expand Down
1 change: 1 addition & 0 deletions test/system/tools/xmlvalidation/all_profile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@
<calculate_crc>false</calculate_crc>
<check_crc>false</check_crc>
<enable_tcp_nodelay>false</enable_tcp_nodelay>
<non_blocking_send>false</non_blocking_send>
</transport_descriptor>

<transport_descriptor>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<calculate_crc>false</calculate_crc>
<check_crc>false</check_crc>
<enable_tcp_nodelay>false</enable_tcp_nodelay>
<non_blocking_send>false</non_blocking_send>
</transport_descriptor>

<transport_descriptor>
Expand Down
11 changes: 4 additions & 7 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "mock/MockTCPChannelResource.h"
#include "mock/MockTCPv4Transport.h"
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/Semaphore.h>
#include <fastrtps/utils/IPFinder.h>
Expand Down Expand Up @@ -1426,6 +1425,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole;
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.non_blocking_send = true;
senderDescriptor.sendBufferSize = msg_size;
senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT;
senderDescriptor.tls_config.verify_file = "ca.crt";
Expand All @@ -1435,9 +1435,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
senderDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2);
senderDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION);
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);
senderTransportUnderTest.init();

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
Expand Down Expand Up @@ -1976,11 +1974,10 @@ TEST_F(TCPv4Tests, non_blocking_send)
// Create a TCP Server transport
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.non_blocking_send = true;
senderDescriptor.sendBufferSize = msg_size;
MockTCPv4Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);
senderTransportUnderTest.init();

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
Expand Down
5 changes: 2 additions & 3 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,10 @@ TEST_F(TCPv6Tests, non_blocking_send)
// Create a TCP Server transport
TCPv6TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.non_blocking_send = true;
senderDescriptor.sendBufferSize = msg_size;
MockTCPv6Transport senderTransportUnderTest(senderDescriptor);
eprosima::fastrtps::rtps::RTPSParticipantAttributes att;
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");
senderTransportUnderTest.init(&att.properties);
senderTransportUnderTest.init();

// Create a TCP Client socket.
// The creation of a reception transport for testing this functionality is not
Expand Down
8 changes: 6 additions & 2 deletions test/unittest/xmlparser/XMLParserTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ TEST_F(XMLParserTests, parseXMLTransportData)
<calculate_crc>false</calculate_crc>\
<check_crc>false</check_crc>\
<enable_tcp_nodelay>false</enable_tcp_nodelay>\
<non_blocking_send>true</non_blocking_send>\
<tls><!-- TLS Section --></tls>\
<keep_alive_thread>\
<scheduling_policy>12</scheduling_policy>\
Expand Down Expand Up @@ -1086,6 +1087,7 @@ TEST_F(XMLParserTests, parseXMLTransportData)
EXPECT_EQ(pTCPv4Desc->listening_ports[0], 5100u);
EXPECT_EQ(pTCPv4Desc->listening_ports[1], 5200u);
EXPECT_EQ(pTCPv4Desc->keep_alive_thread, modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->non_blocking_send, true);
EXPECT_EQ(pTCPv4Desc->accept_thread, modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->default_reception_threads(), modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->get_thread_config_for_port(12345), modified_thread_settings);
Expand Down Expand Up @@ -1115,8 +1117,9 @@ TEST_F(XMLParserTests, parseXMLTransportData)
EXPECT_EQ(pTCPv6Desc->logical_port_increment, 2u);
EXPECT_EQ(pTCPv6Desc->listening_ports[0], 5100u);
EXPECT_EQ(pTCPv6Desc->listening_ports[1], 5200u);
EXPECT_EQ(pTCPv4Desc->keep_alive_thread, modified_thread_settings);
EXPECT_EQ(pTCPv4Desc->accept_thread, modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->keep_alive_thread, modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->non_blocking_send, true);
EXPECT_EQ(pTCPv6Desc->accept_thread, modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->default_reception_threads(), modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->get_thread_config_for_port(12345), modified_thread_settings);
EXPECT_EQ(pTCPv6Desc->get_thread_config_for_port(12346), modified_thread_settings);
Expand Down Expand Up @@ -1236,6 +1239,7 @@ TEST_F(XMLParserTests, parseXMLTransportData_NegativeClauses)
"calculate_crc",
"check_crc",
"enable_tcp_nodelay",
"non_blocking_send",
"tls",
"keep_alive_thread",
"accept_thread",
Expand Down
Loading