### TriorbSetPathのactionクライアント
参考：https://docs.ros.org/en/foxy/Tutorials/Intermediate/Writing-an-Action-Server-Client/Py.html

In [1]:
import shutil, os
import pandas as pd
import numpy as np
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TransformStamped
import copy
from std_msgs.msg import Empty
from triorb_drive_interface.msg import TriorbSetPos3 as TriorbSetPos3Topic
from triorb_drive_interface.msg import TriorbRunResult
from triorb_drive_interface.msg import DriveGains
from scipy.spatial.transform import Rotation as R

In [2]:
def quaternion_to_euler_zyx(q):
    r = R.from_quat([q[0], q[1], q[2], q[3]])
    return r.as_euler('zyx', degrees=False)

def get_topic_name(topic_name_raw):
    return (os.getenv('ROS_PREFIX', '')+'/'+topic_name_raw).replace("//","/")

In [3]:
waypoint_list = pd.DataFrame(index=[], columns=["x","y","deg","v_xy","v_w","tx","ty","tr"])

### クライアントノード実体

In [16]:
from rclpy.action import ActionClient
from triorb_drive_interface.action import TriorbSetPath
from triorb_drive_interface.msg import TriorbSetPos3

class SetPathActionClient(Node):

    def __init__(self):
        super().__init__('act_test')
        self._action_client = ActionClient(self, TriorbSetPath, '/action/drive/set_path')

    def send_goal(self, path_list):
        goal_msg = TriorbSetPath.Goal()
        goal_msg.path = path_list
        self.current_way = 0

        for _ in goal_msg.path:
            print(_)

        self._action_client.wait_for_server()
        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
        self._send_goal_future.add_done_callback(self.goal_response_callback)


    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            rclpy.shutdown()
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.result))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        if self.current_way == feedback.way_idx:
            print('route{}: pos({:.4f},{:.4f},{:.4f})'.format(feedback.way_idx, feedback.now.x, feedback.now.y, feedback.now.deg), end="          \r")
        else:
            print("")
        self.current_way = feedback.way_idx


### waypointをactionに変換し実行する関数定義

In [5]:
STD_ACC = 1000
STD_DEC = 1000
def run_action( waypoints ):
    try:
        rclpy.init()
    except:
        pass
    action_client = SetPathActionClient()

    path_list = []
    for idx, point in waypoints.iterrows():
        request = TriorbSetPos3Topic()
        request.pos.position.x = point.x
        request.pos.position.y = point.y
        request.pos.position.deg = point.deg
        request.pos.speed.xy = point.v_xy
        request.pos.speed.w = point.v_w
        request.pos.speed.acc = STD_ACC
        request.pos.speed.dec = STD_DEC
        request.setting.tx = point.tx
        request.setting.ty = point.ty
        request.setting.tr = point.tr
        request.setting.force = 1

        path_list.append(request)

    future = action_client.send_goal(path_list)
    rclpy.spin(action_client)



## Waypointを登録

In [13]:
try:
    rclpy.init()
except:
    pass

class SampleNodeClient(Node):
    def __init__(self):
        super().__init__('sample_node_notebook')
        self.timer = self.create_timer(5.0, self.callback_timer)
        self.subs = [self.create_subscription(TransformStamped, get_topic_name('/vslam/rig_tf'), self.callback_rig_tf, 10)]

    def destroy(self):
        _ = [self.destroy_subscription(_sub) for _sub in self.subs]
        self.destroy_node()
    
    def callback_timer(self):
        print("waiting for the massage")

    def callback_rig_tf(self, msg):
        global waypoint_list
        #print(str(msg))
        _x, _y = msg.transform.translation.x, msg.transform.translation.y
        _yaw, _pitch, _roll  = quaternion_to_euler_zyx([msg.transform.rotation.x, msg.transform.rotation.y, msg.transform.rotation.z, msg.transform.rotation.w])
        _df = pd.DataFrame([[_x, _y, -np.rad2deg(_yaw), 0.1, 3.14/2.0, 0.005, 0.005, 0.5, 1.0]], columns=["x","y","deg","v_xy","v_w","tx","ty","tr","sleep"])
        #_df = pd.DataFrame([[_x, _y, -np.rad2deg(_yaw), 0.5, 3.14, 0.05, 0.05, 1.0, 0.0]], columns=["x","y","deg","v_xy","v_w","tx","ty","tr","sleep"])

        waypoint_list = pd.concat([waypoint_list, _df]).reset_index(drop=True)
        raise KeyboardInterrupt
        
node = SampleNodeClient()
try:
    rclpy.spin(node)
except KeyboardInterrupt:
    print("KeyboardInterrupt")
node.destroy()
if waypoint_list.shape[0]:
    print(waypoint_list)

KeyboardInterrupt
   Unnamed: 0         x         y       deg  v_xy   v_w     tx     ty   tr  \
0         0.0  0.116607 -0.103098  0.141730   0.1  1.57  0.005  0.005  0.5   
1         NaN  0.102300 -0.461652  1.770067   0.1  1.57  0.005  0.005  0.5   
2         NaN  0.174298 -0.037320  0.913492   0.1  1.57  0.005  0.005  0.5   
3         NaN  0.181490 -0.040865  0.252377   0.1  1.57  0.005  0.005  0.5   
4         NaN  0.106763 -0.191495  0.628768   0.1  1.57  0.005  0.005  0.5   

   sleep  
0    1.0  
1    1.0  
2    1.0  
3    1.0  
4    1.0  


## 実行

In [15]:
run_action(waypoint_list)

triorb_drive_interface.msg.TriorbSetPos3(pos=triorb_drive_interface.msg.TriorbRunPos3(speed=triorb_drive_interface.msg.TriorbSpeed(acc=1000, dec=1000, xy=0.1, w=1.57), position=triorb_drive_interface.msg.TriorbPos3(x=0.1166073499593636, y=-0.1030979217945752, deg=0.1417295658673515)), setting=triorb_drive_interface.msg.TriorbRunSetting(tx=0.005, ty=0.005, tr=0.5, force=1))
triorb_drive_interface.msg.TriorbSetPos3(pos=triorb_drive_interface.msg.TriorbRunPos3(speed=triorb_drive_interface.msg.TriorbSpeed(acc=1000, dec=1000, xy=0.1, w=1.57), position=triorb_drive_interface.msg.TriorbPos3(x=0.10229976082432561, y=-0.46165202450769927, deg=1.770066593113423)), setting=triorb_drive_interface.msg.TriorbRunSetting(tx=0.005, ty=0.005, tr=0.5, force=1))
triorb_drive_interface.msg.TriorbSetPos3(pos=triorb_drive_interface.msg.TriorbRunPos3(speed=triorb_drive_interface.msg.TriorbSpeed(acc=1000, dec=1000, xy=0.1, w=1.57), position=triorb_drive_interface.msg.TriorbPos3(x=0.17429823073666423, y=-0.0373

[INFO] [1716979326.190435397] [act_test]: Goal accepted :)


route0: pos:(0.1151,-0.1070,0.3868)          
route1: pos:(0.1023,-0.4614,1.2881)          
route2: pos:(0.1727,-0.0414,0.9742)          
route3: pos:(0.1793,-0.0409,0.7188)          
route4: pos:(0.1090,-0.1859,0.5183)          

[INFO] [1716979350.835705500] [act_test]: Result: triorb_drive_interface.msg.TriorbRunResult(success=True, position=triorb_drive_interface.msg.TriorbPos3(x=0.108827143907547, y=-0.18873511254787445, deg=0.5082088708877563))


In [7]:
waypoint_list.to_csv("./waypoint.csv")

In [8]:
waypoint_list = pd.read_csv("./waypoint.csv")