Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] [NPU] Using global command queue #28661

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Pipeline {

Pipeline(const Pipeline&) = delete;
Pipeline& operator=(const Pipeline&) = delete;
virtual ~Pipeline() = default;
~Pipeline();

void push();
void pull();
Expand All @@ -40,6 +40,9 @@ struct Pipeline {
void closeCommandListIndex(size_t command_list_index);

protected:
void getCommandQueue();

std::shared_ptr<ZeroInitStructsHolder> _init_structs;
std::shared_ptr<IGraph> _graph;
const Config _config;
const uint32_t _id;
Expand All @@ -59,9 +62,15 @@ struct Pipeline {
std::vector<std::unique_ptr<Fence>> _fences;
std::shared_ptr<EventPool> _event_pool;
std::vector<std::shared_ptr<Event>> _events;
bool sync_output_with_fences_ = true;
bool _sync_output_with_fences = true;
std::shared_ptr<zeroProfiling::NpuInferProfiling> _npu_profiling;
Logger _logger;

uint32_t _group_ordinal;
std::mutex _mutex;
bool _turbo = false;
ze_command_queue_priority_t _ze_queue_priority;
std::optional<ze_command_queue_workload_type_t> _ze_workload_type = std::nullopt;
};

} // namespace intel_npu
131 changes: 110 additions & 21 deletions src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,67 @@ Pipeline::Pipeline(const Config& config,
const std::vector<std::vector<std::shared_ptr<ov::ITensor>>>& input_tensors,
const std::vector<std::shared_ptr<ov::ITensor>>& output_tensors,
uint32_t group_ordinal)
: _graph(graph),
: _init_structs(init_structs),
_graph(graph),
_config(config),
_id(_graph->get_unique_id()),
_number_of_command_lists(_graph->get_batch_size().has_value() ? *_graph->get_batch_size() : 1),
_event_pool{
std::make_shared<EventPool>(init_structs->getDevice(),
init_structs->getContext(),
_number_of_command_lists ? static_cast<uint32_t>(_number_of_command_lists) : 1)},
_npu_profiling(npu_profiling),
_logger("Pipeline", _config.get<LOG_LEVEL>()) {
_logger("Pipeline", _config.get<LOG_LEVEL>()),
_group_ordinal(group_ordinal) {
OV_ITT_SCOPED_TASK(itt::domains::LevelZeroBackend, "Zero_infer_request::Pipeline::Pipeline");
_logger.debug("Pipeline - initialize started");

if (profiling_pool.create()) {
profiling_query.create(profiling_pool._handle);
}

OPENVINO_ASSERT(_sync_output_with_fences || !_config.get<RUN_INFERENCES_SEQUENTIALLY>(),
"In-order execution doesn't work in case synchronization of the inferences is done using events");

if (!_sync_output_with_fences || _config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
_event_pool =
std::make_shared<EventPool>(_init_structs->getDevice(),
_init_structs->getContext(),
_number_of_command_lists ? static_cast<uint32_t>(_number_of_command_lists) : 1);

_events.reserve(_number_of_command_lists);
for (size_t i = 0; i < _number_of_command_lists; i++) {
_events.emplace_back(std::make_shared<Event>(_event_pool, static_cast<uint32_t>(i)));
}
}

_command_lists.reserve(_number_of_command_lists);
_events.reserve(_number_of_command_lists);
_fences.reserve(_number_of_command_lists);
_logger.debug("Pipeline - emplace_back _event_pool and _command_queue");
for (size_t i = 0; i < _number_of_command_lists; i++) {
_command_lists.emplace_back(
std::make_unique<CommandList>(init_structs,
std::make_unique<CommandList>(_init_structs,
group_ordinal,
init_structs->getMutableCommandListVersion() ? true : false));
_events.emplace_back(std::make_shared<Event>(_event_pool, static_cast<uint32_t>(i)));
_fences.emplace_back(std::make_unique<Fence>(*_graph->get_command_queue()));
_init_structs->getMutableCommandListVersion() ? true : false));
}

_ze_queue_priority = zeroUtils::toZeQueuePriority(_config.get<MODEL_PRIORITY>());

if (_config.has<TURBO>()) {
_turbo = _config.get<TURBO>();
}

if (config.has<WORKLOAD_TYPE>()) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(config.get<WORKLOAD_TYPE>());
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);

if (_sync_output_with_fences) {
_fences.resize(_number_of_command_lists);

for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}

for (size_t i = 0; i < _number_of_command_lists; i++) {
Expand Down Expand Up @@ -138,7 +171,7 @@ Pipeline::Pipeline(const Config& config,
}

// appendBarrier used in L0 as well
if (!sync_output_with_fences_) {
if (!_sync_output_with_fences) {
_command_lists.at(i)->appendBarrier();
_events.at(i)->AppendSignalEvent(*_command_lists.at(i));
}
Expand All @@ -147,9 +180,49 @@ Pipeline::Pipeline(const Config& config,
_logger.debug("Pipeline - initialize completed");
}

void Pipeline::getCommandQueue() {
_logger.debug("Pipeline - getCommandQueue() started");

std::lock_guard<std::mutex> lock(_mutex);

if (_ze_workload_type != _graph->get_ze_workload_type()) {
// fences created for the old command queue shall be destroyed and make new ones
if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_logger.debug("Pipeline - getCommandQueue() - destroy old fence");
_fences[i].reset();
}
}
}

_command_queue = CommandQueueManager::getInstance().getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}

_logger.debug("Pipeline - getCommandQueue() - free previous command queue");
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);

_ze_workload_type = _graph->get_ze_workload_type();
}

_logger.debug("Pipeline - getCommandQueue() completed");
}

void Pipeline::push() {
_logger.debug("Pipeline - push() started");

getCommandQueue();

if (_config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
if (_id) {
auto previousIndex = _graph->get_last_submitted_id();
Expand All @@ -164,10 +237,10 @@ void Pipeline::push() {

for (size_t i = 0; i < _command_lists.size(); ++i) {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
if (sync_output_with_fences_) {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i), *_fences.at(i));
if (_sync_output_with_fences) {
_command_queue->executeCommandList(*_command_lists.at(i), *_fences.at(i));
} else {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i));
_command_queue->executeCommandList(*_command_lists.at(i));
}
}

Expand All @@ -179,7 +252,7 @@ void Pipeline::pull() {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PULL, itt::domains::LevelZeroBackend, "Pipeline", "pull");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->hostSynchronize();
} else {
_events.at(i)->hostSynchronize();
Expand All @@ -194,17 +267,17 @@ void Pipeline::pull() {
};

void Pipeline::reset() const {
_logger.debug("Pipeline - rest() started");
_logger.debug("Pipeline - reset() started");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->reset();
} else {
_events.at(i)->reset();
}
}

_logger.debug("Pipeline - rest() completed");
_logger.debug("Pipeline - reset() completed");
};

void Pipeline::updateCommandList(uint32_t arg_index, const void* arg_data, size_t byte_size) {
Expand Down Expand Up @@ -257,4 +330,20 @@ void Pipeline::closeCommandListIndex(size_t command_list_index) {
_command_lists.at(command_list_index)->close();
};

Pipeline::~Pipeline() {
if (_command_queue) {
if (_sync_output_with_fences) {
// fences shall be destroyed before the command queue is destroyed
for (size_t i = 0; i < _number_of_command_lists; i++) {
if (_fences[i] != nullptr) {
_fences[i].reset();
}
}
}

_command_queue.reset();
CommandQueueManager::getInstance().freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);
}
}

} // namespace intel_npu
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

const std::vector<ArgumentDescriptor>& get_input_descriptors() const;
const std::vector<ArgumentDescriptor>& get_output_descriptors() const;
const std::shared_ptr<CommandQueue>& get_command_queue() const;

void set_workload_type(const ov::WorkloadType workloadType) const;
void set_workload_type(const ov::WorkloadType workloadType);
const std::optional<ze_command_queue_workload_type_t> get_ze_workload_type() const;

std::mutex& get_mutex();

Expand Down Expand Up @@ -83,7 +83,6 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
std::vector<ArgumentDescriptor> _input_descriptors;
std::vector<ArgumentDescriptor> _output_descriptors;

std::shared_ptr<CommandQueue> _command_queue;
std::vector<std::shared_ptr<Event>> _last_submitted_event;

// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when the
Expand All @@ -101,6 +100,8 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*/
std::optional<std::size_t> _batch_size = std::nullopt;

std::optional<ze_command_queue_workload_type_t> _ze_workload_type = std::nullopt;

Logger _logger;
};

Expand Down
34 changes: 11 additions & 23 deletions src/plugins/intel_npu/src/common/src/igraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ IGraph::IGraph(ze_graph_handle_t handle,
: _handle(handle),
_metadata(std::move(metadata)),
_blobPtr(std::move(blobPtr)),
_logger("IGraph", config.get<LOG_LEVEL>()) {}
_logger("IGraph", config.get<LOG_LEVEL>()) {
if (config.has<WORKLOAD_TYPE>()) {
set_workload_type(config.get<WORKLOAD_TYPE>());
}
}

const NetworkMetadata& IGraph::get_metadata() const {
return _metadata;
Expand All @@ -43,28 +47,8 @@ const std::vector<ArgumentDescriptor>& IGraph::get_output_descriptors() const {
return _output_descriptors;
}

const std::shared_ptr<CommandQueue>& IGraph::get_command_queue() const {
return _command_queue;
}

void IGraph::set_workload_type(const ov::WorkloadType workloadType) const {
if (_command_queue == nullptr) {
return;
}

ze_command_queue_workload_type_t zeWorkloadType;
switch (workloadType) {
case ov::WorkloadType::DEFAULT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_DEFAULT;
break;
case ov::WorkloadType::EFFICIENT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_BACKGROUND;
break;
default:
OPENVINO_THROW("Unknown value for WorkloadType!");
}

_command_queue->setWorkloadType(zeWorkloadType);
void IGraph::set_workload_type(const ov::WorkloadType workloadType) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(workloadType);
}

std::mutex& IGraph::get_mutex() {
Expand Down Expand Up @@ -153,4 +137,8 @@ const std::optional<std::size_t> IGraph::get_batch_size() const {
return _batch_size;
}

const std::optional<ze_command_queue_workload_type_t> IGraph::get_ze_workload_type() const {
return _ze_workload_type;
}

} // namespace intel_npu
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ZeGraphExtWrappers {

void setGraphArgumentValue(ze_graph_handle_t graphHandle, uint32_t argi_, const void* argv) const;

void initializeGraph(ze_graph_handle_t graphHandle, const Config& config) const;
void initializeGraph(ze_graph_handle_t graphHandle) const;

private:
std::unordered_set<std::string> getQueryResultFromSupportedLayers(
Expand All @@ -60,7 +60,7 @@ class ZeGraphExtWrappers {
std::vector<IODescriptor>& inputs,
std::vector<IODescriptor>& outputs) const;

void initialize_graph_through_command_list(ze_graph_handle_t graphHandle, const Config& config) const;
void initialize_graph_through_command_list(ze_graph_handle_t graphHandle) const;

std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;
uint32_t _graphExtVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,8 @@ void DriverGraph::initialize(const Config& config) {
deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES;
THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties",
zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties));
auto groupOrdinal = zeroUtils::findGroupOrdinal(_zeroInitStruct->getDevice(), deviceProperties);

bool turbo = false;
if (config.has<TURBO>()) {
turbo = config.get<TURBO>();
}

_command_queue = std::make_shared<CommandQueue>(_zeroInitStruct,
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
groupOrdinal,
turbo);

if (config.has<WORKLOAD_TYPE>()) {
set_workload_type(config.get<WORKLOAD_TYPE>());
}

_zeGraphExt->initializeGraph(_handle, config);
_zeGraphExt->initializeGraph(_handle);

_logger.debug("Graph initialize finish");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,8 @@ void PluginGraph::initialize(const Config& config) {
deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES;
THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties",
zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties));
auto groupOrdinal = zeroUtils::findGroupOrdinal(_zeroInitStruct->getDevice(), deviceProperties);

bool turbo = false;
if (config.has<TURBO>()) {
turbo = config.get<TURBO>();
}

_command_queue = std::make_shared<CommandQueue>(_zeroInitStruct,
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
groupOrdinal,
turbo);

if (config.has<WORKLOAD_TYPE>()) {
set_workload_type(config.get<WORKLOAD_TYPE>());
}

_zeGraphExt->initializeGraph(_handle, config);
_zeGraphExt->initializeGraph(_handle);

if (config.get<BATCH_MODE>() != ov::intel_npu::BatchMode::COMPILER) {
_batch_size = get_batch_size(_metadata);
Expand Down
Loading
Loading