Action Servers
Overview
ROS2 actions are a three-way communication mechanism for long-running tasks (seconds to minutes):
- Goal (client → server): The objective to achieve
- Feedback (server → client): Progress updates
- Result (server → client): Final outcome
- Cancel (client → server): Cancellation request
PLEM Standard Actions
1. FollowJointTrajectory
Joint-space trajectory execution (standard ROS2 Control action).
Server: joint_trajectory_controller
Action name: /joint_trajectory_controller/follow_joint_trajectory
Package: joint_trajectory_controller (ROS2 Control standard controller)
Launch: plem_bringup/launch/ros2_control.launch.py
Activation: Automatically starts when ros2_control.launch.py is executed
In multi-robot systems, a namespace is added:
/{robot_id}/joint_trajectory_controller/follow_joint_trajectory
Message structure:
# Goal
trajectory_msgs/JointTrajectory trajectory
# Result
int32 error_code
string error_string
# Feedback
trajectory_msgs/JointTrajectoryPoint desired
trajectory_msgs/JointTrajectoryPoint actual
trajectory_msgs/JointTrajectoryPoint error
CLI testing:
# Check action server
ros2 action list | grep follow_joint_trajectory
# Send simple trajectory
# Note: Use actual joint names defined in URDF (e.g., 'joint0', 'joint1' for Indy7)
# Scaffold-generated packages may use 'joint1', 'joint2', etc.
ros2 action send_goal /joint_trajectory_controller/follow_joint_trajectory \
control_msgs/action/FollowJointTrajectory \
"{trajectory: {joint_names: ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5'], \
points: [{positions: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], time_from_start: {sec: 2}}]}}"
2. SetMode
Robot mode control (BRAKED, TRAJECTORY, FREEDRIVE).
Server node: wim_rt_driver
Action name: /wim_rt_driver/set_mode
Package: Included in PLEM runtime (starts automatically)
Launch: Automatically started in plem_bringup/launch/plem_launch.py
In multi-robot systems, a namespace is added:
/{robot_id}/wim_rt_driver/set_mode
Message structure:
# Goal
wim_msgs/RobotMode target_robot_mode # 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING is internal)
# Result
bool success
string message
# Feedback
wim_msgs/RobotMode current_robot_mode
wim_msgs/SafetyMode current_safety_mode
CLI testing:
# Switch to TRAJECTORY mode (mode: 2)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 2}}"
# Switch to FREEDRIVE mode (mode: 3)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 3}}"
# Switch to BRAKED mode (mode: 0)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 0}}"
3. PlanTrajectory
Motion planning through MoveIt/Pilz.
Server node: PLEM motion planning server
Action name: /plan_trajectory
Package: Included in PLEM runtime (starts automatically when MoveIt is running)
Launch: plem_bringup/launch/moveit.launch.py
Activation: Automatically starts when moveit.launch.py is executed (requires MoveIt motion planner)
In multi-robot systems, a namespace is added:
/{robot_id}/plan_trajectory
Message structure:
# Goal
wim_msgs/MotionTarget[] targets # MOVEJ/MOVEL target sequence
# Result
trajectory_msgs/JointTrajectory trajectory
float64 duration
uint32 waypoint_count
bool success
string message
# Feedback
uint32 current_segment
uint32 total_segments
Python Client Examples
Basic Action Client
#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from wim_msgs.action import SetMode
class ModeControlClient(Node):
def __init__(self):
super().__init__('mode_control_client')
# For multi-robot: use f'/{robot_id}/wim_rt_driver/set_mode'
self._action_client = ActionClient(self, SetMode, '/wim_rt_driver/set_mode')
def send_goal(self, target_mode):
"""
target_mode: 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING is internal)
"""
goal_msg = SetMode.Goal()
goal_msg.target_robot_mode.mode = target_mode
self._action_client.wait_for_server()
self.get_logger().info(f'Sending goal: mode={target_mode}')
send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Goal rejected')
return
self.get_logger().info('Goal accepted')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
if result.success:
self.get_logger().info(f'Success: {result.message}')
else:
self.get_logger().error(f'Failed: {result.message}')
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(
f'Feedback: current mode={feedback.current_robot_mode.mode}, '
f'safety mode={feedback.current_safety_mode.safety_mode}'
)
def main(args=None):
rclpy.init(args=args)
client = ModeControlClient()
# Switch to TRAJECTORY mode
client.send_goal(target_mode=2)
rclpy.spin(client)
if __name__ == '__main__':
main()
Trajectory Execution Client
#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration
class TrajectoryClient(Node):
def __init__(self):
super().__init__('trajectory_client')
self._action_client = ActionClient(
self,
FollowJointTrajectory,
'/joint_trajectory_controller/follow_joint_trajectory'
)
def send_trajectory(self, positions_list, durations):
"""
positions_list: List of joint positions (6 values each)
durations: Time to reach each waypoint (seconds)
"""
goal_msg = FollowJointTrajectory.Goal()
trajectory = JointTrajectory()
# Note: Joint names vary by URDF definition
# Indy7 example: 'joint0', 'joint1', etc. (0-indexed, no underscore)
# Scaffold-generated packages: 'joint1', 'joint2', etc. (1-indexed)
trajectory.joint_names = [
'joint0', 'joint1', 'joint2',
'joint3', 'joint4', 'joint5'
]
for positions, duration_sec in zip(positions_list, durations):
point = JointTrajectoryPoint()
point.positions = positions
point.time_from_start = Duration(sec=int(duration_sec))
trajectory.points.append(point)
goal_msg.trajectory = trajectory
self._action_client.wait_for_server()
self.get_logger().info(f'Sending trajectory with {len(positions_list)} waypoints')
send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Trajectory rejected')
return
self.get_logger().info('Trajectory execution started')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
if result.error_code == 0:
self.get_logger().info('Trajectory execution completed')
else:
self.get_logger().error(f'Execution failed: {result.error_string}')
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
# Optional: Log progress
pass
def main(args=None):
rclpy.init(args=args)
client = TrajectoryClient()
# Example: 2-waypoint trajectory
positions_list = [
[0.0, 0.5, 1.0, 0.0, 0.0, 0.0], # Waypoint 1
[0.5, 0.5, 0.5, 0.5, 0.5, 0.5], # Waypoint 2
]
durations = [2.0, 4.0] # 2 seconds, 4 seconds
client.send_trajectory(positions_list, durations)
rclpy.spin(client)
if __name__ == '__main__':
main()
C++ Client Examples
Asynchronous Action Client
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_action/rclcpp_action.hpp>
#include <wim_msgs/action/set_mode.hpp>
class ModeControlClient : public rclcpp::Node {
public:
using SetMode = wim_msgs::action::SetMode;
using GoalHandle = rclcpp_action::ClientGoalHandle<SetMode>;
ModeControlClient() : Node("mode_control_client") {
// For multi-robot: use "/{robot_id}/wim_rt_driver/set_mode"
client_ = rclcpp_action::create_client<SetMode>(this, "/wim_rt_driver/set_mode");
}
void send_goal(int8_t target_mode) {
if (!client_->wait_for_action_server(std::chrono::seconds(5))) {
RCLCPP_ERROR(get_logger(), "Action server unavailable");
return;
}
auto goal = SetMode::Goal();
goal.target_robot_mode.mode = target_mode;
auto send_goal_options = rclcpp_action::Client<SetMode>::SendGoalOptions();
send_goal_options.feedback_callback =
[this](GoalHandle::SharedPtr, const std::shared_ptr<const SetMode::Feedback> feedback) {
RCLCPP_INFO(get_logger(), "Current mode: %d", feedback->current_robot_mode.mode);
};
send_goal_options.result_callback =
[this](const GoalHandle::WrappedResult& result) {
if (result.code == rclcpp_action::ResultCode::SUCCEEDED) {
RCLCPP_INFO(get_logger(), "Success: %s", result.result->message.c_str());
} else {
RCLCPP_ERROR(get_logger(), "Failed");
}
};
client_->async_send_goal(goal, send_goal_options);
}
private:
rclcpp_action::Client<SetMode>::SharedPtr client_;
};
int main(int argc, char** argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<ModeControlClient>();
node->send_goal(2); // TRAJECTORY mode (0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE)
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
Action Cancellation
Python Cancellation Example
class CancellableClient(Node):
def __init__(self):
super().__init__('cancellable_client')
self._action_client = ActionClient(self, FollowJointTrajectory, '/joint_trajectory_controller/follow_joint_trajectory')
self._goal_handle = None
def send_goal(self, trajectory):
send_goal_future = self._action_client.send_goal_async(trajectory)
send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
self._goal_handle = future.result()
if self._goal_handle.accepted:
self.get_logger().info('Goal accepted')
def cancel_goal(self):
if self._goal_handle:
self.get_logger().info('Canceling goal...')
cancel_future = self._goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.cancel_done_callback)
def cancel_done_callback(self, future):
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.get_logger().info('Goal canceled')
else:
self.get_logger().warn('Cancellation failed')
CLI Cancellation
ROS2 Humble CLI does not support canceling actions with a single command. Use the Python example above, or switch the robot mode to BRAKED to stop trajectory execution.
# Stop trajectory by switching mode
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode "{target_mode: 0}"
Error Handling
Result Codes
| Code | Meaning | Action |
|---|---|---|
SUCCEEDED | Goal achieved | Normal completion |
CANCELED | User canceled | Report cancellation |
ABORTED | Server error | Log error, retry |
UNKNOWN | Unexpected state | Investigation required |
Common Error Scenarios
Goal rejection:
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Goal rejected - check:')
self.get_logger().error('- SafetyMode (must be NORMAL)')
self.get_logger().error('- Parameter ranges')
self.get_logger().error('- Hardware ready state')
return
Execution timeout:
def get_result_callback(self, future):
result = future.result().result
if not result.success and 'timeout' in result.message.lower():
self.get_logger().error('Timeout occurred - attempting recovery')
# Reset mode to BRAKED
# Or check safety state
Safety violation:
# Periodically check safety state
def feedback_callback(self, feedback_msg):
if feedback_msg.feedback.current_safety_mode.safety_mode != 0: # NORMAL
self.get_logger().warn('Safety violation detected - canceling goal')
self.cancel_goal()
Debugging
Check Action Server Status
# Available action servers
ros2 action list
# Action interface info
ros2 action info /wim_rt_driver/set_mode
# Action type definition
ros2 interface show wim_msgs/action/SetMode
Monitor Action Traffic
# Monitor goal sending
ros2 topic echo /wim_rt_driver/set_mode/_action/send_goal
# Monitor feedback
ros2 topic echo /wim_rt_driver/set_mode/_action/feedback
# Monitor results
ros2 topic echo /wim_rt_driver/set_mode/_action/get_result
Logging
# Action server logs
ros2 topic echo /rosout | grep ModeControlActionServer
# Set log level for specific node
ros2 run rqt_console rqt_console
Summary
Key points:
- Standard actions:
FollowJointTrajectory(trajectory execution),SetMode(mode control),PlanTrajectory(motion planning) - Launch structure:
plem_bringup/launch/ros2_control.launch.py: FollowJointTrajectory + SetMode action serversplem_bringup/launch/moveit.launch.py: PlanTrajectory action serverplem_bringup/launch/plem_launch.py: Automatic integration of all components
- Package locations:
- FollowJointTrajectory:
joint_trajectory_controller(ROS2 Control standard) - SetMode/PlanTrajectory:
plem_ros2_adapter(PLEM custom)
- FollowJointTrajectory:
- Invocation methods: CLI (
ros2 action send_goal), Python (ActionClient), C++ (rclcpp_action::Client) - Error handling: Check goal rejection, validate result codes, monitor safety state
- Cancellation:
cancel_goal_async()(Python),async_cancel_goal()(C++) - Debugging:
ros2 action list/info,/rosoutlogs, feedback topics
Next steps:
- ROS2 Launch System - Understanding IPC communication and launch system
- Safety Modes - SafetyMode and RobotMode state management