Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of the new TPool and ThreadPool classes #148

Closed
wants to merge 9 commits into from
52 changes: 1 addition & 51 deletions build/unix/makepchinput.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def printModulesMessageOnScreen(selModules):
def getExtraHeaders():
""" Get extra headers which do not fall in other special categories
"""
extraHeaders=["ROOT/TSeq.h","ROOT/StringConv.h"]
extraHeaders=["ROOT/TSeq.h","ROOT/StringConv.h", "ThreadPool.h", "TPool.h"]
code = "// Extra headers\n"
for extraHeader in extraHeaders:
code += '#include "%s"\n' %extraHeader
Expand Down Expand Up @@ -487,53 +487,3 @@ def makePCHInput():

if __name__ == "__main__":
makePCHInput()


















































6 changes: 6 additions & 0 deletions cmake/modules/SearchInstalledSoftware.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,12 @@ if(imt)
if(NOT builtin_tbb)
message(STATUS "Looking for TBB")
find_package(TBB)
if(TBB_FOUND)
if(${TBB_VERSION} VERSION_LESS 4.3)
message(STATUS "TBB version < 4.3. Switching on builtin_tbb option")
set(builtin_tbb ON CACHE BOOL "" FORCE)
endif()
endif()
if(NOT TBB_FOUND)
message(STATUS "TBB not found. Switching on builtin_tbb option")
set(builtin_tbb ON CACHE BOOL "" FORCE)
Expand Down
2 changes: 1 addition & 1 deletion core/multiproc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

set(headers TMPClient.h MPSendRecv.h TProcPool.h TMPWorker.h TPoolWorker.h TPoolProcessor.h MPCode.h PoolUtils.h)

set(sources TMPClient.cxx MPSendRecv.cxx TProcPool.cxx TMPWorker.cxx TPoolWorker.cxx TPoolProcessor.cxx PoolUtils.cxx)
set(sources TMPClient.cxx MPSendRecv.cxx TProcPool.cxx TMPWorker.cxx TPoolWorker.cxx TPoolProcessor.cxx)

ROOT_GENERATE_DICTIONARY(G__MultiProc ${headers} MODULE MultiProc LINKDEF LinkDef.h)

Expand Down
4 changes: 1 addition & 3 deletions core/multiproc/Module.mk
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ MULTIPROCH := $(MODDIRI)/TMPClient.h $(MODDIRI)/TProcPool.h \

MULTIPROCS := $(MODDIRS)/TMPClient.cxx $(MODDIRS)/TProcPool.cxx \
$(MODDIRS)/TMPWorker.cxx $(MODDIRS)/MPSendRecv.cxx \
$(MODDIRS)/TPoolWorker.cxx $(MODDIRS)/TPoolProcessor.cxx \
$(MODDIRS)/PoolUtils.cxx
$(MODDIRS)/TPoolWorker.cxx $(MODDIRS)/TPoolProcessor.cxx

MULTIPROCO := $(call stripsrc,$(MULTIPROCS:.cxx=.o))

Expand Down Expand Up @@ -83,4 +82,3 @@ distclean-$(MODNAME): clean-$(MODNAME)
@rm -f $(MULTIPROCDEP) $(MULTIPROCDS) $(MULTIPROCDH) $(MULTIPROCLIB) $(MULTIPROCMAP)

distclean:: distclean-$(MODNAME)

41 changes: 40 additions & 1 deletion core/multiproc/inc/PoolUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,46 @@ namespace PoolCode {
///
//////////////////////////////////////////////////////////////////////////
namespace PoolUtils {
TObject *ReduceObjects(const std::vector<TObject *> &objs);
//////////////////////////////////////////////////////////////////////////
/// Merge collection of TObjects.
/// This functor looks for an implementation of the Merge method
/// (e.g. TH1F::Merge) and calls it on the objects contained in objs.
/// If Merge is not found, a null pointer is returned.
template <class T>
class ReduceObjects{
public:
T operator()(const std::vector<T> &objs){
static_assert(std::is_constructible<TObject *, T>::value,
"The argument should be a vector of pointers to TObject or derived classes");
if(objs.size() == 0)
return nullptr;

if(objs.size() == 1)
return objs[0];

//get first object from objs
auto obj = objs[0];
//get merge function
ROOT::MergeFunc_t merge = obj->IsA()->GetMerge();
if(!merge) {
std::cerr << "could not find merge method for the TObject\n. Aborting operation.";
return nullptr;
}

//put the rest of the objs in a list
TList mergelist;
unsigned NObjs = objs.size();
for(unsigned i=1; i<NObjs; ++i) //skip first object
mergelist.Add(objs[i]);

//call merge
merge(obj, &mergelist, nullptr);
mergelist.Delete();

//return result
return obj;
}
};
}

namespace ROOT {
Expand Down
11 changes: 6 additions & 5 deletions core/multiproc/inc/TPoolProcessor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* @(#)root/multiproc:$Id$ */
// Author: Enrico Guiraud September 2015
// Author: Enrico Guiraud September 2015.

/*************************************************************************
* Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
Expand Down Expand Up @@ -227,7 +227,7 @@ void TPoolProcessor<F>::Process(unsigned code, MPCodeBufPair& msg)
MPSend(GetSocket(), PoolCode::kProcError, reply.data());
return;
}

//execute function
auto res = fProcFunc(reader);

Expand All @@ -238,12 +238,13 @@ void TPoolProcessor<F>::Process(unsigned code, MPCodeBufPair& msg)
fProcessedEntries += finish - start;

if(fCanReduce) {
fReducedResult = static_cast<decltype(fReducedResult)>(PoolUtils::ReduceObjects({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
PoolUtils::ReduceObjects<TObject *> redfunc;
fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
} else {
fCanReduce = true;
fReducedResult = res;
}

if(fMaxNEntries == fProcessedEntries)
//we are done forever
MPSend(GetSocket(), PoolCode::kProcResult, fReducedResult);
Expand Down Expand Up @@ -285,7 +286,7 @@ TTree *TPoolProcessor<F>::RetrieveTree(TFile *fp)
if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
tree = static_cast<TTree*>(fp->Get(key->GetName()));
}
}
}
} else {
tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
}
Expand Down
2 changes: 1 addition & 1 deletion core/multiproc/inc/TPoolWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TPoolWorker : public TMPWorker {
// we trust that decltype(redfunc(std::vector<decltype(func(args[0]))>)) == decltype(args[0])
// TODO document somewhere that fReducedResult must have a default ctor
TPoolWorker(F func, const std::vector<T> &args, R redfunc) :
TMPWorker(), fFunc(func), fArgs(std::move(args)), fRedFunc(redfunc),
TMPWorker(), fFunc(func), fArgs(args), fRedFunc(redfunc),
fReducedResult(), fCanReduce(false)
{}
~TPoolWorker() {}
Expand Down
Loading