总线设计"/>
消息总线设计
消息框架设计
消息定义
-
消息格式:msg+消息类型+消息名称msg.sys.msgname。
-
消息类型:
- 以msg.sys前缀开头的为系统消息。
- msg.othername前缀开头的为用户自定义小。
-
定义方式
- 字符串宏定义
- 平台消息统一存放 /include/msg.h头文件中。
//msg.h #define MSG_SYS_MSGNAME "msg.plat.msgname"
消息类型
同步消息
- 说明:发送消息立即到达接收端。
- 使用场景:同一线程内部发送消息。
异步消息
- 说明:发送消息为延迟到达接收端。
- 使用场景:子线程更新UI数据。
设计实现
所属工程
消息机制实现代码位于PlatFrame工程。
UML类图
消息监听器接口(IMessageListener)
- 用于监听制定消息数据,用户监听消息时需要继承此接口。
消息总线(MessageBus)
- 单例全局类。
- 消息管理,消息注册移除。
- 发送消息,包括发送同步消息及发送异步消息。
消息循环(MessageLoop)
- 主要用于异步消息传输,异步消息会首先加入消息循环队列,在合适的时机传递给消息接收者。
- 同步消息不需要经过消息循环队列,会直接发送给消息监听者。
- 由于异步消息的接收者要适应主线程(UI线程),所系异步的消息循环通过XRE的FrameMove机制去实现。
在FrameMove中向外投递消息。
使用示范
class TestListener : public AutoRegMessageListener
{
protected:virtual bool HandlerMessage(Message& msg){std::cout<<msg.msgName<<std::endl;std::string msgContent = std::any_cast<std::string>(msg.msgDatas[0]);std::cout<<msgContent<<std::endl;}
}void main()
{TestListener* testListener = new TestListener;PlatformFramework::getSingleton()->GetMessageBus()->RegisterMsgListener(testListener);PlatformFramework::getSingleton()->GetMessageBus()->SendMessage("com.plat.test","sendMessage Test");PlatformFramework::getSingleton()->GetMessageBus()->PostMessage("com.plat.test","postMessage Test");
}
源码
头文件
/*! \file MessageBus.h\brief 消息框架实现包含消息总线类及消息监听接口\version 1.0.0\date 2019/12/06*/#pragma once#include <map>
#include <vector>
#include <string>
#include <memory>
#include <any>
#include <mutex>
#include <deque>
#include <optional>namespace eZPlat
{using MessageData = std::any;using MessageDataList = std::vector<MessageData>;struct Message{std::string msgName;MessageDataList msgDatas;template <typename T>T* GetDataObj(uint32_t argIndex){if (msgDatas.size() <= argIndex){return nullptr;}try{return std::any_cast<T*>(msgDatas[argIndex]);}catch (const std::bad_any_cast& e){return nullptr;}}template <typename T>std::optional<T> GetDataValue(uint32_t argIndex){if (msgDatas.size() <= argIndex){return std::nullopt;}try{return std::any_cast<T>(msgDatas[argIndex]);}catch (const std::bad_any_cast&){return std::nullopt;}}};/// \brief 消息监听接口/// /// 不推荐直接继承,要实现消息监听/// 推荐直接继承AutoRegMessageListener接口class IMessageListener{friend class MessageBus;protected:virtual ~IMessageListener() = default;virtual bool HandlerMessage(Message&& msg) = 0;public:enum{MSG_PRIORITY_HIGH = 1000,MSG_PRIORITY_NORMAL = 10000,MSG_PRIORITY_LOW = 100000};protected:uint32_t m_priority = MSG_PRIORITY_NORMAL;// m_priority小优先级越高};/// \brief 带自动注册的消息监听接口/// /// 实际实现消息监听时应该继承该监听器/// 该监听器的内存维护需要由外部自己实现/// 当该监听器内存释放后,对应的消息对象会由/// 内部自动释放class PLATFORM_API AutoRegMessageListener{class _MessageListener : public IMessageListener{public:_MessageListener(AutoRegMessageListener* p);virtual bool HandlerMessage(Message&& msg) override;void SetMsgPriority(uint32_t priority) { m_priority = priority; }uint32_t GetMsgPriority() const { return m_priority; }private:AutoRegMessageListener* m_p = nullptr;};public:AutoRegMessageListener() {};AutoRegMessageListener(const std::vector<std::string>& msgList);virtual ~AutoRegMessageListener();void SetMsgPriority(uint32_t priority) { m_msgListener->SetMsgPriority(priority); }uint32_t GetMsgPriority() const { return m_msgListener->GetMsgPriority(); }protected:virtual bool HandlerMessage(Message& msg) = 0;virtual IMessageListener* GetMessageListener() { return m_msgListener; }private:_MessageListener* m_msgListener = new _MessageListener(this);};/// \brief 消息总线类/// /// 对象由PlatformFramework维护/// 支持同步消息、异步消息class PLATFORM_API MessageBus final{public:friend class PlatformFramework;void SetAsyMsgQueueSize(uint32_t asyMsgQueueSize) { m_asyMsgQueueSize = asyMsgQueueSize; }bool RegisterMsgListener(const std::string& msgName, IMessageListener* msgListener);void RemoveMsgListener(const std::string& msgName, IMessageListener* msgListener);void RemoveMsgListener(IMessageListener* msgListener);/// \brief 同步,主线程向主线程发送数据template<typename ...Args>void SendMessage(const std::string& msgName, Args... args){MessageDataList msgDatas;GetMessageData(msgDatas, args...);SendMessage(msgName, msgDatas);}void SendMessage(const std::string& msgName, MessageDataList& msgDatas);/// \brief 异步,子线程向主线程发送数据////// msgData尽量使用拷贝传值/// 如果对于数据库比较大的内存必须要使用堆内存/// 严禁传递栈指针/// 请确保两点其中一点:/// 1. 发送者明确知道该内存的生命周期(生命周期为全局的指针)/// 2. 堆内存指针使用智能指针传值std::shared_ptrtemplate<typename ...Args>void PostMessage(const std::string& msgName, const Args... args){MessageDataList msgDatas;GetMessageData(msgDatas, args...);PostMessage(msgName, msgDatas);}void PostMessage(const std::string& msgName, MessageDataList& msgDatas);/// \brief 异步消息循环由外部发起调用void OnAsynMessageLoop();private:MessageBus();~MessageBus();MessageBus(const MessageBus&) = delete;MessageBus operator=(const MessageBus&) = delete;void GetMessageData(MessageDataList& msgDatas){}void GetMessageData(MessageDataList& msgDatas, std::any msgData){msgDatas.push_back(msgData);}template<typename T, typename ...Args>void GetMessageData(MessageDataList& msgDatas, T msgData, Args... args){msgDatas.push_back(msgData);GetMessageData(msgDatas, args...);}private:std::multimap<std::string, IMessageListener*> m_msgMap;std::mutex m_msgChangeMutex;///< 消息管理锁std::mutex m_msgQueueMutex;///< 消息队列锁std::deque<Message> m_msgQueue;uint32_t m_asyMsgQueueSize = 10000;///< 消息队列大小默认10000};
}
源文件
#include "pch.h"
#include "MessageBus.h"//
namespace eZPlat
{AutoRegMessageListener::_MessageListener::_MessageListener(AutoRegMessageListener* p){m_p = p;}bool AutoRegMessageListener::_MessageListener::HandlerMessage(Message&& msg){return m_p->HandlerMessage(msg);}AutoRegMessageListener::AutoRegMessageListener(const std::vector<std::string>& msgList){for (auto& item : msgList){PlatformFramework::GetSingleton()->GetMessageBus()->RegisterMsgListener(item, m_msgListener);}}AutoRegMessageListener::~AutoRegMessageListener(){PlatformFramework::GetSingleton()->GetMessageBus()->RemoveMsgListener(m_msgListener);delete m_msgListener;}//MessageBus::MessageBus(){}MessageBus::~MessageBus(){}bool MessageBus::RegisterMsgListener(const std::string& msgName, IMessageListener* msgListener){if (nullptr == msgListener || msgName.empty()){return false;}std::lock_guard<std::mutex> lock(m_msgChangeMutex);auto range = m_msgMap.equal_range(msgName);for (auto item = range.first; item != range.second; ++item){if (item->second == msgListener){// msg exist!return false;}}m_msgMap.insert(std::make_pair(msgName, msgListener));return true;}void MessageBus::RemoveMsgListener(const std::string& msgName, IMessageListener* msgListener){std::lock_guard<std::mutex> lock(m_msgChangeMutex);auto range = m_msgMap.equal_range(msgName);for (auto item = range.first; item != range.second; ++item){if (item->second == msgListener){item = m_msgMap.erase(item);return;}else {++item;}}}void MessageBus::RemoveMsgListener(IMessageListener* msgListener){if (nullptr == msgListener){return;}std::lock_guard<std::mutex> lock(m_msgChangeMutex);for (auto item = m_msgMap.begin(); item != m_msgMap.end();){if (item->second == msgListener){item = m_msgMap.erase(item);}else {++item;}}}void MessageBus::SendMessage(const std::string& msgName, MessageDataList& msgDatas){std::multimap<uint32_t, IMessageListener*> priority2ListenerMap;{std::lock_guard<std::mutex> lock(m_msgChangeMutex);auto range = m_msgMap.equal_range(msgName);if (range.first == range.second){return;}for (auto item = range.first; item != range.second; ++item){priority2ListenerMap.insert(std::make_pair(item->second->m_priority,item->second));}}for (auto item = priority2ListenerMap.begin(); item != priority2ListenerMap.end(); ++item){if (item->second &&item->second->HandlerMessage({ msgName, msgDatas })){break;}}}void MessageBus::PostMessage(const std::string& msgName, MessageDataList& msgDatas){if (m_msgMap.find(msgName) == m_msgMap.end()){return;}std::lock_guard<std::mutex> lock(m_msgQueueMutex);if (m_msgQueue.size() <= m_asyMsgQueueSize){m_msgQueue.push_back({ msgName,msgDatas });}}void MessageBus::OnAsynMessageLoop(){if (!m_msgQueue.empty()){SendMessage(m_msgQueue.front().msgName, m_msgQueue.front().msgDatas);std::lock_guard<std::mutex> lock(m_msgQueueMutex);m_msgQueue.pop_front();}}
}
更多推荐
消息总线设计
发布评论