Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rules engine unsubscribe race condition #731

Merged
merged 3 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion production/rules/event_manager/inc/event_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class event_manager_t
std::recursive_mutex m_init_lock;

// Hash table of all rules registered with the system.
// The key is the rulset_name::rule_name.
// The key is the ruleset_name::rule_name.
std::unordered_map<std::string, std::unique_ptr<_rule_binding_t>> m_rules;

// List of rules that are invoked when an event is logged.
Expand Down Expand Up @@ -133,6 +133,10 @@ class event_manager_t
// members, so the thread pool must be initialized last and destroyed first.
std::unique_ptr<rule_thread_pool_t> m_invocations;

// Commit trigger function pointer that the database calls
// whenever a transaction is committed.
gaia::db::triggers::commit_trigger_fn m_trigger_fn;

private:
// Only internal static creation is allowed.
event_manager_t() = default;
Expand Down
8 changes: 7 additions & 1 deletion production/rules/event_manager/inc/rule_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class rule_thread_pool_t
*/
rule_thread_pool_t(size_t num_threads, uint32_t max_rule_retries, rule_stats_manager_t& stats_manager);

/**
* Wait for the current rules "graph" to execute. Wait for all rules to finish
* executing as well as any rules that these rules enqueue.
*/
void wait_for_rules_to_finish();

/**
* Notify and wait for all workers in the thread pool
* to finish executing their last work item before destroying
Expand Down Expand Up @@ -93,9 +99,9 @@ class rule_thread_pool_t

private:
void rule_worker(int32_t& count_busy_workers);

void invoke_rule(invocation_t& invocation);
void process_pending_invocations(bool should_schedule);
void wait_for_rules_to_finish(std::unique_lock<std::mutex>& lock);

// Each thread has a copy of these two variables to determine
// whether pending rule invocations can be scheduled or they
Expand Down
25 changes: 23 additions & 2 deletions production/rules/event_manager/src/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ void event_manager_t::init(const event_manager_settings_t& settings)
m_rule_checker = make_unique<rule_checker_t>();
}

auto fn = [](const trigger_event_list_t& event_list) {
m_trigger_fn = [](const trigger_event_list_t& event_list) {
event_manager_t::get().commit_trigger(event_list);
};
set_commit_trigger(fn);
set_commit_trigger(m_trigger_fn);

m_is_initialized = true;
}
Expand Down Expand Up @@ -381,8 +381,29 @@ bool event_manager_t::unsubscribe_rule(
void event_manager_t::unsubscribe_rules()
{
gaia_log::rules().debug("Unsubscribing all rules.");

// Because a rule may cause further invocations on commit which cause access of the
// rule subscriptions, we need to wait for the current rules graph to finish executing.
if (m_invocations)
{
m_invocations->wait_for_rules_to_finish();

// Detach the commit trigger so that any new events that come in do not try
// to look for rule subscriptions while we are removing them.
gaia::db::set_commit_trigger(nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super-dumb question (because I don't know the rules engine code at all): since it seems to be safe to clear the commit trigger before calling wait_for_rules_to_finish() (since you do it immediately after clearing the commit trigger), why do we need to call wait_for_rules_to_finish() before calling set_commit_trigger()? Is this just a best-effort attempt to execute all existing rule invocations? Is that necessary at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first call to wait_for_rules_to_finish is to drain any invocations and any further rules that run because of those invocations. If I were to clear the commit trigger before doing this, I woudn't execute the entire intended graph of rules that were supposed to be run. After the engine is done with that, then it's a best effort to preclude firing any more rules by setting the commit trigger to nullptr. The second wait_for_rules_to_finish is to account for the case when some other procedural code made a change that caused a rule to fire before I had the chance to null out the commit trigger.


// Since an invocation might have snuck in between the time we finished executing
// and the time we detached the commit trigger, wait for these to finish.
m_invocations->wait_for_rules_to_finish();
}

// Now it is safe to clear out the subscriptions and rule bindings.
m_subscriptions.clear();
m_rules.clear();

// Reset the commit trigger so we are ready to go on any new
// rule subscriptions.
gaia::db::set_commit_trigger(m_trigger_fn);
}

void event_manager_t::list_subscribed_rules(
Expand Down
2 changes: 1 addition & 1 deletion production/rules/event_manager/src/rule_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ rule_stats_t::rule_stats_t(const char* a_rule_id)
if (a_rule_id)
{
rule_id = a_rule_id;
// For pretting formatting, truncate long rule ids.
// Truncate long rule ids for pretty formatting.
if (rule_id.length() > c_max_rule_id_len)
{
truncated_rule_id = rule_id.substr(0, c_max_rule_id_len);
Expand Down
41 changes: 28 additions & 13 deletions production/rules/event_manager/src/rule_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,19 @@ rule_thread_pool_t::~rule_thread_pool_t()
shutdown();
}

void rule_thread_pool_t::shutdown()
void rule_thread_pool_t::wait_for_rules_to_finish()
{
if (m_threads.size() == 0)
{
return;
}

// Wait for any scheduled rules to finish executing. Once the rule
// has finished, its worker thread will go into a wait state (not busy).
// Once all workers are not busy, then we can exit if there are no pending
// rule invocations. If there are invocations left, then wait for them to
// be executed and check again.
auto start_shutdown_time = gaia::common::timer_t::get_time_point();

unique_lock lock(m_lock, defer_lock);
wait_for_rules_to_finish(lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like you don't really need defer_lock at all here since the caller isn't using the lock to protect anything. In that case I assume the lock parameter is purely for the benefit of rule_thread_pool_t::shutdown(), since it uses the lock to protect access to m_exit. I wonder if it would be simpler to just have shutdown() re-acquire the lock after wait_for_rules_to_finish() returns? Then you could just use a unique_lock in wait_for_rules_to_finish() rather than calling lock()/unlock() directly, and you wouldn't have to pass locks around with defer_lock. Does the potential synchronization overhead here justify introducing this complexity (e.g. the implicit contract of wait_for_rules_to_finish() always exiting with the lock held)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't want anyone messing with m_exit when trying to shutdown. The flag and the wait state of the threads need to be checked and set together. Note that the wait_for_rules_to_finish method that takes the lock is a private method only used in this class. There is a public wrapper which takes care of the lock for you for other classes that want to use it (most notably the event_manager_t class). I felt that the complexity was justified for use within the thread pool but did not want this to escape through the class public interface.

lock.unlock();
}

// NOTE: Callers must unlock the passed in mutex.
//
// TODO[GAIAPLAT-1020]: Add a configuration setting to limit the time
// we will wait for all rules to execute.
void rule_thread_pool_t::wait_for_rules_to_finish(std::unique_lock<std::mutex>& lock)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, assuming you no longer passed in the lock via defer_lock, I think you could just use an RAII wrapper like unique_lock if you just added an extra scope to the while loop and set a flag? It doesn't seem crucial that this method exits with the lock held, except for convenience in shutdown() not needing to acquire the lock again to set m_exit? (I'm probably missing something, of course.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this code used to live in the shutdown method and I wanted to preserve its locking behavior exactly.

while (true)
{
lock.lock();
Expand All @@ -70,6 +68,23 @@ void rule_thread_pool_t::shutdown()
lock.unlock();
std::this_thread::yield();
}
}

void rule_thread_pool_t::shutdown()
{
if (m_threads.size() == 0)
{
return;
}

auto start_shutdown_time = gaia::common::timer_t::get_time_point();

unique_lock lock(m_lock, defer_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this method doesn't seem to use the lock itself, why can't wait_for_rules_to_finish() just use m_lock directly? Oh, I guess shutdown() needs to hold the lock while it sets m_exit since that's not atomic and is protected by m_lock everywhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I want it to set m_exit while the lock is held.


// Wait for the currently executing rules "graph" to finish executing. This will
// drain the work queues of any rules executing AND any rules that these rules
// may trigger.
wait_for_rules_to_finish(lock);

m_exit = true;
lock.unlock();
Expand Down