admin管理员组

文章数量:1627747

DCPS Infrastructure

  • Infrastructure Module
    • Entity
        • Entity Identifier
        • QoS policy
        • Listener
        • Status
        • StatusCondition
        • Enabling Entities
    • QosPolicy
    • Status
      • Status 定义
      • StatusMask 定义
      • Listener callback定义
      • Listener callback的实现
    • Condition and WaitSet

Infrastructure Module


从Infrastructure模块的类图中可以看出,其主要包含如下几个部分:

  • Entity
  • QosPolicy
  • Status
  • Condition
  • WaitSet

其中Entity类派生出了DomainEntity类和DomainParticipant类;Condition派生出了StatusCondition类,GuardCondition类和ReadCondition类

Entity

Entity可以理解为DDS标准中执行以数据为中心的发布订阅模型的实体,Entity包含QoS 策略,Listener和Status condition,通过改变其包含的这些内容可以控制其行为;
从C++的角度来讲Entity是所有DDS实体的一个抽象基类,其继承关系如下:

DDS Entity包含的共同特征如下:

  • Entity Identifier
  • QoS policy
  • Listener
  • Status
  • StatusCondition
  • Enabling Entities

对它们的详细解释如下:

Entity Identifier

每一个Entity都有一个独一无二的Entity ID,可以通过其方法InstanceHandle_t 获取。

inline const InstanceHandle_t &get_instance_handle() const

QoS policy

大家都知道,DDS的最大特点就是其具有丰富的Qos策略来控制通信行为。所有的DDS Entity都可以通过set_qos,get_qos来设置或者获取Qos。
以DomainParticipant Entity为例

ReturnCode_t set_qos(const DomainParticipantQos &qos) const
Parameters:qos – DomainParticipantQos to be set
Returns:RETCODE_IMMUTABLE_POLICY if any of the Qos cannot be changed, RETCODE_INCONSISTENT_POLICY if the Qos is not self consistent and RETCODE_OK if the qos is changed correctly.

ReturnCode_t get_qos(DomainParticipantQos &qos) const
Parameters:qos – DomainParticipantQos reference where the qos is going to be returned
Returns:RETCODE_OK

Listener

Listener定义了一Callback函数,这写接口都是虚函数,默认未定义其内容,只有用户需要在特定条件(StatusMask满足某种条件)下需要实现某些动作的时候可以在Listener中实现。

Status

Status其实就是Communication Status,这些状态变化就是Listener的触发的基础。

StatusCondition

StatusCondition表示的是Entity的状态变化,不一样的是StatusCondition主要与Wait-Set的配合,组成了一套DDS的等待某种特定条件然后获取数据的通信方式。

Enabling Entities

对应的是Entity类下面的enable方法,默认情况下创建的Entity都是enable的,但是这个可以通过修改EntityFactoryQosPolicy来更改。

QosPolicy

Qos可以说是DDS通信方式最大特征,也是DDS区别于SOME/IP的主要内容,用户主要用Qos来进行灵活的通信行为配置,配置对象主要为各种Qos Policy。
DDS官方标准定义了一套Qos Policy,其相互关系如下UML图;另外,不同DDS的实现厂商也定义了一些Qos策略。

对DDS官方标准 Qos Policy整理如下:

QosPolicyData Member NameTypeDescriptionConcern to Entity
DeadlineQosPolicyperiodDuration_tc_TimeInfinite(默认)
其实就是定义了DataWriter/DataReader的发送/接收最大时间
Topic, DataReader and DataWriter
DestinationOrderQosPolicykindDestinationOrderQosPolicyKindBY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS
BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS(非默认)
Topic, DataReader and DataWriter
DurabilityQosPolicykindDurabilityQosPolicyKind(4个值)默认情况下:VOLATILE_DURABILITY_QOS for DataReaders
TRANSIENT_LOCAL_DURABILITY_QOS for DataWriters
Topic, DataReader and DataWriter
DurabilityServiceQosPolicyTopic and DataWriter
GroupDataQosPolicycollectionstd::vectorEmpty vector(默认)
配合DataWriter and DataReader listeners 使用来实现配对策略
Publisher and Subscriber
HistoryQosPolicykind
depth
HistoryQosPolicyKind
int32_t
HistoryQosPolicyKind:(KEEP_LAST_HISTORY_QOS(默认),KEEP_ALL_HISTORY_QOS)控制Instance被DataReader读取之前的变化次数Topic, DataWriter and DataReader
LatencyBudgetQosPolicydurationDuration_tc_TimeZero(默认=0)
用于指定数据从被写到进入到DataReader History的最大可接受时间
Topic, DataWriter and DataReader
LifespanQosPolicydurationDuration_tc_TimeInfinite(默认)
表示数据被DataWriter写之后的存活时间
Topic, DataReader and DataWriter
LivelinessQosPolicykind
lease_duration
announcement_period
LivelinessQosPolicyKind
Duration_t
Duration_t
默认情况下:AUTOMATIC_LIVELINESS_QOS
c_TimeInfinite
c_TimeInfinite
Topic, DataReader and DataWriter
OwnershipQosPolicykindOwnershipQosPolicyKindSHARED_OWNERSHIP_QOSTopic, DataReader and DataWriter
OwnershipStrengthQosPolicyvalueuint32_t默认=0DataWriter
PartitionQosPolicymax_size
names(各个partition name)
uint32_t
SerializedPayload_t
0 (Length Unlimited)
Empty List
Publisher and Subscriber
PresentationQosPolicy
ReaderDataLifecycleQosPolicy
ReliabilityQosPolicy
ResourceLimitsQosPolicy
TimeBasedFilterQosPolicy
TopicDataQosPolicy
TransportPriorityQosPolicy
UserDataQosPolicy
WriterDataLifecycleQosPolicy

Status

DDS针对于不同Entity定义了不同的用于判断其状态的对象Status。

Status 定义

实际上它就是定义了不同的结构体,以LivelinessChangedStatus的定义为例:

struct LivelinessChangedStatus
{
    //! @brief The total number of currently active publishers that write the topic read by the subscriber
    //! @details This count increases when a newly matched publisher asserts its liveliness for the first time
    //! or when a publisher previously considered to be not alive reasserts its liveliness. The count decreases
    //! when a publisher considered alive fails to assert its liveliness and becomes not alive, whether because
    //! it was deleted normally or for some other reason
    int32_t alive_count = 0;

    //! @brief The total count of current publishers that write the topic read by the subscriber that are no longer
    //! asserting their liveliness
    //! @details This count increases when a publisher considered alive fails to assert its liveliness and becomes
    //! not alive for some reason other than the normal deletion of that publisher. It decreases when a previously
    //! not alive publisher either reasserts its liveliness or is deleted normally
    int32_t not_alive_count = 0;

    //! @brief The change in the alive_count since the last time the listener was called or the status was read
    int32_t alive_count_change = 0;

    //! @brief The change in the not_alive_count since the last time the listener was called or the status was read
    int32_t not_alive_count_change = 0;

    //! @brief Handle to the last publisher whose change in liveliness caused this status to change
    InstanceHandle_t last_publication_handle;
};

StatusMask 定义

以及在StatusMask的类定义中会定义每一个Status具体由StatusMask中哪一个位表示:

/**
 * @brief
 * StatusMask is a bitmap or bitset field.
 *
 * This bitset is used to:
 * - determine which listener functions to call
 * - set conditions in dds::core::cond::StatusCondition
 * - indicate status changes when calling dds::core::Entity::status_changes
 */

class RTPS_DllAPI StatusMask : public std::bitset<FASTDDS_STATUS_COUNT>
{
public:
    /**
     * Get the StatusMask associated with dds::core::status::LivelinessChangedStatus
     *
     * @return StatusMask liveliness_changed
     */
    inline static StatusMask liveliness_changed()
    {
        return StatusMask(0x00000001 << 12u);
    }
    ...

    /**
     * Get the statusmask associated with dds::core::status::PublicationMatchedStatus
     *
     * @return StatusMask publication_matched
     */
    inline static StatusMask publication_matched()
    {
        return StatusMask(0x00000001 << 13u);
    }
    ...
};

Listener callback定义

Status作为Listener callback的传入参数之一,其作用是特定执行动作的状态判断,因为liveliness_changed这个状态是针对于DataReader对象的,所以其定义在DataReaderListener的类定义中,如下所示:

/**
 * Class DataReaderListener, it should be used by the end user to implement specific callbacks to certain actions.
 * @ingroup FASTDDS_MODULE
 */
class DataReaderListener
{
public:
...
    /**
     * @brief Method called when the liveliness status associated to a subscriber changes
     *
     * @param reader The DataReader
     * @param status The liveliness changed status
     */
    RTPS_DllAPI virtual void on_liveliness_changed(
            DataReader* reader,
            const fastrtps::LivelinessChangedStatus& status)
    {
        (void)reader;
        (void)status;
    }
...
}

由此可见,on_liveliness_changed定义的是一个虚函数,具体的实现需要User去自己去定义实现的内容。

Listener callback的实现

我们在实例化一个DataReaderListener之后,如果需要on_liveliness_changed的调用,需要将其实现定义出来:

class CustmDataReaderListener :DataReaderListener
{
public:
...
    /**
     * @brief Method called when the liveliness status associated to a subscriber changes
     *
     * @param reader The DataReader
     * @param status The liveliness changed status
     */
     virtual void on_liveliness_changed(
            DataReader* reader,
            const fastrtps::LivelinessChangedStatus& status)
    {
        (void)reader;
        (void)status;
    }
...
}
void CustmDataReaderListener::on_liveliness_changed(
            DataReader* reader,
            const fastrtps::LivelinessChangedStatus& status) override
    {
        (void)reader;
        if (status.alive_count_change == 1)
        {
            std::cout << "Publisher recovered liveliness" << std::endl;
        }
        else if (status.not_alive_count_change == 1)
        {
            std::cout << "Publisher lost liveliness" << std::endl;
            run_ = false;
        }
    }

Condition and WaitSet

DDS中Condition和WaiSet配合使用,提供了一种允许Application同时等待多个Entity满足特定状态之后执行特定动作的机制:
其示例如下:

class ApplicationJob
{
    WaitSet wait_set_;
    GuardCondition terminate_condition_;
    std::thread thread_;

    void main_loop()
    {
        // Main loop is repeated until the terminate condition is triggered
        while (false == terminate_condition_.get_trigger_value())
        {
            // Wait for any of the conditions to be triggered
            ReturnCode_t ret_code;
            ConditionSeq triggered_conditions;
            ret_code = wait_set_.wait(triggered_conditions, eprosima::fastrtps::c_TimeInfinite);
            if (ReturnCode_t::RETCODE_OK != ret_code)
            {
                // ... handle error
                continue;
            }

            // Process triggered conditions
            for (Condition* cond : triggered_conditions)
            {
                StatusCondition* status_cond = dynamic_cast<StatusCondition*>(cond);
                if (nullptr != status_cond)
                {
                    Entity* entity = status_cond->get_entity();
                    StatusMask changed_statuses = entity->get_status_changes();

                    // Process status. Liveliness changed and data available are depicted as an example
                    if (changed_statuses.is_active(StatusMask::liveliness_changed()))
                    {
                        std::cout << "Liveliness changed reported for entity " << entity->get_instance_handle() <<
                            std::endl;
                    }

                    if (changed_statuses.is_active(StatusMask::data_available()))
                    {
                        std::cout << "Data avilable on reader " << entity->get_instance_handle() << std::endl;

                        FooSeq data_seq;
                        SampleInfoSeq info_seq;
                        DataReader* reader = static_cast<DataReader*>(entity);

                        // Process all the samples until no one is returned
                        while (ReturnCode_t::RETCODE_OK == reader->take(data_seq, info_seq,
                                LENGTH_UNLIMITED, ANY_SAMPLE_STATE,
                                ANY_VIEW_STATE, ANY_INSTANCE_STATE))
                        {
                            // Both info_seq.length() and data_seq.length() will have the number of samples returned
                            for (FooSeq::size_type n = 0; n < info_seq.length(); ++n)
                            {
                                // Only samples for which valid_data is true should be accessed
                                if (info_seq[n].valid_data)
                                {
                                    // Process sample on data_seq[n]
                                }
                            }

                            // must return the loaned sequences when done processing
                            reader->return_loan(data_seq, info_seq);
                        }
                    }
                }
            }
        }
    }

public:

    ApplicationJob(
            const std::vector<DataReader*>& readers,
            const std::vector<DataWriter*>& writers)
    {
        // Add a GuardCondition, so we can signal the processing thread to stop
        // 00.构造函数中将terminate_condition和status_condition添加到Waitset中 
        wait_set_.attach_condition(terminate_condition_);

        // Add the status condition of every reader and writer
        for (DataReader* reader : readers)
        {
            wait_set_.attach_condition(reader->get_statuscondition());
        }
        for (DataWriter* writer : writers)
        {
            wait_set_.attach_condition(writer->get_statuscondition());
        }

        thread_ = std::thread(&ApplicationJob::main_loop, this);
    }

    ~ApplicationJob()
    {
        // Signal the GuardCondition to force the WaitSet to wake up
        terminate_condition_.set_trigger_value(true);
        // Wait for the thread to finish
        thread_.join();
    }

};

// Application initialization
ReturnCode_t ret_code;
std::vector<DataReader*> application_readers;
std::vector<DataWriter*> application_writers;

// Create the participant, topics, readers, and writers.
ret_code = create_dds_application(application_readers, application_writers);
if (ReturnCode_t::RETCODE_OK != ret_code)
{
    // ... handle error
    return;
}

{
    ApplicationJob main_loop_thread(application_readers, application_writers);

    // ... wait for application termination signaling (signal handler, user input, etc)

    // ... Destructor of ApplicationJob takes care of stopping the processing thread
}

// Destroy readers, writers, topics, and participant
destroy_dds_application();

由以上示例可以看出,其对应的流程如下:

  1. 将一个Condition对象与wait-set关联
  2. 当wait-set会持续等待直到其关联的其中一个或者多个Condition的trigger value为TRUE
  3. 当条件满足的时候,取值

本文标签: 模块DDSDCPSInfrastructure