消息队列之:oslo

编程入门 行业动态 更新时间:2024-10-27 13:32:21

消息<a href=https://www.elefans.com/category/jswz/34/1771257.html style=队列之:oslo"/>

消息队列之:oslo

上一篇我们介绍了 RabbitMq 的相关知识

今天介绍一下 openstack 中关于 RabbitMq 和 oslo_messaging 库之间的前生今世

一定要弄清楚的一个问题就是:直接使用rabbitmq和使用oslo_messaging这个库间接的使用有什么区别。

olso_messaging实际上是在rabbitmq的基础上通过一些列的调用,最终暴露给用户一个简单的使用接口,用户不必关心内部的实现,只用配置好配置文件,进行简单的函数调用即可。
并且由于这个库是openstack的标准库,里面的一些函数命名和默认参数都是针对于openstack的概念来讲的。

基本结构就是:

openstack -> oslo_messaging -> kombu -> AMQP -> socket

首先看下 oslo_messaging 中对象封装:

概念简介

Transport

Transport(传输层)主要实现RPC底层的通信(比如socket)以及事件循环,多线程等其他功能.可以通过URL来获得不同transport的句柄.URL的格式为:

transport://user:password@host:port[,hostN:portN]/virtual_host

目前支持的Transport有rabbit,qpid与zmq,分别对应不同的后端消息总线.用户可以使用oslo.messaging.get_transport函数来获得transport对象实例的句柄.

import oslo_messaging
transport = oslo_messaging.get_transport(cfg, url=None, **kwargs)

Target

Target封装了指定某一个消息最终目的地的所有信息,下表所示为其所具有的属性:

参数=默认值说 明
exchange = None(字符串类型)topic所属的范围,不指定的话默认使用配置文件中的control_exchange选项
topic = None(字符串类型)一个topic可以用来标识服务器所暴露的一组接口(一个接口包含多个可被远程调用的方法).允许多个服务器暴露同一组接口,消息会以轮循的方式发送给多个服务器中的某一个
namespace = None(字符串类型)用来标识服务器所暴露的某个特定接口(多个可被远程调用的方法)
version = None(字符串类型)服务器所暴露的接口支持M.N类型的版本号.次版本号(N)的增加表示新的接口向前兼容,主版本号(M)的增加表示新接口和旧接口不兼容.RPC服务器可以实现多个不同的主版本号接口.
server = None(字符串类型)客户端可以指定此参数来要求消息的目的地是某个特定的服务器,而不是一组同属某个topic的服务器中的任意一台.
fanout = None(布尔型)当设置为真时,消息会被发送到同属某个topic的所有服务器上,而不是其中的一台.

在不同的应用场景下,构造Target对象需要不同的参数:创建一个RPC服务器时,需要topic和server参数,exchange参数可选;指定一个endpoint时,namespace和version是可选的;客户端发送消息时,需要topic参数,其他可选.

Server

一个RPC服务器可以暴露多个endpoint,每个endpoint包含一组方法,这组方法是可以被客户端通过某种Transport对象远程调用的.创建Server对象时,需要指定Transport,Target和一组endpoint.

RPC Client

通过RPC Client,可以远程调用RPC Sever上的方法.远程调用时,需要提供一个字典对象来指明调用的上下文,调用方法的名字和传递给调用方法的参数(用字典表示).

有cast和call两种远程调用方式.通过cast方式远程调用,请求发送后就直接返回了;通过call方式调用,需要等响应从服务器返回.

Notifier

Notifier用来通过某种transport发送通知消息.通知消息遵循如下的格式:

import six
import uuid
from oslo_utils import timeutils{'message_id': six.text_type(uuid.uuid4()), #消息id号'publisher_id': 'compute.hos1',    #发送者id'timestamp': timeutils.utcnow(),   #时间戳'priority': 'WARN',                #通知优先级'event_type': 'compute.create_instance',   #通知类型'payload': {'instance_id': 12, ...}}       #通知内容

可以在不同的优先级别上发送通知,这些优先级包括sample,critical,error,warn,info,debug,audit等.

Notification Listener

Notification Listener和Server类似,一个Notification Listener对象可以暴露多个endpoint,每个endpoint包含一组方法.但是与Server对象中的endpoint不同的是,这里的endpoint中的方法对应通知消息的不同优先级.比如:

import oslo_messagingclass ErrorEndpoint:def error(self, ctxt, publisher_id, event_type, payload, metadata):do_something(payload)return oslo_messaging.NotificationResult.HANDLED

endpoint中的方法如果返回messaging.NotificationResult.HANDLED或者None,表示这个通知消息已经确认被处理;如果返回messaging.NotificationResult.REQUEUE,表示这个通知消息要重新进入消息队列.

下面是一个利用oslo_messaging来实现远程过程调用的示例.

from oslo_config import cfg
import oslo_messaging as messagingclass ServerControlEndpoint(object):target = messaging.Target(namespace='controle',version='2.0')def __init__(self, server):self.server = serverdef stop(self, ctx):if self.server:self.server.stop()class TestEndpoint(object):def test(self, ctx, arg):return argtransport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test',server='server1')
endpoints = [ServerControlEndpoint(None),TestEndpoint(),
]server = messaging.get_rpc_server(transport, target, endpoints,executor='blocking')
server.start()
server.wait()

这个例子里,定义了两个不同的endpoint:ServerControlEndpoint与TestEndpoint.这两个endpoint中的方法stop和test都可以被客户端远程调用.

创建rpc server对象之前,需要先创建transport和target对象,这里使用get_transport()函数来获得transport对象的句柄,get_transport()的参数如下表所示:

参数=默认值说 明
conf(oslo.config.cfg.ConfigOpts类型)oslo.config配置项对象
url = None(字符串或者oslo.messaging.Transport类型)transport URL.如果为空,采用conf配置中的transport_url项所指定的值
namespace = None(字符串类型)用来标识服务器所暴露的某个特定接口(多个可被远程调用的方法)
allowed_remote_exmods = None(列表类型)Python模块的列表.客户端可用列表里的模块来deserialize异常
aliases = None(字典类型)transport别名和transport名称之间的对应关系

conf对象里,除了包含transport_url项外,还可以包含control_exchange项.control_exchange用来指明topic所属的默认范围,默认为"openstack".可以使用oslo.messaging.set_transport_defaults()函数来修改默认值.

此处构建的Target对象是用来建立RPC Server的,所以需指明topic和server参数.用户定义的endpoint对象也可以包含一个target属性,用来指明这个endpoint所支持的特定的namespace和version.

这里使用get_rpc_server()函数创建server对象,然后调用server对象的start方法开始接收远程调用.get_rpc_server()函数的参数如下表所求:

参数=默认值说 明
transport(Transpor类型)transport对象
target(Target类型)target对象,用来指明监听的exchange,topic和server
endpoints(列表类型)包含了endpoints对象实例的列表
executor=‘blocking’(字符串类型)用来指明消息接收和发收的方式:目前支持两种方式: blocking:在这种方式中,用户调用start函数后,在start函数中开始请求处理循环:用户线程阻塞,处理下一个请求.直到用户调用了stop函数后,这个处理循环才会退出.消息的接收和分发处理都在调用start函数的线程中完成. eventlet:在这种方式中,会有一个协程GreenThread来处理消息的接收,然后有其他不同的GreenThread来处理不同消息的分发处理.调用start的用户线程不会被阻塞
serializer = None(Serializer类型)用来序列化/反序列化消息
#client.py 客户端from oslo_config import cfg
import oslo_messaging as messagingtransport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test')
client = messaging.RPCClient(transport, target)
ret = client.call(ctxt={},method='test',arg='myarg')cctx = client.prepare(namespace='control', version='2.0')
cctx.cast({}, 'stop')

这里target对象构造时,必须要有的参数只有topic,创建RPCClient对象时,可以接受的参数如下表所示:

参数=默认值说 明
transport(Transport类型)transport对象
target(Taget类型)该client对象的默认target对象
timeout = None(整数或者浮点数类型)客户端调用call方法时超时时间(秒)
version_cap = None(字符串类型)最大所支持的版本号.当版本号超过时,会扔出RPCVersionCapError异常
serializer = None(Serializer类型)用来序列化/反序列化消息
retry = None(整数类型)连接重试次数:None或者-1:一直重试0:不重试>0:重试次数

远程调用时,需要传入调用上下文,调用方法的名字和传给调用方法的参数.

Target对象的属性在RPCClient对象构造以后,还可以通过prepare()方法修改.可以修改的属性包括exchange,topic,namespace,version,server,fanout,timeout,version_cap和retry.

修改后的target属性只在这个prepare()方法返回的对象中有效.

下面我们再来看一个利用oslo_messaing实现通知消息处理的例子:

#notification_listener.py 消息通知处理from oslo_config import cfg
import oslo_messaging as messagingclass NotificationEndPoint(object):def warn(self, ctxt, publisher_id, event_type, payload, metadata):do_something(payload)class ErrorEndpoint(object):def error(self, ctxt, publisher_id, event_type, payload, metadata):do_something(payload)transport = messaging.get_transport(cfg.CONF)
targets = [messaging.Target(topic='notifications'),messaging.Target(topic='notifications_bis')
]
endpoints = [NotificationEndPoint(),ErrorEndpoint(),
]
listener = messaging.get_notification_listener(transport,targets,endpoints)listener.start()
listener.wait()

通知消息处理的endpoint对象和远程过程调用的endpoint对象不同,对象定义的方法要和通知消息的优先级一一对应.我们可以为每个endpoint指定所对应的target对象.

最后调用get_notificaton_listener()函数构造notification listener对象,get_notification_listener()函数的参数如下表所示:

参数=默认值说 明
transport(Transport类型)transport对象
target(列表类型)target对象的列表,用来指明endpoints列表中的每一个endpoint所侦听处理的exchange和topic
endpoints(列表类型)包含了endpoints对象实例的列表
executor=‘blocking’(字符串类型)用来指明消息接收和发收的方式:目前支持两种方式: blocking:在这种方式中,用户调用start函数后,在start函数中开始请求处理循环:用户线程阻塞,处理下一个请求.直到用户调用了stop函数后,这个处理循环才会退出.消息的接收和分发处理都在调用start函数的线程中完成. eventlet:在这种方式中,会有一个协程GreenThread来处理消息的接收,然后有其他不同的GreenThread来处理不同消息的分发处理.调用start的用户线程不会被阻塞
serializer=None(Serializer类型)用来序列化/反序列化消息
allow_requeue=False(布尔类型)如果为真,表示支持NotificationResult.REQUEUE

相对应的发送消息通知的代码如下:

#notifier_send.pyfrom oslo_config import cfg
import oslo_messaging as messagingtransport = messaging.get_transport(cfg.CONF)
notifier = messaging.Notifier(transport,driver='messaging',topic='notifications')notifier2 = notifier.prepare(publisher_id='compute')
notifier2.error(ctxt={},event_type='my_type',payload={'content': 'error occurred'})
发送通知消息时,首先要构造Notifier对象,此时可能需要指定的参数如下表所示:
参数=默认值说 明
transport(Transport类型)transport对象
target(列表类型)target对象的列表,用来指明endpoints列表中的每一个endpoint所侦听处理的exchange和topic
publish_id = None(字符串类型)发送者id
driver = None(字符串类型)后台驱动.一般采用"messaging".如果没有指定,会使用配置文件中的notificaton_driver的值
topic = None(字符串类型)发送消息的topic.如果没有指定,会使用配置文件中的notification_topics的值
serializer = None(Serializer类型)用来序列化/反序列化消息

初始化Notifier对象的操作比较复杂,所以可以用prepare()方法修改已创建的Notifier对象,prepare()方法返回的是新的Notifier对象的实例.它的参数如下表所示:

参数 = 默认值说 明
publish_id = None(字符串类型)发送者id
retry = None(整数类型)连接重试次数:None或者-1:一直重试0:不重试>0:重试次数

最后可以调用Notifier对象的不同方法(error, critical, warn, 等等)发送不同优先级的消息通知.

源码分析

根据上个章节,我们可以看到其实这个库最终暴漏给用户的是两个概念:1.rpc,2.notification

下面我们来根据基本源码分析一下这两个概念

rpc

rpc(即远程调用)的概念被划分为调用方和被调用方
调用方称为client:rpc_client
被调用方称为server:rpc_server
使用时,被调用方server.start,等待调用方client.cast 或 clinet.call即可发起阻塞或非阻塞的远程调用。

当rpc client执行一次远程调用时实际发生了什么呢 ?
(代码在oslo_messaging/rpc/client.py文件里)

rpc client

首先构建 rpc client ,实例化 RPCClient

        self.rpc_client = messaging.get_rpc_client(messaging.get_transport(),version='1.0')
def get_rpc_client(transport, retry=None, **kwargs):"""Return a configured oslo_messaging RPCClient."""target = oslo_messaging.Target(**kwargs)serializer = oslo_serializer.RequestContextSerializer(oslo_serializer.JsonPayloadSerializer())return oslo_messaging.RPCClient(transport, target,serializer=serializer,retry=retry)
class RPCClient(_BaseCallContext):_marker = _BaseCallContext._markerdef __init__(self, transport, target,timeout=None, version_cap=None, serializer=None, retry=None,call_monitor_timeout=None, transport_options=None):if serializer is None:serializer = msg_serializer.NoOpSerializer()if not isinstance(transport, msg_transport.RPCTransport):LOG.warning("Using notification transport for RPC. Please use ""get_rpc_transport to obtain an RPC transport ""instance.")super(RPCClient, self).__init__(transport, target, serializer, timeout, version_cap, retry,call_monitor_timeout, transport_options)self.conf.register_opts(_client_opts)

然后通过 call 或者 cast 调用

self.rpc_client.prepare(topic=topic).call(cxt, method, **args)
@six.add_metaclass(abc.ABCMeta)
class _BaseCallContext(object):_marker = object()def __init__(self, transport, target, serializer,timeout=None, version_cap=None, retry=None,call_monitor_timeout=None, transport_options=None):self.conf = transport.confself.transport = transportself.target = targetself.serializer = serializerself.timeout = timeoutself.call_monitor_timeout = call_monitor_timeoutself.retry = retryself.version_cap = version_capself.transport_options = transport_optionssuper(_BaseCallContext, self).__init__()def cast(self, ctxt, method, **kwargs):"""Invoke a method and return immediately. See RPCClient.cast()."""msg = self._make_message(ctxt, method, kwargs)msg_ctxt = self.serializer.serialize_context(ctxt)self._check_version_cap(msg.get('version'))try:self.transport._send(self.target, msg_ctxt, msg,retry=self.retry,transport_options=self.transport_options)except driver_base.TransportDriverError as ex:raise ClientSendError(self.target, ex)def call(self, ctxt, method, **kwargs):"""Invoke a method and wait for a reply. See RPCClient.call()."""if self.target.fanout:raise exceptions.InvalidTarget('A call cannot be used with fanout',self.target)msg = self._make_message(ctxt, method, kwargs)msg_ctxt = self.serializer.serialize_context(ctxt)timeout = self.timeoutif self.timeout is None:timeout = self.conf.rpc_response_timeoutcm_timeout = self.call_monitor_timeoutself._check_version_cap(msg.get('version'))try:result = \self.transport._send(self.target, msg_ctxt, msg,wait_for_reply=True, timeout=timeout,call_monitor_timeout=cm_timeout,retry=self.retry,transport_options=self.transport_options)except driver_base.TransportDriverError as ex:raise ClientSendError(self.target, ex)return self.serializer.deserialize_entity(ctxt, result)

可以看到在 29 行和 54 行,两个方法都是执行了 transport._send

只有参数不同,这里最大的区别其实是wait_for_reply这个参数,顾名思义wait or no wait也就是我们说的阻塞/非阻塞。
那_send这个方法,最重要的两关键一个是transport本身,一个是target参数,这两个东西是rpc client __init__的时候必须要传的参数,

transport 参数是由(osllo_messaging/transport.py文件)_get_transport方法而来:

def _get_transport(conf, url=None, allowed_remote_exmods=None,transport_cls=RPCTransport):allowed_remote_exmods = allowed_remote_exmods or []conf.register_opts(_transport_opts)if not isinstance(url, TransportURL):url = TransportURL.parse(conf, url)kwargs = dict(default_exchange=conf.control_exchange,allowed_remote_exmods=allowed_remote_exmods)try:mgr = driver.DriverManager('oslo.messaging.drivers',url.transport.split('+')[0],invoke_on_load=True,invoke_args=[conf, url],invoke_kwds=kwargs)except RuntimeError as ex:raise DriverLoadFailure(url.transport, ex)return transport_cls(mgr.driver)

这里url是配置文件里配的,这里以rabbitmq为例
entry_point到oslo_messaging._drivers.impl_rabbit:RabbitDriver,最终获得到的是RabbitDriver的实例。

target 直接实例化即可,这里注意到两个参数exchange和topic,和rabbitmq里的exchange和routing_key的概念一致

那我们接着来看 transport._send方法,前面也说到了transport此时是RabbitDriver

RabbitDriver 继承自 AMQPDriverBase 继承自 BaseDriver

_send 方法在AMQPDriverBase中:

class AMQPDriverBase(base.BaseDriver):missing_destination_retry_timeout = 0def __init__(self, conf, url, connection_pool,default_exchange=None, allowed_remote_exmods=None):super(AMQPDriverBase, self).__init__(conf, url, default_exchange,allowed_remote_exmods)self._default_exchange = default_exchangeself._connection_pool = connection_poolself._reply_q_lock = threading.Lock()self._reply_q = Noneself._reply_q_conn = Noneself._waiter = None    def _send(self, target, ctxt, message,wait_for_reply=None, timeout=None, call_monitor_timeout=None,envelope=True, notify=False, retry=None, transport_options=None):msg = messageif wait_for_reply:msg_id = uuid.uuid4().hexmsg.update({'_msg_id': msg_id})msg.update({'_reply_q': self._get_reply_q()})msg.update({'_timeout': call_monitor_timeout})rpc_amqp._add_unique_id(msg)unique_id = msg[rpc_amqp.UNIQUE_ID]rpc_amqp.pack_context(msg, ctxt)if envelope:msg = rpc_common.serialize_msg(msg)if wait_for_reply:self._waiter.listen(msg_id)log_msg = "CALL msg_id: %s " % msg_idelse:log_msg = "CAST unique_id: %s " % unique_idtry:with self._get_connection(rpc_common.PURPOSE_SEND) as conn:if notify:exchange = self._get_exchange(target)LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': target.topic})conn.notify_send(exchange, target.topic, msg, retry=retry)elif target.fanout:log_msg += "FANOUT topic '%(topic)s'" % {'topic': target.topic}LOG.debug(log_msg)conn.fanout_send(target.topic, msg, retry=retry)else:topic = target.topicexchange = self._get_exchange(target)if target.server:topic = '%s.%s' % (target.topic, target.server)LOG.debug(log_msg + "exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': topic})conn.topic_send(exchange_name=exchange, topic=topic,msg=msg, timeout=timeout, retry=retry,transport_options=transport_options)if wait_for_reply:result = self._waiter.wait(msg_id, timeout,call_monitor_timeout)if isinstance(result, Exception):raise resultreturn resultfinally:if wait_for_reply:self._waiter.unlisten(msg_id)

我们看下 44 行到 66 行,回顾上面的cast和call函数里调用_send的时候是没有传notify找个参数的,所以第一个条件一定不成立

那看接下来的两个case,elif target.fanout/else(这里的fanout与rabbitmq本身的fanout意义是一样的)那也就是说我们在生成target或者client.prepare的时候可以通过指定fanout这个参数来决定进入哪个case,(注意第三个case里如果指定了target.server那么topic是target.topic和target.server二者相结合)那我们这里来看一下conn.fanout_send和conn.topic_send这两个方法(conn是__enter__ exit __getattr__的产物,具体本文不细说了,这里只要知道最终调用到了oslo_messaging/_drivers/impl_rabbit.py里Connection这个类就可以了):

conn.fanout_send

class Connection(object):def fanout_send(self, topic, msg, retry=None):"""Send a 'fanout' message."""exchange = kombu.entity.Exchange(name='%s_fanout' % topic,type='fanout',durable=False,auto_delete=True)self._ensure_publishing(self._publish, exchange, msg, retry=retry)

其实到这里基本上就清楚了fanout_send就是往名叫target.topic + "_fanout"这个exchange里发送fanout模式的消息,所有bind到这个exchange的queue都会收到这条消息,如果这个exchange没有创建过,在self.publish方法里会被declare.

conn.topic_send

class Connection(object):def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,transport_options=None):"""Send a 'topic' message."""exchange = kombu.entity.Exchange(name=exchange_name,type='topic',durable=self.amqp_durable_queues,auto_delete=self.amqp_auto_delete)self._ensure_publishing(self._publish, exchange, msg,routing_key=topic, timeout=timeout,retry=retry,transport_options=transport_options)

topic_send就是以topic做为routing_key 以exchange_name这个参数值命名的exchange里发送topic模式的消息,这里注意区别就是exchange_name是上级调用_get_exchange方法得来的

class Connection(object):def _get_exchange(self, target):return target.exchange or self._default_exchange

_default_exchange如果仔细看的话前面其实前面的截图里有,就是conf.control_exchange
默认是openstack(这里大概知道点为啥oslo_messaging是为openstack搞得了吧😄),关键还是取决于target,如果target里没有指定才会用配置文件的。
同样,如果这个exchange没有创建过,在self.publish方法里会被declare.

rpc server

首先要获得一个rpc server的实例

def get_rpc_server(transport, target, endpoints,executor='blocking', serializer=None, access_policy=None):dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer,access_policy)return RPCServer(transport, target, dispatcher, executor)
class RPCServer(msg_server.MessageHandlingServer):def __init__(self, transport, target, dispatcher, executor='blocking'):super(RPCServer, self).__init__(transport, dispatcher, executor)if not isinstance(transport, msg_transport.RPCTransport):LOG.warning("Using notification transport for RPC. Please use ""get_rpc_transport to obtain an RPC transport ""instance.")self._target = target

RPCServer 继承自 MessageHandlingServer 继承自 ServiceBase,_OrderedTaskRunner

同样的,transport和target是必须要有的,获得 rpc sever实例后,rpc server调用start方法,最终调用到了基类的start方法

@six.add_metaclass(abc.ABCMeta)
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):@ordered(reset_after='stop')def start(self, override_pool_size=None):if self._started:LOG.warning('The server has already been started. Ignoring ''the redundant call to start().')returnself._started = Trueexecutor_opts = {}if self.executor_type in ("threading", "eventlet"):executor_opts["max_workers"] = (override_pool_size or self.conf.executor_thread_pool_size)self._work_executor = self._executor_cls(**executor_opts)try:self.listener = self._create_listener()except driver_base.TransportDriverError as ex:raise ServerListenError(self.target, ex)self.listener.start(self._on_incoming)

关键看下 20 行到 25 行,因为这里实例化的是 RPCServer,所以_create_listener 调用的是 RPCServer 的方法

class RPCServer(msg_server.MessageHandlingServer):def _create_listener(self):return self.transport._listen(self._target, 1, None)
class Transport(object):def _listen(self, target, batch_size, batch_timeout):if not (target.topic and target.server):raise exceptions.InvalidTarget('A server\'s target must have ''topic and server names specified',target)return self._driver.listen(target, batch_size,batch_timeout)
class AMQPDriverBase(base.BaseDriver):def listen(self, target, batch_size, batch_timeout):conn = self._get_connection(rpc_common.PURPOSE_LISTEN)listener = RpcAMQPListener(self, conn)conn.declare_topic_consumer(exchange_name=self._get_exchange(target),topic=target.topic,callback=listener)conn.declare_topic_consumer(exchange_name=self._get_exchange(target),topic='%s.%s' % (target.topic,target.server),callback=listener)conn.declare_fanout_consumer(target.topic, listener)return base.PollStyleListenerAdapter(listener, batch_size,batch_timeout)

listen方法实际上关键是执行了三个declare,以下称作:
declare_topic_consumer(1)
declare_topic_consumer (2)
declare_fanout_consumer

declare_topic_consumer(1)

oslo_messaging/_drivers/impl_rabbit.py

class Connection(object):def declare_topic_consumer(self, exchange_name, topic, callback=None,queue_name=None):"""Create a 'topic' consumer."""consumer = Consumer(exchange_name=exchange_name,queue_name=queue_name or topic,routing_key=topic,type='topic',durable=self.amqp_durable_queues,exchange_auto_delete=self.amqp_auto_delete,queue_auto_delete=self.amqp_auto_delete,callback=callback,rabbit_ha_queues=self.rabbit_ha_queues)self.declare_consumer(consumer)
    def declare_consumer(self, consumer):"""Create a Consumer using the class that was passed in andadd it to our list of consumers"""def _connect_error(exc):log_info = {'topic': consumer.routing_key, 'err_str': exc}LOG.error("Failed to declare consumer for topic '%(topic)s': ""%(err_str)s", log_info)def _declare_consumer():consumer.declare(self)tag = self._active_tags.get(consumer.queue_name)if tag is None:tag = next(self._tags)self._active_tags[consumer.queue_name] = tagself._new_tags.add(tag)self._consumers[consumer] = tagreturn consumerwith self._connection_lock:return self.ensure(_declare_consumer,error_callback=_connect_error)
    def declare(self, conn):"""Re-declare the queue after a rabbit (re)connect."""self.queue = kombu.entity.Queue(name=self.queue_name,channel=conn.channel,exchange=self.exchange,durable=self.durable,auto_delete=self.queue_auto_delete,routing_key=self.routing_key,queue_arguments=self.queue_arguments)try:LOG.debug('[%s] Queue.declare: %s',conn.connection_id, self.queue_name)self.queue.declare()
@python_2_unicode_compatible
class Queue(MaybeChannelBound):def declare(self, nowait=False, channel=None):"""Declare queue and exchange then binds queue to exchange."""if not self.no_declare:# - declare main binding.self._create_exchange(nowait=nowait, channel=channel)self._create_queue(nowait=nowait, channel=channel)self._create_bindings(nowait=nowait, channel=channel)return self.name
    def _create_exchange(self, nowait=False, channel=None):if self.exchange:self.exchange.declare(nowait=nowait, channel=channel)def _create_queue(self, nowait=False, channel=None):self.queue_declare(nowait=nowait, passive=False, channel=channel)if self.exchange and self.exchange.name:self.queue_bind(nowait=nowait, channel=channel)def _create_bindings(self, nowait=False, channel=None):for B in self.bindings:channel = channel or self.channelB.declare(channel)B.bind(self, nowait=nowait, channel=channel)

一目了然,总结来说就是使用target的exchange(默认openstack)做为exchange_name
使用target的topic做为默认的queue_name,然后declare这个exchange和queue,然后将二者bind。

declare_topic_consumer(2)
与declare_topic_consumer(1)的唯一区别是这里使用了target.topic结合target.server做为了默认的queue_name。

declare_fanout_consumer

class Connection(object):def declare_fanout_consumer(self, topic, callback):"""Create a 'fanout' consumer."""unique = uuid.uuid4().hexexchange_name = '%s_fanout' % topicqueue_name = '%s_fanout_%s' % (topic, unique)consumer = Consumer(exchange_name=exchange_name,queue_name=queue_name,routing_key=topic,type='fanout',durable=False,exchange_auto_delete=True,queue_auto_delete=False,callback=callback,rabbit_ha_queues=self.rabbit_ha_queues,rabbit_queue_ttl=self.rabbit_transient_queues_ttl)self.declare_consumer(consumer)

与上述两种的区别是,这里的queue_name变成了target.topic+“fanout”+uuid
exchange_name变成了target.topic+"_fanout",exchange的type变成了fanout

这里也指定了routing_key ,我觉得应该是没用的。

小结

server监听

一、
1.由target.exchange或配置文件(openstack为默认值)命名的exchange(type为topic模式)
2.以target.topic做为queue_name
2.以target.topic做为routing_key进行queue和exchange的绑定
二、
1.由target.exchange或配置文件(openstack为默认值)命名的exchange(type为topic模式)
2.以target.topic结合target.server做为queue_name
2.以target.topic结合target.server做为routing_key进行queue和exchange的绑定
三、
1.由target.topic+"fanout"命名的exchange(type为fanout模式)
2.以target.topic+“fanout_”+唯一uuid做为queue_name
2.将queue和exchange的绑定

client调用

1.非阻塞调用:client.cast
2.阻塞调用:client.call

通过prepare来改变client的target进行fanout或者指定server的调用

notification

notification,顾名思义,消息/通知,其概念被分为
通知方:notifier(官方也叫driver), 监听方:notification_listener
使用时,监听方listener.start, 调用方notifier.notfiy(具体暴漏给用户使用时是sample,audit,info等不同level的方法) 即可把消息发给监听方进行处理。

发送方

以 ceilometer 为例,首先需要实例化一个 Notifier 对象

        self.notifier = oslo_messaging.Notifier(messaging.get_transport(),driver=cfg.CONF.publisher_notifier.telemetry_driver,publisher_id="ceilometer.polling")

在处理完数据之后需要将数据发送出去(具体中间处理数据的部分就不讲了,见我的另一篇 polling 源码分析)

class Notifier(object):def sample(self, ctxt, event_type, payload):self._notify(ctxt, event_type, payload, 'SAMPLE')
    def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,retry=None):payload = self._serializer.serialize_entity(ctxt, payload)ctxt = self._serializer.serialize_context(ctxt)msg = dict(message_id=six.text_type(uuid.uuid4()),publisher_id=publisher_id or self.publisher_id,event_type=event_type,priority=priority,payload=payload,timestamp=six.text_type(timeutils.utcnow()))def do_notify(ext):try:ext.obj.notify(ctxt, msg, priority, retry or self.retry)except Exception as e:_LOG.exception("Problem '%(e)s' attempting to send to ""notification system. Payload=%(payload)s",{'e': e, 'payload': payload})if self._driver_mgr.extensions:self._driver_mgr.map(do_notify)
class ExtensionManager(object):def map(self, func, *args, **kwds):if not self.extensions:# FIXME: Use a more specific exception class here.raise NoMatches('No %s extensions found' % self.namespace)response = []for e in self.extensions:self._invoke_one_plugin(response.append, func, e, args, kwds)return response
分析def map:
(Pdb) p func
<function do_notify at 0x7f2c3c1df758>
(Pdb) p args
()
(Pdb) p kwds
{}2)
(Pdb) p self.extensions
[<stevedore.extension.Extension object at 0x7f2c48771450>]
(Pdb) p e
<stevedore.extension.Extension object at 0x7f2c48771450>
(Pdb) p e.__dict__
{'obj': <oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f2c48771210>, 'entry_point': EntryPoint.parse('messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver'), 'name': 'messagingv2', 'plugin': <class 'oslo_messaging.notify.messaging.MessagingV2Driver'>}
class ExtensionManager(object):def _invoke_one_plugin(self, response_callback, func, e, args, kwds):try:response_callback(func(e, *args, **kwds))except Exception as err:if self.propagate_map_exceptions:raiseelse:LOG.error('error calling %r: %s', e.name, err)LOG.exception(err)

可以看到 response_callback 传过来是一个 append 方法,这里的 func 就是 do_notify,发生调用,即

    def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,retry=None):payload = self._serializer.serialize_entity(ctxt, payload)ctxt = self._serializer.serialize_context(ctxt)msg = dict(message_id=six.text_type(uuid.uuid4()),publisher_id=publisher_id or self.publisher_id,event_type=event_type,priority=priority,payload=payload,timestamp=six.text_type(timeutils.utcnow()))def do_notify(ext):try:ext.obj.notify(ctxt, msg, priority, retry or self.retry)except Exception as e:_LOG.exception("Problem '%(e)s' attempting to send to ""notification system. Payload=%(payload)s",{'e': e, 'payload': payload})if self._driver_mgr.extensions:self._driver_mgr.map(do_notify)

即调用 13 行方法

(Pdb) p ext
<stevedore.extension.Extension object at 0x7f2c48771450>
(Pdb) p ext.__dict__
{'obj': <oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f2c48771210>, 'entry_point': EntryPoint.parse('messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver'), 'name': 'messagingv2', 'plugin': <class 'oslo_messaging.notify.messaging.MessagingV2Driver'>}(Pdb) p ext.obj
<oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f2c48771210>
(Pdb) p ext.obj.__dict__
{'topics': ['notifications'], 'version': 2.0, 'transport': <oslo_messaging.transport.Transport object at 0x7f2c485df890>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x16ba490>}
class MessagingDriver(notifier.Driver):def __init__(self, conf, topics, transport, version=1.0):super(MessagingDriver, self).__init__(conf, topics, transport)self.version = versiondef notify(self, ctxt, message, priority, retry):priority = priority.lower()for topic in self.topics:target = oslo_messaging.Target(topic='%s.%s' % (topic, priority))try:self.transport._send_notification(target, ctxt, message,version=self.version,retry=retry)except Exception:LOG.exception("Could not send notification to %(topic)s. ""Payload=%(message)s",{'topic': topic, 'message': message})class MessagingV2Driver(MessagingDriver):"Send notifications using the 2.0 message format."def __init__(self, conf, **kwargs):super(MessagingV2Driver, self).__init__(conf, version=2.0, **kwargs)

实际调用到第 6 行,最关键的地方:
43 -> priority = priority.lower()
44 for topic in self.topics:
45 target = oslo_messaging.Target(topic=’%s.%s’ % (topic, priority))

可以看到oslo_messaging封装得到的真正队列名称是:
. 这种形式,
样例:
notifications.sample
所以,一旦调用oslo_messaging.notifier.sample来发送消息,此时的priority就被设置为
sample了,并且该优先级被用于最终拼接生成oslo_messaging的Target,最后生成了对应的
队列notifications.sample

class Transport(object):def _send_notification(self, target, ctxt, message, version, retry=None):if not target.topic:raise exceptions.InvalidTarget('A topic is required to send',target)self._driver.send_notification(target, ctxt, message, version,retry=retry)
(Pdb) p self._driver
<oslo_messaging._drivers.impl_rabbit.RabbitDriver object at 0x7f2c4876de90>
(Pdb) p self._driver.__dict__
{'_waiter': None, '_allowed_remote_exmods': [], '_reply_q_lock': <thread.lock object at 0x7f2c482efc30>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x16ba490>, '_default_exchange': 'ceilometer', '_connection_pool': <oslo_messaging._drivers.pool.ConnectionPool object at 0x7f2c4876df90>, '_reply_q': None, 'missing_destination_retry_timeout': 60, 'prefetch_size': 0, '_reply_q_conn': None, '_url': <TransportURL transport='rabbit', hosts=[<TransportHost hostname='rabbitmq.openstack.svc.cluster.local', port=5672, username='rabbitmq', password='vut8mvvS'>]>}
class AMQPDriverBase(base.BaseDriver):def send_notification(self, target, ctxt, message, version, retry=None):return self._send(target, ctxt, message,envelope=(version == 2.0), notify=True, retry=retry)
    def _send(self, target, ctxt, message,wait_for_reply=None, timeout=None, call_monitor_timeout=None,envelope=True, notify=False, retry=None, transport_options=None):msg = messageif wait_for_reply:msg_id = uuid.uuid4().hexmsg.update({'_msg_id': msg_id})msg.update({'_reply_q': self._get_reply_q()})msg.update({'_timeout': call_monitor_timeout})rpc_amqp._add_unique_id(msg)unique_id = msg[rpc_amqp.UNIQUE_ID]rpc_amqp.pack_context(msg, ctxt)if envelope:msg = rpc_common.serialize_msg(msg)if wait_for_reply:self._waiter.listen(msg_id)log_msg = "CALL msg_id: %s " % msg_idelse:log_msg = "CAST unique_id: %s " % unique_idtry:with self._get_connection(rpc_common.PURPOSE_SEND) as conn:if notify:exchange = self._get_exchange(target)LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': target.topic})conn.notify_send(exchange, target.topic, msg, retry=retry)elif target.fanout:log_msg += "FANOUT topic '%(topic)s'" % {'topic': target.topic}LOG.debug(log_msg)conn.fanout_send(target.topic, msg, retry=retry)else:topic = target.topicexchange = self._get_exchange(target)if target.server:topic = '%s.%s' % (target.topic, target.server)LOG.debug(log_msg + "exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': topic})conn.topic_send(exchange_name=exchange, topic=topic,msg=msg, timeout=timeout, retry=retry,transport_options=transport_options)if wait_for_reply:result = self._waiter.wait(msg_id, timeout,call_monitor_timeout)if isinstance(result, Exception):raise resultreturn resultfinally:if wait_for_reply:self._waiter.unlisten(msg_id)

到了这里就和 rpc client 那块差不多了,区别就是这里发送的时候,传了 notify=True,后面就不啰嗦了,和前面差不多

接收/监听方

还是以 ceilometer 为例

        urls = cfg.CONF.notification.messaging_urls or [None]for url in urls:transport = messaging.get_transport(url)# NOTE(gordc): ignore batching as we want pull# to maintain sequencing as much as possible.listener = messaging.get_batch_notification_listener(transport, targets, endpoints)listener.start()self.listeners.append(listener)
        (Pdb) endpoints[<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>,<ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fa1741f6810>,<ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fa17424d410>,<ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fa17424d8d0>,<ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fa1742461d0>,<ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fa17424dc90>,<ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fa17467ddd0>](Pdb) cfg.CONF.notification.messaging_urls['rabbit://rabbitmq:I5dZs2KN@rabbitmq.openstack.svc.cluster.local:5672/']
def get_batch_notification_listener(transport, targets, endpoints,allow_requeue=False,batch_size=1, batch_timeout=None):return oslo_messaging.get_batch_notification_listener(transport, targets, endpoints, executor='threading',allow_requeue=allow_requeue,batch_size=batch_size, batch_timeout=batch_timeout)
def get_batch_notification_listener(transport, targets, endpoints,executor='blocking', serializer=None,allow_requeue=False, pool=None,batch_size=None, batch_timeout=None):dispatcher = notify_dispatcher.BatchNotificationDispatcher(endpoints, serializer)return BatchNotificationServer(transport, targets, dispatcher, executor, allow_requeue, pool,batch_size, batch_timeout)

其中 BatchNotificationDispatcher 实现了 dispatch 方法,BatchNotificationServer 中实现了 _process_incoming方法

BatchNotificationDispatcher 继承自 NotificationDispatcher 继承自 DispatcherBase

BatchNotificationServer 继承自 NotificationServerBase 继承自 MessageHandlingServer 继承自 ServiceBase,_OrderedTaskRunner

因为 listen 实例化的是 BatchNotificationServer 对象,所以在调用 listener.start() 的时候,实际调用的是 MessageHandlingServer 的 start 方法

@six.add_metaclass(abc.ABCMeta)
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):@ordered(reset_after='stop')def start(self, override_pool_size=None):if self._started:LOG.warning('The server has already been started. Ignoring ''the redundant call to start().')returnself._started = Trueexecutor_opts = {}if self.executor_type in ("threading", "eventlet"):executor_opts["max_workers"] = (override_pool_size or self.conf.executor_thread_pool_size)self._work_executor = self._executor_cls(**executor_opts)try:self.listener = self._create_listener()except driver_base.TransportDriverError as ex:raise ServerListenError(self.target, ex)self.listener.start(self._on_incoming)

重点在 21 和 25 行,第 21 行调到了

class NotificationServerBase(msg_server.MessageHandlingServer):def __init__(self, transport, targets, dispatcher, executor='blocking',allow_requeue=True, pool=None, batch_size=1,batch_timeout=None):super(NotificationServerBase, self).__init__(transport, dispatcher,executor)self._allow_requeue = allow_requeueself._pool = poolself.targets = targetsself._targets_priorities = set(itertools.product(self.targets,self.dispatcher.supported_priorities))self._batch_size = batch_sizeself._batch_timeout = batch_timeoutdef _create_listener(self):return self.transport._listen_for_notifications(self._targets_priorities, self._pool, self._batch_size,self._batch_timeout)
    def _listen_for_notifications(self, targets_and_priorities, pool,batch_size, batch_timeout):for target, priority in targets_and_priorities:if not target.topic:raise exceptions.InvalidTarget('A target must have ''topic specified',target)return self._driver.listen_for_notifications(targets_and_priorities, pool, batch_size, batch_timeout)
class AMQPDriverBase(base.BaseDriver):def listen_for_notifications(self, targets_and_priorities, pool,batch_size, batch_timeout):conn = self._get_connection(rpc_common.PURPOSE_LISTEN)listener = NotificationAMQPListener(self, conn)for target, priority in targets_and_priorities:conn.declare_topic_consumer(exchange_name=self._get_exchange(target),topic='%s.%s' % (target.topic, priority),callback=listener, queue_name=pool)return base.PollStyleListenerAdapter(listener, batch_size,batch_timeout)

在listen_for_notification中建立连接后,创建topic.priority的queue。PollStyleListenerAdapter启动一个线程对获取到的数据进行处理,此处返回此类,所以_create_listener 就是返回了一个PollStyleListenerAdapter的实例

然后调用 self.listener.start(self._on_incoming)

PollStyleListenerAdapter 继承自 Listener

class PollStyleListenerAdapter(Listener):"""A Listener that uses a PollStyleListener for message transfer. Adedicated thread is created to do message polling."""def __init__(self, poll_style_listener, batch_size, batch_timeout):super(PollStyleListenerAdapter, self).__init__(batch_size, batch_timeout, poll_style_listener.prefetch_size)self._poll_style_listener = poll_style_listenerself._listen_thread = threading.Thread(target=self._runner)self._listen_thread.daemon = Trueself._started = Falsedef start(self, on_incoming_callback):super(PollStyleListenerAdapter, self).start(on_incoming_callback)self._started = Trueself._listen_thread.start()@excutils.forever_retry_uncaught_exceptionsdef _runner(self):while self._started:incoming = self._poll_style_listener.poll(batch_size=self.batch_size, batch_timeout=self.batch_timeout)if incoming:self.on_incoming_callback(incoming)# listener is stopped but we need to process all already consumed# messageswhile True:incoming = self._poll_style_listener.poll(batch_size=self.batch_size, batch_timeout=self.batch_timeout)if not incoming:returnself.on_incoming_callback(incoming)def stop(self):self._started = Falseself._poll_style_listener.stop()self._listen_thread.join()super(PollStyleListenerAdapter, self).stop()def cleanup(self):self._poll_style_listener.cleanup()

NotificationAMQPListener 继承自 AMQPListener 继承自 PollStyleListener

可以看到这里的 start 方法就是调用了 _runner方法,这里的 self.poll_style_listener 就是 NotificationAMQPListener 实例,通过 poll 不断从队列中取出数据

即调用 AMQPListener.poll

class NotificationAMQPListener(AMQPListener):message_cls = NotificationAMQPIncomingMessageclass AMQPListener(base.PollStyleListener):def __init__(self, driver, conn):super(AMQPListener, self).__init__(driver.prefetch_size)self.driver = driverself.conn = connself.msg_id_cache = rpc_amqp._MsgIdCache()self.incoming = []self._shutdown = threading.Event()self._shutoff = threading.Event()self._obsolete_reply_queues = ObsoleteReplyQueuesCache()self._message_operations_handler = MessageOperationsHandler("AMQPListener")self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MINdef __call__(self, message):ctxt = rpc_amqp.unpack_context(message)unique_id = self.msg_id_cache.check_duplicate_message(message)if ctxt.msg_id:LOG.debug("received message msg_id: %(msg_id)s reply to ""%(queue)s", {'queue': ctxt.reply_q,'msg_id': ctxt.msg_id})else:LOG.debug("received message with unique_id: %s", unique_id)self.incoming.append(self.message_cls(self,ctxt.to_dict(),message,unique_id,ctxt.msg_id,ctxt.reply_q,ctxt.client_timeout,self._obsolete_reply_queues,self._message_operations_handler))@base.batch_poll_helperdef poll(self, timeout=None):stopwatch = timeutils.StopWatch(duration=timeout).start()while not self._shutdown.is_set():self._message_operations_handler.process()if self.incoming:return self.incoming.pop(0)left = stopwatch.leftover(return_none=True)if left is None:left = self._current_timeoutif left <= 0:return Nonetry:self.conn.consume(timeout=min(self._current_timeout, left))except rpc_common.Timeout:self._current_timeout = max(self._current_timeout * 2,ACK_REQUEUE_EVERY_SECONDS_MAX)else:self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN# NOTE(sileht): listener is stopped, just processes remaining messages# and operationsself._message_operations_handler.process()if self.incoming:return self.incoming.pop(0)self._shutoff.set()

在 call 方法又调用了 self.message_cls ,即 NotificationAMQPIncomingMessage

class NotificationAMQPIncomingMessage(AMQPIncomingMessage):def acknowledge(self):def _do_ack():try:self.message.acknowledge()except Exception as exc:# NOTE(kgiusti): this failure is likely due to a loss of the# connection to the broker.  Not much we can do in this case,# especially considering the Notification has already been# dispatched. This *could* result in message duplication# (unacked msg is returned to the queue by the broker), but the# driver tries to catch that using the msg_id_cache.LOG.warning("Failed to acknowledge received message: %s", exc)self._message_operations_handler.do(_do_ack)self.listener.msg_id_cache.add(self.unique_id)def requeue(self):# NOTE(sileht): In case of the connection is lost between receiving the# message and requeing it, this requeue call fail# but because the message is not acknowledged and not added to the# msg_id_cache, the message will be reconsumed, the only difference is# the message stay at the beginning of the queue instead of moving to# the end.def _do_requeue():try:self.message.requeue()except Exception as exc:LOG.warning("Failed to requeue received message: %s", exc)self._message_operations_handler.do(_do_requeue)

NotificationAMQPIncomingMessage 继承自 AMQPIncomingMessage 继承自 RpcIncomingMessage 继承自 IncomingMessage

(self.conn.consume 中有回调函数调用,call 方法,往 incoming 中塞值,poll 再从里面取值处理)括号中的部分有点疑问,理得不是很清楚,如果有清楚的大佬,欢迎指教。

NotificationAMQPIncomingMessage 是一个消息对象,包含了acknowledge和requeue方法,可以用于消息确认或再次入队。

取出之后用self.on_incoming_callback(incoming)处理,即MessageHandlingServer中的self.__on_incoming处理

@six.add_metaclass(abc.ABCMeta)
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):def _on_incoming(self, incoming):"""Handles on_incoming event:param incoming: incoming request."""self._work_executor.submit(self._process_incoming, incoming)

我们看到这边实际使用的是self._process_incoming来处理,即采用BatchNotificationServer类中的self.__process_incoming处理

class BatchNotificationServer(NotificationServerBase):def _process_incoming(self, incoming):try:not_processed_messages = self.dispatcher.dispatch(incoming)except Exception:......

可以看到该处理函数会调用dispatcher对象来分派消息,这里的self.dispatcher,就是之前使用NotificationDispatcher初始化后传过来的参数,后面就是一些数据处理的过程,详细请见我的另一篇 nogtification 源码分析

参考:

.html

=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param

=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.channel_param

更多推荐

消息队列之:oslo

本文发布于:2023-06-11 23:33:41,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/645338.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   消息   oslo

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!