Skip to content

Commit

Permalink
Extract CommandQueue interface (#17352)
Browse files Browse the repository at this point in the history
### Ticket
#17339

### Problem description
CommandQueue exposes all the details.
CommandQueue has many things in the interface which I'd prefer us to
cleanup.
We need to allow MeshBuffer and MeshWorkload to go via CommandQueue.

### What's changed
CommandQueue is now just an inteface
command_queue.hpp is what you care about and it now includes way less.

Moved command_queue_commands.hpp our of API folder. Yay! 😁

HWCommandQueue implements that interface. 
Device creates it inside.
hardware_command_queue.hpp is relocated from api and can't be included.
Yay! 😁

There is still some crap with 3 free functions which had Enqueue in them
- which have nothing to do with CommandQueue - for now, located in
command_queue.hpp 💩
There is some mess with command_queue_interface.hpp and
command_queue_commands.hpp. Need to sort out separately. 💩

### What's next
I expect that next we work our an interface that works for both
MeshBuffer and MeshWorkload, while still accommodating Buffer and
Program 🔮
```cpp
// CommandQueue
virtual void enqueue_program(Program& program, bool blocking) = 0;
virtual void enqueue_write_buffer(Buffer& buffer, const void* src, const BufferRegion& region, bool blocking, tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;
```
vs
```cpp
// MeshCommandQueue
void enqueue_mesh_workload(MeshWorkload& mesh_workload, bool blocking);
void enqueue_write_shard(
        std::shared_ptr<MeshBuffer>& mesh_buffer, const void* host_data, const Coordinate& coord, bool blocking);
void enqueue_write_shard_to_sub_grid(
        const MeshBuffer& buffer, const void* host_data, const LogicalDeviceRange& device_range, bool blocking);
void enqueue_write_mesh_buffer(const std::shared_ptr<MeshBuffer>& buffer, const void* host_data, bool blocking);
```

#### enqueue_program vs enqueue_mesh_workload

I propose to update as `enqueue_workload(MeshWorkload)` and make sure
that MeshWorkload can be (maybe even implicitly) constructed from
Program

#### enqueue_write_buffer vs enqueue_write*

I don't know how to best handle. Need to align with @omilyutin-tt
@tt-asaigal @cfjchu

### Checklist
- [x] [Post commit
CI](https://github.com/tenstorrent/tt-metal/actions/runs/13060406436)
  • Loading branch information
ayerofieiev-tt authored Jan 31, 2025
1 parent 1550e2a commit a44e186
Show file tree
Hide file tree
Showing 20 changed files with 565 additions and 455 deletions.
245 changes: 63 additions & 182 deletions tt_metal/api/tt-metalium/command_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,217 +1,98 @@
// SPDX-FileCopyrightText: © 2023 Tenstorrent Inc.
// SPDX-FileCopyrightText: © 2025 Tenstorrent Inc.
//
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <cstdint>
#include <memory>
#include <span>
#include <thread>
#include <utility>
#include <vector>

#include "env_lib.hpp"
#include "command_queue_interface.hpp"
#include "device_command.hpp"
#include "lock_free_queue.hpp"
#include "program_command_sequence.hpp"
#include "worker_config_buffer.hpp"
#include "program_impl.hpp"
#include "trace_buffer.hpp"
#include "hardware_command_queue.hpp"
#include "memcpy.hpp"
#include "command_queue_interface.hpp"

namespace tt::tt_metal {
inline namespace v0 {

class BufferRegion;
inline namespace v0 {
class Event;
class Trace;
using RuntimeArgs = std::vector<std::variant<Buffer*, uint32_t>>;

class Program;
class Kernel;
} // namespace v0

// Only contains the types of commands which are enqueued onto the device
enum class EnqueueCommandType {
ENQUEUE_READ_BUFFER,
ENQUEUE_WRITE_BUFFER,
GET_BUF_ADDR,
ADD_BUFFER_TO_PROGRAM,
SET_RUNTIME_ARGS,
ENQUEUE_PROGRAM,
ENQUEUE_TRACE,
ENQUEUE_RECORD_EVENT,
ENQUEUE_WAIT_FOR_EVENT,
FINISH,
FLUSH,
TERMINATE,
INVALID
};

string EnqueueCommandTypeToString(EnqueueCommandType ctype);
class CommandQueue {
public:
virtual ~CommandQueue() = default;

class Command {
public:
Command() {}
virtual void process() {};
virtual EnqueueCommandType type() = 0;
};
virtual const CoreCoord& virtual_enqueue_program_dispatch_core() const = 0;
virtual const CoreCoord& completion_queue_writer_core() const = 0;

class EnqueueProgramCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
NOC noc_index;
Program& program;
SystemMemoryManager& manager;
WorkerConfigBufferMgr& config_buffer_mgr;
CoreCoord dispatch_core;
CoreType dispatch_core_type;
uint32_t expected_num_workers_completed;
uint32_t packed_write_max_unicast_sub_cmds;
uint32_t dispatch_message_addr;
uint32_t multicast_cores_launch_message_wptr = 0;
uint32_t unicast_cores_launch_message_wptr = 0;
// TODO: There will be multiple ids once programs support spanning multiple sub_devices
SubDeviceId sub_device_id = SubDeviceId{0};

public:
EnqueueProgramCommand(
uint32_t command_queue_id,
IDevice* device,
NOC noc_index,
Program& program,
CoreCoord& dispatch_core,
SystemMemoryManager& manager,
WorkerConfigBufferMgr& config_buffer_mgr,
uint32_t expected_num_workers_completed,
uint32_t multicast_cores_launch_message_wptr,
uint32_t unicast_cores_launch_message_wptr,
SubDeviceId sub_device_id);

void process();

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_PROGRAM; }

constexpr bool has_side_effects() { return true; }
};
virtual volatile bool is_dprint_server_hung() = 0;
virtual volatile bool is_noc_hung() = 0;

class EnqueueRecordEventCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
NOC noc_index;
SystemMemoryManager& manager;
uint32_t event_id;
tt::stl::Span<const uint32_t> expected_num_workers_completed;
tt::stl::Span<const SubDeviceId> sub_device_ids;
bool clear_count;
bool write_barrier;

public:
EnqueueRecordEventCommand(
uint32_t command_queue_id,
IDevice* device,
NOC noc_index,
SystemMemoryManager& manager,
uint32_t event_id,
tt::stl::Span<const uint32_t> expected_num_workers_completed,
tt::stl::Span<const SubDeviceId> sub_device_ids,
bool clear_count = false,
bool write_barrier = true);
virtual void record_begin(const uint32_t tid, const std::shared_ptr<TraceDescriptor>& ctx) = 0;
virtual void record_end() = 0;
virtual void set_num_worker_sems_on_dispatch(uint32_t num_worker_sems) = 0;
virtual void set_go_signal_noc_data_on_dispatch(const vector_memcpy_aligned<uint32_t>& go_signal_noc_data) = 0;

void process();
virtual void reset_worker_state(
bool reset_launch_msg_state,
uint32_t num_sub_devices,
const vector_memcpy_aligned<uint32_t>& go_signal_noc_data) = 0;

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_RECORD_EVENT; }
virtual uint32_t id() const = 0;
virtual std::optional<uint32_t> tid() const = 0;

constexpr bool has_side_effects() { return false; }
};
virtual SystemMemoryManager& sysmem_manager() = 0;

class EnqueueWaitForEventCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
SystemMemoryManager& manager;
const Event& sync_event;
CoreType dispatch_core_type;
bool clear_count;
virtual void terminate() = 0;

public:
EnqueueWaitForEventCommand(
uint32_t command_queue_id,
IDevice* device,
SystemMemoryManager& manager,
const Event& sync_event,
bool clear_count = false);
virtual IDevice* device() = 0;

void process();
// These functions are temporarily needed since MeshCommandQueue relies on the CommandQueue object
virtual uint32_t get_expected_num_workers_completed_for_sub_device(uint32_t sub_device_index) const = 0;
virtual void set_expected_num_workers_completed_for_sub_device(uint32_t sub_device_index, uint32_t num_workers) = 0;
virtual WorkerConfigBufferMgr& get_config_buffer_mgr(uint32_t index) = 0;

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_WAIT_FOR_EVENT; }
virtual void enqueue_trace(const uint32_t trace_id, bool blocking) = 0;

constexpr bool has_side_effects() { return false; }
};
virtual void enqueue_program(Program& program, bool blocking) = 0;

class EnqueueTraceCommand : public Command {
private:
uint32_t command_queue_id;
Buffer& buffer;
IDevice* device;
SystemMemoryManager& manager;
std::shared_ptr<TraceDescriptor>& descriptor;
std::array<uint32_t, dispatch_constants::DISPATCH_MESSAGE_ENTRIES>& expected_num_workers_completed;
bool clear_count;
NOC noc_index;
CoreCoord dispatch_core;
public:
EnqueueTraceCommand(
uint32_t command_queue_id,
IDevice* device,
SystemMemoryManager& manager,
std::shared_ptr<TraceDescriptor>& descriptor,
virtual void enqueue_read_buffer(
std::shared_ptr<Buffer>& buffer,
void* dst,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;
virtual void enqueue_read_buffer(
Buffer& buffer,
std::array<uint32_t, dispatch_constants::DISPATCH_MESSAGE_ENTRIES>& expected_num_workers_completed,
NOC noc_index,
CoreCoord dispatch_core);
void* dst,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;

void process();

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_TRACE; }

constexpr bool has_side_effects() { return true; }
};

class EnqueueTerminateCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
SystemMemoryManager& manager;

public:
EnqueueTerminateCommand(uint32_t command_queue_id, IDevice* device, SystemMemoryManager& manager);

void process();

EnqueueCommandType type() { return EnqueueCommandType::TERMINATE; }
virtual void enqueue_record_event(
const std::shared_ptr<Event>& event,
bool clear_count = false,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;
virtual void enqueue_wait_for_event(const std::shared_ptr<Event>& sync_event, bool clear_count = false) = 0;

virtual void enqueue_write_buffer(
const std::variant<std::reference_wrapper<Buffer>, std::shared_ptr<Buffer>>& buffer,
HostDataType src,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;
virtual void enqueue_write_buffer(
Buffer& buffer,
const void* src,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;

constexpr bool has_side_effects() { return false; }
virtual void finish(tt::stl::Span<const SubDeviceId> sub_device_ids) = 0;
};

// Primitives used to place host only operations on the SW Command Queue.
// These are used in functions exposed through tt_metal.hpp or host_api.hpp
void EnqueueGetBufferAddr(uint32_t* dst_buf_addr, const Buffer* buffer, bool blocking);
void EnqueueSetRuntimeArgs(
const std::shared_ptr<Kernel>& kernel,
const CoreCoord& core_coord,
const std::shared_ptr<RuntimeArgs>& runtime_args_ptr,
bool blocking);
void EnqueueAddBufferToProgram(
const std::variant<std::reference_wrapper<Buffer>, std::shared_ptr<Buffer>>& buffer,
Program& program,
bool blocking);

} // namespace tt::tt_metal

std::ostream& operator<<(std::ostream& os, const tt::tt_metal::EnqueueCommandType& type);
1 change: 1 addition & 0 deletions tt_metal/api/tt-metalium/command_queue_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "hal.hpp"
#include "dispatch_settings.hpp"
#include "helpers.hpp"
#include "buffer.hpp"

// FIXME: Don't do this in header files
using namespace tt::tt_metal;
Expand Down
1 change: 0 additions & 1 deletion tt_metal/api/tt-metalium/device_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "hal.hpp"
#include "command_queue_interface.hpp"
#include "command_queue.hpp"
#include "hardware_command_queue.hpp"
#include "sub_device_manager_tracker.hpp"
#include "sub_device_types.hpp"
#include "trace_buffer.hpp"
Expand Down
7 changes: 5 additions & 2 deletions tt_metal/api/tt-metalium/program_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ namespace distributed {
class MeshWorkload;
} // namespace distributed

class JitBuildOptions;
class EnqueueProgramCommand;
class CommandQueue;
class JitBuildOptions;
// Must be removed. Only here because its a friend of a Program
class HWCommandQueue;

namespace detail{
class Program_;

Expand Down Expand Up @@ -232,7 +235,7 @@ class Program {
template<typename T> friend void program_dispatch::finalize_program_offsets(T&, IDevice*);
template <typename WorkloadType, typename DeviceType>
friend uint32_t program_dispatch::program_base_addr_on_core(WorkloadType&, DeviceType, HalProgrammableCoreType);
friend CommandQueue;
friend HWCommandQueue;
friend EnqueueProgramCommand;
friend distributed::MeshWorkload;
friend detail::Internal_;
Expand Down
4 changes: 2 additions & 2 deletions tt_metal/distributed/mesh_command_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void MeshCommandQueue::read_shard_from_device(
buffer_dispatch::copy_sharded_buffer_from_core_to_completion_queue(
core_id, *shard_view, dispatch_params, sub_device_ids, cores[core_id], this->dispatch_core_type());
if (dispatch_params.pages_per_txn > 0) {
auto read_descriptor = std::get<tt::tt_metal::detail::ReadBufferDescriptor>(
auto read_descriptor = std::get<tt::tt_metal::ReadBufferDescriptor>(
*buffer_dispatch::generate_sharded_buffer_read_descriptor(dst, dispatch_params, *shard_view));
buffer_dispatch::copy_completion_queue_data_into_user_space(
read_descriptor, mmio_device_id, channel, id_, device->sysmem_manager(), exit_condition);
Expand All @@ -222,7 +222,7 @@ void MeshCommandQueue::read_shard_from_device(
buffer_dispatch::copy_interleaved_buffer_to_completion_queue(
dispatch_params, *shard_view, sub_device_ids, this->dispatch_core_type());
if (dispatch_params.pages_per_txn > 0) {
auto read_descriptor = std::get<tt::tt_metal::detail::ReadBufferDescriptor>(
auto read_descriptor = std::get<tt::tt_metal::ReadBufferDescriptor>(
*buffer_dispatch::generate_interleaved_buffer_read_descriptor(dst, dispatch_params, *shard_view));
buffer_dispatch::copy_completion_queue_data_into_user_space(
read_descriptor, mmio_device_id, channel, id_, device->sysmem_manager(), exit_condition);
Expand Down
1 change: 0 additions & 1 deletion tt_metal/distributed/mesh_workload_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include <host_api.hpp>
#include <command_queue.hpp>
#include <hardware_command_queue.hpp>

#include "tt_metal/impl/program/dispatch.hpp"

Expand Down
2 changes: 1 addition & 1 deletion tt_metal/impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ set(IMPL_SRC
${CMAKE_CURRENT_SOURCE_DIR}/program/program.cpp
${CMAKE_CURRENT_SOURCE_DIR}/program/dispatch.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/debug_tools.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/command_queue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/host_runtime_commands.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/hardware_command_queue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/launch_message_ring_buffer_state.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/worker_config_buffer.cpp
Expand Down
11 changes: 1 addition & 10 deletions tt_metal/impl/buffers/circular_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
#include <device.hpp>
#include <command_queue.hpp>

namespace {

inline void GetBufferAddress(const tt::tt_metal::Buffer* buffer, uint32_t* address_on_host) {
EnqueueGetBufferAddr(address_on_host, buffer, false);
}

} // namespace
namespace tt {

namespace tt_metal {
Expand Down Expand Up @@ -129,9 +122,7 @@ uint32_t CircularBuffer::address() const {
return this->globally_allocated() ? globally_allocated_address_ : locally_allocated_address_.value();
}

void CircularBuffer::assign_global_address() {
GetBufferAddress(config_.shadow_global_buffer, &globally_allocated_address_);
}
void CircularBuffer::assign_global_address() { globally_allocated_address_ = config_.shadow_global_buffer->address(); }

void CircularBuffer::set_global_circular_buffer(const v1::experimental::GlobalCircularBuffer& global_circular_buffer) {
TT_FATAL(
Expand Down
Loading

0 comments on commit a44e186

Please sign in to comment.