如何让tornado执行并发代码?(How to make tornado execute concurrent code?)

编程入门 行业动态 更新时间:2024-10-23 09:38:59
如何让tornado执行并发代码?(How to make tornado execute concurrent code?)

我正在尝试为Tornado服务器编写一个简单的工作负载生成器,这是它的简化版本:

class EventsLoader(object): generate_num_requests = 1000 generate_concurrency = 32 server_port = 8001 def __init__(self, conf_file): self.parse_config(conf_file) self.client = AsyncHTTPClient() def generate(self): IOLoop.current().run_sync(self.generate_work) @gen.coroutine def generate_work(self): self.queue = queues.Queue() IOLoop.current().spawn_callback(self.fetch_requests) for i in range(self.generate_concurrency): yield self.generate_requests(i) print 'before join queue size: %s' % self.queue.qsize() yield self.queue.join() @gen.coroutine def generate_requests(self, i): load = self.generate_num_requests / self.generate_concurrency for j in range(load): request = self.generate_request(i * 1000 + j) self.queue.put(request) @gen.coroutine def fetch_requests(self): while True: try: request = yield self.queue.get() yield self.client.fetch(request) except Exception as e: print 'failed fetching: %s: %s' % (request.body, e) finally: print 'fetched: %s' % json.loads(request.body)['seq'] self.queue.task_done() def generate_request(self, seq): event = { 'seq': seq, # ... more fields here ... } return HTTPRequest( 'http://localhost:%s/events' % self.server_port, method='POST', body=json.dumps(event), )

我看到的是所有fetched: xxxx消息fetched: xxxx按顺序出现,如果生成器确实同时工作,这是绝对不可能的。

如何让它同时运行? 我对I / O循环的理解以及@gen.coroutine作用一定有很多缺失。 即无论我的generate_concurrency设置如何,性能都保持不变。

I'm trying to write a simple workload generator for a Tornado server, here's its simplified version:

class EventsLoader(object): generate_num_requests = 1000 generate_concurrency = 32 server_port = 8001 def __init__(self, conf_file): self.parse_config(conf_file) self.client = AsyncHTTPClient() def generate(self): IOLoop.current().run_sync(self.generate_work) @gen.coroutine def generate_work(self): self.queue = queues.Queue() IOLoop.current().spawn_callback(self.fetch_requests) for i in range(self.generate_concurrency): yield self.generate_requests(i) print 'before join queue size: %s' % self.queue.qsize() yield self.queue.join() @gen.coroutine def generate_requests(self, i): load = self.generate_num_requests / self.generate_concurrency for j in range(load): request = self.generate_request(i * 1000 + j) self.queue.put(request) @gen.coroutine def fetch_requests(self): while True: try: request = yield self.queue.get() yield self.client.fetch(request) except Exception as e: print 'failed fetching: %s: %s' % (request.body, e) finally: print 'fetched: %s' % json.loads(request.body)['seq'] self.queue.task_done() def generate_request(self, seq): event = { 'seq': seq, # ... more fields here ... } return HTTPRequest( 'http://localhost:%s/events' % self.server_port, method='POST', body=json.dumps(event), )

What I see happen is that all messages fetched: xxxx appear in sequential order, which is absolutely improbable, if the generator was indeed working concurrently.

How do I make it run concurrently? There must be something huge missing in my understanding of what I/O loop is for, and what @gen.coroutine does. I.e. regardless of my generate_concurrency setting, the performance is unchanged.

最满意答案

无论你如何生成请求,你都只能获取任务。 您需要并行化的是获取而不是生成:

for i in range(self.fetch_concurrency): IOLoop.current().spawn_callback(self.fetch_requests)

这将为您提供多个fetch_requests工作程序,可以从共享队列中提取工作。

此外,此代码的生成部分也不是并行运行的。 代替

for i in range(self.generate_concurrency): yield self.generate_requests(i)

等待一个generate_requests调用在开始下一个之前完成,你可以并行运行它们

yield [self.generate_requests(i) for i in range(self.generate_concurrency)]

No matter how you're generating the requests, you've only got on task fetching them. It's the fetching, not the generating, that you need to parallelize:

for i in range(self.fetch_concurrency): IOLoop.current().spawn_callback(self.fetch_requests)

This will give you multiple fetch_requests workers that can pull work from the shared queue.

Also, the generation part of this code isn't running in parallel either. Instead of

for i in range(self.generate_concurrency): yield self.generate_requests(i)

which waits for one generate_requests call to finish before starting the next, you can run them in parallel with

yield [self.generate_requests(i) for i in range(self.generate_concurrency)]

更多推荐

本文发布于:2023-08-05 00:36:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1424258.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:代码   tornado   execute   code   concurrent

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!