본문으로 건너뛰기

Action 서버

개요

ROS2 액션은 장시간 실행 작업(수초~수분)을 위한 3방향 통신 메커니즘입니다:

  • Goal (클라이언트 → 서버): 달성할 목표
  • Feedback (서버 → 클라이언트): 진행 상황 업데이트
  • Result (서버 → 클라이언트): 최종 결과
  • Cancel (클라이언트 → 서버): 중단 요청

PLEM의 표준 액션

1. FollowJointTrajectory

조인트 공간 궤적 실행 (표준 ROS2 Control 액션).

서버: joint_trajectory_controller 액션 이름: /joint_trajectory_controller/follow_joint_trajectory 패키지: joint_trajectory_controller (ROS2 Control 표준 컨트롤러) 런치: plem_bringup/launch/ros2_control.launch.py 활성화: ros2_control.launch.py 실행 시 자동 시작

다중 로봇 환경

다중 로봇 시스템에서는 네임스페이스가 추가됩니다: /{robot_id}/joint_trajectory_controller/follow_joint_trajectory

메시지 구조:

# Goal
trajectory_msgs/JointTrajectory trajectory

# Result
int32 error_code
string error_string

# Feedback
trajectory_msgs/JointTrajectoryPoint desired
trajectory_msgs/JointTrajectoryPoint actual
trajectory_msgs/JointTrajectoryPoint error

CLI 테스트:

# 액션 서버 확인
ros2 action list | grep follow_joint_trajectory

# 간단한 궤적 전송
# 참고: 조인트 이름은 URDF에 정의된 실제 이름 사용 (예: Indy7의 경우 'joint0', 'joint1' 등)
# 스캐폴드로 생성된 패키지는 'joint1', 'joint2' 등을 사용할 수 있음
ros2 action send_goal /joint_trajectory_controller/follow_joint_trajectory \
control_msgs/action/FollowJointTrajectory \
"{trajectory: {joint_names: ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5'], \
points: [{positions: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], time_from_start: {sec: 2}}]}}"

2. SetMode

로봇 모드 제어 (BRAKED, TRAJECTORY, FREEDRIVE).

서버 노드: wim_rt_driver 액션 이름: /wim_rt_driver/set_mode 패키지: PLEM 런타임에 포함 (자동 시작) 런치: plem_bringup/launch/plem_launch.py에서 자동 시작

다중 로봇 환경

다중 로봇 시스템에서는 네임스페이스가 추가됩니다: /{robot_id}/wim_rt_driver/set_mode

메시지 구조:

# Goal
wim_msgs/RobotMode target_robot_mode # 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING은 내부용)

# Result
bool success
string message

# Feedback
wim_msgs/RobotMode current_robot_mode
wim_msgs/SafetyMode current_safety_mode

CLI 테스트:

# TRAJECTORY 모드로 전환 (mode: 2)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 2}}"

# FREEDRIVE 모드로 전환 (mode: 3)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 3}}"

# BRAKED 모드로 전환 (mode: 0)
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode \
"{target_robot_mode: {mode: 0}}"

3. PlanTrajectory

MoveIt/Pilz를 통한 모션 계획.

서버 노드: PLEM 모션 플래닝 서버 액션 이름: /plan_trajectory 패키지: PLEM 런타임에 포함 (MoveIt 실행 시 자동 시작) 런치: plem_bringup/launch/moveit.launch.py 활성화: moveit.launch.py 실행 시 자동 시작 (MoveIt 모션 플래너 필요)

다중 로봇 환경

다중 로봇 시스템에서는 네임스페이스가 추가됩니다: /{robot_id}/plan_trajectory

메시지 구조:

# Goal
wim_msgs/MotionTarget[] targets # MOVEJ/MOVEL 타겟 시퀀스

# Result
trajectory_msgs/JointTrajectory trajectory
float64 duration
uint32 waypoint_count
bool success
string message

# Feedback
uint32 current_segment
uint32 total_segments

Python 클라이언트 예제

기본 액션 클라이언트

#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from wim_msgs.action import SetMode

class ModeControlClient(Node):
def __init__(self):
super().__init__('mode_control_client')
# 다중 로봇 시: f'/{robot_id}/wim_rt_driver/set_mode' 사용
self._action_client = ActionClient(self, SetMode, '/wim_rt_driver/set_mode')

def send_goal(self, target_mode):
"""
target_mode: 0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE (1=STOPPING은 내부용)
"""
goal_msg = SetMode.Goal()
goal_msg.target_robot_mode.mode = target_mode

self._action_client.wait_for_server()
self.get_logger().info(f'목표 전송: 모드={target_mode}')

send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('목표 거부됨')
return

self.get_logger().info('목표 승인됨')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
if result.success:
self.get_logger().info(f'성공: {result.message}')
else:
self.get_logger().error(f'실패: {result.message}')

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(
f'피드백: 현재 모드={feedback.current_robot_mode.mode}, '
f'안전 모드={feedback.current_safety_mode.safety_mode}'
)

def main(args=None):
rclpy.init(args=args)
client = ModeControlClient()

# TRAJECTORY 모드로 전환
client.send_goal(target_mode=2)
rclpy.spin(client)

if __name__ == '__main__':
main()

궤적 실행 클라이언트

#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration

class TrajectoryClient(Node):
def __init__(self):
super().__init__('trajectory_client')
self._action_client = ActionClient(
self,
FollowJointTrajectory,
'/joint_trajectory_controller/follow_joint_trajectory'
)

def send_trajectory(self, positions_list, durations):
"""
positions_list: 조인트 위치 리스트 (각 6개 값)
durations: 각 웨이포인트 도달 시간 (초)
"""
goal_msg = FollowJointTrajectory.Goal()

trajectory = JointTrajectory()
# 참고: 조인트 이름은 URDF 정의에 따라 다름
# Indy7 예시: 'joint0', 'joint1', etc. (0-indexed, no underscore)
# 스캐폴드 생성 패키지: 'joint1', 'joint2', etc. (1-indexed)
trajectory.joint_names = [
'joint0', 'joint1', 'joint2',
'joint3', 'joint4', 'joint5'
]

for positions, duration_sec in zip(positions_list, durations):
point = JointTrajectoryPoint()
point.positions = positions
point.time_from_start = Duration(sec=int(duration_sec))
trajectory.points.append(point)

goal_msg.trajectory = trajectory

self._action_client.wait_for_server()
self.get_logger().info(f'{len(positions_list)}개 웨이포인트 궤적 전송')

send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('궤적 거부됨')
return

self.get_logger().info('궤적 실행 시작')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
if result.error_code == 0:
self.get_logger().info('궤적 실행 완료')
else:
self.get_logger().error(f'실행 실패: {result.error_string}')

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
# 선택 사항: 진행 상황 로깅
pass

def main(args=None):
rclpy.init(args=args)
client = TrajectoryClient()

# 예제: 2개 웨이포인트 궤적
positions_list = [
[0.0, 0.5, 1.0, 0.0, 0.0, 0.0], # 웨이포인트 1
[0.5, 0.5, 0.5, 0.5, 0.5, 0.5], # 웨이포인트 2
]
durations = [2.0, 4.0] # 2초, 4초

client.send_trajectory(positions_list, durations)
rclpy.spin(client)

if __name__ == '__main__':
main()

C++ 클라이언트 예제

비동기 액션 클라이언트

#include <rclcpp/rclcpp.hpp>
#include <rclcpp_action/rclcpp_action.hpp>
#include <wim_msgs/action/set_mode.hpp>

class ModeControlClient : public rclcpp::Node {
public:
using SetMode = wim_msgs::action::SetMode;
using GoalHandle = rclcpp_action::ClientGoalHandle<SetMode>;

ModeControlClient() : Node("mode_control_client") {
// 다중 로봇 시: "/{robot_id}/wim_rt_driver/set_mode" 사용
client_ = rclcpp_action::create_client<SetMode>(this, "/wim_rt_driver/set_mode");
}

void send_goal(int8_t target_mode) {
if (!client_->wait_for_action_server(std::chrono::seconds(5))) {
RCLCPP_ERROR(get_logger(), "액션 서버 사용 불가");
return;
}

auto goal = SetMode::Goal();
goal.target_robot_mode.mode = target_mode;

auto send_goal_options = rclcpp_action::Client<SetMode>::SendGoalOptions();

send_goal_options.feedback_callback =
[this](GoalHandle::SharedPtr, const std::shared_ptr<const SetMode::Feedback> feedback) {
RCLCPP_INFO(get_logger(), "현재 모드: %d", feedback->current_robot_mode.mode);
};

send_goal_options.result_callback =
[this](const GoalHandle::WrappedResult& result) {
if (result.code == rclcpp_action::ResultCode::SUCCEEDED) {
RCLCPP_INFO(get_logger(), "성공: %s", result.result->message.c_str());
} else {
RCLCPP_ERROR(get_logger(), "실패");
}
};

client_->async_send_goal(goal, send_goal_options);
}

private:
rclcpp_action::Client<SetMode>::SharedPtr client_;
};

int main(int argc, char** argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<ModeControlClient>();
node->send_goal(2); // TRAJECTORY 모드 (0=BRAKED, 2=TRAJECTORY, 3=FREEDRIVE)
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}

액션 취소

Python 취소 예제

class CancellableClient(Node):
def __init__(self):
super().__init__('cancellable_client')
self._action_client = ActionClient(self, FollowJointTrajectory, '/joint_trajectory_controller/follow_joint_trajectory')
self._goal_handle = None

def send_goal(self, trajectory):
send_goal_future = self._action_client.send_goal_async(trajectory)
send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
self._goal_handle = future.result()
if self._goal_handle.accepted:
self.get_logger().info('목표 승인됨')

def cancel_goal(self):
if self._goal_handle:
self.get_logger().info('목표 취소 중...')
cancel_future = self._goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.cancel_done_callback)

def cancel_done_callback(self, future):
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.get_logger().info('목표 취소됨')
else:
self.get_logger().warn('취소 실패')

CLI 취소

ROS2 Humble CLI 제약사항

ROS2 Humble CLI에서는 단일 명령으로 액션을 취소할 수 없습니다. 위의 Python 예시를 사용하거나, 로봇 모드를 BRAKED로 전환하여 궤적 실행을 중단하세요.

# 모드 전환으로 궤적 중단
ros2 action send_goal /wim_rt_driver/set_mode wim_msgs/action/SetMode "{target_mode: 0}"

에러 처리

결과 코드

코드의미조치
SUCCEEDED목표 달성정상 완료
CANCELED사용자 취소취소 보고
ABORTED서버 에러에러 로깅, 재시도
UNKNOWN예상치 못한 상태조사 필요

일반적인 에러 시나리오

목표 거부:

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('목표 거부됨 - 원인:')
self.get_logger().error('- SafetyMode 확인 (NORMAL이어야 함)')
self.get_logger().error('- 파라미터 범위 확인')
self.get_logger().error('- 하드웨어 준비 상태 확인')
return

실행 중 타임아웃:

def get_result_callback(self, future):
result = future.result().result
if not result.success and 'timeout' in result.message.lower():
self.get_logger().error('타임아웃 발생 - 복구 시도')
# 모드를 BRAKED로 리셋
# 또는 안전 상태 확인

안전 위반:

# 주기적으로 안전 상태 확인
def feedback_callback(self, feedback_msg):
if feedback_msg.feedback.current_safety_mode.safety_mode != 0: # NORMAL
self.get_logger().warn('안전 위반 감지 - 목표 취소')
self.cancel_goal()

디버깅

액션 서버 상태 확인

# 사용 가능한 액션 서버
ros2 action list

# 액션 인터페이스 정보
ros2 action info /wim_rt_driver/set_mode

# 액션 타입 정의
ros2 interface show wim_msgs/action/SetMode

액션 트래픽 모니터링

# 목표 전송 모니터링
ros2 topic echo /wim_rt_driver/set_mode/_action/send_goal

# 피드백 모니터링
ros2 topic echo /wim_rt_driver/set_mode/_action/feedback

# 결과 모니터링
ros2 topic echo /wim_rt_driver/set_mode/_action/get_result

로깅

# 액션 서버 로그
ros2 topic echo /rosout | grep ModeControlActionServer

# 특정 노드 로그 레벨 설정
ros2 run rqt_console rqt_console

요약

핵심 요점:

  1. 표준 액션: FollowJointTrajectory (궤적 실행), SetMode (모드 제어), PlanTrajectory (모션 계획)
  2. 런치 구조:
    • plem_bringup/launch/ros2_control.launch.py: FollowJointTrajectory + SetMode 액션 서버
    • plem_bringup/launch/moveit.launch.py: PlanTrajectory 액션 서버
    • plem_bringup/launch/plem_launch.py: 모든 컴포넌트 자동 통합
  3. 패키지 위치:
    • FollowJointTrajectory: joint_trajectory_controller (ROS2 Control 표준)
    • SetMode/PlanTrajectory: plem_ros2_adapter (PLEM 커스텀)
  4. 호출 방법: CLI (ros2 action send_goal), Python (ActionClient), C++ (rclcpp_action::Client)
  5. 에러 처리: 목표 거부 확인, 결과 코드 검증, 안전 상태 모니터링
  6. 취소: cancel_goal_async() (Python), async_cancel_goal() (C++)
  7. 디버깅: ros2 action list/info, /rosout 로그, 피드백 토픽

다음 단계: