Skip to content

Commit

Permalink
FIX: remove python logging of core distributed calls, which leads to …
Browse files Browse the repository at this point in the history
…segfaults on py312. Add tests.
  • Loading branch information
carsten-forty2 committed Nov 14, 2024
1 parent e7d441a commit 6d42186
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 53 deletions.
9 changes: 6 additions & 3 deletions core/src/bert/bertJacobian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ MEMINFO
if (S.rows() != nData || S.cols() != nModel) S.resize(nData, nModel);
S *= ValueType(0);
MEMINFO

if (verbose){
std::cout << "S(" << numberOfCPU() << "/" << nThreads; //**check!!!
#if USE_BOOST_THREAD
Expand All @@ -403,10 +402,14 @@ MEMINFO
//swatch.stop(verbose);
}
bool calc1 = getEnvironment("SENSMAT1", false, true);
distributeCalc(CreateSensitivityColMT< ValueType >(S, cells, data,
if (useOMP()){

} else {
distributeCalc(CreateSensitivityColMT< ValueType >(S, cells, data,
pots, currPatternIdx,
weights, k, calc1, verbose),
cells.size(), nThreads, verbose);
cells.size(), nThreads, verbose);
}
if (verbose){
swatch.stop(verbose);
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/calculateMultiThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class BaseCalcMT{
Index _threadNumber;
};
template < class T > void distributeCalc(T calc, uint nCalcs, uint nThreads, bool verbose=false){
log(Debug, "Create distributed calculation of " + str(nCalcs) + " jobs on "
log(Debug, "Create distributed calculation of " + str(nCalcs) + " jobs on "
+ str(nThreads) + " threads for " + str(numberOfCPU()) + " CPU");

if (nThreads == 1){
Expand Down Expand Up @@ -91,16 +91,16 @@ template < class T > void distributeCalc(T calc, uint nCalcs, uint nThreads, boo
for (uint i = 0; i < calcObjs.size(); i++) {
//threads.emplace_back(calcObjs[i]);
threads[i] = std::thread( [&iomutex, i, &calcObjs] {
Stopwatch swatch(true);
// Stopwatch swatch(true);
{
std::lock_guard<std::mutex> iolock(iomutex);
log(Debug, "Thread #" + str(i) + ": on CPU "
+ str(schedGetCPU()) + " slice " + str(calcObjs[i].start()) + ":" + str(calcObjs[i].end()));
//log(Debug, "Thread #" + str(i) + ": on CPU "
//+ str(schedGetCPU()) + " slice " + str(calcObjs[i].start()) + ":" + str(calcObjs[i].end()));
}
calcObjs[i]();
{
std::lock_guard<std::mutex> iolock(iomutex);
log(Debug, "time: #" + str(i) + " " + str(swatch.duration()) + "s");
// log(Debug, "time: #" + str(i) + " " + str(swatch.duration()) + "s");
}

});
Expand Down
74 changes: 49 additions & 25 deletions core/src/gimli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,41 +57,25 @@ Index __setTC__(){
int tc = getEnvironment("OPENBLAS_NUM_THREADS", -1, false);

if (tc == -1){
tc = min(16, numberOfCPU()-2);
}
int tc = getEnvironment("GIMLI_NUM_THREADS", -1, false);

if (tc == -1) return 1;
}

setThreadCount(tc);
return (Index)(tc);
// __MS(tc)
tc = Index(min(8, numberOfCPU()-2));
// __MS(Index(tc))
setThreadCount(Index(tc));
return Index(tc);
}

static Index __GIMLI_THREADCOUNT__ = __setTC__();

// //** end forward declaration
// // static here gives every .cpp its own static bool
// extern static bool __SAVE_PYTHON_GIL__;
// extern static bool __GIMLI_DEBUG__;

std::string versionStr(){
std::string vers(str(PACKAGE_NAME) + "-" + PACKAGE_VERSION);
return vers;
}

void savePythonGIL(bool s){ __SAVE_PYTHON_GIL__ = s; }
bool pythonGIL(){ return __SAVE_PYTHON_GIL__; }

void setDebug(bool s){ __GIMLI_DEBUG__ = s; }
bool debug(){ return __GIMLI_DEBUG__;}

void setDeepDebug(int level){__GIMLI_DEEP_DEBUG__ = level;}
int deepDebug(){ return __GIMLI_DEEP_DEBUG__; }

void setThreadCount(Index nThreads){

nThreads = max(1, nThreads);
log(Debug, "Set amount of threads to " + str(nThreads));
//log(Debug, "omp_get_max_threads: " + str(omp_get_max_threads()));
//omp_set_num_threads

#if OPENBLAS_CBLAS_FOUND
openblas_set_num_threads(nThreads);
//omp_set_num_threads
Expand All @@ -100,13 +84,53 @@ void setThreadCount(Index nThreads){
#endif

__GIMLI_THREADCOUNT__ = nThreads;
// __MS(__GIMLI_THREADCOUNT__)
}

Index threadCount(){
return __GIMLI_THREADCOUNT__;
}


bool __setOMP__(){

int tc = getEnvironment("PG_USE_OMP", -1, false);
if (tc == -1) {
// default is false
tc = false;
}
setUseOMP(tc);
return (bool)tc;
}

static bool __GIMLI_USE_OMP__ = __setOMP__();

void setUseOMP(bool o){
__GIMLI_USE_OMP__ = o;
}
bool useOMP(){
return __GIMLI_USE_OMP__;
}

// //** end forward declaration
// // static here gives every .cpp its own static bool
// extern static bool __SAVE_PYTHON_GIL__;
// extern static bool __GIMLI_DEBUG__;

std::string versionStr(){
std::string vers(str(PACKAGE_NAME) + "-" + PACKAGE_VERSION);
return vers;
}

void savePythonGIL(bool s){ __SAVE_PYTHON_GIL__ = s; }
bool pythonGIL(){ return __SAVE_PYTHON_GIL__; }

void setDebug(bool s){ __GIMLI_DEBUG__ = s; }
bool debug(){ return __GIMLI_DEBUG__;}

void setDeepDebug(int level){__GIMLI_DEEP_DEBUG__ = level;}
int deepDebug(){ return __GIMLI_DEEP_DEBUG__; }

void PythonGILSave::save() {
if (!saved_) {
#ifdef PYGIMLI
Expand Down
7 changes: 5 additions & 2 deletions core/src/gimli.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ typedef int64_t int64;


/*!Replace from with to inside str and return the result*/
DLLEXPORT std::string replace(const std::string & str,
const std::string & from,
DLLEXPORT std::string replace(const std::string & str,
const std::string & from,
const std::string & to);

#ifndef SRC_DIR
Expand Down Expand Up @@ -370,6 +370,9 @@ Default is number of CPU. */
DLLEXPORT void setThreadCount(Index nThreads);
DLLEXPORT Index threadCount();

void setUseOMP(bool o);
bool useOMP();

/*! For some debug purposes only */
DLLEXPORT void showSizes();

Expand Down
7 changes: 2 additions & 5 deletions core/src/modellingbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void ModellingBase::init_() {
constraints_ = 0;
dataContainer_ = 0;

nThreads_ = min(16, numberOfCPU()-2);
nThreads_ = min(8, numberOfCPU()-2);
nThreadsJacobian_ = 1;

ownJacobian_ = false;
Expand All @@ -98,9 +98,6 @@ void ModellingBase::setThreadCount(Index nThreads) {
Index ModellingBase::threadCount(){
bool verbose = this->verbose();

this->nThreads_ = getEnvironment("GIMLI_NUM_THREADS",
this->nThreads_, verbose);

if (verbose){
std::cout << "J(" << numberOfCPU() << "/" << this->nThreads_;
#if USE_BOOST_THREAD
Expand All @@ -113,7 +110,7 @@ Index ModellingBase::threadCount(){
if (verbose){
std::cout << std::endl;
}
return this->nThreads_;
return max(1, this->nThreads_);
}

void ModellingBase::setData(DataContainer & data){
Expand Down
38 changes: 34 additions & 4 deletions core/src/ttdijkstramodelling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <vector>
#include <queue>
#include <map>
#include <omp.h>


namespace GIMLI {

Expand Down Expand Up @@ -494,10 +496,33 @@ void TravelTimeDijkstraModelling::createJacobian(const RVector & slowness) {
slowness);
}

void fillWayMatrix(std::vector < std::vector < IndexArray > > & wayM,
const Dijkstra & dijk,
const IndexArray & shotNodes,
const IndexArray & recNodes){

ASSERT_EQUAL_SIZE(wayM, shotNodes)
ASSERT_EQUAL_SIZE(wayM[0], recNodes)

Dijkstra _dijkstra(dijk);

for (Index shot = 0; shot < shotNodes.size(); shot ++) {
_dijkstra.setStartNode(shotNodes[shot]);
#pragma omp parallel if (useOMP())
{

//print("A:", omp_get_thread_num());
for (Index i = 0; i < recNodes.size(); i ++) {
wayM[shot][i] = _dijkstra.shortestPathTo((recNodes)[i]);
}
}
}
}

void TravelTimeDijkstraModelling::createJacobian(RSparseMapMatrix & jacobian,
const RVector & slowness) {

if (min(this->mesh_->cellMarkers()) < 0){
__M
log(Warning, "There are cells with marker -1. "
"Did you define a boundary region? (This is not needed).");
}
Expand Down Expand Up @@ -526,9 +551,14 @@ void TravelTimeDijkstraModelling::createJacobian(RSparseMapMatrix & jacobian,

Index nThreads = this->threadCount();

distributeCalc(CreateDijkstraRowMT(wayMatrix_, dijkstra_,
if (useOMP()){
__MS("DEBUG: OMP for fill Way Matrix")
fillWayMatrix(wayMatrix_, dijkstra_, shotNodeId_, receNodeId_);
} else {
distributeCalc(CreateDijkstraRowMT(wayMatrix_, dijkstra_,
shotNodeId_, receNodeId_, this->verbose()),
nShots, nThreads, this->verbose());
nShots, nThreads, this->verbose());
}

if (this->verbose()){
std::cout << "/" << swatch.duration(true);
Expand All @@ -540,7 +570,6 @@ void TravelTimeDijkstraModelling::createJacobian(RSparseMapMatrix & jacobian,
// wayMatrix_[shot].push_back(dijkstra_.shortestPathTo(receNodeId_[i]));
// }
// }

for (Index dataIdx = 0; dataIdx < nData; dataIdx ++) {
Index s = shotsInv_.at(Index((*dataContainer_)("s")[dataIdx]));
Index g = receiInv_.at(Index((*dataContainer_)("g")[dataIdx]));
Expand Down Expand Up @@ -580,6 +609,7 @@ void TravelTimeDijkstraModelling::createJacobian(RSparseMapMatrix & jacobian,
if (this->verbose()){
std::cout << "/" << swatch.duration(true) << " ";
}
std::cout << std::endl;
}

TTModellingWithOffset::TTModellingWithOffset(Mesh & mesh, DataContainer & dataContainer, bool verbose)
Expand Down
3 changes: 2 additions & 1 deletion pygimli/physics/traveltime/TravelTimeManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def createForwardOperator(self, **kwargs):
if self.useFatray:
fop = FatrayDijkstraModelling(frequency=self.frequency, **kwargs)
else:
fop = TravelTimeDijkstraModelling(**kwargs)
fop = TravelTimeDijkstraModelling(verbose=self.verbose)
return fop

def load(self, fileName):
Expand Down Expand Up @@ -297,6 +297,7 @@ def getRayPaths(self, model=None):

segs = []
nodes = self.fop._core.mesh().positions(withSecNodes=True)

for s, g in zip(shots, recei):
wi = self.fop.way(s, g)
points = nodes[wi]
Expand Down
8 changes: 4 additions & 4 deletions pygimli/solver/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,15 +886,15 @@ def div(mesh, v):
>>> divCells = pg.solver.div(mesh, v(mesh.cellCenters()))
>>> # divergence from boundary values are exact where the divergence from
>>> # interpolated cell center values wrong due to interpolation to boundary
>>> print(sum(divCells))
>>> print(np.round(sum(divCells),12))
12.0
>>> mesh = pg.createGrid(x=np.linspace(0, 1, 4),
... y=np.linspace(0, 1, 4),
... z=np.linspace(0, 1, 4))
>>> print(sum(pg.solver.div(mesh, v(mesh.boundaryCenters()))))
81.0
>>> divCells = pg.solver.div(mesh, v(mesh.cellCenters()))
>>> print(sum(divCells))
>>> print(np.round(sum(divCells),12))
54.0
"""
mesh.createNeighborInfos()
Expand Down Expand Up @@ -2255,8 +2255,8 @@ def solveFiniteElements(mesh, a=1.0, b=None, f=0.0, bc=None,
Note this is only a shortcut for
bc={'Dirichlet': [mesh.node(nodeID), value]}.
The parameter $a$ for Neumann boundary condition is choosen
automatically from the diffusivity parameter $a$ of the associated cell.
The parameter $a$ for Neumann boundary condition is choosen
automatically from the diffusivity parameter $a$ of the associated cell.
times: array [None]
Solve as time dependent problem for the given times.
Expand Down
Loading

0 comments on commit 6d42186

Please sign in to comment.