From fb67a796a03ee04a288262f1a51606a0f70fedb0 Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Mon, 9 Dec 2024 11:25:25 +0800 Subject: [PATCH] Change RMF transporter to be a workcell instead Signed-off-by: Luca Della Vedova --- .../config/system_bts/main.xml | 26 +-- .../config/system_bts/pick_and_place.xml | 23 +-- .../launch/control_center.launch.py | 9 + nexus_integration_tests/launch/launch.py | 2 +- nexus_system_orchestrator/CMakeLists.txt | 3 +- .../src/dispatch_transporter.cpp | 173 ++++++++++++++++++ .../src/dispatch_transporter.hpp | 91 +++++++++ .../src/execute_task_pair.cpp | 104 ----------- .../src/execute_task_pair.hpp | 79 -------- .../src/for_each_task_pair.cpp | 109 ----------- .../src/for_each_task_pair.hpp | 72 -------- nexus_system_orchestrator/src/signals.cpp | 94 ++-------- nexus_system_orchestrator/src/signals.hpp | 5 +- .../src/system_orchestrator.cpp | 26 +-- nexus_zenoh_bridge_dds_vendor/CMakeLists.txt | 2 +- 15 files changed, 319 insertions(+), 499 deletions(-) create mode 100644 nexus_system_orchestrator/src/dispatch_transporter.cpp create mode 100644 nexus_system_orchestrator/src/dispatch_transporter.hpp delete mode 100644 nexus_system_orchestrator/src/execute_task_pair.cpp delete mode 100644 nexus_system_orchestrator/src/execute_task_pair.hpp delete mode 100644 nexus_system_orchestrator/src/for_each_task_pair.cpp delete mode 100644 nexus_system_orchestrator/src/for_each_task_pair.hpp diff --git a/nexus_integration_tests/config/system_bts/main.xml b/nexus_integration_tests/config/system_bts/main.xml index a2a45dc..e7311cb 100644 --- a/nexus_integration_tests/config/system_bts/main.xml +++ b/nexus_integration_tests/config/system_bts/main.xml @@ -2,24 +2,18 @@ - - + + + - - - - - + + - - - - - - - - - + + + + + diff --git a/nexus_integration_tests/config/system_bts/pick_and_place.xml b/nexus_integration_tests/config/system_bts/pick_and_place.xml index 6f429a2..b36f4a2 100644 --- a/nexus_integration_tests/config/system_bts/pick_and_place.xml +++ b/nexus_integration_tests/config/system_bts/pick_and_place.xml @@ -2,23 +2,12 @@ - - - - - - - - - - - - - - - - - + + + + + + diff --git a/nexus_integration_tests/launch/control_center.launch.py b/nexus_integration_tests/launch/control_center.launch.py index c92b133..63170ac 100644 --- a/nexus_integration_tests/launch/control_center.launch.py +++ b/nexus_integration_tests/launch/control_center.launch.py @@ -117,6 +117,14 @@ def launch_setup(context, *args, **kwargs): get_package_share_directory("nexus_integration_tests"), "rviz", "nexus_panel.rviz" ) + rmf_transporter_node = LifecycleNode( + name="rmf_nexus_transporter", + namespace="", + package="rmf_nexus_transporter", + executable="rmf_nexus_transporter", + parameters=[], + ) + system_orchestrator_node = LifecycleNode( name="system_orchestrator", namespace="", @@ -169,6 +177,7 @@ def launch_setup(context, *args, **kwargs): return [ SetEnvironmentVariable("ROS_DOMAIN_ID", ros_domain_id), system_orchestrator_node, + # rmf_transporter_node, transporter_node, mock_emergency_alarm_node, nexus_panel, diff --git a/nexus_integration_tests/launch/launch.py b/nexus_integration_tests/launch/launch.py index aae4a85..b182dd2 100644 --- a/nexus_integration_tests/launch/launch.py +++ b/nexus_integration_tests/launch/launch.py @@ -97,7 +97,7 @@ def launch_setup(context, *args, **kwargs): "ros_domain_id": str(control_center_domain_id), "zenoh_config_package": "nexus_integration_tests", "zenoh_config_filename": "config/zenoh/system_orchestrator.json5", - "transporter_plugin": "rmf_nexus_transporter::RmfNexusTransporter", + "transporter_plugin": "nexus_transporter::MockTransporter", "activate_system_orchestrator": headless, "headless": headless, }.items(), diff --git a/nexus_system_orchestrator/CMakeLists.txt b/nexus_system_orchestrator/CMakeLists.txt index 1de86f1..6cfe7cd 100644 --- a/nexus_system_orchestrator/CMakeLists.txt +++ b/nexus_system_orchestrator/CMakeLists.txt @@ -19,10 +19,9 @@ set(CMAKE_CXX_FLAGS "-Wall -Wpedantic") add_library(${PROJECT_NAME}_plugin SHARED src/bid_transporter.cpp + src/dispatch_transporter.cpp src/execute_task.cpp - src/execute_task_pair.cpp src/for_each_task.cpp - src/for_each_task_pair.cpp src/signals.cpp src/system_orchestrator.cpp src/transporter_request.cpp diff --git a/nexus_system_orchestrator/src/dispatch_transporter.cpp b/nexus_system_orchestrator/src/dispatch_transporter.cpp new file mode 100644 index 0000000..7234cfc --- /dev/null +++ b/nexus_system_orchestrator/src/dispatch_transporter.cpp @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2022 Johnson & Johnson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "dispatch_transporter.hpp" + +namespace nexus::system_orchestrator { + +BT::PortsList DispatchTransporter::providedPorts() +{ + return {BT::OutputPort("result"), BT::OutputPort("transport_task") }; +} + +BT::NodeStatus DispatchTransporter::onStart() +{ + auto node = this->_w_node.lock(); + if (!node) + { + std::cerr << + "FATAL ERROR!!! NODE IS DESTROYED WHILE THERE ARE STILL REFERENCES" << + std::endl; + std::terminate(); + } + + // Create a list of destination from the current context tasks + const auto& task_assignments = this->_ctx->workcell_task_assignments; + YAML::Node orders; + std::vector locations; + + for (const auto& task : this->_ctx->tasks) + { + auto assignment_it = task_assignments.find(task.id); + if (assignment_it == task_assignments.end()) + { + RCLCPP_ERROR( + node->get_logger(), "%s: Unable to transport, task [%s] was not assigned to a workcell", + this->name().c_str(), task.id.c_str()); + return BT::NodeStatus::FAILURE; + } + // Multipickup task + // TODO(luca) Consider encoding where is a pickup and where a dropoff + YAML::Node order; + order["type"] = "pickup"; + order["destination"] = assignment_it->second; + orders.push_back(order); + } + this->_transport_task.id = this->_ctx->job_id; + this->_transport_task.type = "transportation"; + YAML::Emitter out; + out << orders; + this->_transport_task.payload = out.c_str(); + + // Probe workcells for transportation capability to this set of destinations + auto req = + std::make_shared(); + req->task = this->_transport_task; + // send request to all transporters in parallel + for (auto& [workcell_id, session] : this->_ctx->workcell_sessions) + { + auto fut = session->task_doable_client->async_send_request(req); + this->_ongoing_requests.emplace(workcell_id, + OngoingRequest{session->task_doable_client, std::move(fut)}); + } + this->_time_limit = std::chrono::steady_clock::now() + + std::chrono::seconds{5}; + + return BT::NodeStatus::RUNNING; +} + +BT::NodeStatus DispatchTransporter::onRunning() +{ + auto result = this->_update_ongoing_requests(); + if (result != BT::NodeStatus::RUNNING) + { + // action is done, drop requests that have not received response to avoid callback leak. + this->_cleanup_pending_requests(); + } + return result; +} + +void DispatchTransporter::onHalted() +{ + // can't cancel a service call, so we just drop the requests. + this->_cleanup_pending_requests(); +} + +void DispatchTransporter::_cleanup_pending_requests() +{ + for (auto& [_, ongoing_req] : this->_ongoing_requests) + { + ongoing_req.client->remove_pending_request(ongoing_req.fut); + } +} + +BT::NodeStatus DispatchTransporter::_update_ongoing_requests() +{ + auto node = this->_w_node.lock(); + if (!node) + { + std::cerr << + "FATAL ERROR!!! NODE IS DESTROYED WHILE THERE ARE STILL REFERENCES" << + std::endl; + std::terminate(); + } + + if (std::chrono::steady_clock::now() > this->_time_limit) + { + for (const auto& [workcell_id, _] : this->_ongoing_requests) + { + RCLCPP_WARN( + node->get_logger(), "%s: Skipped transporter [%s] (no response)", + this->name().c_str(), + workcell_id.c_str()); + } + RCLCPP_ERROR( + node->get_logger(), "%s: No transporter is able to perform task", + this->name().c_str()); + return BT::NodeStatus::FAILURE; + } + + std::vector finished_requests; + for (auto& [workcell_id, ongoing_req] : this->_ongoing_requests) + { + if (ongoing_req.fut.wait_for(std::chrono::seconds{0}) == + std::future_status::ready) + { + finished_requests.emplace_back(workcell_id); + auto resp = ongoing_req.fut.get(); + // TODO(luca) make this proper bidding + // use first available transporter + if (resp->success) + { + RCLCPP_INFO( + node->get_logger(), "[%s]: Bid awarded to [%s]", + this->name().c_str(), + workcell_id.c_str()); + this->setOutput("result", workcell_id); + this->setOutput("transport_task", this->_transport_task); + return BT::NodeStatus::SUCCESS; + } + else + { + RCLCPP_DEBUG( + node->get_logger(), "%s: Transporter [%s] cannot perform task", + this->name().c_str(), workcell_id.c_str()); + } + } + } + + for (const auto& workcell_id : finished_requests) + { + this->_ongoing_requests.erase(workcell_id); + } + + return BT::NodeStatus::RUNNING; +} + +} diff --git a/nexus_system_orchestrator/src/dispatch_transporter.hpp b/nexus_system_orchestrator/src/dispatch_transporter.hpp new file mode 100644 index 0000000..a7d1f32 --- /dev/null +++ b/nexus_system_orchestrator/src/dispatch_transporter.hpp @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2022 Johnson & Johnson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef NEXUS_SYSTEM_ORCHESTRATOR__DISPATCH_TRANSPORTER_HPP +#define NEXUS_SYSTEM_ORCHESTRATOR__DISPATCH_TRANSPORTER_HPP + +#include "context.hpp" +#include "session.hpp" + +#include +#include + +#include + +#include +#include +#include + +#include + +#include +#include +#include + +namespace nexus::system_orchestrator { + +/** + * Iterates over all the tasks and sends a request to the transporter to move + * the item inbetween workcells. + * + * Input Ports: + * destination |std::string| Destination to transport to. + * Output Ports: + * result |std::string| Id of the transporter assigned. + */ +class DispatchTransporter : public BT::StatefulActionNode +{ +public: using IsTaskDoableService = + endpoints::IsTaskDoableService; + +public: using WorkcellTask = + nexus_orchestrator_msgs::msg::WorkcellTask; + +public: static BT::PortsList providedPorts(); + +public: DispatchTransporter(const std::string& name, + const BT::NodeConfiguration& config, + rclcpp_lifecycle::LifecycleNode::WeakPtr w_node, + std::shared_ptr ctx) + : BT::StatefulActionNode{name, config}, _w_node{w_node}, _ctx{std::move(ctx)} + { + } + +public: BT::NodeStatus onStart() override; +public: BT::NodeStatus onRunning() override; +public: void onHalted() override; + +private: struct OngoingRequest + { + rclcpp::Client::SharedPtr client; + rclcpp::Client:: + FutureAndRequestId fut; + }; + +private: rclcpp_lifecycle::LifecycleNode::WeakPtr _w_node; +private: std::shared_ptr _ctx; +private: std::unordered_map _ongoing_requests; +private: std::chrono::steady_clock::time_point _time_limit; +private: WorkcellTask _transport_task; + +private: void _cleanup_pending_requests(); +private: BT::NodeStatus _update_ongoing_requests(); +}; + +} + +#endif diff --git a/nexus_system_orchestrator/src/execute_task_pair.cpp b/nexus_system_orchestrator/src/execute_task_pair.cpp deleted file mode 100644 index 52bafe8..0000000 --- a/nexus_system_orchestrator/src/execute_task_pair.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (C) 2023 Johnson & Johnson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "execute_task_pair.hpp" - -namespace nexus::system_orchestrator { - -BT::NodeStatus ExecuteTaskPair::onStart() -{ - auto place_task = this->getInput("place_task"); - if (!place_task) - { - RCLCPP_ERROR( - this->_ctx->node.get_logger(), "%s: [place_task] port is required", - this->name().c_str()); - return BT::NodeStatus::FAILURE; - } - - auto pick_task = this->getInput("pick_task"); - if (!pick_task) - { - RCLCPP_ERROR( - this->_ctx->node.get_logger(), "%s: [pick_task] port is required", - this->name().c_str()); - return BT::NodeStatus::FAILURE; - } - - auto source = this->getInput("source"); - if (!source) - { - RCLCPP_ERROR( - this->_ctx->node.get_logger(), "%s: [source] port is required", - this->name().c_str()); - return BT::NodeStatus::FAILURE; - } - - auto destination = this->getInput("destination"); - if (!destination) - { - RCLCPP_ERROR( - this->_ctx->node.get_logger(), "%s: [destination] port is required", - this->name().c_str()); - return BT::NodeStatus::FAILURE; - } - - // Remap the BT filename to load if one is provided. - // TODO(luca) look into task remapping - std::string bt_name = place_task->type; - auto it = _ctx->task_remaps->find(place_task->type); - if (it != _ctx->task_remaps->end()) - { - RCLCPP_DEBUG( - _ctx->node.get_logger(), - "[ExecuteTaskPair] Loading remapped BT [%s] for original task type [%s]", - it->second.c_str(), - place_task->type.c_str() - ); - bt_name = it->second; - } - std::filesystem::path task_bt_path(this->_bt_path / (bt_name + ".xml")); - if (!std::filesystem::is_regular_file(task_bt_path)) - { - RCLCPP_ERROR( - this->_ctx->node.get_logger(), - "%s: no behavior tree to execute task type [%s]", - this->name().c_str(), place_task->type.c_str()); - return BT::NodeStatus::FAILURE; - } - - this->_bt = std::make_unique(this->_bt_factory->createTreeFromFile( - task_bt_path)); - this->_bt->rootBlackboard()->set("place_task", *place_task); - this->_bt->rootBlackboard()->set("pick_task", *pick_task); - this->_bt->rootBlackboard()->set("source", *source); - this->_bt->rootBlackboard()->set("destination", *destination); - - return BT::NodeStatus::RUNNING; -} - -BT::NodeStatus ExecuteTaskPair::onRunning() -{ - return this->_bt->tickRoot(); -} - -void ExecuteTaskPair::onHalted() -{ - this->_bt->haltTree(); -} - -} diff --git a/nexus_system_orchestrator/src/execute_task_pair.hpp b/nexus_system_orchestrator/src/execute_task_pair.hpp deleted file mode 100644 index f3572d6..0000000 --- a/nexus_system_orchestrator/src/execute_task_pair.hpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (C) 2023 Johnson & Johnson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef NEXUS_SYSTEM_ORCHESTRATOR__EXECUTE_TASK_PAIR_HPP -#define NEXUS_SYSTEM_ORCHESTRATOR__EXECUTE_TASK_PAIR_HPP - -#include "context.hpp" - -#include - -#include -#include - -#include - -namespace nexus::system_orchestrator { - -/** - * Searches for a behavior tree to execute a task and run it as a sub tree. - * - * The behavior tree used is based on the task type after remapping and the - * filename of the behavior tree without extension. - * - * Input Ports: - * task |nexus_orchestrator_msgs::msg::WorkcellTask| The task to execute. - * workcell |std::string| Workcell to execute on. - */ -class ExecuteTaskPair : public BT::StatefulActionNode -{ -public: using Task = nexus_orchestrator_msgs::msg::WorkcellTask; - -public: static BT::PortsList providedPorts() - { - return { BT::InputPort("place_task"), - BT::InputPort("pick_task"), - BT::InputPort("source"), - BT::InputPort("destination") - }; - } - -public: ExecuteTaskPair(const std::string& name, - const BT::NodeConfiguration& config, - std::shared_ptr ctx, - std::filesystem::path bt_path, - std::shared_ptr bt_factory) - : BT::StatefulActionNode(name, config), _ctx(std::move(ctx)), - _bt_path(std::move( - bt_path)), - _bt_factory(std::move(bt_factory)) {} - -protected: BT::NodeStatus onStart() override; - -protected: BT::NodeStatus onRunning() override; - -protected: void onHalted() override; - -private: std::shared_ptr _ctx; -private: std::filesystem::path _bt_path; -private: std::shared_ptr _bt_factory; -private: std::unique_ptr _bt; -}; - -} - -#endif diff --git a/nexus_system_orchestrator/src/for_each_task_pair.cpp b/nexus_system_orchestrator/src/for_each_task_pair.cpp deleted file mode 100644 index e34904e..0000000 --- a/nexus_system_orchestrator/src/for_each_task_pair.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (C) 2022 Johnson & Johnson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "for_each_task_pair.hpp" - -#include - -namespace nexus::system_orchestrator { - -using Task = nexus_orchestrator_msgs::msg::WorkcellTask; - -BT::PortsList ForEachTaskPair::providedPorts() -{ - return { - BT::OutputPort("place_task"), - BT::OutputPort("pick_task"), - BT::OutputPort("source"), - BT::OutputPort("destination") - }; -} - -BT::NodeStatus ForEachTaskPair::tick() -{ - if (this->_first_tick) - { - this->_first_tick = false; - RCLCPP_DEBUG(this->_logger, "Looping through %lu tasks", - this->_ctx->tasks.size()); - } - if (this->_ctx->tasks.size() == 0) - { - return BT::NodeStatus::SUCCESS; - } - if (this->_ctx->tasks.size() < 2) - { - RCLCPP_ERROR(this->_logger, "ForEachTaskPair node requires at least 2 nodes"); - return BT::NodeStatus::FAILURE; - } - - this->setStatus(BT::NodeStatus::RUNNING); - - // Iterate pairwise, a transporter will be requested to go from source to destination - while (this->_current_idx < this->_ctx->tasks.size() - 1) - { - auto place_task = this->_ctx->tasks.at(this->_current_idx); - auto pick_task = this->_ctx->tasks.at(this->_current_idx + 1); - this->setOutput("place_task", place_task); - this->setOutput("pick_task", pick_task); - try - { - this->setOutput("source", - this->_ctx->workcell_task_assignments.at(place_task.id)); - } - catch (const std::out_of_range&) - { - RCLCPP_ERROR(this->_logger, "task [%s] not assigned to any workcell", - place_task.id.c_str()); - return BT::NodeStatus::FAILURE; - } - try - { - this->setOutput("destination", - this->_ctx->workcell_task_assignments.at(pick_task.id)); - } - catch (const std::out_of_range&) - { - RCLCPP_ERROR(this->_logger, "task [%s] not assigned to any workcell", - pick_task.id.c_str()); - return BT::NodeStatus::FAILURE; - } - - this->setStatus(BT::NodeStatus::RUNNING); - auto child_state = this->child_node_->executeTick(); - switch (child_state) - { - case BT::NodeStatus::SUCCESS: - this->haltChild(); - this->_current_idx += 2; - break; - case BT::NodeStatus::RUNNING: - return BT::NodeStatus::RUNNING; - case BT::NodeStatus::FAILURE: - this->haltChild(); - this->_current_idx = 0; - return BT::NodeStatus::FAILURE; - case BT::NodeStatus::IDLE: - this->_current_idx = 0; - throw BT::LogicError("A child node must never return IDLE"); - } - } - - return BT::NodeStatus::SUCCESS; -} - -} diff --git a/nexus_system_orchestrator/src/for_each_task_pair.hpp b/nexus_system_orchestrator/src/for_each_task_pair.hpp deleted file mode 100644 index 6a60f20..0000000 --- a/nexus_system_orchestrator/src/for_each_task_pair.hpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (C) 2022 Johnson & Johnson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef NEXUS_SYSTEM_ORCHESTRATOR__FOR_EACH_TASK__PAIR_HPP -#define NEXUS_SYSTEM_ORCHESTRATOR__FOR_EACH_TASK__PAIR_HPP - -#include "context.hpp" -#include "session.hpp" - -#include -#include - -#include - -#include - -namespace nexus::system_orchestrator { - -/** - * Run the child node once for each task in a work order. - * - * Output Ports: - * task |nexus_orchestrator_msgs::msg::WorkcellTask| The current task being processed. - * workcell |std::string| The workcell id assigned to the task. - */ -class ForEachTaskPair : public BT::DecoratorNode -{ - /** - * Defines provided ports for the BT node. - */ -public: static BT::PortsList providedPorts(); - -/** - * @param name name of the BT node. - * @param config config of the BT node. - * @param logger rclcpp logger to use. - * @param tasks list of tasks in the work order. - * @param assignments map of task id and the assigned workcells - */ -public: ForEachTaskPair(const std::string& name, - const BT::NodeConfiguration& config, - rclcpp::Logger logger, - std::shared_ptr ctx) - : BT::DecoratorNode{name, config}, _logger{logger}, _ctx{std::move(ctx)} - { - } - -public: BT::NodeStatus tick() override; - -private: size_t _current_idx = 0; -private: rclcpp::Logger _logger; -private: std::shared_ptr _ctx; -private: bool _first_tick = true; -}; - -} - -#endif diff --git a/nexus_system_orchestrator/src/signals.cpp b/nexus_system_orchestrator/src/signals.cpp index c6c5b80..26c97b6 100644 --- a/nexus_system_orchestrator/src/signals.cpp +++ b/nexus_system_orchestrator/src/signals.cpp @@ -23,12 +23,11 @@ namespace nexus::system_orchestrator { BT::NodeStatus SendSignal::tick() { - auto task = this->getInput("task"); - auto transporter = this->getInput("transporter"); - if ((task && transporter) || (!task && !transporter)) + auto workcell = this->getInput("workcell"); + if (!workcell) { RCLCPP_ERROR( - this->_ctx->node.get_logger(), "%s only one of ports [task] and [transporter] is required", + this->_ctx->node.get_logger(), "%s [workcell] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -42,87 +41,28 @@ BT::NodeStatus SendSignal::tick() return BT::NodeStatus::FAILURE; } - if (task) - { - return signal_task(*task, *signal); - } - if (transporter) - { - return signal_transporter(*transporter, *signal); - } - // Should never reach here because we return early if none is available - return BT::NodeStatus::FAILURE; -} - -BT::NodeStatus SendSignal::signal_task(const WorkcellTask& task, const std::string& signal) -{ - // TODO(luca) remove this - /* - auto it = std::find_if( - this->_ctx->workcell_task_assignments.cbegin(), - this->_ctx->workcell_task_assignments.cend(), - [&task](const std::pair& p) - { - return p.first == task.id; - }); - if (it == this->_ctx->workcell_task_assignments.cend()) - { - RCLCPP_ERROR( - this->_ctx->node.get_logger(), - "%s: Unable to find workcell assigned to task [%s]", - this->name().c_str(), task.id.c_str()); - return BT::NodeStatus::FAILURE; - } - const auto& workcell = it->second; - - const auto session_it = this->_ctx->workcell_sessions.find(workcell); - if (session_it == this->_ctx->workcell_sessions.end()) - { - RCLCPP_ERROR(this->_ctx->node.get_logger(), "%s: Session not found for workcell %s", - this->name().c_str(), workcell); - return BT::NodeStatus::FAILURE; - } - const auto& session = session_it->second; - auto req = - std::make_shared(); - req->task_id = task.id; - req->signal = signal; - auto resp = session->signal_wc_client->send_request(req); - RCLCPP_INFO( - this->_ctx->node.get_logger(), "%s: Sent signal [%s] to workcell [%s]", - this->name().c_str(), signal.c_str(), workcell.c_str()); - if (!resp->success) - { - RCLCPP_WARN( - this->_ctx->node.get_logger(), - "%s: Workcell is not able to accept [%s] signal now. Queuing the signal to be sent on the next task request.", - this->name().c_str(), signal.c_str()); - this->_ctx->queued_signals[task.id].emplace_back(signal); - } - */ - - return BT::NodeStatus::SUCCESS; + return signal_workcell(*workcell, *signal); } -BT::NodeStatus SendSignal::signal_transporter(const std::string& transporter, const std::string& signal) +BT::NodeStatus SendSignal::signal_workcell(const std::string& workcell, const std::string& signal) { - auto it = this->_ctx->transporter_sessions.find(transporter); - if (it == this->_ctx->transporter_sessions.cend()) + auto it = this->_ctx->workcell_sessions.find(workcell); + if (it == this->_ctx->workcell_sessions.cend()) { RCLCPP_ERROR( this->_ctx->node.get_logger(), - "%s: Unable to find transporter [%s]", - this->name().c_str(), transporter.c_str()); + "%s: Unable to find workcell [%s]", + this->name().c_str(), workcell.c_str()); return BT::NodeStatus::FAILURE; } const auto& session = it->second; if (!session) { - // TODO(luca) implement a queueing mechanism if the transporter is not available? + // TODO(luca) implement a queueing mechanism if the workcell is not available? RCLCPP_WARN( this->_ctx->node.get_logger(), - "%s: No session found for transporter [%s], skipping signaling", - this->name().c_str(), transporter.c_str()); + "%s: No session found for workcell [%s], skipping signaling", + this->name().c_str(), workcell.c_str()); return BT::NodeStatus::SUCCESS; } @@ -131,17 +71,17 @@ BT::NodeStatus SendSignal::signal_transporter(const std::string& transporter, co // TODO(luca) Using job id for task id is not scalable, generate uuids? req->task_id = this->_ctx->job_id; req->signal = signal; - auto resp = session->signal_client->send_request(req); + auto resp = session->signal_wc_client->send_request(req); RCLCPP_INFO( - this->_ctx->node.get_logger(), "%s: Sent signal [%s] to transporter [%s]", - this->name().c_str(), signal.c_str(), transporter.c_str()); + this->_ctx->node.get_logger(), "%s: Sent signal [%s] to workcell [%s]", + this->name().c_str(), signal.c_str(), workcell.c_str()); if (!resp->success) { // TODO(luca) implement a queueing mechanism if the request fails? RCLCPP_WARN( this->_ctx->node.get_logger(), - "%s: Transporter [%s] is not able to accept [%s] signal now. Skipping signaling", - this->name().c_str(), transporter.c_str(), signal.c_str()); + "%s: Workcell [%s] is not able to accept [%s] signal now. Skipping signaling, error: [%s]", + this->name().c_str(), workcell.c_str(), signal.c_str(), resp->message.c_str()); return BT::NodeStatus::SUCCESS; } diff --git a/nexus_system_orchestrator/src/signals.hpp b/nexus_system_orchestrator/src/signals.hpp index 4c29ba9..89dc40b 100644 --- a/nexus_system_orchestrator/src/signals.hpp +++ b/nexus_system_orchestrator/src/signals.hpp @@ -40,7 +40,7 @@ public: static BT::PortsList providedPorts() { return { BT::InputPort("task", "The task the signal is tied to."), - BT::InputPort("transporter", "The transporter to signal"), + BT::InputPort("workcell", "The workcell to signal"), BT::InputPort( "signal", "Signal to send.") }; } @@ -51,8 +51,7 @@ public: SendSignal(const std::string& name, const BT::NodeConfiguration& config, public: BT::NodeStatus tick() override; -private: BT::NodeStatus signal_task(const WorkcellTask& task, const std::string& signal); -private: BT::NodeStatus signal_transporter(const std::string& transporter, const std::string& signal); +private: BT::NodeStatus signal_workcell(const std::string& workcell, const std::string& signal); private: std::shared_ptr _ctx; }; diff --git a/nexus_system_orchestrator/src/system_orchestrator.cpp b/nexus_system_orchestrator/src/system_orchestrator.cpp index f8797ed..a336a38 100644 --- a/nexus_system_orchestrator/src/system_orchestrator.cpp +++ b/nexus_system_orchestrator/src/system_orchestrator.cpp @@ -19,11 +19,10 @@ #include "bid_transporter.hpp" #include "context.hpp" +#include "dispatch_transporter.hpp" #include "exceptions.hpp" #include "execute_task.hpp" -#include "execute_task_pair.hpp" #include "for_each_task.hpp" -#include "for_each_task_pair.hpp" #include "job.hpp" #include "signals.hpp" #include "transporter_request.hpp" @@ -493,6 +492,13 @@ BT::Tree SystemOrchestrator::_create_bt(const WorkOrderActionType::Goal& wo, }); }); + bt_factory->registerBuilder("DispatchTransporter", + [this, ctx](const std::string& name, const BT::NodeConfiguration& config) + { + return std::make_unique(name, config, + this->shared_from_this(), ctx); + }); + bt_factory->registerBuilder("BidTransporter", [this, ctx](const std::string& name, const BT::NodeConfiguration& config) { @@ -515,14 +521,6 @@ BT::Tree SystemOrchestrator::_create_bt(const WorkOrderActionType::Goal& wo, this->get_logger(), ctx); }); - bt_factory->registerBuilder("ForEachTaskPair", - [this, ctx](const std::string& name, - const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, - this->get_logger(), ctx); - }); - bt_factory->registerBuilder("ExecuteTask", [this, ctx, bt_factory](const std::string& name, const BT::NodeConfiguration& config) @@ -531,14 +529,6 @@ BT::Tree SystemOrchestrator::_create_bt(const WorkOrderActionType::Goal& wo, bt_factory); }); - bt_factory->registerBuilder("ExecuteTaskPair", - [this, ctx, bt_factory](const std::string& name, - const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, ctx, this->_bt_path, - bt_factory); - }); - bt_factory->registerBuilder("SendSignal", [ctx](const std::string& name, const BT::NodeConfiguration& config) { diff --git a/nexus_zenoh_bridge_dds_vendor/CMakeLists.txt b/nexus_zenoh_bridge_dds_vendor/CMakeLists.txt index 7cb949e..eeba689 100644 --- a/nexus_zenoh_bridge_dds_vendor/CMakeLists.txt +++ b/nexus_zenoh_bridge_dds_vendor/CMakeLists.txt @@ -12,7 +12,7 @@ find_package(ament_cmake_vendor_package REQUIRED) ament_vendor(zeno_bridge_dds_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds.git - VCS_VERSION 1.0.0 + VCS_VERSION 1.0.4 ) # TODO(sloretz) make a nice way to get this path from ament_vendor