Skip to content

Commit

Permalink
add support for cooperative queue
Browse files Browse the repository at this point in the history
Change-Id: I9b4f73a703da94d146745ef1a6bde6adddddeea1
  • Loading branch information
scchan committed Mar 4, 2020
1 parent 57a4f05 commit e3f2a12
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 35 deletions.
13 changes: 13 additions & 0 deletions include/hc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,19 @@ class accelerator
return pQueue;
}

/**
* Creates and returns a new accelerator view on the accelerator that
* support cooperative queueing.
*
* @param[in] qmode The queuing mode of the accelerator_view to be created.
* See "Queuing Mode". The default value would be
* queueing_mdoe_automatic if not specified.
*/
accelerator_view create_cooperative_view() {
auto pQueue = pDev->createCooperativeQueue();
return pQueue;
}

/**
* Clients can use the underlying device as an identifier for the
* accelerator. This complies with the equality opertor below,
Expand Down
6 changes: 6 additions & 0 deletions include/kalmar_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ class KalmarDevice

/// create KalmarQueue from current device
virtual std::shared_ptr<KalmarQueue> createQueue(execute_order order = execute_in_order, queue_priority priority = priority_normal) = 0;

/// create KalmarQueue from current device that support cooperative queuing
virtual std::shared_ptr<KalmarQueue> createCooperativeQueue() = 0;

virtual ~KalmarDevice() {}

std::shared_ptr<KalmarQueue> get_default_queue() {
Expand Down Expand Up @@ -502,6 +506,8 @@ class CPUDevice final : public KalmarDevice
uint32_t get_version() const override { return 0; }

std::shared_ptr<KalmarQueue> createQueue(execute_order order = execute_in_order, queue_priority priority = priority_normal) override { return std::shared_ptr<KalmarQueue>(new CPUQueue(this)); }
std::shared_ptr<KalmarQueue> createCooperativeQueue() override { return createQueue(); }

void* create(size_t count, struct rw_info* /* not used */ ) override { return kalmar_aligned_alloc(0x1000, count); }
void release(void* ptr, struct rw_info* /* nout used */) override { kalmar_aligned_free(ptr); }
void* CreateKernel(const char* fun, KalmarQueue *queue) override { return nullptr; }
Expand Down
109 changes: 74 additions & 35 deletions lib/hsa/mcwamp_hsa.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1228,29 +1228,7 @@ struct RocrQueue {
static void callbackQueue(hsa_status_t status, hsa_queue_t* queue, void *data) {
STATUS_CHECK(status, __LINE__);
}

RocrQueue(hsa_agent_t agent, size_t queue_size, HSAQueue *hccQueue, queue_priority priority)
: _priority(priority_normal)
{
assert(queue_size != 0);

/// Create a queue using the maximum size.
hsa_status_t status = hsa_queue_create(agent, queue_size, HSA_QUEUE_TYPE_SINGLE, callbackQueue, NULL,
UINT32_MAX, UINT32_MAX, &_hwQueue);
DBOUT(DB_QUEUE, " " << __func__ << ": created an HSA command queue: " << _hwQueue << "\n");
STATUS_CHECK(status, __LINE__);

// Set queue priority
status = setPriority(priority);
STATUS_CHECK(status, __LINE__);

// TODO - should we provide a mechanism to conditionally enable profiling as a performance optimization?
status = hsa_amd_profiling_set_profiler_enabled(_hwQueue, 1);

// Create the links between the queues:
assignHccQueue(hccQueue);
}

RocrQueue(hsa_agent_t agent, size_t queue_size, HSAQueue *hccQueue, queue_priority priority);
~RocrQueue() {

DBOUT(DB_QUEUE, " " << __func__ << ": destroy an HSA command queue: " << _hwQueue << "\n");
Expand Down Expand Up @@ -1371,12 +1349,15 @@ class HSAQueue final : public KalmarQueue
// signal used by sync copy only
hsa_signal_t sync_copy_signal;

// indicated whether this HSAQueue has already been used
// indicate whether this HSAQueue has already been used
// for compute or copy tasks
bool has_been_used;

// indicate whether this is a cooperative queue
bool is_cooperative;

public:
HSAQueue(KalmarDevice* pDev, hsa_agent_t agent, execute_order order, queue_priority priority) ;
HSAQueue(KalmarDevice* pDev, hsa_agent_t agent, execute_order order, queue_priority priority, bool cooperative = false) ;

bool nextKernelNeedsSysAcquire() const { return _nextKernelNeedsSysAcquire; };
void setNextKernelNeedsSysAcquire(bool r) { _nextKernelNeedsSysAcquire = r; };
Expand All @@ -1394,6 +1375,8 @@ class HSAQueue final : public KalmarQueue

Kalmar::HSADevice * getHSADev() const;

bool isCooperative() { return is_cooperative; }

void dispose() override;

~HSAQueue() {
Expand Down Expand Up @@ -2194,6 +2177,32 @@ class HSAQueue final : public KalmarQueue

};

RocrQueue::RocrQueue(hsa_agent_t agent, size_t queue_size, HSAQueue *hccQueue, queue_priority priority)
: _priority(priority_normal)
{
assert(queue_size != 0);

/// Create a queue using the maximum size.
auto qtype = HSA_QUEUE_TYPE_SINGLE;
if (hccQueue->isCooperative())
{
qtype = HSA_QUEUE_TYPE_COOPERATIVE;
}
hsa_status_t status = hsa_queue_create(agent, queue_size, qtype, callbackQueue, NULL,
UINT32_MAX, UINT32_MAX, &_hwQueue);
DBOUT(DB_QUEUE, " " << __func__ << ": created an HSA command queue: " << _hwQueue << "\n");
STATUS_CHECK(status, __LINE__);

// Set queue priority
status = setPriority(priority);
STATUS_CHECK(status, __LINE__);

// TODO - should we provide a mechanism to conditionally enable profiling as a performance optimization?
status = hsa_amd_profiling_set_profiler_enabled(_hwQueue, 1);

// Create the links between the queues:
assignHccQueue(hccQueue);
}

void RocrQueue::assignHccQueue(HSAQueue *hccQueue) {
assert (hccQueue->rocrQueue == nullptr); // only needy should assign new queue
Expand Down Expand Up @@ -2267,6 +2276,9 @@ class HSADevice final : public KalmarDevice
std::mutex queues_mutex; // protects access to the queues vector:
std::vector< std::weak_ptr<KalmarQueue> > queues;

bool support_cooperative_queue = false;
std::shared_ptr<KalmarQueue> cooperative_queue = nullptr;

// TODO: Do we need to maintain a different mutex for each queue priority?
// In which case HSAQueue::dispose needs to handle locking the appropriate mutex
std::mutex rocrQueuesMutex; // protects rocrQueues
Expand Down Expand Up @@ -2450,6 +2462,10 @@ class HSADevice final : public KalmarDevice

public:

size_t getQueueSize() {
return queue_size;
}

uint32_t getWorkgroupMaxSize() {
return workgroup_max_size;
}
Expand Down Expand Up @@ -2982,6 +2998,20 @@ class HSADevice final : public KalmarDevice
return q;
}

std::shared_ptr<KalmarQueue> createCooperativeQueue() override {
if (cooperative_queue == nullptr) {
std::lock_guard<std::mutex> l(queues_mutex);
if (cooperative_queue == nullptr) {
if (!support_cooperative_queue) {
throw Kalmar::runtime_exception("agent doesn't support cooperative queue", -1);
}
cooperative_queue = std::make_shared<HSAQueue>(this, agent, execute_in_order, priority_normal, true);
}
}
return cooperative_queue;
}


size_t GetMaxTileStaticSize() override {
return max_tile_static_size;
}
Expand Down Expand Up @@ -3991,10 +4021,11 @@ HSADevice::HSADevice(hsa_agent_t a, hsa_agent_t host, int x_accSeqNum) :
assert (__builtin_popcount(MAX_INFLIGHT_COMMANDS_PER_QUEUE) == 1); // make sure this is power of 2.
}

status = hsa_amd_profiling_async_copy_enable(1);
status = hsa_agent_get_info(agent, (hsa_agent_info_t)HSA_AMD_AGENT_INFO_COOPERATIVE_QUEUES, &support_cooperative_queue);
STATUS_CHECK(status, __LINE__);


status = hsa_amd_profiling_async_copy_enable(1);
STATUS_CHECK(status, __LINE__);

/// Iterate over memory pool of the device and its host
status = hsa_amd_agent_iterate_memory_pools(agent, HSADevice::find_group_memory, &max_tile_static_size);
Expand Down Expand Up @@ -4139,18 +4170,20 @@ std::ostream& operator<<(std::ostream& os, const HSAQueue & hav)
return os;
}




HSAQueue::HSAQueue(KalmarDevice* pDev, hsa_agent_t agent, execute_order order, queue_priority priority) :
HSAQueue::HSAQueue(KalmarDevice* pDev, hsa_agent_t agent, execute_order order, queue_priority priority,
bool cooperative) :
KalmarQueue(pDev, queuing_mode_automatic, order, priority),
rocrQueue(nullptr),
asyncOps(HCC_ASYNCOPS_SIZE), asyncOpsIndex(0),
valid(true), _nextSyncNeedsSysRelease(false), _nextKernelNeedsSysAcquire(false), bufferKernelMap(), kernelBufferMap(),
has_been_used(false)
has_been_used(false), is_cooperative(cooperative)
{

if (!HCC_LAZY_QUEUE_CREATION) {
if (is_cooperative) {
HSADevice* device = reinterpret_cast<HSADevice*>(pDev);
rocrQueue = new RocrQueue(agent, device->getQueueSize(), this, priority);
}
else if (!HCC_LAZY_QUEUE_CREATION) {
// Protect the HSA queue we can steal it.
DBOUT(DB_LOCK, " ptr:" << this << " create lock_guard...\n");

Expand All @@ -4166,7 +4199,6 @@ HSAQueue::HSAQueue(KalmarDevice* pDev, hsa_agent_t agent, execute_order order, q
STATUS_CHECK(status, __LINE__);
}


void HSAQueue::dispose() {
hsa_status_t status;

Expand Down Expand Up @@ -4204,7 +4236,14 @@ void HSAQueue::dispose() {
kernelBufferMap.clear();

if (this->rocrQueue != nullptr) {
device->removeRocrQueue(rocrQueue);
if (is_cooperative) {
// the cooperative queue is not in the queue pool
// so just delete it
delete(rocrQueue);
}
else {
device->removeRocrQueue(rocrQueue);
}
rocrQueue = nullptr;
}
}
Expand Down

0 comments on commit e3f2a12

Please sign in to comment.