Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GAIAPLAT-1205] Do not fire a rule if the anchor row is invalid #882

Merged
merged 9 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions production/rules/event_manager/inc/event_manager_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct event_manager_settings_t
event_manager_settings_t()
: num_background_threads(SIZE_MAX)
, enable_catalog_checks(true)
, enable_db_checks(true)
, enable_rule_stats(false)
, stats_log_interval(10)
, max_rule_retries(1)
Expand All @@ -46,6 +47,9 @@ struct event_manager_settings_t
// Specifying true will allow rule subscriptions without comparing the table
// and fields to existing catalog definitions.
bool enable_catalog_checks;
// Specifies whether rule anchor rows are validated by the database
// before invoking a rule.
bool enable_db_checks;
// Enable logging of rule specific statistcs.
bool enable_rule_stats;
// Specifies the interval in seconds that performance statistics
Expand Down
39 changes: 33 additions & 6 deletions production/rules/event_manager/inc/rule_checker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,46 @@ namespace rules
{

/**
* This helper class interfaces with the catalog to verify rule subscriptions.
* This class is invoked when the user calls the subscribe_rule API.
*
* Post Q2 this functionality may be moved to the catalog manager so that other code can
* also use it.
*/
* This helper class is used by the rules engine at both rule
* subscription and invocation time. Currently, the class performs
* the following checks.
*
* Catalog Checks:
* Ensure tables and fields that are referenced in a rule are
* actually present in the catalog at rule subscription time.
*
* Database Checks:
* Ensure that an anchor row is valid before invoking a rule.
*
* Note that the checks can be disabled by unit tests that do not want
* to have a dependency on the database. These tests can intialize
* the rules engine with custom settings. See event_manager_settings.hpp
* and rules_test_helpers.hpp for more information.
*/
Comment on lines +15 to +30

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to determine what user-facing documentation that needs to be provided.
Does affect a rule as it processes and forward chaining?

Copy link
Contributor Author

@daxhaw daxhaw Aug 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two important messages:

  1. Gaia guarantees that a rule is passed a valid anchor row.
  2. Gaia determines the validity of an anchor row before calling a rule. If the anchor row is invalid, the rule will not be called.

One way for an anchor row to be invalidated is the following sequence events:

  1. a row is inserted, a transaction is committed and a rule bound to the insert event is enqueued
  2. that same row is deleted, and a transaction is committed before the rule enqueued in step 1 is invoked
  3. the rules engine goes to invoke the insert rule enqueued in step 1 but now it won't because that row got deleted. Note that whether the row got deleted in the same transaction as when it was inserted or a different transaction doesn't matter.

class rule_checker_t
{
public:
// By default, enable all checks.
rule_checker_t()
: rule_checker_t(true, true)
{
}

rule_checker_t(bool enable_catalog_checks, bool enable_db_checks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what these checks are actually doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the following to the class header:

/**
 * This helper class is used by the rules engine at both rule
 * subscription and invocation time.  Currently, the class performs 
 * the following checks.
 * 
 * Catalog Checks:
 *  Ensure tables and fields that are referenced in a rule are
 *  actually present in the catalog at rule subscription time.
 * 
 * Database Checks:
 *  Ensure that an anchor row is valid before invoking a rule.
 * 
 * Note that the checks can be disabled by unit tests that do not want
 * to have a dependency on the database.  These tests can intialize
 * the rules engine with custom settings.  See event_manager_settings.hpp 
 * and rules_test_helpers.hpp for more information.
 */

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

: m_enable_catalog_checks(enable_catalog_checks)
, m_enable_db_checks(enable_db_checks)
{
}

void check_catalog(common::gaia_type_t type, const common::field_position_list_t& field_list);
bool is_valid_row(common::gaia_id_t row_id);

private:
void check_fields(common::gaia_id_t id, const common::field_position_list_t& field_list);

private:
bool m_enable_catalog_checks;
bool m_enable_db_checks;
};

} // namespace rules
Expand Down
10 changes: 9 additions & 1 deletion production/rules/event_manager/inc/rule_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "gaia_internal/db/triggers.hpp"

#include "rule_checker.hpp"
#include "rule_stats_manager.hpp"

namespace gaia
Expand Down Expand Up @@ -69,7 +70,9 @@ class rule_thread_pool_t
* mode and no worker threads are created. If SIZE_MAX is specified
* then create the pool with the number of available hardware threads.
*/
rule_thread_pool_t(size_t num_threads, uint32_t max_rule_retries, rule_stats_manager_t& stats_manager);
rule_thread_pool_t(
size_t num_threads, uint32_t max_rule_retries,
rule_stats_manager_t& stats_manager, rule_checker_t& rule_checker);

/**
* Wait for the current rules "graph" to execute. Wait for all rules to finish
Expand Down Expand Up @@ -132,6 +135,11 @@ class rule_thread_pool_t
*/
rule_stats_manager_t& m_stats_manager;

/**
* Helper to validate anchor rows.
*/
rule_checker_t& m_rule_checker;

/**
* Maximum number of times to retry a rule when getting transaction update conflicts.
*/
Expand Down
19 changes: 6 additions & 13 deletions production/rules/event_manager/src/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,11 @@ void event_manager_t::init(const event_manager_settings_t& settings)
count_worker_threads,
settings.stats_log_interval);

m_invocations = make_unique<rule_thread_pool_t>(
count_worker_threads, settings.max_rule_retries, *m_stats_manager);
m_rule_checker = make_unique<rule_checker_t>(
settings.enable_catalog_checks, settings.enable_db_checks);

if (settings.enable_catalog_checks)
{
m_rule_checker = make_unique<rule_checker_t>();
}
m_invocations = make_unique<rule_thread_pool_t>(
count_worker_threads, settings.max_rule_retries, *m_stats_manager, *m_rule_checker);

m_trigger_fn = [](const trigger_event_list_t& event_list) {
event_manager_t::get().commit_trigger(event_list);
Expand Down Expand Up @@ -265,13 +263,8 @@ void event_manager_t::subscribe_rule(
check_subscription(event_type, fields);

// Verify that the type and fields specified in the rule subscription
// are valid according to the catalog. The rule checker may be null
// if the event_manager was initialized with 'disabled_catalog_checks'
// set to true in its settings.
if (m_rule_checker)
{
m_rule_checker->check_catalog(gaia_type, fields);
}
// are valid according to the catalog.
m_rule_checker->check_catalog(gaia_type, fields);

// Look up the gaia_type in our type map. If we do not find it
// then we create a new empty event map map.
Expand Down
17 changes: 17 additions & 0 deletions production/rules/event_manager/src/rule_checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "gaia_internal/catalog/catalog.hpp"
#include "gaia_internal/catalog/gaia_catalog.h"
#include "gaia_internal/db/gaia_ptr.hpp"

using namespace gaia::rules;
using namespace gaia::common;
Expand Down Expand Up @@ -90,6 +91,10 @@ ruleset_not_found::ruleset_not_found(const char* ruleset_name)
//
void rule_checker_t::check_catalog(gaia_type_t type, const field_position_list_t& field_list)
{
if (!m_enable_catalog_checks)
{
return;
}
auto_transaction_t txn;
// Find the id of the table defining gaia_type.
for (const auto& table : catalog::gaia_table_t::list())
Expand Down Expand Up @@ -148,3 +153,15 @@ void rule_checker_t::check_fields(gaia_id_t id, const field_position_list_t& fie
}
}
}

// A transaction must be active before calling this function.
bool rule_checker_t::is_valid_row(gaia::common::gaia_id_t row_id)
{
if (!m_enable_db_checks)
{
return true;
}

gaia::db::gaia_ptr_t row_ptr = gaia::db::gaia_ptr_t::open(row_id);
return static_cast<bool>(row_ptr);
}
24 changes: 22 additions & 2 deletions production/rules/event_manager/src/rule_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ using namespace gaia::direct_access;
thread_local bool rule_thread_pool_t::s_tls_can_enqueue = true;
thread_local queue<rule_thread_pool_t::invocation_t> rule_thread_pool_t::s_tls_pending_invocations;

rule_thread_pool_t::rule_thread_pool_t(size_t count_threads, uint32_t max_retries, rule_stats_manager_t& stats_manager)
: m_stats_manager(stats_manager), m_max_rule_retries(max_retries), m_count_busy_workers(count_threads)
rule_thread_pool_t::rule_thread_pool_t(size_t count_threads, uint32_t max_retries, rule_stats_manager_t& stats_manager, rule_checker_t& rule_checker)
: m_stats_manager(stats_manager)
, m_rule_checker(rule_checker)
, m_max_rule_retries(max_retries)
, m_count_busy_workers(count_threads)
{
m_exit = false;
for (uint32_t i = 0; i < count_threads; i++)
Expand Down Expand Up @@ -232,6 +235,23 @@ void rule_thread_pool_t::invoke_rule_inner(invocation_t& invocation)
try
{
auto_transaction_t txn(auto_transaction_t::no_auto_begin);

// If the anchor row is invalid, then do not invoke the rule. This can
// occur if the row is deleted after it has been inserted or updated but
// before an enqueued rule has been invoked.
if (!m_rule_checker.is_valid_row(rule_invocation.record))
{
gaia_log::rules().trace("invalid anchor row: rule '{}' was not invoked, src_txn:'{}', new_txn:'{}'", rule_id, rule_invocation.src_txn_id, gaia::db::get_txn_id());

// It is safe to exit early out of this routine. The transaction will clean up on
// exit of the function and there will be no pending rule invocations to process.
// An invocation can only be pending if the rule itself called commit such that
// a new rule is enqueued while the existing rule is executing. Since we
// never called a rule function, this cannot happen.
ASSERT_INVARIANT(s_tls_pending_invocations.empty(), "No pending invocations should exist!");
return;
}

rule_context_t context(
txn,
rule_invocation.gaia_type,
Expand Down
4 changes: 4 additions & 0 deletions production/rules/event_manager/tests/test_event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ class event_manager_test : public db_catalog_test_base_t
event_manager_settings_t settings;
settings.num_background_threads = 0;
settings.enable_catalog_checks = false;
// These standalone tests do not write to the database so that
// it cannot verify the validity of the "fake" anchor rows the
// test is using to test the event manager.
settings.enable_db_checks = false;
test::initialize_rules_engine(settings);
g_context_checker.get_dummy_transaction(true);
}
Expand Down
21 changes: 21 additions & 0 deletions production/rules/event_manager/tests/test_rule_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,27 @@ TEST_F(rule_integration_test, test_update)
}
}

// This test verifies that a rule is not invoked if the anchor row
// is deleted before the rule can be called.
TEST_F(rule_integration_test, test_update_and_delete)
{
subscribe_update();
{
// The update rule should not be fired because we delete the row.
rule_monitor_t monitor(0);
auto_transaction_t txn(true);
employee_writer writer;
writer.name_first = "Ignore";
employee_t e = employee_t::get(writer.insert_row());
txn.commit();
writer = e.writer();
writer.name_first = c_name;
writer.update_row();
e.delete_row();
txn.commit();
}
}

// Test single rule, single active field binding.
TEST_F(rule_integration_test, test_update_field)
{
Expand Down
8 changes: 5 additions & 3 deletions production/sdk/tests/test_sdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,11 @@ TEST_F(sdk_test, rule_subscribe_unsubscribe)
employee_writer w;
w.name_first = "Public";
w.name_last = "Headers";
gaia_id_t id = w.insert_row();
employee_t e = employee_t::get(id);
e.delete_row();
w.insert_row();
// [GAIAPLAT-1205]: We now do not fire an event if
// the anchor row has been deleted.
// employee_t e = employee_t::get(id);
// e.delete_row();
tx.commit();

wait_for_rule(g_rule_1_called);
Expand Down
7 changes: 4 additions & 3 deletions production/system/tests/test_gaia_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ void perform_transactions(uint32_t count_transactions, uint32_t crud_events_per_
w.name_first = "updated_name";
w.update_row();

// Delete row.
employee_t::delete_row(id);
// [GAIAPLAT-1205]: We now do not fire an event if
// the anchor row has been deleted.
// employee_t::delete_row(id);
gaia::db::commit_transaction();

// We should get crud_events_per_txn per commit. Wait for them.
Expand All @@ -131,7 +132,7 @@ void validate_and_end_test(uint32_t count_txn, uint32_t crud_events_per_txn, uin
TEST_F(gaia_system_test, single_threaded_transactions)
{
uint32_t count_txn = 2;
uint32_t crud_events_per_txn = 2;
uint32_t crud_events_per_txn = 2; // insert and update
perform_transactions(count_txn, crud_events_per_txn, false);
validate_and_end_test(count_txn, crud_events_per_txn, 1);
}
Expand Down
2 changes: 0 additions & 2 deletions production/tools/gaia_translate/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ constexpr int c_encoding_mask = 0xFFFF;

constexpr char c_connect_keyword[] = "connect";
constexpr char c_disconnect_keyword[] = "disconnect";
constexpr size_t c_connect_keyword_length = 7;
constexpr size_t c_disconnect_keyword_length = 10;

vector<string> g_rulesets;
unordered_map<string, unordered_set<string>> g_active_fields;
Expand Down