Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix/memfile-ack-logic-deadlock #1795

Merged
merged 5 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions ecal/core/src/io/shm/ecal_memfile_sync.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,6 +103,9 @@ namespace eCAL
if (iter != m_event_handle_map.end())
{
const SEventHandlePair event_pair = iter->second;
// fire acknowledge events, to unlock blocking send function
gSetEvent(event_pair.event_ack);
// close the snd and ack event
gCloseEvent(event_pair.event_snd);
gCloseEvent(event_pair.event_ack);
m_event_handle_map.erase(iter);
Expand Down Expand Up @@ -336,19 +339,26 @@ namespace eCAL
// fire the publisher events
// connected subscribers will read the content from the memory file

const std::lock_guard<std::mutex> lock(m_event_handle_map_sync);
// we work on a copy of the event handle map, this is needed to ..
// 1. unlock a memory file sync via Disconnect(process_id) (ack event is set by the Disconnect in this case)
// 2. be able to add a new memory file sync via Connect(process_id)
EventHandleMapT event_handle_map_snapshot;
{
const std::lock_guard<std::mutex> lock(m_event_handle_map_sync);
event_handle_map_snapshot = m_event_handle_map;
}

// "eat" old acknowledge events :)
if (m_attr.timeout_ack_ms != 0)
{
for (const auto& event_handle : m_event_handle_map)
for (const auto& event_handle : event_handle_map_snapshot)
{
while (gWaitForEvent(event_handle.second.event_ack, 0)) {}
}
}

// send sync (memory file update) event
for (const auto& event_handle : m_event_handle_map)
for (const auto& event_handle : event_handle_map_snapshot)
{
// send sync event
gSetEvent(event_handle.second.event_snd);
Expand All @@ -360,7 +370,7 @@ namespace eCAL
// take start time for all acknowledge timeouts
const auto start_time = std::chrono::steady_clock::now();

for (auto& event_handle : m_event_handle_map)
for (auto& event_handle : event_handle_map_snapshot)
{
const auto time_since_start = std::chrono::steady_clock::now() - start_time;
const auto time_to_wait = std::chrono::milliseconds(m_attr.timeout_ack_ms)- time_since_start;
Expand Down
50 changes: 49 additions & 1 deletion ecal/core/src/readwrite/shm/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,18 @@ namespace eCAL
return sent;
}

void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& /*topic_id_*/, const std::string& /*conn_par_*/)
void CDataWriterSHM::ApplySubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_, const std::string& /*conn_par_*/)
{
// we accept local connections only
if (host_name_ != m_attributes.host_name) return;

// add or update the map with process id's and sets of topic ids
{
const std::lock_guard<std::mutex> lock(m_process_id_topic_id_set_map_sync);
auto& topic_set = m_process_id_topic_id_set_map[process_id_];
topic_set.insert(topic_id_);
}

for (auto& memory_file : m_memory_file_vec)
{
memory_file->Connect(std::to_string(process_id_));
Expand All @@ -97,6 +104,47 @@ namespace eCAL
}
}

void CDataWriterSHM::RemoveSubscription(const std::string& host_name_, const int32_t process_id_, const std::string& topic_id_)
rex-schilasky marked this conversation as resolved.
Show resolved Hide resolved
{
// we accept local disconnections only
if (host_name_ != m_attributes.host_name) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that this is correct. If we communicate through docker images, we might have different host_name, but identical host_group.
Anyways, who applies the RemoveSubscription?
If you remove only subscriptions from IDs which have previously been subscribed, you don't need to do any checks here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a general issue, and will be adressed with #1804


// remove topic id from the id set for the given process id
bool memfile_has_subscriptions(true);
{
const std::lock_guard<std::mutex> lock(m_process_id_topic_id_set_map_sync);
auto process_it = m_process_id_topic_id_set_map.find(process_id_);

// this process id is connected to the memory file
if (process_it != m_process_id_topic_id_set_map.end())
{
// remove it from the id set
process_it->second.erase(topic_id_);

// this process id has no more connection to this memory file
if (process_it->second.empty())
{
// we can remove the empty topic id set
m_process_id_topic_id_set_map.erase(process_it);
// and set the subscription state to false for later processing
memfile_has_subscriptions = false;
}
}
}

// memory file is still connected to at least one topic id of this process id
// no need to Disconnect process id
if (memfile_has_subscriptions) return;

for (auto& memory_file : m_memory_file_vec)
{
memory_file->Disconnect(std::to_string(process_id_));
#ifndef NDEBUG
Logging::Log(log_level_debug1, std::string("CDataWriterSHM::RemoveSubscription - Memory FileName: ") + memory_file->GetName() + " to ProcessId " + std::to_string(process_id_));
#endif
}
}

Registration::ConnectionPar CDataWriterSHM::GetConnectionParameter()
{
Registration::ConnectionPar connection_par;
Expand Down
8 changes: 8 additions & 0 deletions ecal/core/src/readwrite/shm/ecal_writer_shm.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
#include "readwrite/ecal_writer_base.h"

#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>

Expand All @@ -47,6 +50,7 @@ namespace eCAL
bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override;

void ApplySubscription(const std::string& host_name_, int32_t process_id_, const std::string& topic_id_, const std::string& conn_par_) override;
void RemoveSubscription(const std::string& host_name_, int32_t process_id_, const std::string& topic_id_) override;

Registration::ConnectionPar GetConnectionParameter() override;

Expand All @@ -58,5 +62,9 @@ namespace eCAL
size_t m_write_idx = 0;
std::vector<std::shared_ptr<CSyncMemoryFile>> m_memory_file_vec;
static const std::string m_memfile_base_name;

using ProcessIDTopicIDSetT = std::map<int32_t, std::set<std::string>>;
std::mutex m_process_id_topic_id_set_map_sync;
ProcessIDTopicIDSetT m_process_id_topic_id_set_map;
};
}
Loading