From 7654e0125d93d93bed43a21f26ed972ace64eea0 Mon Sep 17 00:00:00 2001 From: Danilo Piparo Date: Mon, 12 Nov 2018 14:49:54 +0100 Subject: [PATCH] [DF] Fix rare race condition: TTreeReader and its values deleted concurrently in a nutshell, the condition was: - Thread #1) a task ends and pushes back processing slot - Thread #2) a task starts and overwrites thread-local TTreeReaderValues - Thread #1) first task deletes TTreeReader See https://github.com/root-project/root/commit/26e8ace6e47de6794ac9ec770c3bbff9b7f2e945 --- tree/dataframe/inc/ROOT/RDF/RAction.hxx | 33 +++++++++++++++++++ tree/dataframe/inc/ROOT/RDF/RActionBase.hxx | 1 + tree/dataframe/inc/ROOT/RDF/RColumnValue.hxx | 25 ++++++++++++++ tree/dataframe/inc/ROOT/RDF/RCustomColumn.hxx | 6 ++++ .../inc/ROOT/RDF/RCustomColumnBase.hxx | 1 + tree/dataframe/inc/ROOT/RDF/RFilter.hxx | 14 ++++++++ tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx | 2 ++ tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx | 1 + .../inc/ROOT/RDF/RJittedCustomColumn.hxx | 1 + tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx | 2 ++ tree/dataframe/src/RJittedAction.cxx | 6 ++++ tree/dataframe/src/RJittedCustomColumn.cxx | 6 ++++ tree/dataframe/src/RJittedFilter.cxx | 12 +++++++ tree/dataframe/src/RLoopManager.cxx | 2 ++ 14 files changed, 112 insertions(+) diff --git a/tree/dataframe/inc/ROOT/RDF/RAction.hxx b/tree/dataframe/inc/ROOT/RDF/RAction.hxx index 7131f7a74a6f9..c71b0dd38b889 100644 --- a/tree/dataframe/inc/ROOT/RDF/RAction.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RAction.hxx @@ -84,6 +84,15 @@ void InitRDFValues(unsigned int slot, std::vector &value 0}; } +/// This overload is specialized to act on RTypeErasedColumnValues instead of RColumnValues. +template +void ResetRDFValueTuple(std::vector &values, std::index_sequence, + ROOT::TypeTraits::TypeList) +{ + using expander = int[]; + (void)expander{(values[S].Cast()->Reset(), 0)...}; +} + // fwd decl for RActionCRTP template class RAction; @@ -132,9 +141,15 @@ public: void FinalizeSlot(unsigned int slot) final { + ClearValueReaders(slot); + for (auto &column : GetCustomColumns().GetColumns()) { + column.second->ClearValueReaders(slot); + } fHelper.CallFinalizeTask(slot); } + void ClearValueReaders(unsigned int slot) { static_cast(this)->ResetColumnValues(slot, TypeInd_t()); } + void Finalize() final { fHelper.Finalize(); @@ -209,6 +224,12 @@ public: (void)entry; // avoid bogus 'unused parameter' warning in gcc4.9 ActionCRTP_t::GetHelper().Exec(slot, std::get(fValues[slot]).Get(entry)...); } + + template + void ResetColumnValues(unsigned int slot, std::index_sequence s) + { + ResetRDFValueTuple(fValues[slot], s); + } }; // These specializations let RAction type-erase their column values, for (presumably) a small hit in @@ -254,6 +275,12 @@ public: (void)entry; // avoid bogus 'unused parameter' warning in gcc4.9 ActionCRTP_t::GetHelper().Exec(slot, fValues[slot][S].template Get(entry)...); } + + template + void ResetColumnValues(unsigned int slot, std::index_sequence s) + { + ResetRDFValueTuple(fValues[slot], s, ColumnTypes_t{}); + } }; // Same exact code as above, but for SnapshotHelperMT. I don't know how to avoid repeating this code @@ -288,6 +315,12 @@ public: (void)entry; // avoid bogus 'unused parameter' warning in gcc4.9 ActionCRTP_t::GetHelper().Exec(slot, fValues[slot][S].template Get(entry)...); } + + template + void ResetColumnValues(unsigned int slot, std::index_sequence s) + { + ResetRDFValueTuple(fValues[slot], s, ColumnTypes_t{}); + } }; } // ns RDF diff --git a/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx b/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx index 7bd74d891be1e..b42c4370b8f35 100644 --- a/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx @@ -67,6 +67,7 @@ public: virtual void Initialize() = 0; virtual void InitSlot(TTreeReader *r, unsigned int slot) = 0; virtual void TriggerChildrenCount() = 0; + virtual void ClearValueReaders(unsigned int slot) = 0; virtual void FinalizeSlot(unsigned int) = 0; virtual void Finalize() = 0; /// This method is invoked to update a partial result during the event loop, right before passing the result to a diff --git a/tree/dataframe/inc/ROOT/RDF/RColumnValue.hxx b/tree/dataframe/inc/ROOT/RDF/RColumnValue.hxx index 0460ffd4a7df8..e973ecc173369 100644 --- a/tree/dataframe/inc/ROOT/RDF/RColumnValue.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RColumnValue.hxx @@ -237,6 +237,21 @@ public: return fColumnKind == EColumnKind::kCustomColumn ? *fCustomValuePtr : **fDSValuePtr; } } + + void Reset() + { + // This method should by all means not be removed, together with all + // of its callers, otherwise a race condition takes place in which a + // TTreeReader and its TTreeReader{Value,Array}s could be deleted + // concurrently: + // - Thread #1) a task ends and pushes back processing slot + // - Thread #2) a task starts and overwrites thread-local TTreeReaderValues + // - Thread #1) first task deletes TTreeReader + // See https://github.com/root-project/root/commit/26e8ace6e47de6794ac9ec770c3bbff9b7f2e945 + if (EColumnKind::kTree == fColumnKind) { + fTreeReader.reset(); + } + } }; // Some extern instaniations to speed-up compilation/interpretation time @@ -273,6 +288,16 @@ struct TRDFValueTuple> { template using RDFValueTuple_t = typename TRDFValueTuple::type; +/// Clear the proxies of a tuple of RColumnValues +template +void ResetRDFValueTuple(ValueTuple &values, std::index_sequence) +{ + // hack to expand a parameter pack without c++17 fold expressions. + std::initializer_list expander{(std::get(values).Reset(), 0)...}; + (void)expander; // avoid "unused variable" warnings +} + + } // ns RDF } // ns Internal } // ns ROOT diff --git a/tree/dataframe/inc/ROOT/RDF/RCustomColumn.hxx b/tree/dataframe/inc/ROOT/RDF/RCustomColumn.hxx index 098c9e9c33f18..c19fe573f0616 100644 --- a/tree/dataframe/inc/ROOT/RDF/RCustomColumn.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RCustomColumn.hxx @@ -124,6 +124,12 @@ public: // silence "unused parameter" warnings in gcc (void)slot; (void)entry; + + void ClearValueReaders(unsigned int slot) final + { + // TODO: Each node calls this method for each column it uses. Multiple nodes may share the same columns, and this + // would lead to this method being called multiple times. + RDFInternal::ResetRDFValueTuple(fValues[slot], TypeInd_t()); } }; diff --git a/tree/dataframe/inc/ROOT/RDF/RCustomColumnBase.hxx b/tree/dataframe/inc/ROOT/RDF/RCustomColumnBase.hxx index e81b5c34d447e..51690ffa8dbb9 100644 --- a/tree/dataframe/inc/ROOT/RDF/RCustomColumnBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RCustomColumnBase.hxx @@ -56,6 +56,7 @@ public: RLoopManager *GetLoopManagerUnchecked() const; std::string GetName() const; virtual void Update(unsigned int slot, Long64_t entry) = 0; + virtual void ClearValueReaders(unsigned int slot) = 0; bool IsDataSourceColumn() const { return fIsDataSourceColumn; } virtual void InitNode(); /// Return the unique identifier of this RCustomColumnBase. diff --git a/tree/dataframe/inc/ROOT/RDF/RFilter.hxx b/tree/dataframe/inc/ROOT/RDF/RFilter.hxx index cdd40c6bd5508..feb00e78a7243 100644 --- a/tree/dataframe/inc/ROOT/RDF/RFilter.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RFilter.hxx @@ -133,6 +133,11 @@ public: fPrevData.IncrChildrenCount(); } + virtual void ClearValueReaders(unsigned int slot) final + { + RDFInternal::ResetRDFValueTuple(fValues[slot], TypeInd_t()); + } + void AddFilterName(std::vector &filters) { fPrevData.AddFilterName(filters); @@ -140,6 +145,15 @@ public: filters.push_back(name); } + virtual void ClearTask(unsigned int slot) final + { + for (auto &column : fCustomColumns.GetColumns()) { + column.second->ClearValueReaders(slot); + } + + ClearValueReaders(slot); + } + std::shared_ptr GetGraph() { // Recursively call for the previous node. diff --git a/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx b/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx index bff4378efd36c..e0e123b45c779 100644 --- a/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx @@ -63,6 +63,8 @@ public: std::fill(fAccepted.begin(), fAccepted.end(), 0); std::fill(fRejected.begin(), fRejected.end(), 0); } + virtual void ClearValueReaders(unsigned int slot) = 0; + virtual void ClearTask(unsigned int slot) = 0; virtual void InitNode(); virtual void AddFilterName(std::vector &filters) = 0; }; diff --git a/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx b/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx index 2d2a75045c25e..de67900ac556e 100644 --- a/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx @@ -53,6 +53,7 @@ public: void *PartialUpdate(unsigned int slot) final; bool HasRun() const final; void SetHasRun() final; + void ClearValueReaders(unsigned int slot) final; std::shared_ptr GetGraph(); }; diff --git a/tree/dataframe/inc/ROOT/RDF/RJittedCustomColumn.hxx b/tree/dataframe/inc/ROOT/RDF/RJittedCustomColumn.hxx index d34c6bd472cb9..836a87ab69ff1 100644 --- a/tree/dataframe/inc/ROOT/RDF/RJittedCustomColumn.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RJittedCustomColumn.hxx @@ -45,6 +45,7 @@ public: void *GetValuePtr(unsigned int slot) final; const std::type_info &GetTypeId() const final; void Update(unsigned int slot, Long64_t entry) final; + void ClearValueReaders(unsigned int slot) final; void InitNode() final; }; diff --git a/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx b/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx index 90821b5ce18ce..71b7761e107b0 100644 --- a/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx @@ -55,8 +55,10 @@ public: void ResetChildrenCount() final; void TriggerChildrenCount() final; void ResetReportCount() final; + void ClearValueReaders(unsigned int slot) final; void InitNode() final; void AddFilterName(std::vector &filters) final; + void ClearTask(unsigned int slot) final; std::shared_ptr GetGraph(); }; diff --git a/tree/dataframe/src/RJittedAction.cxx b/tree/dataframe/src/RJittedAction.cxx index 9c39ac035d030..e0416a3d2751f 100644 --- a/tree/dataframe/src/RJittedAction.cxx +++ b/tree/dataframe/src/RJittedAction.cxx @@ -76,6 +76,12 @@ void RJittedAction::SetHasRun() return fConcreteAction->SetHasRun(); } +void RJittedAction::ClearValueReaders(unsigned int slot) +{ + R__ASSERT(fConcreteAction != nullptr); + return fConcreteAction->ClearValueReaders(slot); +} + std::shared_ptr RJittedAction::GetGraph() { R__ASSERT(fConcreteAction != nullptr); diff --git a/tree/dataframe/src/RJittedCustomColumn.cxx b/tree/dataframe/src/RJittedCustomColumn.cxx index db37385b2f4a0..1e51568dff449 100644 --- a/tree/dataframe/src/RJittedCustomColumn.cxx +++ b/tree/dataframe/src/RJittedCustomColumn.cxx @@ -37,6 +37,12 @@ void RJittedCustomColumn::Update(unsigned int slot, Long64_t entry) fConcreteCustomColumn->Update(slot, entry); } +void RJittedCustomColumn::ClearValueReaders(unsigned int slot) +{ + R__ASSERT(fConcreteCustomColumn != nullptr); + fConcreteCustomColumn->ClearValueReaders(slot); +} + void RJittedCustomColumn::InitNode() { R__ASSERT(fConcreteCustomColumn != nullptr); diff --git a/tree/dataframe/src/RJittedFilter.cxx b/tree/dataframe/src/RJittedFilter.cxx index c725909cb6a42..5db6bd6e79826 100644 --- a/tree/dataframe/src/RJittedFilter.cxx +++ b/tree/dataframe/src/RJittedFilter.cxx @@ -83,6 +83,18 @@ void RJittedFilter::ResetReportCount() fConcreteFilter->ResetReportCount(); } +void RJittedFilter::ClearValueReaders(unsigned int slot) +{ + R__ASSERT(fConcreteFilter != nullptr); + fConcreteFilter->ClearValueReaders(slot); +} + +void RJittedFilter::ClearTask(unsigned int slot) +{ + R__ASSERT(fConcreteFilter != nullptr); + fConcreteFilter->ClearTask(slot); +} + void RJittedFilter::InitNode() { R__ASSERT(fConcreteFilter != nullptr); diff --git a/tree/dataframe/src/RLoopManager.cxx b/tree/dataframe/src/RLoopManager.cxx index 0f3ceb97ec7bc..337600be9fd9c 100644 --- a/tree/dataframe/src/RLoopManager.cxx +++ b/tree/dataframe/src/RLoopManager.cxx @@ -259,6 +259,8 @@ void RLoopManager::CleanUpTask(unsigned int slot) { for (auto &ptr : fBookedActions) ptr->FinalizeSlot(slot); + for (auto &ptr : fBookedFilters) + ptr->ClearTask(slot); } /// Jit all actions that required runtime column type inference, and clean the `fToJit` member variable.