Skip to content

Commit

Permalink
[DF] Fix rare race condition: TTreeReader and its values deleted conc…
Browse files Browse the repository at this point in the history
…urrently

in a nutshell, the condition was:
- Thread #1) a task ends and pushes back processing slot
- Thread #2) a task starts and overwrites thread-local TTreeReaderValues
- Thread #1) first task deletes TTreeReader

See root-project@26e8ace
  • Loading branch information
dpiparo committed Nov 19, 2018
1 parent 4216622 commit 7654e01
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 0 deletions.
33 changes: 33 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RAction.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ void InitRDFValues(unsigned int slot, std::vector<RTypeErasedColumnValue> &value
0};
}

/// This overload is specialized to act on RTypeErasedColumnValues instead of RColumnValues.
template <std::size_t... S, typename... ColTypes>
void ResetRDFValueTuple(std::vector<RTypeErasedColumnValue> &values, std::index_sequence<S...>,
ROOT::TypeTraits::TypeList<ColTypes...>)
{
using expander = int[];
(void)expander{(values[S].Cast<ColTypes>()->Reset(), 0)...};
}

// fwd decl for RActionCRTP
template <typename Helper, typename PrevDataFrame, typename ColumnTypes_t>
class RAction;
Expand Down Expand Up @@ -132,9 +141,15 @@ public:

void FinalizeSlot(unsigned int slot) final
{
ClearValueReaders(slot);
for (auto &column : GetCustomColumns().GetColumns()) {
column.second->ClearValueReaders(slot);
}
fHelper.CallFinalizeTask(slot);
}

void ClearValueReaders(unsigned int slot) { static_cast<Action_t *>(this)->ResetColumnValues(slot, TypeInd_t()); }

void Finalize() final
{
fHelper.Finalize();
Expand Down Expand Up @@ -209,6 +224,12 @@ public:
(void)entry; // avoid bogus 'unused parameter' warning in gcc4.9
ActionCRTP_t::GetHelper().Exec(slot, std::get<S>(fValues[slot]).Get(entry)...);
}

template <std::size_t... S>
void ResetColumnValues(unsigned int slot, std::index_sequence<S...> s)
{
ResetRDFValueTuple(fValues[slot], s);
}
};

// These specializations let RAction<SnapshotHelper[MT]> type-erase their column values, for (presumably) a small hit in
Expand Down Expand Up @@ -254,6 +275,12 @@ public:
(void)entry; // avoid bogus 'unused parameter' warning in gcc4.9
ActionCRTP_t::GetHelper().Exec(slot, fValues[slot][S].template Get<ColTypes>(entry)...);
}

template <std::size_t... S>
void ResetColumnValues(unsigned int slot, std::index_sequence<S...> s)
{
ResetRDFValueTuple(fValues[slot], s, ColumnTypes_t{});
}
};

// Same exact code as above, but for SnapshotHelperMT. I don't know how to avoid repeating this code
Expand Down Expand Up @@ -288,6 +315,12 @@ public:
(void)entry; // avoid bogus 'unused parameter' warning in gcc4.9
ActionCRTP_t::GetHelper().Exec(slot, fValues[slot][S].template Get<ColTypes>(entry)...);
}

template <std::size_t... S>
void ResetColumnValues(unsigned int slot, std::index_sequence<S...> s)
{
ResetRDFValueTuple(fValues[slot], s, ColumnTypes_t{});
}
};

} // ns RDF
Expand Down
1 change: 1 addition & 0 deletions tree/dataframe/inc/ROOT/RDF/RActionBase.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public:
virtual void Initialize() = 0;
virtual void InitSlot(TTreeReader *r, unsigned int slot) = 0;
virtual void TriggerChildrenCount() = 0;
virtual void ClearValueReaders(unsigned int slot) = 0;
virtual void FinalizeSlot(unsigned int) = 0;
virtual void Finalize() = 0;
/// This method is invoked to update a partial result during the event loop, right before passing the result to a
Expand Down
25 changes: 25 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RColumnValue.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,21 @@ public:
return fColumnKind == EColumnKind::kCustomColumn ? *fCustomValuePtr : **fDSValuePtr;
}
}

void Reset()
{
// This method should by all means not be removed, together with all
// of its callers, otherwise a race condition takes place in which a
// TTreeReader and its TTreeReader{Value,Array}s could be deleted
// concurrently:
// - Thread #1) a task ends and pushes back processing slot
// - Thread #2) a task starts and overwrites thread-local TTreeReaderValues
// - Thread #1) first task deletes TTreeReader
// See https://github.com/root-project/root/commit/26e8ace6e47de6794ac9ec770c3bbff9b7f2e945
if (EColumnKind::kTree == fColumnKind) {
fTreeReader.reset();
}
}
};

// Some extern instaniations to speed-up compilation/interpretation time
Expand Down Expand Up @@ -273,6 +288,16 @@ struct TRDFValueTuple<TypeList<BranchTypes...>> {
template <typename BranchType>
using RDFValueTuple_t = typename TRDFValueTuple<BranchType>::type;

/// Clear the proxies of a tuple of RColumnValues
template <typename ValueTuple, std::size_t... S>
void ResetRDFValueTuple(ValueTuple &values, std::index_sequence<S...>)
{
// hack to expand a parameter pack without c++17 fold expressions.
std::initializer_list<int> expander{(std::get<S>(values).Reset(), 0)...};
(void)expander; // avoid "unused variable" warnings
}


} // ns RDF
} // ns Internal
} // ns ROOT
Expand Down
6 changes: 6 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RCustomColumn.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public:
// silence "unused parameter" warnings in gcc
(void)slot;
(void)entry;

void ClearValueReaders(unsigned int slot) final
{
// TODO: Each node calls this method for each column it uses. Multiple nodes may share the same columns, and this
// would lead to this method being called multiple times.
RDFInternal::ResetRDFValueTuple(fValues[slot], TypeInd_t());
}
};

Expand Down
1 change: 1 addition & 0 deletions tree/dataframe/inc/ROOT/RDF/RCustomColumnBase.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public:
RLoopManager *GetLoopManagerUnchecked() const;
std::string GetName() const;
virtual void Update(unsigned int slot, Long64_t entry) = 0;
virtual void ClearValueReaders(unsigned int slot) = 0;
bool IsDataSourceColumn() const { return fIsDataSourceColumn; }
virtual void InitNode();
/// Return the unique identifier of this RCustomColumnBase.
Expand Down
14 changes: 14 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RFilter.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,27 @@ public:
fPrevData.IncrChildrenCount();
}

virtual void ClearValueReaders(unsigned int slot) final
{
RDFInternal::ResetRDFValueTuple(fValues[slot], TypeInd_t());
}

void AddFilterName(std::vector<std::string> &filters)
{
fPrevData.AddFilterName(filters);
auto name = (HasName() ? fName : "Unnamed Filter");
filters.push_back(name);
}

virtual void ClearTask(unsigned int slot) final
{
for (auto &column : fCustomColumns.GetColumns()) {
column.second->ClearValueReaders(slot);
}

ClearValueReaders(slot);
}

std::shared_ptr<RDFGraphDrawing::GraphNode> GetGraph()
{
// Recursively call for the previous node.
Expand Down
2 changes: 2 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public:
std::fill(fAccepted.begin(), fAccepted.end(), 0);
std::fill(fRejected.begin(), fRejected.end(), 0);
}
virtual void ClearValueReaders(unsigned int slot) = 0;
virtual void ClearTask(unsigned int slot) = 0;
virtual void InitNode();
virtual void AddFilterName(std::vector<std::string> &filters) = 0;
};
Expand Down
1 change: 1 addition & 0 deletions tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public:
void *PartialUpdate(unsigned int slot) final;
bool HasRun() const final;
void SetHasRun() final;
void ClearValueReaders(unsigned int slot) final;

std::shared_ptr<GraphDrawing::GraphNode> GetGraph();
};
Expand Down
1 change: 1 addition & 0 deletions tree/dataframe/inc/ROOT/RDF/RJittedCustomColumn.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public:
void *GetValuePtr(unsigned int slot) final;
const std::type_info &GetTypeId() const final;
void Update(unsigned int slot, Long64_t entry) final;
void ClearValueReaders(unsigned int slot) final;
void InitNode() final;
};

Expand Down
2 changes: 2 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ public:
void ResetChildrenCount() final;
void TriggerChildrenCount() final;
void ResetReportCount() final;
void ClearValueReaders(unsigned int slot) final;
void InitNode() final;
void AddFilterName(std::vector<std::string> &filters) final;
void ClearTask(unsigned int slot) final;
std::shared_ptr<RDFGraphDrawing::GraphNode> GetGraph();
};

Expand Down
6 changes: 6 additions & 0 deletions tree/dataframe/src/RJittedAction.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ void RJittedAction::SetHasRun()
return fConcreteAction->SetHasRun();
}

void RJittedAction::ClearValueReaders(unsigned int slot)
{
R__ASSERT(fConcreteAction != nullptr);
return fConcreteAction->ClearValueReaders(slot);
}

std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RJittedAction::GetGraph()
{
R__ASSERT(fConcreteAction != nullptr);
Expand Down
6 changes: 6 additions & 0 deletions tree/dataframe/src/RJittedCustomColumn.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ void RJittedCustomColumn::Update(unsigned int slot, Long64_t entry)
fConcreteCustomColumn->Update(slot, entry);
}

void RJittedCustomColumn::ClearValueReaders(unsigned int slot)
{
R__ASSERT(fConcreteCustomColumn != nullptr);
fConcreteCustomColumn->ClearValueReaders(slot);
}

void RJittedCustomColumn::InitNode()
{
R__ASSERT(fConcreteCustomColumn != nullptr);
Expand Down
12 changes: 12 additions & 0 deletions tree/dataframe/src/RJittedFilter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ void RJittedFilter::ResetReportCount()
fConcreteFilter->ResetReportCount();
}

void RJittedFilter::ClearValueReaders(unsigned int slot)
{
R__ASSERT(fConcreteFilter != nullptr);
fConcreteFilter->ClearValueReaders(slot);
}

void RJittedFilter::ClearTask(unsigned int slot)
{
R__ASSERT(fConcreteFilter != nullptr);
fConcreteFilter->ClearTask(slot);
}

void RJittedFilter::InitNode()
{
R__ASSERT(fConcreteFilter != nullptr);
Expand Down
2 changes: 2 additions & 0 deletions tree/dataframe/src/RLoopManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ void RLoopManager::CleanUpTask(unsigned int slot)
{
for (auto &ptr : fBookedActions)
ptr->FinalizeSlot(slot);
for (auto &ptr : fBookedFilters)
ptr->ClearTask(slot);
}

/// Jit all actions that required runtime column type inference, and clean the `fToJit` member variable.
Expand Down

0 comments on commit 7654e01

Please sign in to comment.