admin管理员组

文章数量:1572325

SMACH

BehaviorTree

一、简述

1.1 状态机简介

有限状态机(Finite-state machine, FSM),又称有限状态自动机,简称状态机,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学模型。FSM是一种算法思想,简单而言,有限状态机由一组状态、一个初始状态、输入和根据输入及现有状态转换为下一个状态的转换函数组成。


Smach代表"状态机",它是一种基于python的强大的、可伸缩的分级状态机库。Smach库不依赖于ROS,并且可以在任何Python项目中使用。executive_smach堆栈提供了与ROS非常好的集成,包括平滑actionlib集成和强大的Smach查看器来可视化和内部化状态机。

1.2 状态机的应用

  • 适合使用状态机

    当你希望机器人执行一些复杂的计划时,SMACH是有用的,其中所有可能的状态和状态转换都可以被明确地描述。这基本上是将不同模块组合成为一个系统,就像操作移动机器人一样这么有趣。

    • 基于快速原型: 基于Python的SMACH语法很明确,简便快捷的制造一个原型状态机并运行它。
    • 复杂的状态机: SMACH允许你设计、维护和调试大型、复杂的层次状态机。你可以在这里找到一个复杂的层次状态机例子。
    • 内省: SMACH让你充分内省状态机、状态转换和数据流等。查看smach_viewer获得更多细节。
  • 不适合使用状态机

    • 非结构化的任务: 对于你的非结构化任务,SMACH将调度不足。
    • 低级系统: 对于要求高效率的低级系统,SMACH并不意味着可以作为一个状态机使用,SMACH是一个任务级体系。
    • Smash: 当你想要使用smash的时候,不要使用SMACH,这时候使用smash.

二、状态机详解

2.1 概念和术语


状态存储关于过去的信息,就是说:它反映从系统开始到现在时刻的输入变化。转移指示状态变更,并且用必须满足确使转移发生的条件来描述它。动作是在给定时刻要进行的活动的描述。有多种类型的动作:

  • 进入动作(entry action):在进入状态时进行
  • 退出动作:在退出状态时进行
  • 输入动作:依赖于当前状态和输入条件进行
  • 转移动作:在进行特定转移时进行

FSM(有限状态机)可以使用上图那样的状态图(或状态转移图)来表示。此外可以使用多种类型的状态转移表。下面展示最常见的表示:当前状态(B)和条件(Y)的组合指示出下一个状态(C)。完整的动作信息可以只使用脚注来增加。包括完整动作信息的FSM定义可以使用状态表。

2.2 特征

  • 状态总数(state)是有限的。
  • 任一时刻,只处在一种状态之中。
  • 某种条件下,会从一种状态转变(transition)到另一种状态。

2.3 重要概念

  • 状态(State):表示对象的某种形态,在当前形态下可能会拥有不同的行为和属性。
  • 转移(Transition):表示状态变更,并且必须满足确使转移发生的条件来执行。
  • 动作(Action):表示在给定时刻要进行的活动。
  • 事件(Event):事件通常会引起状态的变迁,促使状态机从一种状态切换到另一种状态。

三、smach的使用

  • Containers(容器):包含状态
    • StateMachine容器 基本
    • Concurrence容器 并行
    • Sequence容器
    • Iterator容器
    • 用actionlib封装容器 构建action service
  • states(状态):基本操作单元
    • state 通用状态
    • CBState
    • SimpleActionState (ROS) ros action
    • ServiceState (ROS) ros Service
    • MonitorState (ROS) ros topic

3.1 容器

StateMachine
from smach import State,StateMachine

# define state Foo
# 继承通用状态,构建Foo状态
class Foo(State):
    def __init__(self):
        State.__init__(self, 
                       outcomes=['outcome1','outcome2'],
                       input_keys=['foo_counter_in'],
                       output_keys=['foo_counter_out'])
        self.counter = 0
	
    # 响应调用的函数
    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'
        
# define state Bar
class Bar(State):
    def __init__(self):
        State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'

class main():
        rospy.init_node('smach_example_state_machine')

    # Create a SMACH state machine
    sm = StateMachine(outcomes=[])

    # Open the container
    with sm:
        # Add states to the container
        StateMachine.add('FOO', 
                          Foo(), 
                          transitions={'outcome1':'BAR', 
                                       'outcome2':'outcome4'},
                          remapping={'foo_counter_in':'sm_counter', 
                                     'foo_counter_out':'sm_counter'})
        StateMachine.add('BAR', 
                         Bar(), 
                         transitions={'outcome1':'FOO'})

        # 内部监视器
        intro_server = IntrospectionServer('sm_IntrospectionServer', sm, '/SM_ROOT')
        intro_server.start()

        # Execute the state machine
        sm_outcome = sm_vison_work.execute()

        rospy.spin()

        intro_server.stop()

说明

  1. 初始化

    __init__(self, outcomes, input_keys=[], output_keys=[])

    • outcomes,转移
    • input_keys,输入值
    • output_keys,输出值
  2. 添加状态

    add(label, state, transitions=None, remapping=None)

    • label,状态名字
    • state,状态
    • transitions,状态机状态跳转
    • remapping,状态机的输入输出映射到整体空间中,用于数据传递,input_keys为输入,output_keys为输出
  3. 内部检测服务器,用于观察状态机状态

    IntrospectionServer(name, StateMachine,level)

    • name,观测服务器名字
    • StateMachine,状态机
    • level,状态机层级
Concurrence

Concurrence可用于构建并行状态机

import roslib; roslib.load_manifest('smach_tutorials')
import rospy
import smach
import smach_ros

# define state Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'

# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'
        
# define state Bas
class Bas(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome3'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAS')
        return 'outcome3'

def main():
    rospy.init_node('smach_example_state_machine')

    # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['outcome6'])
    
    # Open the container
    with sm_top:
        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'CON'})

        # Create the sub SMACH state machine
        sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
                                   default_outcome='outcome4',
                                   outcome_map={'outcome5':
                                       { 'FOO':'outcome2',
                                         'BAR':'outcome1'}})

        # Open the container
        with sm_con:
            # Add states to the container
            smach.Concurrence.add('FOO', Foo())
            smach.Concurrence.add('BAR', Bar())

        smach.StateMachine.add('CON', sm_con,
                               transitions={'outcome4':'CON',
                                            'outcome5':'outcome6'})

    # Execute SMACH plan
    outcome = sm_top.execute()

if __name__ == '__main__':
    main()

说明

  1. 初始化

    __init__(self, outcomes, default_outcome, input_keys=[], output_keys=[], outcome_map={}, outcome_cb=None, child_termination_cb=None)

    • outcome_map,指定基于其子结果确定并发结果的策略,即可根据多个子结果的组合输出指定输出,若映射没有满足,则返回default_outcome。
    • outcome_cb,called once when the last child state terminates. This callback returns the outcome of the concurrence state machine.
    • child_termination_cb,called every time one of the child states terminates. In the callback function you can decide if the state machine should keep running (return False), or if it should preempt all remaining running states (return True).
  2. 添加状态

    add(label, state, remapping={})

    和StateMachine类似,但少了transitions,这部分通过outcome_map实现。

3.2 状态

MonitorState

MonitorState可以订阅话题,进行回调处理

  1. 初始化状态

    __init__(self, topic, msg_type, cond_cb, max_checks=-1)

    • topic,题名
    • msg_type,消息类型
    • cond_cb,回调函数
    • max_checks,a limit on how many times the monitor_cb can be called before the MonitorState will return ‘valid’.
  2. 设置回调函数

    callback_function(self, userdata, msg)

    • userdata,状态机自定义消息输入输出

    • msg,话题消息

  3. 添加状态

    StateMachine.add(name, MonitorState, transitions, remapping)

    • name,状态机名字

    • MonitorState,话题类

    • transitions,使用话题类作为状态机时,跳转关系为固定的,需要全部定义同时不能添加额外跳转,否则报错

    • invalid,等于False,用于状态转移,其余两种状态会不断调用回调函数,不会转移到别的状态机

    • valid,等于True,调用回调函数

    • preempted,话题被占用

ServiceState
  1. 初始化状态

    __init__(self, service_name, service_spec, request=None, request_cb=None, request_cb_args=[], request_cb_kwargs={}, request_key=None, request_slots=[], response_cb=None, response_cb_args=[], response_cb_kwargs={}, response_key=None, response_slots=[], input_keys=[], output_keys=[], outcomes=[])
    
    • service_name,服务名
    • service_spec,服务类型,一般为.srv文件名字
    • request,请求数据,若请求数据不唯一,可使用request_cb或request_slots
    • response,返回结果,注意名字要与.srv中定义一致,若返回数据不唯一,可使用response_cb或response_slots
  2. 设置response_cb回调函数

    callback_function(self, userdata, response)

    • userdata,状态机自定义消息输入输出
    • response,服务返回结果
  3. 添加状态

    StateMachine.add(name, MonitorState, transitions, remapping)

    • name,状态机名字
    • ServiceState,服务类名字
    • transitions,使用服务类作为状态机时,跳转关系为固定的,需要全部定义同时不能添加额外跳转,否则报错
      • succeeded,服务调用成功
      • aborted,服务调用失败
      • preempted,服务被占用
SimpleActionState

simple_action_state只定义了goal和Result的行为,若需要接收feedback并做处理,可通过MonitorState订阅feedback话题构建并行状态机。

 smach.StateMachine.add(
     'GET_PATH',
      smach_ros.SimpleActionState(
          'move_base_flex/get_path',
          GetPathAction,
          goal_cb=Planning.planner_goal_cb,
          result_cb=Planning.planner_result_cb),
     transitions={'succeeded': 'succeeded',
                  'preempted': 'preempted',
                  'aborted': 'aborted'}
     )

 @staticmethod
 @smach.cb_interface(input_keys=['target_pose'])
 def planner_goal_cb(user_data, goal):
    goal.use_start_pose = False
    goal.tolerance = 0.2  # 20cm tolerance to the target
    goal.target_pose = user_data.target_pose
    goal.planner = 'GlobalPlanner'  # name of the planner to call see move base flex planners.yaml config

 @staticmethod
 @smach.cb_interface(
     output_keys=['outcome', 'message', 'path', 'cost', 'recovery_behavior'],
     outcomes=['succeeded', 'preempted', 'aborted'])
 def planner_result_cb(user_data, status, result):
    user_data.outcome = result.outcome
    if result.outcome == GetPathResult.SUCCESS:
        return 'succeeded'
    elif result.outcome == GetPathResult.CANCELED:
        return 'preempted'
    else:
        user_data.recovery_behavior = recovery_behavior
        rospy.loginfo("Set recovery behavior to %s", recovery_behavior)
        return 'aborted'

说明

初始化状态

__init__(self, action_name, action_spec, goal=None, goal_key=None, goal_slots=[], goal_cb=None, goal_cb_args=[], goal_cb_kwargs={}, result_key=None, result_slots=[], result_cb=None, result_cb_args=[], result_cb_kwargs={}, input_keys=[], output_keys=[], outcomes=[], exec_timeout=None, preempt_timeout=rospy.Duration(60.0), server_wait_timeout=rospy.Duration(60.0))
  • action_name,action服务名
  • action_spec,action类型,一般为.action文件名字
  • goal_cb,设置发送到action的goal
  • result_cb,action处理完成后返回result的回调

3.3 可视化

rosrun smach_viewer smach_viewer.py

四、问题解决

  • 问题描述

    启动状态机后无法通过Crtl+c退出程序。

  • 解决方案

    参考:Smach/Python does not respond to Ctrl+C when inside a Concurrency Container

    创建多线程,在线程池中执行sm.execute()

    修改前

    # Execute the state machine
    sm_outcome = sm.execute()
    

    修改后

    import threading
    from multiprocessing.pool import ThreadPool
    
    # Create a thread to execute the smach container
    pool = ThreadPool(processes=1)
    async_result = pool.apply_async(sm.execute)
    

参考

SMACH 教程

有限状态机(Python)

机器人任务规划:从状态机到形式系统

有限状态机

Smach/Python does not respond to Ctrl+C when inside a Concurrency Container

smach

smach_ros

本文标签: 进阶状态机rosSMACH