Skip to content

Commit

Permalink
Add upper bound based on current unread messages
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carroll <[email protected]>
  • Loading branch information
mjcarroll committed Apr 17, 2020
1 parent cc1e52f commit da7b410
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/serialized_message.h"
Expand Down Expand Up @@ -100,11 +102,25 @@ _take_sequence(
bool taken_flag = false;
rmw_ret_t ret = RMW_RET_OK;

if (subscription->implementation_identifier != identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

CustomSubscriberInfo * info = static_cast<CustomSubscriberInfo *>(subscription->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR);

// Limit the upper bound of reads to the number unread at the beginning.
// This prevents any samples that are added after the beginning of the
// _take_sequence call from being read.
auto unread_count = info->subscriber_->get_unread_count();
count = std::min(count, unread_count);

for (size_t ii = 0; ii < count; ++ii) {
taken_flag = false;
ret = _take(
identifier, subscription, message_sequence->data[ii],
&taken_flag, &message_info_sequence->data[ii], allocation);
identifier, subscription, message_sequence->data[*taken],
&taken_flag, &message_info_sequence->data[*taken], allocation);

if (ret != RMW_RET_OK) {
break;
Expand Down

0 comments on commit da7b410

Please sign in to comment.