Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract CommandQueue interface #17352

Merged
merged 6 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 63 additions & 182 deletions tt_metal/api/tt-metalium/command_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,217 +1,98 @@
// SPDX-FileCopyrightText: © 2023 Tenstorrent Inc.
// SPDX-FileCopyrightText: © 2025 Tenstorrent Inc.
//
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <cstdint>
#include <memory>
#include <span>
#include <thread>
#include <utility>
#include <vector>

#include "env_lib.hpp"
#include "command_queue_interface.hpp"
#include "device_command.hpp"
#include "lock_free_queue.hpp"
#include "program_command_sequence.hpp"
#include "worker_config_buffer.hpp"
#include "program_impl.hpp"
#include "trace_buffer.hpp"
#include "hardware_command_queue.hpp"
#include "memcpy.hpp"
#include "command_queue_interface.hpp"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not an interface really. need to find a better name, likely need to split. definitely need to move some things to cpp.
contains SystemMemoryManager, some "deprecated" singleton and a bunch of other things. Very big include.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, definitely needs a split. Since now there is an actual "cq interface", maybe rename to "cq utils"?

namespace tt::tt_metal {
inline namespace v0 {

class BufferRegion;
inline namespace v0 {
class Event;
class Trace;
using RuntimeArgs = std::vector<std::variant<Buffer*, uint32_t>>;

class Program;
class Kernel;
} // namespace v0

// Only contains the types of commands which are enqueued onto the device
enum class EnqueueCommandType {
ENQUEUE_READ_BUFFER,
ENQUEUE_WRITE_BUFFER,
GET_BUF_ADDR,
ADD_BUFFER_TO_PROGRAM,
SET_RUNTIME_ARGS,
ENQUEUE_PROGRAM,
ENQUEUE_TRACE,
ENQUEUE_RECORD_EVENT,
ENQUEUE_WAIT_FOR_EVENT,
FINISH,
FLUSH,
TERMINATE,
INVALID
};

string EnqueueCommandTypeToString(EnqueueCommandType ctype);
class CommandQueue {
public:
ayerofieiev-tt marked this conversation as resolved.
Show resolved Hide resolved
virtual ~CommandQueue() = default;

class Command {
public:
Command() {}
virtual void process() {};
virtual EnqueueCommandType type() = 0;
};
virtual const CoreCoord& virtual_enqueue_program_dispatch_core() const = 0;
virtual const CoreCoord& completion_queue_writer_core() const = 0;

class EnqueueProgramCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
NOC noc_index;
Program& program;
SystemMemoryManager& manager;
WorkerConfigBufferMgr& config_buffer_mgr;
CoreCoord dispatch_core;
CoreType dispatch_core_type;
uint32_t expected_num_workers_completed;
uint32_t packed_write_max_unicast_sub_cmds;
uint32_t dispatch_message_addr;
uint32_t multicast_cores_launch_message_wptr = 0;
uint32_t unicast_cores_launch_message_wptr = 0;
// TODO: There will be multiple ids once programs support spanning multiple sub_devices
SubDeviceId sub_device_id = SubDeviceId{0};

public:
EnqueueProgramCommand(
uint32_t command_queue_id,
IDevice* device,
NOC noc_index,
Program& program,
CoreCoord& dispatch_core,
SystemMemoryManager& manager,
WorkerConfigBufferMgr& config_buffer_mgr,
uint32_t expected_num_workers_completed,
uint32_t multicast_cores_launch_message_wptr,
uint32_t unicast_cores_launch_message_wptr,
SubDeviceId sub_device_id);

void process();

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_PROGRAM; }

constexpr bool has_side_effects() { return true; }
};
virtual volatile bool is_dprint_server_hung() = 0;
virtual volatile bool is_noc_hung() = 0;

class EnqueueRecordEventCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
NOC noc_index;
SystemMemoryManager& manager;
uint32_t event_id;
tt::stl::Span<const uint32_t> expected_num_workers_completed;
tt::stl::Span<const SubDeviceId> sub_device_ids;
bool clear_count;
bool write_barrier;

public:
EnqueueRecordEventCommand(
uint32_t command_queue_id,
IDevice* device,
NOC noc_index,
SystemMemoryManager& manager,
uint32_t event_id,
tt::stl::Span<const uint32_t> expected_num_workers_completed,
tt::stl::Span<const SubDeviceId> sub_device_ids,
bool clear_count = false,
bool write_barrier = true);
virtual void record_begin(const uint32_t tid, const std::shared_ptr<TraceDescriptor>& ctx) = 0;
virtual void record_end() = 0;
virtual void set_num_worker_sems_on_dispatch(uint32_t num_worker_sems) = 0;
virtual void set_go_signal_noc_data_on_dispatch(const vector_memcpy_aligned<uint32_t>& go_signal_noc_data) = 0;

void process();
virtual void reset_worker_state(
bool reset_launch_msg_state,
uint32_t num_sub_devices,
const vector_memcpy_aligned<uint32_t>& go_signal_noc_data) = 0;

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_RECORD_EVENT; }
virtual uint32_t id() const = 0;
virtual std::optional<uint32_t> tid() const = 0;

constexpr bool has_side_effects() { return false; }
};
virtual SystemMemoryManager& sysmem_manager() = 0;

class EnqueueWaitForEventCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
SystemMemoryManager& manager;
const Event& sync_event;
CoreType dispatch_core_type;
bool clear_count;
virtual void terminate() = 0;

public:
EnqueueWaitForEventCommand(
uint32_t command_queue_id,
IDevice* device,
SystemMemoryManager& manager,
const Event& sync_event,
bool clear_count = false);
virtual IDevice* device() = 0;

void process();
// These functions are temporarily needed since MeshCommandQueue relies on the CommandQueue object
virtual uint32_t get_expected_num_workers_completed_for_sub_device(uint32_t sub_device_index) const = 0;
virtual void set_expected_num_workers_completed_for_sub_device(uint32_t sub_device_index, uint32_t num_workers) = 0;
virtual WorkerConfigBufferMgr& get_config_buffer_mgr(uint32_t index) = 0;

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_WAIT_FOR_EVENT; }
virtual void enqueue_trace(const uint32_t trace_id, bool blocking) = 0;

constexpr bool has_side_effects() { return false; }
};
virtual void enqueue_program(Program& program, bool blocking) = 0;

class EnqueueTraceCommand : public Command {
private:
uint32_t command_queue_id;
Buffer& buffer;
IDevice* device;
SystemMemoryManager& manager;
std::shared_ptr<TraceDescriptor>& descriptor;
std::array<uint32_t, dispatch_constants::DISPATCH_MESSAGE_ENTRIES>& expected_num_workers_completed;
bool clear_count;
NOC noc_index;
CoreCoord dispatch_core;
public:
EnqueueTraceCommand(
uint32_t command_queue_id,
IDevice* device,
SystemMemoryManager& manager,
std::shared_ptr<TraceDescriptor>& descriptor,
virtual void enqueue_read_buffer(
std::shared_ptr<Buffer>& buffer,
void* dst,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;
virtual void enqueue_read_buffer(
Buffer& buffer,
std::array<uint32_t, dispatch_constants::DISPATCH_MESSAGE_ENTRIES>& expected_num_workers_completed,
NOC noc_index,
CoreCoord dispatch_core);
void* dst,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;

void process();

EnqueueCommandType type() { return EnqueueCommandType::ENQUEUE_TRACE; }

constexpr bool has_side_effects() { return true; }
};

class EnqueueTerminateCommand : public Command {
private:
uint32_t command_queue_id;
IDevice* device;
SystemMemoryManager& manager;

public:
EnqueueTerminateCommand(uint32_t command_queue_id, IDevice* device, SystemMemoryManager& manager);

void process();

EnqueueCommandType type() { return EnqueueCommandType::TERMINATE; }
virtual void enqueue_record_event(
const std::shared_ptr<Event>& event,
bool clear_count = false,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;
virtual void enqueue_wait_for_event(const std::shared_ptr<Event>& sync_event, bool clear_count = false) = 0;

virtual void enqueue_write_buffer(
const std::variant<std::reference_wrapper<Buffer>, std::shared_ptr<Buffer>>& buffer,
HostDataType src,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;
virtual void enqueue_write_buffer(
Buffer& buffer,
const void* src,
const BufferRegion& region,
bool blocking,
tt::stl::Span<const SubDeviceId> sub_device_ids = {}) = 0;

constexpr bool has_side_effects() { return false; }
virtual void finish(tt::stl::Span<const SubDeviceId> sub_device_ids) = 0;
};

// Primitives used to place host only operations on the SW Command Queue.
// These are used in functions exposed through tt_metal.hpp or host_api.hpp
void EnqueueGetBufferAddr(uint32_t* dst_buf_addr, const Buffer* buffer, bool blocking);
void EnqueueSetRuntimeArgs(
const std::shared_ptr<Kernel>& kernel,
const CoreCoord& core_coord,
const std::shared_ptr<RuntimeArgs>& runtime_args_ptr,
bool blocking);
void EnqueueAddBufferToProgram(
const std::variant<std::reference_wrapper<Buffer>, std::shared_ptr<Buffer>>& buffer,
Program& program,
bool blocking);

} // namespace tt::tt_metal

std::ostream& operator<<(std::ostream& os, const tt::tt_metal::EnqueueCommandType& type);
1 change: 1 addition & 0 deletions tt_metal/api/tt-metalium/command_queue_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "hal.hpp"
#include "dispatch_settings.hpp"
#include "helpers.hpp"
#include "buffer.hpp"

// FIXME: Don't do this in header files
using namespace tt::tt_metal;
Expand Down
1 change: 0 additions & 1 deletion tt_metal/api/tt-metalium/device_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "hal.hpp"
#include "command_queue_interface.hpp"
#include "command_queue.hpp"
#include "hardware_command_queue.hpp"
#include "sub_device_manager_tracker.hpp"
#include "sub_device_types.hpp"
#include "trace_buffer.hpp"
Expand Down
7 changes: 5 additions & 2 deletions tt_metal/api/tt-metalium/program_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ namespace distributed {
class MeshWorkload;
} // namespace distributed

class JitBuildOptions;
class EnqueueProgramCommand;
class CommandQueue;
class JitBuildOptions;
// Must be removed. Only here because its a friend of a Program
class HWCommandQueue;

namespace detail{
class Program_;

Expand Down Expand Up @@ -232,7 +235,7 @@ class Program {
template<typename T> friend void program_dispatch::finalize_program_offsets(T&, IDevice*);
template <typename WorkloadType, typename DeviceType>
friend uint32_t program_dispatch::program_base_addr_on_core(WorkloadType&, DeviceType, HalProgrammableCoreType);
friend CommandQueue;
friend HWCommandQueue;
friend EnqueueProgramCommand;
friend distributed::MeshWorkload;
friend detail::Internal_;
Expand Down
4 changes: 2 additions & 2 deletions tt_metal/distributed/mesh_command_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void MeshCommandQueue::read_shard_from_device(
buffer_dispatch::copy_sharded_buffer_from_core_to_completion_queue(
core_id, *shard_view, dispatch_params, sub_device_ids, cores[core_id], this->dispatch_core_type());
if (dispatch_params.pages_per_txn > 0) {
auto read_descriptor = std::get<tt::tt_metal::detail::ReadBufferDescriptor>(
auto read_descriptor = std::get<tt::tt_metal::ReadBufferDescriptor>(
*buffer_dispatch::generate_sharded_buffer_read_descriptor(dst, dispatch_params, *shard_view));
buffer_dispatch::copy_completion_queue_data_into_user_space(
read_descriptor, mmio_device_id, channel, id_, device->sysmem_manager(), exit_condition);
Expand All @@ -222,7 +222,7 @@ void MeshCommandQueue::read_shard_from_device(
buffer_dispatch::copy_interleaved_buffer_to_completion_queue(
dispatch_params, *shard_view, sub_device_ids, this->dispatch_core_type());
if (dispatch_params.pages_per_txn > 0) {
auto read_descriptor = std::get<tt::tt_metal::detail::ReadBufferDescriptor>(
auto read_descriptor = std::get<tt::tt_metal::ReadBufferDescriptor>(
*buffer_dispatch::generate_interleaved_buffer_read_descriptor(dst, dispatch_params, *shard_view));
buffer_dispatch::copy_completion_queue_data_into_user_space(
read_descriptor, mmio_device_id, channel, id_, device->sysmem_manager(), exit_condition);
Expand Down
1 change: 0 additions & 1 deletion tt_metal/distributed/mesh_workload_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include <host_api.hpp>
#include <command_queue.hpp>
#include <hardware_command_queue.hpp>

#include "tt_metal/impl/program/dispatch.hpp"

Expand Down
2 changes: 1 addition & 1 deletion tt_metal/impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ set(IMPL_SRC
${CMAKE_CURRENT_SOURCE_DIR}/program/program.cpp
${CMAKE_CURRENT_SOURCE_DIR}/program/dispatch.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/debug_tools.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/command_queue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/host_runtime_commands.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/hardware_command_queue.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/launch_message_ring_buffer_state.cpp
${CMAKE_CURRENT_SOURCE_DIR}/dispatch/worker_config_buffer.cpp
Expand Down
11 changes: 1 addition & 10 deletions tt_metal/impl/buffers/circular_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
#include <device.hpp>
#include <command_queue.hpp>

namespace {

inline void GetBufferAddress(const tt::tt_metal::Buffer* buffer, uint32_t* address_on_host) {
EnqueueGetBufferAddr(address_on_host, buffer, false);
}

} // namespace
namespace tt {

namespace tt_metal {
Expand Down Expand Up @@ -129,9 +122,7 @@ uint32_t CircularBuffer::address() const {
return this->globally_allocated() ? globally_allocated_address_ : locally_allocated_address_.value();
}

void CircularBuffer::assign_global_address() {
GetBufferAddress(config_.shadow_global_buffer, &globally_allocated_address_);
}
void CircularBuffer::assign_global_address() { globally_allocated_address_ = config_.shadow_global_buffer->address(); }

void CircularBuffer::set_global_circular_buffer(const v1::experimental::GlobalCircularBuffer& global_circular_buffer) {
TT_FATAL(
Expand Down
Loading
Loading