Skip to content

Commit

Permalink
Move signaling from workcell to system orchestrator
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Della Vedova <[email protected]>
  • Loading branch information
luca-della-vedova committed Dec 4, 2024
1 parent 809c3d1 commit c79195e
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 27 deletions.
7 changes: 3 additions & 4 deletions nexus_integration_tests/config/system_bts/pick_and_place.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@
<Sequence>
<BidTransporter name="bid_transporter" source="{source}" destination="{destination}" result="{transporter}"/>
<!-- Transporter will signal source / destination when it arrives -->
<TransporterRequest name="transporter_request" transporter="{transporter}" source="{source}" destination="{destination}" signal_source="{place_task}" signal_destination="{pick_task}"/>
<TransporterRequest name="transporter_request" transporter="{transporter}" source="{source}" destination="{destination}" signal="true"/>
</Sequence>
<Sequence>
<!-- Inside workcell behavior tree, have a wait for signal for transporter_at_source -->
<WaitForSignal name="wait_for_amr" signal="transporter_at_source" clear="true"/>
<WorkcellRequest name="workcell_request" workcell="{source}" task="{place_task}"/>
<!-- The transporter will listen and send an appropriate dispenser result for RMF or noop for conveyor belt-->
<SendSignal transporter="{transporter}" signal="place_done"/>
<!-- Inside workcell behavior tree, have a wait for signal for transporter_at_destination -->
<WaitForSignal name="wait_for_amr" signal="transporter_at_destination" clear="true"/>
<WorkcellRequest name="workcell_request" workcell="{destination}" task="{pick_task}"/>
<!-- TODO(luca) we can actually send this earlier, inside the task, as soon as the robot picked up the item -->
<!-- The transporter will listen and send an appropriate ingestor result for RMF or noop for conveyor belt-->
<SendSignal transporter="{transporter}" signal="pick_done"/>
</Sequence>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
<GetJointConstraints trajectory="{dropoff_traj}" joint_constraints="{joint_constraints}" />
-->

<WaitForSignal name="wait_for_pallet" signal="transporter_done" clear="true"/>
<!--
<execute_trajectory.ExecuteTrajectory name="execute_dropoff" trajectory="{dropoff_traj}" />
<gripper.GripperControl name="dropoff_product" gripper="workcell_1_mock_gripper" position="1.0" max_effort="0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
<root main_tree_to_execute="PickAndPlace">
<BehaviorTree ID="PickAndPlace">
<Sequence name="root_sequence">
<WaitForSignal name="wait_for_pallet" signal="transporter_done" clear="true"/>
<!--
<StringToVector string="productA," delimiter="," strings="{items}"/>
<!--
<detection.DetectAllItems name="detect_items" detector="workcell_2_mock_product_detector" items="{items}" result="{all_products}" />
<detection.GetDetection detections="{all_products}" idx="0" result="{current_product_detection}" />
<detection.GetDetectionPose detection="{current_product_detection}" result="{current_product_pose}" />
Expand Down
2 changes: 1 addition & 1 deletion nexus_system_orchestrator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ add_library(${PROJECT_NAME}_plugin SHARED
src/execute_task_pair.cpp
src/for_each_task.cpp
src/for_each_task_pair.cpp
src/send_signal.cpp
src/signals.cpp
src/system_orchestrator.cpp
src/transporter_request.cpp
src/workcell_request.cpp
Expand Down
5 changes: 2 additions & 3 deletions nexus_system_orchestrator/src/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ public: std::vector<std::string> errors;
public: std::string task_results;

/**
* Map of task ids and their queued signals.
* List of queued signals for the system orchestrator.
*/
public: std::unordered_map<std::string,
std::vector<std::string>> queued_signals;
public: std::vector<std::string> queued_signals;
};

}
Expand Down
1 change: 1 addition & 0 deletions nexus_system_orchestrator/src/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct WorkcellSession
task_doable_client;
rclcpp::Client<endpoints::PauseWorkcellService::ServiceType>::SharedPtr
pause_client;
// TODO(luca) remove this?
std::unique_ptr<common::SyncServiceClient<endpoints::SignalWorkcellService::ServiceType>>
signal_wc_client;
std::unique_ptr<common::SyncServiceClient<endpoints::QueueWorkcellTaskService::ServiceType>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

#include "send_signal.hpp"
#include "signals.hpp"

#include <nexus_endpoints.hpp>

Expand Down Expand Up @@ -56,6 +56,8 @@ BT::NodeStatus SendSignal::tick()

BT::NodeStatus SendSignal::signal_task(const WorkcellTask& task, const std::string& signal)
{
// TODO(luca) remove this
/*
auto it = std::find_if(
this->_ctx->workcell_task_assignments.cbegin(),
this->_ctx->workcell_task_assignments.cend(),
Expand Down Expand Up @@ -97,6 +99,7 @@ BT::NodeStatus SendSignal::signal_task(const WorkcellTask& task, const std::stri
this->name().c_str(), signal.c_str());
this->_ctx->queued_signals[task.id].emplace_back(signal);
}
*/

return BT::NodeStatus::SUCCESS;
}
Expand Down Expand Up @@ -145,4 +148,68 @@ BT::NodeStatus SendSignal::signal_transporter(const std::string& transporter, co
return BT::NodeStatus::SUCCESS;
}

BT::NodeStatus WaitForSignal::onStart()
{
const auto signal = this->getInput<std::string>("signal");
if (!signal)
{
RCLCPP_ERROR(
this->_ctx->node.get_logger(), "%s: [signal] port is required",
this->name().c_str());
return BT::NodeStatus::FAILURE;
}
auto& signals = this->_ctx->queued_signals;
if (auto signal_it = std::find(signals.begin(), signals.end(), *signal); signal_it != signals.end())
{
RCLCPP_DEBUG(
this->_ctx->node.get_logger(), "%s: signal [%s] already set",
this->name().c_str(), signal->c_str());
auto clear = this->getInput<bool>("clear");
if (clear && *clear)
{
signals.erase(signal_it);
RCLCPP_INFO(
this->_ctx->node.get_logger(), "%s: cleared signal [%s]",
this->name().c_str(), signal->c_str());
}
return BT::NodeStatus::SUCCESS;
}
return BT::NodeStatus::RUNNING;
}

BT::NodeStatus WaitForSignal::onRunning()
{
const auto signal = this->getInput<std::string>("signal");
if (!signal)
{
RCLCPP_ERROR(
this->_ctx->node.get_logger(), "%s: [signal] port is required",
this->name().c_str());
return BT::NodeStatus::FAILURE;
}
auto& signals = this->_ctx->queued_signals;
for (const auto& signal : signals)
{
RCLCPP_INFO(
this->_ctx->node.get_logger(), "%s: Signal available [%s]",
this->name().c_str(), signal.c_str());
}
if (auto signal_it = std::find(signals.begin(), signals.end(), *signal); signal_it != signals.end())
{
RCLCPP_DEBUG(
this->_ctx->node.get_logger(), "%s: signal [%s] already set",
this->name().c_str(), signal->c_str());
auto clear = this->getInput<bool>("clear");
if (clear && *clear)
{
signals.erase(signal_it);
RCLCPP_INFO(
this->_ctx->node.get_logger(), "%s: cleared signal [%s]",
this->name().c_str(), signal->c_str());
}
return BT::NodeStatus::SUCCESS;
}
return BT::NodeStatus::RUNNING;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*
*/

#ifndef NEXUS_SYSTEM_ORCHESTRATOR__SEND_SIGNAL_HPP
#define NEXUS_SYSTEM_ORCHESTRATOR__SEND_SIGNAL_HPP
#ifndef NEXUS_SYSTEM_ORCHESTRATOR__SIGNALS_HPP
#define NEXUS_SYSTEM_ORCHESTRATOR__SIGNALS_HPP

#include "context.hpp"

Expand Down Expand Up @@ -57,6 +57,37 @@ private: BT::NodeStatus signal_transporter(const std::string& transporter, const
private: std::shared_ptr<Context> _ctx;
};

// TODO(luca) this is duplicated with workcell_orchestrator. Remove from there or refactor into a common node?
/**
* Returns RUNNING until a signal is received.
*
* Input Ports:
* signal |std::string| Signal to wait for.
* clear |bool| Set this to true to clear the signal when this node is finished.
*/
class WaitForSignal : public BT::StatefulActionNode
{
public: static BT::PortsList providedPorts()
{
return { BT::InputPort<std::string>("signal", "Signal to wait for."),
BT::InputPort<std::string>("clear",
"Set this to true to clear the signal when this node is finished.") };
}

public: WaitForSignal(const std::string& name,
const BT::NodeConfiguration& config,
std::shared_ptr<Context> ctx)
: BT::StatefulActionNode(name, config), _ctx(std::move(ctx)) {}

public: BT::NodeStatus onStart() override;

public: BT::NodeStatus onRunning() override;

public: void onHalted() override {}

private: std::shared_ptr<Context> _ctx;
};

}

#endif
29 changes: 28 additions & 1 deletion nexus_system_orchestrator/src/system_orchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "for_each_task.hpp"
#include "for_each_task_pair.hpp"
#include "job.hpp"
#include "send_signal.hpp"
#include "signals.hpp"
#include "transporter_request.hpp"
#include "workcell_request.hpp"

Expand Down Expand Up @@ -161,6 +161,27 @@ auto SystemOrchestrator::on_configure(const rclcpp_lifecycle::State& previous)
}
});

// create system orchestrator signaling service
this->_signal_srv =
this->create_service<endpoints::SignalWorkcellService::ServiceType>(
endpoints::SignalWorkcellService::service_name("system_orchestrator"),
[this](endpoints::SignalWorkcellService::ServiceType::Request::
ConstSharedPtr req,
endpoints::SignalWorkcellService::ServiceType::Response::SharedPtr resp)
{
RCLCPP_INFO(this->get_logger(), "Received signal for job %s: %s", req->task_id.c_str(), req->signal.c_str());
auto job_it = this->_jobs.find(req->task_id);
if (job_it == this->_jobs.end())
{
resp->success = false;
resp->message = "Job is not being executed";
RCLCPP_WARN(this->get_logger(), "Job %s is not being executed", req->task_id.c_str());
return;
}
job_it->second.ctx->queued_signals.push_back(req->signal);
resp->success = true;
});

// create action server for work order requests
this->_work_order_srv = rclcpp_action::create_server<WorkOrderActionType>(
this->get_node_base_interface(),
Expand Down Expand Up @@ -524,6 +545,12 @@ BT::Tree SystemOrchestrator::_create_bt(const WorkOrderActionType::Goal& wo,
return std::make_unique<SendSignal>(name, config, ctx);
});

bt_factory->registerBuilder<WaitForSignal>("WaitForSignal",
[ctx](const std::string& name, const BT::NodeConfiguration& config)
{
return std::make_unique<WaitForSignal>(name, config, ctx);
});

return bt_factory->createTreeFromFile(this->_bt_path / "main.xml");
}

Expand Down
2 changes: 2 additions & 0 deletions nexus_system_orchestrator/src/system_orchestrator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class SystemOrchestrator : public
_wo_states_pub;
rclcpp::Service<endpoints::GetWorkOrderStateService::ServiceType>::SharedPtr
_get_wo_state_srv;
rclcpp::Service<endpoints::SignalWorkcellService::ServiceType>::SharedPtr
_signal_srv;
std::filesystem::path _bt_path;
rclcpp::SubscriptionBase::SharedPtr _cancel_wo_sub;
bool _activated = false;
Expand Down
18 changes: 8 additions & 10 deletions nexus_system_orchestrator/src/transporter_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ BT::PortsList TransporterRequest::providedPorts()
return { BT::InputPort<std::string>("transporter"),
BT::InputPort<std::string>("destination"),
BT::InputPort<std::string>("source"),
BT::InputPort<Task>("signal_destination"),
BT::InputPort<Task>("signal_source")
BT::InputPort<bool>("signal")
};
}

Expand Down Expand Up @@ -60,13 +59,9 @@ BT::NodeStatus TransporterRequest::onStart()
{
this->_source = maybe_source.value();
}
if (auto maybe_signal_source = this->getInput<Task>("signal_source"))
if (auto maybe_signal = this->getInput<bool>("signal"))
{
this->_signal_source = maybe_signal_source.value().id;
}
if (auto maybe_signal_destination = this->getInput<Task>("signal_destination"))
{
this->_signal_destination = maybe_signal_destination.value().id;
this->_signal = maybe_signal.value();
}

return common::ActionClientBtNode<rclcpp_lifecycle::LifecycleNode*,
Expand All @@ -87,8 +82,11 @@ make_goal()
goal.request.requester = this->_node->get_name();
goal.request.destination = this->_destination;
goal.request.source = this->_source;
goal.request.signal_destination = this->_signal_destination;
goal.request.signal_source = this->_signal_source;
if (this->_signal)
{
goal.request.signal_destination = this->_ctx->job_id;
goal.request.signal_source = this->_ctx->job_id;
}
return goal;
}

Expand Down
3 changes: 1 addition & 2 deletions nexus_system_orchestrator/src/transporter_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ private: std::shared_ptr<Context> _ctx;
private: std::string _transporter;
private: std::string _destination;
private: std::string _source;
private: std::string _signal_source;
private: std::string _signal_destination;
private: bool _signal = false;
};

}
Expand Down
3 changes: 3 additions & 0 deletions nexus_system_orchestrator/src/workcell_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ WorkcellRequest::make_goal()
goal.task = this->_task;
try
{
// TODO(luca) fully remove this
/*
auto& signals = this->_ctx->queued_signals.at(this->_task.id);
goal.start_signals = signals;
signals.clear();
*/
}
catch (const std::out_of_range&)
{
Expand Down

0 comments on commit c79195e

Please sign in to comment.