admin管理员组

文章数量:1572326

状态机在ROS中的实现

  • 1. 概念
  • 2. SMACH的安装
  • 3. SMACH的实现
    • 3.1 状态机中状态的定义
    • 3.2 状态机主函数的编写
    • 3.3 完整Demo
    • 3.4 状态间数据传输的实现
    • 3.5 状态机的嵌套
    • 3.6 多状态并行
  • 4. 状态机在C++中的实现
  • 5. 总结

1. 概念

在ROS中,一般使用SMACH来实现状态机。

SMACH即状态机,它是基于Python实现的一个功能强大且易于扩展的库,SMACH本质上并不依赖于ROS,其可以应用于任意Python项目。

在ROS中,executive_smach功能包把SMACH和ROS完美的集成在了一体,为机器人复杂应用开发提供任务级的状态机框架。

此外,该功能包还集成了actionlib和smach_viewer,用于管理action和状态机的可视化显示。

查看可视化状态机结构:

rosrun smach_viewer smach_viewer.py

2. SMACH的安装

sudo apt-get install ros-kinetic-executive-smach
sudo apt-get install ros-kinetic-executive-smach-visualization

建议:命令行一般不要直接复制,手动敲一敲会避免很多问题的产生,而且不会特别麻烦,只需要打几个字母TAB补充一下即可完成。

3. SMACH的实现

3.1 状态机中状态的定义

# 定义状态Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'

# 定义状态Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome2'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome2'

再强调一次,SMACH是Python的一个库,所以一般用Python来写

从以上代码可以看出,状态的定义都包含初始化__init__和执行execute两个函数:

  1. 初始化函数
    初始化函数用来初始化该状态类,调用smach中状态的初始化函数,同事需要定义输出状态:[‘outcome1’,‘outcome2’]

    此处的outcome代表状态结束时的输出值,使用字符串来表示,每个状态的输出值可有多个,根据不同的输出值可跳转至不同的状态。

    注意:初始化函数不能阻塞

  2. 执行函数
    执行函数用于执行每个状态所对应的具体工作,可执行阻塞工作,工作结束后返回定义的输出值,则该状态完成。

3.2 状态机主函数的编写

# 主函数
def main():
    rospy.init_node('smach_example_state_machine')

    # 创建一个状态机
    sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])

    # 打开状态机容器
    with sm:
        # 使用add方法添加状态到状态机容器当中
        smach.StateMachine.add('FOO', Foo(), 
                               transitions={'outcome1':'BAR', 
                                            'outcome2':'outcome4'})
        smach.StateMachine.add('BAR', Bar(), 
                               transitions={'outcome2':'FOO'})
    
    # 创建并启动内部监测服务器
    sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
    sis.start()
    
    # 开始执行状态机
    outcome = sm.execute()
    
    # 等待退出
    rospy.spin()
    sis.stop()
  1. 首先初始化ROS节点,然后使用StateMachine创建一个状态机,且制定状态机执行完后的输出值[‘outcome4’, ‘outcome5’]

    sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
    
  2. 使用add()方法添加需要的状态到状态机容器,同时需要设置状态间的跳转关系:

    # 把状态FOO添加到状态机容器
    # 如果FOO状态执行完输出outcome1,那么跳转到BAR状态;
    # 如果FOO状态执行完输出outcome2,那么结束此状态机并输出outcome4
    
    smach.StateMachine.add('FOO', Foo(),  transitions={'outcome1':'BAR',   'outcome2':'outcome4'})
    
  3. 使用IntrospectionServer()方法创建内部监测服务器,用于状态机的可视化显示。

    sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
    # 参数1:观测服务器的名称,可根据需要自由给定
    # 参数2:要观察的状态机
    # 参数3:状态机的层级(可以用于查看状态机内部的状态机)
    
  4. 启动内部监测服务器

     sis.start()
    
  5. 开始执行状态机,使用execute()方法来完成

        # 开始执行状态机
        outcome = sm.execute()
    
  6. 执行结束后需要停止内部观测器

        sis.stop()
    

3.3 完整Demo

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy
import smach
import smach_ros

# 定义状态Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'

# 定义状态Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome2'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome2'

# 主函数
def main():
    rospy.init_node('smach_example_state_machine')

    # 创建一个状态机
    sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])

    # 打开状态机容器
    with sm:
        # 使用add方法添加状态到状态机容器当中
        smach.StateMachine.add('FOO', Foo(), 
                               transitions={'outcome1':'BAR', 
                                            'outcome2':'outcome4'})
        smach.StateMachine.add('BAR', Bar(), 
                               transitions={'outcome2':'FOO'})
    
    # 创建并启动内部监测服务器
    sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
    sis.start()
    
    # 开始执行状态机
    outcome = sm.execute()
    
    # 等待退出
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()

  • 执行过程如下:
    1. 状态机开启后首先进入FOO状态
    2. 然后在该状态中累加counter
    3. counter<3时输出outcome1,FOO状态结束并跳至BAR状态
    4. BAR状态没有任何工作的执行,状态结束输出outcome2并跳至FOO状态
    5. 当counter = 3时,FOO输出outcome2,并跳至outcome4,状态机结束

      在整个过程中,outcome5没有涉及状态转移,所以是孤立的

3.4 状态间数据传输的实现

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy
import smach
import smach_ros

# 定义状态Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, 
                             outcomes=['outcome1','outcome2'],
                             input_keys=['foo_counter_in'],
                             output_keys=['foo_counter_out'])

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if userdata.foo_counter_in < 3:
            userdata.foo_counter_out = userdata.foo_counter_in + 1
            return 'outcome1'
        else:
            return 'outcome2'


# 定义状态Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, 
                             outcomes=['outcome1'],
                             input_keys=['bar_counter_in'])
        
    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        rospy.loginfo('Counter = %f'%userdata.bar_counter_in)        
        return 'outcome1'
        

def main():
    rospy.init_node('smach_example_state_machine')

    # 创建一个状态机
    sm = smach.StateMachine(outcomes=['outcome4'])
    sm.userdata.sm_counter = 0

    # 打开状态机容器
    with sm:
        # 使用add方法添加状态到状态机容器当中
        # 此处的remapping参数类似于ROS中的remapping重映射机制
        # 在FOO状态中,传进的时候将sm_counter映射为foo_counter_in,传出的时候同样将sm_counter映射为foo_counter_out
        # 在BAR状态中同上
        smach.StateMachine.add('FOO', Foo(), 
                               transitions={'outcome1':'BAR', 
                                            'outcome2':'outcome4'},
                               remapping={'foo_counter_in':'sm_counter', 
                                          'foo_counter_out':'sm_counter'})
        smach.StateMachine.add('BAR', Bar(), 
                               transitions={'outcome1':'FOO'},
                               remapping={'bar_counter_in':'sm_counter'})

    # 创建并启动内部监测服务器
    sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
    sis.start()
    
    # 开始执行状态机
    outcome = sm.execute()
    
    # 等待退出
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()

由上代码可以看到,在状态定义类中状态的初始化smach.State.__init__()多了两个参数:
input_keys=['foo_counter_in']、output_keys=['foo_counter_out']

此外,状态执行函数execute()中也多了参数userdata,这就是存储状态之间所传递数据的容器,FOO状态的输入/输出数据foo_counter_in和foo_counter_out就储存在userdata中,因此在执行状态动作时如果访问和修改数据就需要使用userdata.foo_counter_inuserdata.foo_counter_out的形式来实现。

值得注意的是,在主函数中,通过remapping重映射参数将变量sm_counter重映射为了一堆名称,这些名称供各个状态机使用,本质上都是对sm_counter进行操作。

3.5 状态机的嵌套

状态机类似于一个容器,对于状态机的嵌套,主要在主函数中来实现,在内层状态机中状态的定义与添加与普通非嵌套的状态机相同,参考以下例子:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy
import smach
import smach_ros

# 定义状态Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'

# 定义状态Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'

# 定义状态Bas
class Bas(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome3'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAS')
        return 'outcome3'


def main():
    rospy.init_node('smach_example_state_machine')

    # 创建一个顶层状态机
    sm_top = smach.StateMachine(outcomes=['outcome5'])
    
    # 打开状态机容器
    with sm_top:

        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'SUB'})

        # 创建一个内嵌的状态机
        sm_sub = smach.StateMachine(outcomes=['outcome4'])

        # 打开状态机容器
        with sm_sub:

            # 使用add方法添加状态到状态机容器当中
            smach.StateMachine.add('FOO', Foo(), 
                                   transitions={'outcome1':'BAR', 
                                                'outcome2':'outcome4'})
            smach.StateMachine.add('BAR', Bar(), 
                                   transitions={'outcome1':'FOO'})

        smach.StateMachine.add('SUB', sm_sub,
                               transitions={'outcome4':'outcome5'})

    # 创建并启动内部监测服务器
    sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm_top, '/SM_ROOT')
    sis.start()
    
    # 开始执行状态机
    outcome = sm_top.execute()
    
    # 等待退出
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()

3.6 多状态并行

对于多状态的并行,其状态的定义与上相同,区别主要还是在于主函数中,通常使用Concurrence()方法来实现,代码如下:

# default_outcome:	表示该状态机的默认输出为outcome4
# outcome_map:		设置了状态机同步运行的状态跳转
# 当FOO状态输出为outcome2,且BAR状态输出为outcome1时状态机才会输出outcome5.
sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
                                   default_outcome='outcome4',
                                   outcome_map={'outcome5':
                                       { 'FOO':'outcome2',
                                         'BAR':'outcome1'}})

多状态并行Demo代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy
import smach
import smach_ros

# 定义状态Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'

# 定义状态Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'
        
# 定义状态Bas
class Bas(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome3'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAS')
        return 'outcome3'

def main():
    rospy.init_node('smach_example_state_machine')

    # 创建一个顶层状态机
    sm_top = smach.StateMachine(outcomes=['outcome6'])
    
    # 打开状态机容器
    with sm_top:

        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'CON'})

        # 创建一个内嵌的状态机
        sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
                                   default_outcome='outcome4',
                                   outcome_map={'outcome5':
                                       { 'FOO':'outcome2',
                                         'BAR':'outcome1'}})

        # 打开状态机容器
        with sm_con:
            # 使用add方法添加状态到状态机容器当中
            smach.Concurrence.add('FOO', Foo())
            smach.Concurrence.add('BAR', Bar())

        smach.StateMachine.add('CON', sm_con,
                               transitions={'outcome4':'CON',
                                            'outcome5':'outcome6'})

    # 创建并启动内部监测服务器
    sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm_top, '/SM_ROOT')
    sis.start()

    # 开始执行状态机
    outcome = sm_top.execute()

    # 等待退出
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()

4. 状态机在C++中的实现

一般通过while()循环中的switch-case结构来实现。

具体请参见本博客博文《状态机在运动控制中的应用》。

5. 总结

状态机是一种非常好的建模工具,能把复杂的逻辑简化,利于复杂程序逻辑的编写。

非常适用于结构化,尤其涉及应用任务级的任务,但是如果仅仅为了拆分模块,不建议适用状态机,因为状态机相比原程序会平白无故的添加很多判断语句,从而造成执行效率低下的情况。

编不下去了,总之状态机这玩意儿就是好用……


参考文献:
[1]胡春旭.ROS机器人开发实践[M].机械工业出版社:北京,2018:414.

本文标签: 学习笔记状态机ros