admin管理员组文章数量:1572326
状态机在ROS中的实现
- 1. 概念
- 2. SMACH的安装
- 3. SMACH的实现
- 3.1 状态机中状态的定义
- 3.2 状态机主函数的编写
- 3.3 完整Demo
- 3.4 状态间数据传输的实现
- 3.5 状态机的嵌套
- 3.6 多状态并行
- 4. 状态机在C++中的实现
- 5. 总结
1. 概念
在ROS中,一般使用SMACH来实现状态机。
SMACH即状态机,它是基于Python实现的一个功能强大且易于扩展的库,SMACH本质上并不依赖于ROS,其可以应用于任意Python项目。
在ROS中,executive_smach功能包把SMACH和ROS完美的集成在了一体,为机器人复杂应用开发提供任务级的状态机框架。
此外,该功能包还集成了actionlib和smach_viewer,用于管理action和状态机的可视化显示。
查看可视化状态机结构:
rosrun smach_viewer smach_viewer.py
2. SMACH的安装
sudo apt-get install ros-kinetic-executive-smach
sudo apt-get install ros-kinetic-executive-smach-visualization
建议:命令行一般不要直接复制,手动敲一敲会避免很多问题的产生,而且不会特别麻烦,只需要打几个字母TAB补充一下即可完成。
3. SMACH的实现
3.1 状态机中状态的定义
# 定义状态Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome2'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome2'
再强调一次,SMACH是Python的一个库,所以一般用Python来写
从以上代码可以看出,状态的定义都包含初始化__init__
和执行execute
两个函数:
-
初始化函数
初始化函数用来初始化该状态类,调用smach中状态的初始化函数,同事需要定义输出状态:[‘outcome1’,‘outcome2’]此处的outcome代表状态结束时的输出值,使用字符串来表示,每个状态的输出值可有多个,根据不同的输出值可跳转至不同的状态。
注意:初始化函数不能阻塞
-
执行函数
执行函数用于执行每个状态所对应的具体工作,可执行阻塞工作,工作结束后返回定义的输出值,则该状态完成。
3.2 状态机主函数的编写
# 主函数
def main():
rospy.init_node('smach_example_state_machine')
# 创建一个状态机
sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
# 打开状态机容器
with sm:
# 使用add方法添加状态到状态机容器当中
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome2':'FOO'})
# 创建并启动内部监测服务器
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
sis.start()
# 开始执行状态机
outcome = sm.execute()
# 等待退出
rospy.spin()
sis.stop()
-
首先初始化ROS节点,然后使用StateMachine创建一个状态机,且制定状态机执行完后的输出值[‘outcome4’, ‘outcome5’]
sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
-
使用add()方法添加需要的状态到状态机容器,同时需要设置状态间的跳转关系:
# 把状态FOO添加到状态机容器 # 如果FOO状态执行完输出outcome1,那么跳转到BAR状态; # 如果FOO状态执行完输出outcome2,那么结束此状态机并输出outcome4 smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'})
-
使用
IntrospectionServer()
方法创建内部监测服务器,用于状态机的可视化显示。sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT') # 参数1:观测服务器的名称,可根据需要自由给定 # 参数2:要观察的状态机 # 参数3:状态机的层级(可以用于查看状态机内部的状态机)
-
启动内部监测服务器
sis.start()
-
开始执行状态机,使用
execute()
方法来完成# 开始执行状态机 outcome = sm.execute()
-
执行结束后需要停止内部观测器
sis.stop()
3.3 完整Demo
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import smach
import smach_ros
# 定义状态Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome2'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome2'
# 主函数
def main():
rospy.init_node('smach_example_state_machine')
# 创建一个状态机
sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
# 打开状态机容器
with sm:
# 使用add方法添加状态到状态机容器当中
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome2':'FOO'})
# 创建并启动内部监测服务器
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
sis.start()
# 开始执行状态机
outcome = sm.execute()
# 等待退出
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
- 执行过程如下:
- 状态机开启后首先进入FOO状态
- 然后在该状态中累加counter
- counter<3时输出outcome1,FOO状态结束并跳至BAR状态
- BAR状态没有任何工作的执行,状态结束输出outcome2并跳至FOO状态
- 当counter = 3时,FOO输出outcome2,并跳至outcome4,状态机结束
在整个过程中,outcome5没有涉及状态转移,所以是孤立的
3.4 状态间数据传输的实现
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import smach
import smach_ros
# 定义状态Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self,
outcomes=['outcome1','outcome2'],
input_keys=['foo_counter_in'],
output_keys=['foo_counter_out'])
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if userdata.foo_counter_in < 3:
userdata.foo_counter_out = userdata.foo_counter_in + 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self,
outcomes=['outcome1'],
input_keys=['bar_counter_in'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
rospy.loginfo('Counter = %f'%userdata.bar_counter_in)
return 'outcome1'
def main():
rospy.init_node('smach_example_state_machine')
# 创建一个状态机
sm = smach.StateMachine(outcomes=['outcome4'])
sm.userdata.sm_counter = 0
# 打开状态机容器
with sm:
# 使用add方法添加状态到状态机容器当中
# 此处的remapping参数类似于ROS中的remapping重映射机制
# 在FOO状态中,传进的时候将sm_counter映射为foo_counter_in,传出的时候同样将sm_counter映射为foo_counter_out
# 在BAR状态中同上
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'},
remapping={'foo_counter_in':'sm_counter',
'foo_counter_out':'sm_counter'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome1':'FOO'},
remapping={'bar_counter_in':'sm_counter'})
# 创建并启动内部监测服务器
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
sis.start()
# 开始执行状态机
outcome = sm.execute()
# 等待退出
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
由上代码可以看到,在状态定义类中状态的初始化smach.State.__init__()
多了两个参数:
input_keys=['foo_counter_in']、output_keys=['foo_counter_out']
此外,状态执行函数execute()
中也多了参数userdata
,这就是存储状态之间所传递数据的容器,FOO状态的输入/输出数据foo_counter_in
和foo_counter_out
就储存在userdata中,因此在执行状态动作时如果访问和修改数据就需要使用userdata.foo_counter_in
和userdata.foo_counter_out
的形式来实现。
值得注意的是,在主函数中,通过remapping重映射参数将变量sm_counter重映射为了一堆名称,这些名称供各个状态机使用,本质上都是对sm_counter进行操作。
3.5 状态机的嵌套
状态机类似于一个容器,对于状态机的嵌套,主要在主函数中来实现,在内层状态机中状态的定义与添加与普通非嵌套的状态机相同,参考以下例子:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import smach
import smach_ros
# 定义状态Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome1'
# 定义状态Bas
class Bas(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome3'])
def execute(self, userdata):
rospy.loginfo('Executing state BAS')
return 'outcome3'
def main():
rospy.init_node('smach_example_state_machine')
# 创建一个顶层状态机
sm_top = smach.StateMachine(outcomes=['outcome5'])
# 打开状态机容器
with sm_top:
smach.StateMachine.add('BAS', Bas(),
transitions={'outcome3':'SUB'})
# 创建一个内嵌的状态机
sm_sub = smach.StateMachine(outcomes=['outcome4'])
# 打开状态机容器
with sm_sub:
# 使用add方法添加状态到状态机容器当中
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome1':'FOO'})
smach.StateMachine.add('SUB', sm_sub,
transitions={'outcome4':'outcome5'})
# 创建并启动内部监测服务器
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm_top, '/SM_ROOT')
sis.start()
# 开始执行状态机
outcome = sm_top.execute()
# 等待退出
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
3.6 多状态并行
对于多状态的并行,其状态的定义与上相同,区别主要还是在于主函数中,通常使用Concurrence()
方法来实现,代码如下:
# default_outcome: 表示该状态机的默认输出为outcome4
# outcome_map: 设置了状态机同步运行的状态跳转
# 当FOO状态输出为outcome2,且BAR状态输出为outcome1时状态机才会输出outcome5.
sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
default_outcome='outcome4',
outcome_map={'outcome5':
{ 'FOO':'outcome2',
'BAR':'outcome1'}})
多状态并行Demo代码如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
import smach
import smach_ros
# 定义状态Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome1'
# 定义状态Bas
class Bas(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome3'])
def execute(self, userdata):
rospy.loginfo('Executing state BAS')
return 'outcome3'
def main():
rospy.init_node('smach_example_state_machine')
# 创建一个顶层状态机
sm_top = smach.StateMachine(outcomes=['outcome6'])
# 打开状态机容器
with sm_top:
smach.StateMachine.add('BAS', Bas(),
transitions={'outcome3':'CON'})
# 创建一个内嵌的状态机
sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
default_outcome='outcome4',
outcome_map={'outcome5':
{ 'FOO':'outcome2',
'BAR':'outcome1'}})
# 打开状态机容器
with sm_con:
# 使用add方法添加状态到状态机容器当中
smach.Concurrence.add('FOO', Foo())
smach.Concurrence.add('BAR', Bar())
smach.StateMachine.add('CON', sm_con,
transitions={'outcome4':'CON',
'outcome5':'outcome6'})
# 创建并启动内部监测服务器
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm_top, '/SM_ROOT')
sis.start()
# 开始执行状态机
outcome = sm_top.execute()
# 等待退出
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
4. 状态机在C++中的实现
一般通过while()
循环中的switch-case
结构来实现。
具体请参见本博客博文《状态机在运动控制中的应用》。
5. 总结
状态机是一种非常好的建模工具,能把复杂的逻辑简化,利于复杂程序逻辑的编写。
非常适用于结构化,尤其涉及应用任务级的任务,但是如果仅仅为了拆分模块,不建议适用状态机,因为状态机相比原程序会平白无故的添加很多判断语句,从而造成执行效率低下的情况。
编不下去了,总之状态机这玩意儿就是好用……
参考文献:
[1]胡春旭.ROS机器人开发实践[M].机械工业出版社:北京,2018:414.
版权声明:本文标题:ROS学习笔记8 —— 状态机在ROS中的实现 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1727724642a1127078.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论