From 2c6a128d0617a0e150b4804f5c65e6c05edb5b6f Mon Sep 17 00:00:00 2001 From: Klaus Weide Date: Wed, 20 Nov 2024 22:07:19 -0600 Subject: [PATCH 1/4] ExtCpuGpuSplit case (cherry picked from commit 107c748f3d4726791652566d9548fd2829944634) --- includes/Milhoja_Runtime.h | 22 +- includes/Milhoja_ThreadTeam.h | 3 + interfaces/Milhoja_runtime_C_interface.cpp | 208 +++++++++++++++++++ interfaces/Milhoja_runtime_mod.F90 | 209 ++++++++++++++++++- src/Milhoja_Runtime.cpp | 229 ++++++++++++++++++++- src/Milhoja_ThreadTeam.cpp | 23 ++- 6 files changed, 685 insertions(+), 9 deletions(-) diff --git a/includes/Milhoja_Runtime.h b/includes/Milhoja_Runtime.h index 314c57bd..ec10fd4f 100644 --- a/includes/Milhoja_Runtime.h +++ b/includes/Milhoja_Runtime.h @@ -165,10 +165,30 @@ class Runtime { void executeExtendedCpuGpuSplitTasks(const std::string& bundleName, const unsigned int nDistributorThreads, const RuntimeAction& actionA_cpu, + const TileWrapper& tilePrototype, const RuntimeAction& actionA_gpu, - const RuntimeAction& postActionB_cpu, const DataPacket& packetPrototype, + const RuntimeAction& postActionB_cpu, + const TileWrapper& postTilePrototype, const unsigned int nTilesPerCpuTurn); +# ifndef RUNTIME_MUST_USE_TILEITER + void setupPipelineForExtCpuGpuSplitTasks(const std::string& bundleName, + const RuntimeAction& actionA_cpu, + const TileWrapper& tilePrototype, + const RuntimeAction& actionA_gpu, + const DataPacket& packetPrototype, + const RuntimeAction& postActionB_cpu, + const TileWrapper& postTilePrototype, + const unsigned int nTilesPerCpuTurn); + void pushTileToExtCpuGpuSplitPipeline(const std::string& bundleName, + const TileWrapper& tilePrototype, + const DataPacket& packetPrototype, + const TileWrapper& postTilePrototype, + const FlashxrTileRawPtrs& tP, + const FlashxTileRawInts& tI, + const FlashxTileRawReals& tR); + void teardownPipelineForExtCpuGpuSplitTasks(const std::string& bundleName); +# endif void executeCpuGpuWowzaTasks(const std::string& bundleName, const RuntimeAction& actionA_cpu, const TileWrapper& tilePrototype, diff --git a/includes/Milhoja_ThreadTeam.h b/includes/Milhoja_ThreadTeam.h index 57bbce56..f079d3cb 100644 --- a/includes/Milhoja_ThreadTeam.h +++ b/includes/Milhoja_ThreadTeam.h @@ -54,6 +54,7 @@ #include +#include "Milhoja_TileWrapper.h" #include "Milhoja_actionRoutine.h" #include "Milhoja_RuntimeAction.h" #include "Milhoja_ThreadTeamMode.h" @@ -113,6 +114,7 @@ class ThreadTeam : public RuntimeElement { // into thread team configurations. std::string attachDataReceiver(RuntimeElement* receiver) override; std::string detachDataReceiver(void) override; + void setReceiverProto(TileWrapper const * w); protected: constexpr static unsigned int THREAD_START_STOP_TIMEOUT_SEC = 1; @@ -213,6 +215,7 @@ class ThreadTeam : public RuntimeElement { // Keep track of when wait() is blocking and when it is released bool isWaitBlocking_; //!< Only a single thread can be blocked + const TileWrapper * receiverProto_; }; } diff --git a/interfaces/Milhoja_runtime_C_interface.cpp b/interfaces/Milhoja_runtime_C_interface.cpp index ca60edb8..d11ff6bb 100644 --- a/interfaces/Milhoja_runtime_C_interface.cpp +++ b/interfaces/Milhoja_runtime_C_interface.cpp @@ -568,6 +568,77 @@ extern "C" { return MILHOJA_SUCCESS; } + int milhoja_runtime_setup_pipeline_extcpugpusplit_c(milhoja::ACTION_ROUTINE cpuTaskFunction, + milhoja::ACTION_ROUTINE gpuTaskFunction, + milhoja::ACTION_ROUTINE postTaskFunction, + const int nThreads, + const int nTilesPerPacket, + const int nTilesPerCpuTurn, + void* packet, + void* tileWrapper, + void* postTileWrapper) { + if (nThreads < 0) { + std::cerr + << "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] nThreads is negative" + << std::endl; + return MILHOJA_ERROR_N_THREADS_NEGATIVE; + } else if (nTilesPerPacket < 0) { + std::cerr + << "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] nTilesPerPacket is negative" + << std::endl; + return MILHOJA_ERROR_N_TILES_NEGATIVE; + } + + unsigned int nDistributorThreads_ui = 0; + unsigned int nThreads_ui = static_cast(nThreads); + unsigned int nTilesPerPacket_ui = static_cast(nTilesPerPacket); + unsigned int nTilesPerCpuTurn_ui = static_cast(nTilesPerCpuTurn); + + milhoja::TileWrapper* tilePrototype = static_cast(tileWrapper); + milhoja::TileWrapper* postTilePrototype = static_cast(postTileWrapper); + milhoja::DataPacket* pktPrototype = static_cast(packet); + + milhoja::RuntimeAction pktAction; + pktAction.name = "Lazy GPU setup Action Name"; + pktAction.nInitialThreads = nThreads_ui; + pktAction.teamType = milhoja::ThreadTeamDataType::SET_OF_BLOCKS; + pktAction.nTilesPerPacket = nTilesPerPacket_ui; + pktAction.routine = gpuTaskFunction; + + milhoja::RuntimeAction cpuAction; + cpuAction.name = "Lazy CPU setup Action Name"; + cpuAction.nInitialThreads = nThreads_ui; + cpuAction.teamType = milhoja::ThreadTeamDataType::BLOCK; + cpuAction.nTilesPerPacket = 0; + cpuAction.routine = cpuTaskFunction; + + milhoja::RuntimeAction postAction; + postAction.name = "Lazy post CPU setup Action Name"; + postAction.nInitialThreads = nThreads_ui; + postAction.teamType = milhoja::ThreadTeamDataType::BLOCK; + postAction.nTilesPerPacket = 0; + postAction.routine = postTaskFunction; + + try { + milhoja::Runtime::instance().setupPipelineForExtCpuGpuSplitTasks("EXT CPUGPU Split Bundle Name", + cpuAction, + *tilePrototype, + pktAction, + *pktPrototype, + postAction, + *postTilePrototype, + nTilesPerCpuTurn_ui); + } catch (const std::exception& exc) { + std::cerr << exc.what() << std::endl; + return MILHOJA_ERROR_UNABLE_TO_SETUP_PIPELINE; + } catch (...) { + std::cerr << "[milhoja_runtime_setup_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl; + return MILHOJA_ERROR_UNABLE_TO_SETUP_PIPELINE; + } + + return MILHOJA_SUCCESS; + } + int milhoja_runtime_teardown_pipeline_gpu_c(const int nThreads, const int nTilesPerPacket) { if (nThreads < 0) { // nThreads: only use in this function @@ -679,6 +750,34 @@ extern "C" { return MILHOJA_SUCCESS; } + + int milhoja_runtime_teardown_pipeline_extcpugpusplit_c(const int nThreads, + const int nTilesPerPacket) { + if (nThreads < 0) { // nThreads: only use in this function + std::cerr + << "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] nThreads is negative" + << std::endl; + return MILHOJA_ERROR_N_THREADS_NEGATIVE; + } else if (nTilesPerPacket < 0) { // nTilesPerPacket: only use here + std::cerr + << "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] nTilesPerPacket is negative" + << std::endl; + return MILHOJA_ERROR_N_TILES_NEGATIVE; + } + + try { + milhoja::Runtime::instance().teardownPipelineForExtCpuGpuSplitTasks( + "Lazy EXT CPUGPU Split setup Bundle Name"); + } catch (const std::exception& exc) { + std::cerr << exc.what() << std::endl; + return MILHOJA_ERROR_UNABLE_TO_TEARDOWN_PIPELINE; + } catch (...) { + std::cerr << "[milhoja_runtime_teardown_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl; + return MILHOJA_ERROR_UNABLE_TO_TEARDOWN_PIPELINE; + } + + return MILHOJA_SUCCESS; + } /** * Push one tile to the prepared pipeline so that the thread team will * eventually execute the task. @@ -797,6 +896,37 @@ extern "C" { return MILHOJA_SUCCESS; } + + int milhoja_runtime_push_pipeline_extcpugpusplit_c(void* tileWrapper, + void* packet, + void* postTileWrapper, + const int nThreads, + FlashxTileRaw* tileInfo) { + if (nThreads < 0) { + std::cerr << "[milhoja_runtime_push_pipeline_extcpugpusplit_c] nThreads is negative" << std::endl; + return MILHOJA_ERROR_N_THREADS_NEGATIVE; + } + + milhoja::TileWrapper* tilePrototype = static_cast(tileWrapper); + milhoja::TileWrapper* postTilePrototype = static_cast(postTileWrapper); + milhoja::DataPacket* pktPrototype = static_cast(packet); + + try { + milhoja::Runtime::instance().pushTileToExtCpuGpuSplitPipeline("Lazy Bundle Name", + *tilePrototype, + *pktPrototype, + *postTilePrototype, + tileInfo->sP, tileInfo->sI, tileInfo->sR); + } catch (const std::exception& exc) { + std::cerr << exc.what() << std::endl; + return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS; + } catch (...) { + std::cerr << "[milhoja_runtime_push_pipeline_extcpugpusplit_c] Unknown error caught" << std::endl; + return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS; + } + + return MILHOJA_SUCCESS; + } # endif # ifdef RUNTIME_CAN_USE_TILEITER @@ -1141,6 +1271,84 @@ extern "C" { return MILHOJA_SUCCESS; } + + int milhoja_runtime_execute_tasks_extcpugpusplit_c(milhoja::ACTION_ROUTINE cpuTaskFunction, + milhoja::ACTION_ROUTINE gpuTaskFunction, + milhoja::ACTION_ROUTINE postTaskFunction, + const int nDistributorThreads, + const int nThreads, + const int nTilesPerPacket, + const int nTilesPerCpuTurn, + void* packet, + void* tileWrapper, + void* postTileWrapper) { + if (nDistributorThreads < 0) { + std::cerr + << "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nDistributorThreads is negative" + << std::endl; + return MILHOJA_ERROR_N_THREADS_NEGATIVE; + } else if (nThreads < 0) { + std::cerr + << "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nThreads is negative" + << std::endl; + return MILHOJA_ERROR_N_THREADS_NEGATIVE; + } else if (nTilesPerPacket < 0) { + std::cerr + << "[milhoja_runtime_execute_tasks_extcpugpusplit_c] nTilesPerPacket is negative" + << std::endl; + return MILHOJA_ERROR_N_TILES_NEGATIVE; + } + + unsigned int nDistributorThreads_ui = static_cast(nDistributorThreads); + unsigned int nThreads_ui = static_cast(nThreads); + unsigned int nTilesPerPacket_ui = static_cast(nTilesPerPacket); + unsigned int nTilesPerCpuTurn_ui = static_cast(nTilesPerCpuTurn); + + milhoja::TileWrapper* tilePrototype = static_cast(tileWrapper); + milhoja::TileWrapper* postTilePrototype = static_cast(postTileWrapper); + milhoja::DataPacket* pktPrototype = static_cast(packet); + + milhoja::RuntimeAction pktAction; + pktAction.name = "Lazy GPU Action Name"; + pktAction.nInitialThreads = nThreads_ui; + pktAction.teamType = milhoja::ThreadTeamDataType::SET_OF_BLOCKS; + pktAction.nTilesPerPacket = nTilesPerPacket_ui; + pktAction.routine = gpuTaskFunction; + + milhoja::RuntimeAction cpuAction; + cpuAction.name = "Lazy CPU Action Name"; + cpuAction.nInitialThreads = nThreads_ui; + cpuAction.teamType = milhoja::ThreadTeamDataType::BLOCK; + cpuAction.nTilesPerPacket = 0; + cpuAction.routine = cpuTaskFunction; + + milhoja::RuntimeAction postAction; + postAction.name = "Lazy CPU Action Name"; + postAction.nInitialThreads = nThreads_ui; + postAction.teamType = milhoja::ThreadTeamDataType::BLOCK; + postAction.nTilesPerPacket = 0; + postAction.routine = postTaskFunction; + + try { + milhoja::Runtime::instance().executeExtendedCpuGpuSplitTasks("Lazy GPU Bundle Name", + nDistributorThreads_ui, + cpuAction, + *tilePrototype, + pktAction, + *pktPrototype, + postAction, + *postTilePrototype, + nTilesPerCpuTurn_ui); + } catch (const std::exception& exc) { + std::cerr << exc.what() << std::endl; + return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS; + } catch (...) { + std::cerr << "[milhoja_runtime_execute_tasks_extcpugpusplit_c] Unknown error caught" << std::endl; + return MILHOJA_ERROR_UNABLE_TO_EXECUTE_TASKS; + } + + return MILHOJA_SUCCESS; + } # endif #endif // #ifdef RUNTIME_SUPPORT_DATAPACKETS } diff --git a/interfaces/Milhoja_runtime_mod.F90 b/interfaces/Milhoja_runtime_mod.F90 index 91503979..6f05cdd2 100644 --- a/interfaces/Milhoja_runtime_mod.F90 +++ b/interfaces/Milhoja_runtime_mod.F90 @@ -33,6 +33,9 @@ module milhoja_runtime_mod public :: milhoja_runtime_setupPipelineForExtGpuTasks public :: milhoja_runtime_pushTileToExtGpuPipeline public :: milhoja_runtime_teardownPipelineForExtGpuTasks + public :: milhoja_runtime_setupPipelineForExtCpuGpuSplitTasks + public :: milhoja_runtime_pushTileToExtCpuGpuSplitPipeline + public :: milhoja_runtime_teardownPipelineForExtCpuGpuSplitTasks # endif #endif #ifdef RUNTIME_SUPPORT_EXECUTE @@ -42,6 +45,7 @@ module milhoja_runtime_mod public :: milhoja_runtime_executeTasks_CpuGpu public :: milhoja_runtime_executeTasks_CpuGpuSplit public :: milhoja_runtime_executeTasks_ExtGpu + public :: milhoja_runtime_executeTasks_ExtCpuGpuSplit # endif #endif @@ -135,7 +139,7 @@ end function milhoja_runtime_teardown_pipeline_cpu_c !> Fortran interface on routine in C interface of same name. function milhoja_runtime_push_pipeline_cpu_c(C_tileWrapperPrototype, & C_nThreads, & - tileCINfo) result(C_ierr) & + tileCInfo) result(C_ierr) & bind(c) use iso_c_binding, ONLY : C_PTR use milhoja_types_mod, ONLY : MILHOJA_INT @@ -214,6 +218,30 @@ function milhoja_runtime_setup_pipeline_extgpu_c(C_taskFunction, & integer(MILHOJA_INT), intent(IN), value :: C_nTilesPerPacket integer(MILHOJA_INT) :: C_ierr end function milhoja_runtime_setup_pipeline_extgpu_c + function milhoja_runtime_setup_pipeline_extcpugpusplit_c(C_cpuTaskFunction, & + C_gpuTaskFunction, & + C_postTaskFunction, & + C_nThreads, & + C_nTilesPerPacket, & + C_nTilesPerCpuTurn, & + C_packetPrototype, & + C_tilePrototype, & + C_postTilePrototype) result(C_ierr) & + bind(c) + use iso_c_binding, ONLY : C_PTR, C_FUNPTR + use milhoja_types_mod, ONLY : MILHOJA_INT + implicit none + type(C_FUNPTR), intent(IN), value :: C_cpuTaskFunction + type(C_FUNPTR), intent(IN), value :: C_gpuTaskFunction + type(C_FUNPTR), intent(IN), value :: C_postTaskFunction + type(C_PTR), intent(IN), value :: C_packetPrototype + type(C_PTR), intent(IN), value :: C_tilePrototype + type(C_PTR), intent(IN), value :: C_postTilePrototype + integer(MILHOJA_INT), intent(IN), value :: C_nThreads + integer(MILHOJA_INT), intent(IN), value :: C_nTilesPerPacket + integer(MILHOJA_INT), intent(IN), value :: C_nTilesPerCpuTurn + integer(MILHOJA_INT) :: C_ierr + end function milhoja_runtime_setup_pipeline_extcpugpusplit_c !> Fortran interface for the function in C interface of the same name. function milhoja_runtime_teardown_pipeline_gpu_c(C_nThreads, & @@ -256,11 +284,21 @@ function milhoja_runtime_teardown_pipeline_extgpu_c(C_nThreads, & integer(MILHOJA_INT), intent(IN), value :: C_nTilesPerPacket integer(MILHOJA_INT) :: C_ierr end function milhoja_runtime_teardown_pipeline_extgpu_c + function milhoja_runtime_teardown_pipeline_extcpugpusplit_c(C_nThreads, & + C_nTilesPerPacket) & + result(C_ierr) & + bind(c) + use milhoja_types_mod, ONLY : MILHOJA_INT + implicit none + integer(MILHOJA_INT), intent(IN), value :: C_nThreads + integer(MILHOJA_INT), intent(IN), value :: C_nTilesPerPacket + integer(MILHOJA_INT) :: C_ierr + end function milhoja_runtime_teardown_pipeline_extcpugpusplit_c !> Fortran interface for the function in C interface of the same name. function milhoja_runtime_push_pipeline_gpu_c(C_packetPrototype, & C_nThreads, & - tileCINfo) result(C_ierr) & + tileCInfo) result(C_ierr) & bind(c) use iso_c_binding, ONLY : C_PTR use milhoja_types_mod, ONLY : MILHOJA_INT @@ -273,7 +311,7 @@ end function milhoja_runtime_push_pipeline_gpu_c function milhoja_runtime_push_pipeline_cpugpu_c(C_tilePrototype, & C_packetPrototype, & C_nThreads, & - tileCINfo) result(C_ierr) & + tileCInfo) result(C_ierr) & bind(c) use iso_c_binding, ONLY : C_PTR use milhoja_types_mod, ONLY : MILHOJA_INT @@ -287,7 +325,7 @@ end function milhoja_runtime_push_pipeline_cpugpu_c function milhoja_runtime_push_pipeline_cpugpusplit_c(C_tilePrototype, & C_packetPrototype, & C_nThreads, & - tileCINfo) result(C_ierr) & + tileCInfo) result(C_ierr) & bind(c) use iso_c_binding, ONLY : C_PTR use milhoja_types_mod, ONLY : MILHOJA_INT @@ -300,7 +338,7 @@ function milhoja_runtime_push_pipeline_cpugpusplit_c(C_tilePrototype, & end function milhoja_runtime_push_pipeline_cpugpusplit_c function milhoja_runtime_push_pipeline_extgpu_c(C_packetPrototype, & C_nThreads, & - tileCINfo) result(C_ierr) & + tileCInfo) result(C_ierr) & bind(c) use iso_c_binding, ONLY : C_PTR use milhoja_types_mod, ONLY : MILHOJA_INT @@ -310,6 +348,22 @@ function milhoja_runtime_push_pipeline_extgpu_c(C_packetPrototype, & type(C_PTR), intent(IN), value :: tileCInfo integer(MILHOJA_INT) :: C_ierr end function milhoja_runtime_push_pipeline_extgpu_c + function milhoja_runtime_push_pipeline_extcpugpusplit_c(C_tilePrototype, & + C_packetPrototype, & + C_postTilePrototype, & + C_nThreads, & + tileCInfo) result(C_ierr) & + bind(c) + use iso_c_binding, ONLY : C_PTR + use milhoja_types_mod, ONLY : MILHOJA_INT + implicit none + type(C_PTR), intent(IN), value :: C_tilePrototype + type(C_PTR), intent(IN), value :: C_packetPrototype + type(C_PTR), intent(IN), value :: C_postTilePrototype + integer(MILHOJA_INT), intent(IN), value :: C_nThreads + type(C_PTR), intent(IN), value :: tileCInfo + integer(MILHOJA_INT) :: C_ierr + end function milhoja_runtime_push_pipeline_extcpugpusplit_c # ifdef RUNTIME_SUPPORT_EXECUTE !> Fortran interface for the function in C interface of the same name. @@ -391,6 +445,32 @@ function milhoja_runtime_execute_tasks_extgpu_c(C_taskFunction, & type(C_PTR), intent(IN), value :: C_tilePrototype integer(MILHOJA_INT) :: C_ierr end function milhoja_runtime_execute_tasks_extgpu_c + function milhoja_runtime_execute_tasks_extcpugpusplit_c(C_cpuTaskFunction, & + C_gpuTaskFunction, & + C_postTaskFunction, & + C_nDistributorThreads, & + C_nThreads, & + C_nTilesPerPacket, & + C_nTilesPerCpuTurn, & + C_packetPrototype, & + C_tilePrototype, & + C_postTilePrototype) & + result(C_ierr) bind(c) + use iso_c_binding, ONLY : C_PTR, C_FUNPTR + use milhoja_types_mod, ONLY : MILHOJA_INT + implicit none + type(C_FUNPTR), intent(IN), value :: C_cpuTaskFunction + type(C_FUNPTR), intent(IN), value :: C_gpuTaskFunction + type(C_FUNPTR), intent(IN), value :: C_postTaskFunction + integer(MILHOJA_INT), intent(IN), value :: C_nDistributorThreads + integer(MILHOJA_INT), intent(IN), value :: C_nThreads + integer(MILHOJA_INT), intent(IN), value :: C_nTilesPerPacket + integer(MILHOJA_INT), intent(IN), value :: C_nTilesPerCpuTurn + type(C_PTR), intent(IN), value :: C_packetPrototype + type(C_PTR), intent(IN), value :: C_tilePrototype + type(C_PTR), intent(IN), value :: C_postTilePrototype + integer(MILHOJA_INT) :: C_ierr + end function milhoja_runtime_execute_tasks_extcpugpusplit_c # endif #endif end interface @@ -631,6 +711,49 @@ subroutine milhoja_runtime_setupPipelineForExtGpuTasks(taskFunction, & packetPrototype_Cptr, & tilePrototype_Cptr) end subroutine milhoja_runtime_setupPipelineForExtGpuTasks + subroutine milhoja_runtime_setupPipelineForExtCpuGpuSplitTasks(cpuTaskFunction, & + gpuTaskFunction, & + postTaskFunction, & + nThreads, & + nTilesPerPacket, & + nTilesPerCpuTurn, & + packetPrototype_Cptr, & + tilePrototype_Cptr, & + postTilePrototype_Cptr, & + ierr) + use iso_c_binding, ONLY : C_PTR, & + C_FUNPTR, & + C_FUNLOC + + procedure(milhoja_runtime_taskFunction) :: cpuTaskFunction + procedure(milhoja_runtime_taskFunction) :: gpuTaskFunction + procedure(milhoja_runtime_taskFunction) :: postTaskFunction + type(C_PTR), intent(IN) :: packetPrototype_Cptr + type(C_PTR), intent(IN) :: tilePrototype_Cptr + type(C_PTR), intent(IN) :: postTilePrototype_Cptr + integer(MILHOJA_INT), intent(IN) :: nThreads + integer(MILHOJA_INT), intent(IN) :: nTilesPerPacket + integer(MILHOJA_INT), intent(IN) :: nTilesPerCpuTurn + integer(MILHOJA_INT), intent(OUT) :: ierr + + type(C_FUNPTR) :: cpuTaskFunction_Cptr + type(C_FUNPTR) :: gpuTaskFunction_Cptr + type(C_FUNPTR) :: postTaskFunction_Cptr + + cpuTaskFunction_Cptr = C_FUNLOC(cpuTaskFunction) + gpuTaskFunction_Cptr = C_FUNLOC(gpuTaskFunction) + postTaskFunction_Cptr = C_FUNLOC(postTaskFunction) + + ierr = milhoja_runtime_setup_pipeline_extcpugpusplit_c(cpuTaskFunction_Cptr, & + gpuTaskFunction_Cptr, & + postTaskFunction_Cptr, & + nThreads, & + nTilesPerPacket, & + nTilesPerCpuTurn, & + packetPrototype_Cptr, & + tilePrototype_Cptr, & + postTilePrototype_Cptr) + end subroutine milhoja_runtime_setupPipelineForExtCpuGpuSplitTasks !> Instruct the runtime to tear down the GPU-only thread team pipeline. !! @@ -687,6 +810,14 @@ subroutine milhoja_runtime_teardownPipelineForExtGpuTasks(nThreads, nTilesPerPac ierr = milhoja_runtime_teardown_pipeline_extgpu_c(nThreads, nTilesPerPacket) end subroutine milhoja_runtime_teardownPipelineForExtGpuTasks + subroutine milhoja_runtime_teardownPipelineForExtCpuGpuSplitTasks(nThreads, nTilesPerPacket, & + ierr) + integer(MILHOJA_INT), intent(IN) :: nThreads + integer(MILHOJA_INT), intent(IN) :: nTilesPerPacket + integer(MILHOJA_INT), intent(OUT) :: ierr + + ierr = milhoja_runtime_teardown_pipeline_extcpugpusplit_c(nThreads, nTilesPerPacket) + end subroutine milhoja_runtime_teardownPipelineForExtCpuGpuSplitTasks !> Push one tile to the prepared pipeline for task execution. !! @@ -754,6 +885,26 @@ subroutine milhoja_runtime_pushTileToExtGpuPipeline(prototype_Cptr, & nThreads, & tileCInfo_Cp) end subroutine milhoja_runtime_pushTileToExtGpuPipeline + subroutine milhoja_runtime_pushTileToExtCpuGpuSplitPipeline(tilePrototype_Cptr, & + pktPrototype_Cptr, & + postTilePrototype_Cptr, & + nThreads, & + tileCInfo_Cp, & + ierr) + use iso_c_binding, ONLY : C_PTR + + type(C_PTR), intent(IN) :: tilePrototype_Cptr + type(C_PTR), intent(IN) :: pktPrototype_Cptr + type(C_PTR), intent(IN) :: postTilePrototype_Cptr + integer(MILHOJA_INT), intent(IN) :: nThreads + type(C_PTR), intent(IN) :: tileCInfo_Cp + integer(MILHOJA_INT), intent(OUT) :: ierr + ierr = milhoja_runtime_push_pipeline_extcpugpusplit_c(tilePrototype_Cptr, & + pktPrototype_Cptr, & + postTilePrototype_Cptr, & + nThreads, & + tileCInfo_Cp) + end subroutine milhoja_runtime_pushTileToExtCpuGpuSplitPipeline # endif #endif @@ -1010,6 +1161,54 @@ subroutine milhoja_runtime_executeTasks_ExtGpu(taskFunction, & packetPrototype_Cptr, & tilePrototype_Cptr) end subroutine milhoja_runtime_executeTasks_ExtGpu + + + subroutine milhoja_runtime_executeTasks_ExtCpuGpuSplit(tileTaskFunction, & + pktTaskFunction, & + postTaskFunction, & + nDistributorThreads, & + nThreads, & + nTilesPerPacket, & + nTilesPerCpuTurn, & + packetPrototype_Cptr, & + tilePrototype_Cptr, & + postTilePrototype_Cptr, & + ierr) + use iso_c_binding, ONLY : C_FUNPTR, & + C_PTR, & + C_FUNLOC + + procedure(milhoja_runtime_taskFunction) :: tileTaskFunction + procedure(milhoja_runtime_taskFunction) :: pktTaskFunction + procedure(milhoja_runtime_taskFunction) :: postTaskFunction + integer(MILHOJA_INT), intent(IN) :: nDistributorThreads + integer(MILHOJA_INT), intent(IN) :: nThreads + integer(MILHOJA_INT), intent(IN) :: nTilesPerPacket + integer(MILHOJA_INT), intent(IN) :: nTilesPerCpuTurn + type(C_PTR), intent(IN) :: packetPrototype_Cptr + type(C_PTR), intent(IN) :: tilePrototype_Cptr + type(C_PTR), intent(IN) :: postTilePrototype_Cptr + integer(MILHOJA_INT), intent(OUT) :: ierr + + type(C_FUNPTR) :: tileTaskFunction_Cptr + type(C_FUNPTR) :: pktTaskFunction_Cptr + type(C_FUNPTR) :: postTaskFunction_Cptr + + tileTaskFunction_Cptr = C_FUNLOC(tileTaskFunction) + pktTaskFunction_Cptr = C_FUNLOC(pktTaskFunction) + postTaskFunction_Cptr = C_FUNLOC(postTaskFunction) + + ierr = milhoja_runtime_execute_tasks_extcpugpusplit_c(tileTaskFunction_Cptr, & + pktTaskFunction_Cptr, & + postTaskFunction_Cptr, & + nDistributorThreads, & + nThreads, & + nTilesPerPacket, & + nTilesPerCpuTurn, & + packetPrototype_Cptr, & + tilePrototype_Cptr, & + postTilePrototype_Cptr) + end subroutine milhoja_runtime_executeTasks_ExtCpuGpuSplit # endif #endif diff --git a/src/Milhoja_Runtime.cpp b/src/Milhoja_Runtime.cpp index d9cf0e71..7e0c481a 100644 --- a/src/Milhoja_Runtime.cpp +++ b/src/Milhoja_Runtime.cpp @@ -2029,9 +2029,11 @@ void Runtime::executeCpuGpuSplitTasks_timed(const std::string& bundleName, void Runtime::executeExtendedCpuGpuSplitTasks(const std::string& bundleName, const unsigned int nDistributorThreads, const RuntimeAction& actionA_cpu, + const TileWrapper& tilePrototype, const RuntimeAction& actionA_gpu, - const RuntimeAction& postActionB_cpu, const DataPacket& packetPrototype, + const RuntimeAction& postActionB_cpu, + const TileWrapper& postTilePrototype, const unsigned int nTilesPerCpuTurn) { #ifdef USE_THREADED_DISTRIBUTOR const unsigned int nDistThreads = nDistributorThreads; @@ -2099,6 +2101,7 @@ void Runtime::executeExtendedCpuGpuSplitTasks(const std::string& bundleName, teamA_cpu->attachDataReceiver(teamB_cpu); teamA_gpu->attachDataReceiver(&gpuToHost1_); gpuToHost1_.attachDataReceiver(teamB_cpu); + gpuToHost1_.setReceiverProto(&postTilePrototype); // The action parallel distributor's thread resource is used // once the distributor starts to wait @@ -2148,7 +2151,7 @@ void Runtime::executeExtendedCpuGpuSplitTasks(const std::string& bundleName, tileDesc = ti->buildCurrentTile(); if (isCpuTurn) { - teamA_cpu->enqueue( std::move(tileDesc) ); + teamA_cpu->enqueue( tilePrototype.clone( std::move(tileDesc) ) ); ++nInCpuTurn; if (nInCpuTurn >= nTilesPerCpuTurn) { @@ -2195,7 +2198,229 @@ void Runtime::executeExtendedCpuGpuSplitTasks(const std::string& bundleName, Logger::instance().log("[Runtime] End Extended CPU/GPU shared action"); } # endif +# ifndef RUNTIME_MUST_USE_TILEITER +void Runtime::setupPipelineForExtCpuGpuSplitTasks(const std::string& bundleName, + const RuntimeAction& actionA_cpu, + const TileWrapper& tilePrototype, + const RuntimeAction& actionA_gpu, + const DataPacket& packetPrototype, + const RuntimeAction& postActionB_cpu, + const TileWrapper& postTilePrototype, + const unsigned int nTilesPerCpuTurn) { + + const unsigned int nDistThreads = 1; + + Logger::instance().log("[Runtime] Start extended CPU/GPU shared action"); + std::string msg = "[Runtime] " + + std::to_string(nDistThreads) + + " distributor threads"; + Logger::instance().log(msg); + msg = "[Runtime] " + + std::to_string(nTilesPerCpuTurn) + + " tiles sent to CPU for every packet of " + + std::to_string(actionA_gpu.nTilesPerPacket) + + " tiles sent to GPU"; + Logger::instance().log(msg); + + if (nDistThreads <= 0) { + throw std::invalid_argument("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "nDistributorThreads must be positive"); + } else if (actionA_cpu.teamType != ThreadTeamDataType::BLOCK) { + throw std::logic_error("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "Given CPU action A should run on tiles, " + "which is not in configuration"); + } else if (actionA_cpu.nTilesPerPacket != 0) { + throw std::invalid_argument("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "CPU A tiles/packet should be zero since it is tile-based"); + } else if (actionA_gpu.teamType != ThreadTeamDataType::SET_OF_BLOCKS) { + throw std::logic_error("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "Given GPU action should run on packet of blocks, " + "which is not in configuration"); + } else if (actionA_gpu.nTilesPerPacket <= 0) { + throw std::invalid_argument("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "Need at least one tile per GPU packet"); + } else if (postActionB_cpu.teamType != actionA_cpu.teamType) { + throw std::logic_error("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "Given post action data type must match that " + "of CPU action A"); + } else if (postActionB_cpu.nTilesPerPacket != actionA_cpu.nTilesPerPacket) { + throw std::invalid_argument("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "Given post action tiles/packet must match that " + "of CPU action A"); + } else if (nTeams_ < 3) { + throw std::logic_error("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "Need at least three ThreadTeams in runtime"); + } + nTilesPerPacket_ = actionA_gpu.nTilesPerPacket; + nTilesPerCpuTurn_ = nTilesPerCpuTurn; + isCpuTurn_ = true; + nInCpuTurn_ = 0; + + //***** ASSEMBLE THREAD TEAM CONFIGURATION + // CPU/GPU action parallel pipeline + // 1) Action Parallel Distributor will send one fraction of data items + // to CPU for computation and each of these is enqueued directly with the post + // action thread team. + // 2) For the remaining data items, + // a) Asynchronous transfer of Packets of Blocks to GPU by distributor, + // b) GPU action applied to blocks in packet by GPU team + // c) Mover/Unpacker transfers packet back to CPU, + // copies results to Grid data structures, + // and enqueues with post action thread team. + ThreadTeam* teamA_cpu = teams_[0]; + ThreadTeam* teamA_gpu = teams_[1]; + ThreadTeam* teamB_cpu = teams_[2]; + + teamA_cpu->attachThreadReceiver(teamB_cpu); + teamA_cpu->attachDataReceiver(teamB_cpu); + teamA_cpu->setReceiverProto(&postTilePrototype); + teamA_gpu->attachDataReceiver(&gpuToHost1_); + gpuToHost1_.attachDataReceiver(teamB_cpu); + gpuToHost1_.setReceiverProto(&postTilePrototype); + + // The action parallel distributor's thread resource is used + // once the distributor starts to wait + unsigned int nTotalThreads = actionA_cpu.nInitialThreads + + nDistThreads; + if (nTotalThreads > teamA_cpu->nMaximumThreads()) { + throw std::logic_error("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "CPU team could receive too many thread " + "activation calls"); + } + nTotalThreads = actionA_cpu.nInitialThreads + + postActionB_cpu.nInitialThreads + + nDistThreads; + if (nTotalThreads > teamB_cpu->nMaximumThreads()) { + throw std::logic_error("[Runtime::setupPipelineForExtCpuGpuSplitTasks] " + "Post could receive too many thread " + "activation calls"); + } + + //***** START EXECUTION CYCLE + teamA_cpu->startCycle(actionA_cpu, "ActionSharing_CPU_Block_Team"); + teamA_gpu->startCycle(actionA_gpu, "ActionSharing_GPU_Packet_Team"); + teamB_cpu->startCycle(postActionB_cpu, "PostAction_CPU_Block_Team"); + gpuToHost1_.startCycle(); + + packet_gpu_ = packetPrototype.clone(); + + Logger::instance().log("[Runtime] End setting up extended CPU/GPU shared action"); +} + +void Runtime::pushTileToExtCpuGpuSplitPipeline(const std::string& bundleName, + const TileWrapper& tilePrototype, + const DataPacket& packetPrototype, + const TileWrapper& postTilePrototype, + const FlashxrTileRawPtrs& tP, + const FlashxTileRawInts& tI, + const FlashxTileRawReals& tR) { +#ifdef RUNTIME_PERTILE_LOG + Logger::instance().log("[Runtime] Push single tile task to EXT CPU/GPU split pipeline"); #endif + if (nTilesPerPacket_ <= 0) { + throw std::invalid_argument("[Runtime:pushTileToExtCpuGpuSplitPipeline] " + "Need at least one block per packet"); + } else if (nTeams_ < 3) { + throw std::logic_error("[Runtime:pushTileToExtCpuGpuSplitPipeline] " + "Need at three ThreadTeams in runtime"); + } + + ThreadTeam* teamA_cpu = teams_[0]; + ThreadTeam* teamA_gpu = teams_[1]; + ThreadTeam* teamB_cpu = teams_[2]; + + RuntimeBackend& backend = RuntimeBackend::instance(); + std::shared_ptr tileDesc{}; + { + + tileDesc = static_cast>(std::make_unique(tP, tI, tR)); + if (isCpuTurn_) { + teamA_cpu->enqueue( tilePrototype.clone( std::move(tileDesc) ) ); + if ((tileDesc != nullptr) || (tileDesc.use_count() != 0)) { + throw std::logic_error("[Runtime::pushTileToExtCpuGpuSplitPipeline] tileDesc ownership not transferred"); + } + + ++nInCpuTurn_; + if (nInCpuTurn_ >= nTilesPerCpuTurn_) { + isCpuTurn_ = false; + nInCpuTurn_ = 0; + } + } else { + packet_gpu_->addTile( std::move(tileDesc) ); + if ((tileDesc != nullptr) || (tileDesc.use_count() != 0)) { + throw std::logic_error("[Runtime::pushTileToExtCpuGpuSplitPipeline] tileDesc ownership not transferred"); + } + + if (packet_gpu_->nTiles() >= nTilesPerPacket_) { + packet_gpu_->pack(); +#ifdef RUNTIME_PERTILE_LOG + Logger::instance().log("[Runtime] Shipping off packet with " + + std::to_string(packet_gpu_->nTiles()) + + " tiles..."); +#endif + backend.initiateHostToGpuTransfer(*(packet_gpu_.get())); + teamA_gpu->enqueue( std::move(packet_gpu_) ); + + packet_gpu_ = packetPrototype.clone(); + isCpuTurn_ = true; + } + } + } +#ifdef RUNTIME_PERTILE_LOG + Logger::instance().log("[Runtime] Single tile task was pushed to EXT CPU/GPU split pipeline"); +#endif +} + +void Runtime::teardownPipelineForExtCpuGpuSplitTasks(const std::string& bundleName) { + + Logger::instance().log("[Runtime] Tear Down extended CPU/GPU shared action"); + + if (nTilesPerPacket_ <= 0) { + throw std::invalid_argument("[Runtime:teardownPipelineForExtCpuGpuSplitTasks] " + "Need at least one block per packet"); + } else if (nTeams_ < 3) { + throw std::logic_error("[Runtime:teardownPipelineForExtCpuGpuSplitTasks] " + "Need at least three ThreadTeams in runtime"); + } + ThreadTeam* teamA_cpu = teams_[0]; + ThreadTeam* teamA_gpu = teams_[1]; + ThreadTeam* teamB_cpu = teams_[2]; + + RuntimeBackend& backend = RuntimeBackend::instance(); + { + if (packet_gpu_->nTiles() > 0) { + packet_gpu_->pack(); +#ifdef RUNTIME_PERTILE_LOG + Logger::instance().log("[Runtime] Shipping off packet with " + + std::to_string(packet_gpu_->nTiles()) + + " final tiles..."); +#endif + backend.initiateHostToGpuTransfer(*(packet_gpu_.get())); + teamA_gpu->enqueue( std::move(packet_gpu_) ); + } else { + packet_gpu_.reset(); + } + + teamA_cpu->increaseThreadCount(1); + } // implied barrier + + teamA_gpu->closeQueue(nullptr); + teamA_cpu->closeQueue(nullptr); + + // All data flowing through the Action B/Post-A team + teamB_cpu->wait(); + + //***** BREAK APART THREAD TEAM CONFIGURATION + teamA_cpu->detachThreadReceiver(); + teamA_cpu->detachDataReceiver(); + teamA_gpu->detachDataReceiver(); + gpuToHost1_.detachDataReceiver(); + + Logger::instance().log("[Runtime:teardownPipelineForExtCpuGpuSplitTasks] End extended CPU/GPU shared action"); + +} +# endif // ifndef RUNTIME_MUST_USE_TILEITER +#endif // ifdef RUNTIME_SUPPORT_DATAPACKETS /** * diff --git a/src/Milhoja_ThreadTeam.cpp b/src/Milhoja_ThreadTeam.cpp index 8673fc5d..977d2b96 100644 --- a/src/Milhoja_ThreadTeam.cpp +++ b/src/Milhoja_ThreadTeam.cpp @@ -15,6 +15,7 @@ #include "Milhoja_ThreadTeamRunningOpen.h" #include "Milhoja_ThreadTeamRunningClosed.h" #include "Milhoja_ThreadTeamRunningNoMoreWork.h" +#include "Milhoja_TileWrapper.h" namespace milhoja { @@ -907,6 +908,11 @@ std::string ThreadTeam::attachDataReceiver(RuntimeElement* receiver) { return ""; } +void ThreadTeam::setReceiverProto(TileWrapper const * w) { + // TODO: should I mutex lock/unlock? + receiverProto_ = w; +} + /** * Detach the data subscriber so that the calling object is no longer a data * publisher. @@ -1473,7 +1479,22 @@ void* ThreadTeam::threadRoutine(void* varg) { if (team->dataReceiver_) { // Move the data item along so that dataItem is null - team->dataReceiver_->enqueue(std::move(dataItem)); + // TODO: very dirty ownership transfers + if (auto tileWrapper = std::dynamic_pointer_cast(dataItem)) { + // NOTE: this is the case where dataItem is a TilwWrapper, + // and the team->dataReceiver_ is another TileWrapper. + // Need to transfer dataItem initialized with data receiver's + // tileProtoType, as it may differ. + std::unique_ptr clonedTile = + team->receiverProto_->clone(std::move(tileWrapper->tile_)); + // Release ownership, assuming clonedTile has new ownership + dataItem.reset(); + team->dataReceiver_->enqueue(std::move(clonedTile)); + } + else { + // the data receiver is a mover/unpacker + team->dataReceiver_->enqueue(std::move(dataItem)); + } } else { // The data item is done. Null dataItem so that the current // data item's resources can be released if this was the last From e5d5ccb6a4572c5cbaa64aa05bcae55e50de0142 Mon Sep 17 00:00:00 2001 From: Klaus Weide Date: Wed, 20 Nov 2024 22:09:01 -0600 Subject: [PATCH 2/4] fix a missing line (cherry picked from commit 5e9ec79ebaa918f1dc82d2ecc9eb7f0f2423772a) --- src/Milhoja_Runtime.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Milhoja_Runtime.cpp b/src/Milhoja_Runtime.cpp index 7e0c481a..5d6dce5a 100644 --- a/src/Milhoja_Runtime.cpp +++ b/src/Milhoja_Runtime.cpp @@ -2099,6 +2099,7 @@ void Runtime::executeExtendedCpuGpuSplitTasks(const std::string& bundleName, teamA_cpu->attachThreadReceiver(teamB_cpu); teamA_cpu->attachDataReceiver(teamB_cpu); + teamA_cpu->setReceiverProto(&postTilePrototype); teamA_gpu->attachDataReceiver(&gpuToHost1_); gpuToHost1_.attachDataReceiver(teamB_cpu); gpuToHost1_.setReceiverProto(&postTilePrototype); From da53c95a35efac592c270a419a7afce6d643d3cf Mon Sep 17 00:00:00 2001 From: Youngjun Lee Date: Fri, 25 Oct 2024 12:24:23 -0500 Subject: [PATCH 3/4] better handling for data receiver's prototype (cherry picked from commit 2f9cab61ba03bc96e1e7495f6f828d70bbba4876) --- includes/Milhoja_MoverUnpacker.h | 3 +- includes/Milhoja_RuntimeElement.h | 3 ++ includes/Milhoja_ThreadTeam.h | 3 +- src/Milhoja_MoverUnpacker.cpp | 31 +++++--------- src/Milhoja_Runtime.cpp | 12 +++--- src/Milhoja_RuntimeElement.cpp | 31 +++++++++++++- src/Milhoja_ThreadTeam.cpp | 67 +++++++++++++++++++++++++------ 7 files changed, 106 insertions(+), 44 deletions(-) diff --git a/includes/Milhoja_MoverUnpacker.h b/includes/Milhoja_MoverUnpacker.h index d5a44424..50f3e808 100644 --- a/includes/Milhoja_MoverUnpacker.h +++ b/includes/Milhoja_MoverUnpacker.h @@ -46,6 +46,7 @@ #include +#include "Milhoja_DataItem.h" #include "Milhoja_TileWrapper.h" #include "Milhoja_RuntimeElement.h" @@ -63,7 +64,6 @@ class MoverUnpacker : public RuntimeElement { MoverUnpacker& operator=(const MoverUnpacker&) = delete; MoverUnpacker& operator=(MoverUnpacker&&) = delete; - void setReceiverProto(TileWrapper const *); void startCycle(void); void increaseThreadCount(const unsigned int nThreads) override; void enqueue(std::shared_ptr&& dataItem) override; @@ -72,6 +72,7 @@ class MoverUnpacker : public RuntimeElement { void wait(void); RuntimeElement* dataReceiver(void) const { return dataReceiver_; } + const DataItem* receiverPrototype(void) const { return receiverPrototype_; } private: enum class State {Idle, Open, Closed}; diff --git a/includes/Milhoja_RuntimeElement.h b/includes/Milhoja_RuntimeElement.h index 476d0a81..0ff0bd6e 100644 --- a/includes/Milhoja_RuntimeElement.h +++ b/includes/Milhoja_RuntimeElement.h @@ -46,6 +46,8 @@ class RuntimeElement { virtual std::string attachDataReceiver(RuntimeElement* receiver); virtual std::string detachDataReceiver(void); + virtual std::string setReceiverPrototype(const DataItem* prototype); + protected: RuntimeElement(void); virtual ~RuntimeElement(void); @@ -58,6 +60,7 @@ class RuntimeElement { to once this team's action has already been applied to the items. */ + const DataItem* receiverPrototype_; std::map calledCloseQueue_; /*!< The keys in this map serve as a list of data publishers attached to the object. diff --git a/includes/Milhoja_ThreadTeam.h b/includes/Milhoja_ThreadTeam.h index f079d3cb..a594624d 100644 --- a/includes/Milhoja_ThreadTeam.h +++ b/includes/Milhoja_ThreadTeam.h @@ -114,7 +114,7 @@ class ThreadTeam : public RuntimeElement { // into thread team configurations. std::string attachDataReceiver(RuntimeElement* receiver) override; std::string detachDataReceiver(void) override; - void setReceiverProto(TileWrapper const * w); + std::string setReceiverPrototype(const DataItem* prototype) override; protected: constexpr static unsigned int THREAD_START_STOP_TIMEOUT_SEC = 1; @@ -215,7 +215,6 @@ class ThreadTeam : public RuntimeElement { // Keep track of when wait() is blocking and when it is released bool isWaitBlocking_; //!< Only a single thread can be blocked - const TileWrapper * receiverProto_; }; } diff --git a/src/Milhoja_MoverUnpacker.cpp b/src/Milhoja_MoverUnpacker.cpp index 2c350f71..39c0d5dc 100644 --- a/src/Milhoja_MoverUnpacker.cpp +++ b/src/Milhoja_MoverUnpacker.cpp @@ -193,18 +193,17 @@ void MoverUnpacker::handleTransferFinished(void* userData) { // Transfer the ownership of the data items in the packet to the next team if (dataReceiver) { - while (packet->nTiles() > 0) { -#if(0) - std::shared_ptr curTile = std::move(packet->popTile()); - std::shared_ptr wrappedTile = - unpacker->tileProto_->clone( std::move(curTile) ); - dataReceiver->enqueue( std::move(wrappedTile) ); -#endif - dataReceiver->enqueue( - unpacker->tileProto_->clone(packet->popTile()) - ); + auto receiverPrototype = unpacker->receiverPrototype(); + if (receiverPrototype) { + const TileWrapper* tileWrapperPrototype = + dynamic_cast(receiverPrototype); + while (packet->nTiles() > 0) { + dataReceiver->enqueue( + tileWrapperPrototype->clone(packet->popTile()) + ); + } + dataReceiver = nullptr; } - dataReceiver = nullptr; } packet = nullptr; @@ -312,15 +311,5 @@ void MoverUnpacker::wait(void) { pthread_mutex_unlock(&mutex_); } -void MoverUnpacker::setReceiverProto(TileWrapper const * w) { - - if (state_ != State::Idle) { - throw std::logic_error("[MoverUnpacker::setReceiverProto] " - "This setter should only be called in Idle state"); - } - tileProto_ = w; - -} - } diff --git a/src/Milhoja_Runtime.cpp b/src/Milhoja_Runtime.cpp index 5d6dce5a..bb9fc23a 100644 --- a/src/Milhoja_Runtime.cpp +++ b/src/Milhoja_Runtime.cpp @@ -1145,7 +1145,7 @@ void Runtime::executeExtendedGpuTasks(const std::string& bundleName, gpuTeam->attachThreadReceiver(postGpuTeam); gpuTeam->attachDataReceiver(&gpuToHost1_); gpuToHost1_.attachDataReceiver(postGpuTeam); - gpuToHost1_.setReceiverProto(&tilePrototype); + gpuToHost1_.setReceiverPrototype(&tilePrototype); unsigned int nTotalThreads = gpuAction.nInitialThreads + postGpuAction.nInitialThreads @@ -1252,7 +1252,7 @@ void Runtime::setupPipelineForExtGpuTasks(const std::string& bundleName, gpuTeam->attachThreadReceiver(postGpuTeam); gpuTeam->attachDataReceiver(&gpuToHost1_); gpuToHost1_.attachDataReceiver(postGpuTeam); - gpuToHost1_.setReceiverProto(&tilePrototype); + gpuToHost1_.setReceiverPrototype(&tilePrototype); unsigned int nTotalThreads = gpuAction.nInitialThreads + postGpuAction.nInitialThreads @@ -2099,10 +2099,10 @@ void Runtime::executeExtendedCpuGpuSplitTasks(const std::string& bundleName, teamA_cpu->attachThreadReceiver(teamB_cpu); teamA_cpu->attachDataReceiver(teamB_cpu); - teamA_cpu->setReceiverProto(&postTilePrototype); + teamA_cpu->setReceiverPrototype(&postTilePrototype); teamA_gpu->attachDataReceiver(&gpuToHost1_); gpuToHost1_.attachDataReceiver(teamB_cpu); - gpuToHost1_.setReceiverProto(&postTilePrototype); + gpuToHost1_.setReceiverPrototype(&postTilePrototype); // The action parallel distributor's thread resource is used // once the distributor starts to wait @@ -2274,10 +2274,10 @@ void Runtime::setupPipelineForExtCpuGpuSplitTasks(const std::string& bundleName, teamA_cpu->attachThreadReceiver(teamB_cpu); teamA_cpu->attachDataReceiver(teamB_cpu); - teamA_cpu->setReceiverProto(&postTilePrototype); + teamA_cpu->setReceiverPrototype(&postTilePrototype); teamA_gpu->attachDataReceiver(&gpuToHost1_); gpuToHost1_.attachDataReceiver(teamB_cpu); - gpuToHost1_.setReceiverProto(&postTilePrototype); + gpuToHost1_.setReceiverPrototype(&postTilePrototype); // The action parallel distributor's thread resource is used // once the distributor starts to wait diff --git a/src/Milhoja_RuntimeElement.cpp b/src/Milhoja_RuntimeElement.cpp index 9cc0980d..7d768131 100644 --- a/src/Milhoja_RuntimeElement.cpp +++ b/src/Milhoja_RuntimeElement.cpp @@ -7,6 +7,7 @@ using namespace milhoja; RuntimeElement::RuntimeElement(void) : threadReceiver_{nullptr}, dataReceiver_{nullptr}, + receiverPrototype_{nullptr}, calledCloseQueue_{} { } @@ -17,6 +18,9 @@ RuntimeElement::~RuntimeElement(void) { if (dataReceiver_) { std::cerr << "[RuntimeElement::~RuntimeElement] Data Subscriber still attached\n"; } + if (receiverPrototype_) { + std::cerr << "[RuntimeElement::~RuntimeElement] Receiver Prototype still set\n"; + } if (!calledCloseQueue_.empty()) { std::cerr << "[RuntimeElement::~RuntimeElement] Data publishers still attached\n"; // FIXME: Does this help prevent valgrind from finding potential pointer @@ -105,7 +109,10 @@ std::string RuntimeElement::detachDataReceiver(void) { } dataReceiver_ = nullptr; - + + // if it has a receiver's prototype, release it + receiverPrototype_ = nullptr; + return ""; } @@ -155,3 +162,25 @@ std::string RuntimeElement::detachDataPublisher(const RuntimeElement* publisher) return ""; } +/** + * Set the data receiver's prototype for later use when passing + * a DataItem to the data receiver, for calling a proper constructor. + * Note that the receiver's prototype is only required for passing TilwWrapper, currently. + * Thus, calling this function for the DataPacket has no effect. + * The receiverPrototype_ will be nullified when RuntimeElement::detachDataReceiver is called. + * + * \param prototype - A prototype of a DataItem to be passed to the DataReceiver. + */ +std::string RuntimeElement::setReceiverPrototype(const DataItem* prototype) { + + if (!prototype) { + return "Null receiver prototype is given"; + } else if (receiverPrototype_) { + return "A receiver prototype is already given"; + } + + receiverPrototype_ = prototype; + + return ""; +} + diff --git a/src/Milhoja_ThreadTeam.cpp b/src/Milhoja_ThreadTeam.cpp index 977d2b96..b396be5c 100644 --- a/src/Milhoja_ThreadTeam.cpp +++ b/src/Milhoja_ThreadTeam.cpp @@ -4,6 +4,7 @@ #include "Milhoja_ThreadTeam.h" +#include #include #include #include @@ -908,9 +909,42 @@ std::string ThreadTeam::attachDataReceiver(RuntimeElement* receiver) { return ""; } -void ThreadTeam::setReceiverProto(TileWrapper const * w) { - // TODO: should I mutex lock/unlock? - receiverProto_ = w; + +/** + * + */ +std::string ThreadTeam::setReceiverPrototype(const DataItem* prototype) { + pthread_mutex_lock(&teamMutex_); + + std::string errMsg(""); + if (!state_) { + errMsg = printState_NotThreadsafe("setReceiverPrototype", 0, + "state_ is NULL"); + pthread_mutex_unlock(&teamMutex_); + throw std::runtime_error(errMsg); + } + std::string msg = state_->isStateValid_NotThreadSafe(); + if (msg != "") { + errMsg = printState_NotThreadsafe("setReceiverPrototype", 0, msg); + pthread_mutex_unlock(&teamMutex_); + throw std::runtime_error(errMsg); + } else if (state_->mode() != ThreadTeamMode::IDLE) { + errMsg = printState_NotThreadsafe("setReceiverPrototype", 0, + "A team can only be attached in the Idle mode"); + pthread_mutex_unlock(&teamMutex_); + throw std::logic_error(errMsg); + } + + errMsg = RuntimeElement::setReceiverPrototype(prototype); + if (errMsg != "") { + errMsg = printState_NotThreadsafe("setReceiverPrototype", 0, errMsg); + pthread_mutex_unlock(&teamMutex_); + throw std::logic_error(errMsg); + } + + pthread_mutex_unlock(&teamMutex_); + + return ""; } /** @@ -1479,17 +1513,24 @@ void* ThreadTeam::threadRoutine(void* varg) { if (team->dataReceiver_) { // Move the data item along so that dataItem is null - // TODO: very dirty ownership transfers if (auto tileWrapper = std::dynamic_pointer_cast(dataItem)) { - // NOTE: this is the case where dataItem is a TilwWrapper, - // and the team->dataReceiver_ is another TileWrapper. - // Need to transfer dataItem initialized with data receiver's - // tileProtoType, as it may differ. - std::unique_ptr clonedTile = - team->receiverProto_->clone(std::move(tileWrapper->tile_)); - // Release ownership, assuming clonedTile has new ownership - dataItem.reset(); - team->dataReceiver_->enqueue(std::move(clonedTile)); + if (auto tileWrapperPrototype = + dynamic_cast(team->receiverPrototype_)) { + // NOTE: this is the case where dataItem is a TilwWrapper, + // and the team->dataReceiver_ is another TileWrapper. + // Need to transfer dataItem initialized with data receiver's + // tileProtoType, as it may differ. + // TODO: very dirty ownership transfers + std::unique_ptr clonedTileWrapper = + tileWrapperPrototype->clone(std::move(tileWrapper->tile_)); + // Release ownership, assuming clonedTileWrapper has new ownership + dataItem.reset(); + team->dataReceiver_->enqueue(std::move(clonedTileWrapper)); + } + else { + // receiver prototype is not a tilewrapper. do the normal thing + team->dataReceiver_->enqueue(std::move(dataItem)); + } } else { // the data receiver is a mover/unpacker From 6cf605a024a79d5cc59214d5c12d96fcd4c9dfe1 Mon Sep 17 00:00:00 2001 From: Youngjun Lee Date: Thu, 7 Nov 2024 23:46:58 -0600 Subject: [PATCH 4/4] more information (cherry picked from commit 45b5bdb74ae47888735cc63a165c9d21740c5a55) --- tools/milhoja_pypkg/src/milhoja/tests/TestCodeGenerators.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/milhoja_pypkg/src/milhoja/tests/TestCodeGenerators.py b/tools/milhoja_pypkg/src/milhoja/tests/TestCodeGenerators.py index a4698268..55a3a5ed 100644 --- a/tools/milhoja_pypkg/src/milhoja/tests/TestCodeGenerators.py +++ b/tools/milhoja_pypkg/src/milhoja/tests/TestCodeGenerators.py @@ -74,7 +74,7 @@ def run_tests(self, tests_all, dims_all, create_generator): ref = self.__load_code(ref_hdr_fname) generated = self.__load_code(header_filename) - self.assertEqual(len(ref), len(generated)) + self.assertEqual(len(ref), len(generated), f"generated != {ref_hdr_fname}") for gen_line, ref_line in zip(generated, ref): self.assertEqual(gen_line, ref_line, f"generated != {ref_hdr_fname}") @@ -95,7 +95,7 @@ def run_tests(self, tests_all, dims_all, create_generator): ref = self.__load_code(ref_src_fname) generated = self.__load_code(source_filename) - self.assertEqual(len(ref), len(generated)) + self.assertEqual(len(ref), len(generated), f"generated != {ref_src_fname}") for gen_line, ref_line in zip(generated, ref): self.assertEqual(gen_line, ref_line, f"generated != {ref_src_fname}")