Skip to content

Commit

Permalink
Change RMF transporter to be a workcell instead
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Della Vedova <[email protected]>
  • Loading branch information
luca-della-vedova committed Dec 9, 2024
1 parent c79195e commit fb67a79
Show file tree
Hide file tree
Showing 15 changed files with 319 additions and 499 deletions.
26 changes: 10 additions & 16 deletions nexus_integration_tests/config/system_bts/main.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,18 @@
<BehaviorTree ID="main">
<ReactiveSequence>
<!--IsPauseTriggered paused="{paused}"/-->
<Fallback>
<!--PausableSequence pause="{paused}"-->
<!--PausableSequence pause="{paused}"-->
<Parallel success_threshold="-1">
<!-- Iterates over all task assignments and dispatches an AMR -->
<Sequence>
<ForEachTaskPair place_task="{place_task}" source="{source}" destination="{destination}" pick_task="{pick_task}">
<ExecuteTaskPair place_task="{place_task}" source="{source}" destination="{destination}" pick_task="{pick_task}"/>
</ForEachTaskPair>
<BidTransporter name="bid_transporter" destination="unloading" result="{transporter}"/>
<TransporterRequest name="transporter_request" transporter="{transporter}" destination="unloading"/>
<DispatchTransporter name="bid_amr_transporter" result="{amr_transporter}" transport_task="{transport_task}"/>
<WorkcellRequest name="amr_request" workcell="{amr_transporter}" task="{transport_task}"/>
</Sequence>
<!--/PausableSequence-->
<!--PausableSequence pause="{paused}"-->
<Sequence>
<BidTransporter name="bid_transporter" destination="unloading" result="{transporter}"/>
<TransporterRequest name="transporter_request" transporter="{transporter}" destination="unloading"/>
<AlwaysFailure/>
</Sequence>
<!--/PausableSequence-->
</Fallback>
<ForEachTask task="{task}" workcell="{workcell}">
<ExecuteTask task="{task}" workcell="{workcell}"/>
</ForEachTask>
</Parallel>
<!--/PausableSequence-->
</ReactiveSequence>
</BehaviorTree>
</root>
23 changes: 6 additions & 17 deletions nexus_integration_tests/config/system_bts/pick_and_place.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,12 @@
<BehaviorTree ID="pick_and_place">
<ReactiveSequence>
<!--IsPauseTriggered paused="{paused}"/-->
<Parallel success_threshold="-1">
<Sequence>
<BidTransporter name="bid_transporter" source="{source}" destination="{destination}" result="{transporter}"/>
<!-- Transporter will signal source / destination when it arrives -->
<TransporterRequest name="transporter_request" transporter="{transporter}" source="{source}" destination="{destination}" signal="true"/>
</Sequence>
<Sequence>
<WaitForSignal name="wait_for_amr" signal="transporter_at_source" clear="true"/>
<WorkcellRequest name="workcell_request" workcell="{source}" task="{place_task}"/>
<!-- The transporter will listen and send an appropriate dispenser result for RMF or noop for conveyor belt-->
<SendSignal transporter="{transporter}" signal="place_done"/>
<WaitForSignal name="wait_for_amr" signal="transporter_at_destination" clear="true"/>
<WorkcellRequest name="workcell_request" workcell="{destination}" task="{pick_task}"/>
<!-- The transporter will listen and send an appropriate ingestor result for RMF or noop for conveyor belt-->
<SendSignal transporter="{transporter}" signal="pick_done"/>
</Sequence>
</Parallel>
<Sequence>
<WaitForSignal name="wait_for_amr" signal="{workcell}" clear="true"/>
<WorkcellRequest name="workcell_request" workcell="{workcell}" task="{task}"/>
<!-- The transporter will listen and send an appropriate dispenser result for RMF or noop for conveyor belt-->
<SendSignal workcell="rmf_nexus_transporter" signal="{workcell}"/>
</Sequence>
</ReactiveSequence>
</BehaviorTree>
</root>
9 changes: 9 additions & 0 deletions nexus_integration_tests/launch/control_center.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ def launch_setup(context, *args, **kwargs):
get_package_share_directory("nexus_integration_tests"), "rviz", "nexus_panel.rviz"
)

rmf_transporter_node = LifecycleNode(
name="rmf_nexus_transporter",
namespace="",
package="rmf_nexus_transporter",
executable="rmf_nexus_transporter",
parameters=[],
)

system_orchestrator_node = LifecycleNode(
name="system_orchestrator",
namespace="",
Expand Down Expand Up @@ -169,6 +177,7 @@ def launch_setup(context, *args, **kwargs):
return [
SetEnvironmentVariable("ROS_DOMAIN_ID", ros_domain_id),
system_orchestrator_node,
# rmf_transporter_node,
transporter_node,
mock_emergency_alarm_node,
nexus_panel,
Expand Down
2 changes: 1 addition & 1 deletion nexus_integration_tests/launch/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def launch_setup(context, *args, **kwargs):
"ros_domain_id": str(control_center_domain_id),
"zenoh_config_package": "nexus_integration_tests",
"zenoh_config_filename": "config/zenoh/system_orchestrator.json5",
"transporter_plugin": "rmf_nexus_transporter::RmfNexusTransporter",
"transporter_plugin": "nexus_transporter::MockTransporter",
"activate_system_orchestrator": headless,
"headless": headless,
}.items(),
Expand Down
3 changes: 1 addition & 2 deletions nexus_system_orchestrator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ set(CMAKE_CXX_FLAGS "-Wall -Wpedantic")

add_library(${PROJECT_NAME}_plugin SHARED
src/bid_transporter.cpp
src/dispatch_transporter.cpp
src/execute_task.cpp
src/execute_task_pair.cpp
src/for_each_task.cpp
src/for_each_task_pair.cpp
src/signals.cpp
src/system_orchestrator.cpp
src/transporter_request.cpp
Expand Down
173 changes: 173 additions & 0 deletions nexus_system_orchestrator/src/dispatch_transporter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (C) 2022 Johnson & Johnson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

#include <yaml-cpp/yaml.h>

#include "dispatch_transporter.hpp"

namespace nexus::system_orchestrator {

BT::PortsList DispatchTransporter::providedPorts()
{
return {BT::OutputPort<std::string>("result"), BT::OutputPort<WorkcellTask>("transport_task") };
}

BT::NodeStatus DispatchTransporter::onStart()
{
auto node = this->_w_node.lock();
if (!node)
{
std::cerr <<
"FATAL ERROR!!! NODE IS DESTROYED WHILE THERE ARE STILL REFERENCES" <<
std::endl;
std::terminate();
}

// Create a list of destination from the current context tasks
const auto& task_assignments = this->_ctx->workcell_task_assignments;
YAML::Node orders;
std::vector<std::string> locations;

for (const auto& task : this->_ctx->tasks)
{
auto assignment_it = task_assignments.find(task.id);
if (assignment_it == task_assignments.end())
{
RCLCPP_ERROR(
node->get_logger(), "%s: Unable to transport, task [%s] was not assigned to a workcell",
this->name().c_str(), task.id.c_str());
return BT::NodeStatus::FAILURE;
}
// Multipickup task
// TODO(luca) Consider encoding where is a pickup and where a dropoff
YAML::Node order;
order["type"] = "pickup";
order["destination"] = assignment_it->second;
orders.push_back(order);
}
this->_transport_task.id = this->_ctx->job_id;
this->_transport_task.type = "transportation";
YAML::Emitter out;
out << orders;
this->_transport_task.payload = out.c_str();

// Probe workcells for transportation capability to this set of destinations
auto req =
std::make_shared<endpoints::IsTaskDoableService::ServiceType::Request>();
req->task = this->_transport_task;
// send request to all transporters in parallel
for (auto& [workcell_id, session] : this->_ctx->workcell_sessions)
{
auto fut = session->task_doable_client->async_send_request(req);
this->_ongoing_requests.emplace(workcell_id,
OngoingRequest{session->task_doable_client, std::move(fut)});
}
this->_time_limit = std::chrono::steady_clock::now() +
std::chrono::seconds{5};

return BT::NodeStatus::RUNNING;
}

BT::NodeStatus DispatchTransporter::onRunning()
{
auto result = this->_update_ongoing_requests();
if (result != BT::NodeStatus::RUNNING)
{
// action is done, drop requests that have not received response to avoid callback leak.
this->_cleanup_pending_requests();
}
return result;
}

void DispatchTransporter::onHalted()
{
// can't cancel a service call, so we just drop the requests.
this->_cleanup_pending_requests();
}

void DispatchTransporter::_cleanup_pending_requests()
{
for (auto& [_, ongoing_req] : this->_ongoing_requests)
{
ongoing_req.client->remove_pending_request(ongoing_req.fut);
}
}

BT::NodeStatus DispatchTransporter::_update_ongoing_requests()
{
auto node = this->_w_node.lock();
if (!node)
{
std::cerr <<
"FATAL ERROR!!! NODE IS DESTROYED WHILE THERE ARE STILL REFERENCES" <<
std::endl;
std::terminate();
}

if (std::chrono::steady_clock::now() > this->_time_limit)
{
for (const auto& [workcell_id, _] : this->_ongoing_requests)
{
RCLCPP_WARN(
node->get_logger(), "%s: Skipped transporter [%s] (no response)",
this->name().c_str(),
workcell_id.c_str());
}
RCLCPP_ERROR(
node->get_logger(), "%s: No transporter is able to perform task",
this->name().c_str());
return BT::NodeStatus::FAILURE;
}

std::vector<std::string> finished_requests;
for (auto& [workcell_id, ongoing_req] : this->_ongoing_requests)
{
if (ongoing_req.fut.wait_for(std::chrono::seconds{0}) ==
std::future_status::ready)
{
finished_requests.emplace_back(workcell_id);
auto resp = ongoing_req.fut.get();
// TODO(luca) make this proper bidding
// use first available transporter
if (resp->success)
{
RCLCPP_INFO(
node->get_logger(), "[%s]: Bid awarded to [%s]",
this->name().c_str(),
workcell_id.c_str());
this->setOutput("result", workcell_id);
this->setOutput("transport_task", this->_transport_task);
return BT::NodeStatus::SUCCESS;
}
else
{
RCLCPP_DEBUG(
node->get_logger(), "%s: Transporter [%s] cannot perform task",
this->name().c_str(), workcell_id.c_str());
}
}
}

for (const auto& workcell_id : finished_requests)
{
this->_ongoing_requests.erase(workcell_id);
}

return BT::NodeStatus::RUNNING;
}

}
91 changes: 91 additions & 0 deletions nexus_system_orchestrator/src/dispatch_transporter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (C) 2022 Johnson & Johnson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

#ifndef NEXUS_SYSTEM_ORCHESTRATOR__DISPATCH_TRANSPORTER_HPP
#define NEXUS_SYSTEM_ORCHESTRATOR__DISPATCH_TRANSPORTER_HPP

#include "context.hpp"
#include "session.hpp"

#include <nexus_common/models/work_order.hpp>
#include <nexus_orchestrator_msgs/msg/workcell_task.hpp>

#include <behaviortree_cpp_v3/action_node.h>

#include <rclcpp/rclcpp.hpp>
#include <rclcpp_action/rclcpp_action.hpp>
#include <rclcpp_lifecycle/lifecycle_node.hpp>

#include <nexus_endpoints.hpp>

#include <string>
#include <unordered_map>
#include <vector>

namespace nexus::system_orchestrator {

/**
* Iterates over all the tasks and sends a request to the transporter to move
* the item inbetween workcells.
*
* Input Ports:
* destination |std::string| Destination to transport to.
* Output Ports:
* result |std::string| Id of the transporter assigned.
*/
class DispatchTransporter : public BT::StatefulActionNode
{
public: using IsTaskDoableService =
endpoints::IsTaskDoableService;

public: using WorkcellTask =
nexus_orchestrator_msgs::msg::WorkcellTask;

public: static BT::PortsList providedPorts();

public: DispatchTransporter(const std::string& name,
const BT::NodeConfiguration& config,
rclcpp_lifecycle::LifecycleNode::WeakPtr w_node,
std::shared_ptr<Context> ctx)
: BT::StatefulActionNode{name, config}, _w_node{w_node}, _ctx{std::move(ctx)}
{
}

public: BT::NodeStatus onStart() override;
public: BT::NodeStatus onRunning() override;
public: void onHalted() override;

private: struct OngoingRequest
{
rclcpp::Client<IsTaskDoableService::ServiceType>::SharedPtr client;
rclcpp::Client<IsTaskDoableService::ServiceType>::
FutureAndRequestId fut;
};

private: rclcpp_lifecycle::LifecycleNode::WeakPtr _w_node;
private: std::shared_ptr<Context> _ctx;
private: std::unordered_map<std::string, OngoingRequest> _ongoing_requests;
private: std::chrono::steady_clock::time_point _time_limit;
private: WorkcellTask _transport_task;

private: void _cleanup_pending_requests();
private: BT::NodeStatus _update_ongoing_requests();
};

}

#endif
Loading

0 comments on commit fb67a79

Please sign in to comment.