掌握ROS机器人通信系统架构
掌握动作的原理及其使用场景
掌握如何自定义一个动作
掌握使用C++和Python实现ROS动作服务器和客户端
本实验主要学习以下几方面的内容:
ROS通信架构
ROS动作的原理
自定义一个ROS动作
使用C++和Python实现ROS动作服务器和客户端
1.所需仪器设备
(1)“卓越之星”设备-用于功能调试;
(2)个人PC-用于应用程序的开发与展示;
2.注意事项
(1)个人PC需要安装NoMachine或者VSCode支持远程编程开发;
(2)运动实验尽可能将设备平放在地面上;
(3)实验开始前确保设备电量为满电,实验完成后关闭设备电源;
除了异步消息传递机制主题,同步消息传递机制服务,整体的参数,在导航,机械臂规划等长周期场景,我们需要一种通讯机制来实现对整个流程的响应与控制,动作(action)就出现了。
在 ROS 中,动作(Action)是一种用于处理异步任务的机制。动作可以看做是一种高级的消息类型,它包含了一个目标(Goal)、一个反馈(Feedback)和一个结果(Result)。动作客户端(Action
Client)可以向动作服务器(Action Server)发送一个目标,然后动作服务器会异步地执行这个目标,并向动作客户端发送反馈。一旦目标完成,动作服务器会将结果发送给动作客户端。
动作机制的优点在于可以处理长时间的、需要中途取消的任务,例如机器人导航、抓取物体等。在这些任务中,机器人需要执行一系列的动作,并且可能需要在任务进行过程中调整执行的策略。动作机制可以使机器人能够实时地向操作员报告任务的执行情况,并且可以在任务执行过程中取消任务。
在 ROS 中,动作由两个主要的类组成:actionlib::SimpleActionClient和actionlib::
SimpleActionServer。动作客户端用于发送目标和接收反馈和结果,动作服务器用于接收目标并执行任务,并向动作客户端发送反馈和结果。
动作机制是 ROS 中非常重要的一种机制,如果您需要实现长时间的、需要中途取消的任务,那么动作机制将会是您的不二选择。
与消息和服务类似,在 ROS 中自定义 action 流程如下所示:
首先确定动作的目标、反馈和结果
首先,您需要确定您的动作需要什么样的目标、反馈和结果。例如,如果您要编写一个抓取物体的动作,那么您的动作可能需要一个目标来指定要抓取的物体,一个反馈来报告机器人的抓取进度以及一个结果来报告机器人是否成功抓取物体。
然后创建动作文件,在 ROS 中,动作文件的格式为.action。您可以在功能包下新建一个 action 文件夹,在 action
目录下创建一个新的MyAction.action文件,例如:
# Define the goal
string object_name
---
# Define the result
bool success
---
# Define a feedback message
float32 progress
在上面的示例中,我们定义了一个名为MyAction的动作,该动作包含了一个字符串类型的目标object_name,一个浮点型的进度反馈progress和一个布尔型的结果success。
在 package.xml 文件中添加以下内容:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
这将会告诉 catkin 构建系统,您的包需要使用 ROS 消息生成库。
在 CMakeLists.txt 文件中添加以下内容:
find_package(catkin REQUIRED COMPONENTS
message_generation
actionlib
actionlib_msgs
)
add_action_files(
DIRECTORY action
FILES MyAction.action
)
generate_messages(
DEPENDENCIES
)
catkin_package(
CATKIN_DEPENDS message_runtime actionlib actionlib_msgs
)
这将会告诉 catkin 构建系统,您的包依赖于消息生成库,并且您的服务消息类型定义在 action 文件夹下 MyAction.action文件中。
在您的 C++代码中,您可以使用 class_5_pkg/MyActionAction.h
头文件来引用您自定义的消息类型,class_5_pkg代表包的名称,MyActionAction代表动作文件的文件名称(头文件可以在devel/include/class_5_pkg文件夹中找到)。
在您的 Python 代码中,您可以使用
from class_5_pkg.msg import MyActionAction,MyActionGoal,MyActionResult,MyActionFeedback
来引用您自定义的 action 类型,class_5_pkg 代表包的名称,msg
代表存放动作文件的文件夹名称,MyActionAction,MyActionGoal,MyActionResult,MyActionFeedback代表动作及其相关的 Topic 文件名称(py
引用文件可以在 devel/lib/python3/dist-packages/class_5_pkg/msg 文件夹中找到)。
服务端示例代码:
#include <ros/ros.h>
#include <actionlib/server/simple_action_server.h>
#include <class_5_pkg/MyActionAction.h>
class MyActionServer {
protected:
ros::NodeHandle nh_;
actionlib::SimpleActionServer<class_5_pkg::MyActionAction> as_;
std::string action_name_;
class_5_pkg::MyActionFeedback feedback_;
class_5_pkg::MyActionResult result_;
public:
MyActionServer(std::string name) :
as_(nh_, name, boost::bind(&MyActionServer::executeCB, this, _1), false),
action_name_(name) {
as_.start();
}
~MyActionServer(void) {}
void executeCB(const class_5_pkg::MyActionGoalConstPtr &goal) {
ros::Rate r(1);
bool success = true;
// 执行动作
for (int i = 1; i <= 10; i++) {
if (as_.isPreemptRequested() || !ros::ok()) {
ROS_INFO("%s: Preempted", action_name_.c_str());
as_.setPreempted();
success = false;
break;
}
feedback_.progress = i * 10;
ROS_INFO("%s: Executing, progress = %f", action_name_.c_str(), feedback_.progress);
as_.publishFeedback(feedback_);
r.sleep();
}
// 发送结果
if (success) {
result_.success = true;
ROS_INFO("%s: Succeeded", action_name_.c_str());
as_.setSucceeded(result_);
}
}
};
int main(int argc, char **argv) {
ros::init(argc, argv, "my_action_server");
MyActionServer server("my_action");
ros::spin();
return 0;
}
在上面的示例中,我们首先导入了必要的头文件,可以使用自定义的
action。然后,我们定义了一个名为MyActionServer的类,该类继承自actionlib::SimpleActionServer<my_package::MyAction>
,并实现了一个executeCB()回调函数。
在executeCB()
函数中,我们使用一个循环模拟动作的执行。在每次循环中,我们检查动作是否被取消了,如果是,则我们发送一个setPreempted()
信号;否则,我们更新反馈并发布它。
在循环完成后,如果动作成功完成,则我们设置结果并发送setSucceeded()信号。
在main()函数中,我们初始化了 ROS 节点,并创建了一个名为server的MyActionServer对象。然后,我们使用ros::spin()函数使节点保持运行状态。
客户端示例代码:
#include <ros/ros.h>
#include <actionlib/client/simple_action_client.h>
#include <actionlib/client/terminal_state.h>
#include <class_5_pkg/MyActionAction.h>
int main(int argc, char **argv) {
ros::init(argc, argv, "cre_client");
ros::NodeHandle nh;
// 创建动作客户端
actionlib::SimpleActionClient<class_5_pkg::MyActionAction> client("my_action", true);
// 等待服务器启动
ROS_INFO("Waiting for action server to start...");
client.waitForServer();
// 创建动作目标
class_5_pkg::MyActionGoal goal;
goal.object_name = "world";
// 发送目标并等待结果
ROS_INFO("Sending goal...");
client.sendGoal(goal);
bool finished_before_timeout = client.waitForResult(ros::Duration(30.0));
if (finished_before_timeout) {
actionlib::SimpleClientGoalState state = client.getState();
ROS_INFO("Action finished: %s", state.toString().c_str());
} else {
ROS_INFO("Action did not finish before the time out.");
}
return 0;
}
在上面的示例中,我们首先导入了必要的头文件,使得程序可以使用自定义的 action。然后,我们初始化了 ROS 节点,并创建了一个动作客户端client。
在client创建后,我们使用waitForServer()函数等待动作服务器启动。接下来,我们创建了一个动作目标goal,并设置了 object_name 字段。
然后,我们使用sendGoal()函数向动作服务器发送目标,并使用waitForResult()函数等待结果。如果动作在超时时间内完成,则我们输出结果;否则,我们输出超时信息。
编译运行后的效果:
可以看到上述 action 实现了响应,并反馈进度,最终反馈运行结果。
以下是一个基本的 Python 动作服务器,用于实现上述的动作:
#!/usr/bin/env python3
import rospy
import actionlib
from class_5_pkg.msg import MyActionAction,MyActionGoal,MyActionResult,MyActionFeedback
class MyActionServer(object):
def __init__(self, name):
self._action_name = name
self._as = actionlib.SimpleActionServer(self._action_name, MyActionAction, execute_cb=self.executeCB, auto_start = False)
self._as.start()
self._feedback = MyActionFeedback()
self._result = MyActionResult()
def executeCB(self, goal):
r = rospy.Rate(1)
success = True
# 执行动作
for i in range(1, 11):
if self._as.is_preempt_requested():
rospy.loginfo('{}: Preempted'.format(self._action_name))
self._as.set_preempted()
success = False
break
self._feedback.progress = i * 10
rospy.loginfo('{}: Executing, progress = {}'.format(self._action_name, self._feedback.progress))
self._as.publish_feedback(self._feedback)
r.sleep()
# 发送结果
if success:
self._result.success = True
rospy.loginfo('{}: Succeeded'.format(self._action_name))
self._as.set_succeeded(self._result)
if __name__ == '__main__':
rospy.init_node('my_server')
server = MyActionServer('my_action')
rospy.spin()
在上面的示例中,我们首先导入了必要的包,使得程序可以识别自定义
action。然后,我们定义了一个名为MyActionServer的类,该类继承自actionlib.SimpleActionServer,并实现了一个executeCB()回调函数。
在executeCB()
函数中,我们使用一个循环模拟动作的执行。在每次循环中,我们检查动作是否被取消了,如果是,则我们发送一个set_preempted()
信号;否则,我们更新反馈并发布它。
在循环完成后,如果动作成功完成,则我们设置结果并发送set_succeeded()信号。
在main()函数中,我们初始化了 ROS 节点,并创建了一个名为server的MyActionServer对象。然后,我们使用rospy.spin()函数使节点保持运行状态。
以下是一个基本的 Python 动作客户端,用于命令上述的动作:
#!/usr/bin/env python3
import rospy
import actionlib
from class_5_pkg.msg import MyActionAction,MyActionGoal,MyActionResult,MyActionFeedback
def feedback_cb(feedback):
rospy.loginfo('Progress: {}'.format(feedback.progress))
if __name__ == '__main__':
rospy.init_node('my_client')
client = actionlib.SimpleActionClient('my_action', MyActionAction)
client.wait_for_server()
# 创建动作目标
goal = MyActionGoal()
goal.object_name = 'world'
rospy.loginfo('Sending goal...')
# 发送目标并等待结果
client.send_goal(goal, feedback_cb=feedback_cb)
client.wait_for_result()
# 输出结果
result = client.get_result()
if result.success:
rospy.loginfo('Action succeeded')
else:
rospy.loginfo('Action failed')
在上面的示例中,我们首先导入了必要的包,使得程序可以识别自定义 action。然后,我们初始化了 ROS
节点,并创建了一个名为client的SimpleActionClient对象。
在client创建后,我们使用wait_for_server()函数等待动作服务器启动。接下来,我们创建了一个动作目标goal,并设置了target字段。
然后,我们使用send_goal()函数向动作服务器发送目标,并使用feedback_cb()回调函数接收反馈。我们还使用wait_for_result()函数等待结果。
最后,我们使用get_result()函数获取结果,并根据结果输出相应的信息,效果如下: