我正在尝试为Tornado服务器编写一个简单的工作负载生成器,这是它的简化版本:
class EventsLoader(object): generate_num_requests = 1000 generate_concurrency = 32 server_port = 8001 def __init__(self, conf_file): self.parse_config(conf_file) self.client = AsyncHTTPClient() def generate(self): IOLoop.current().run_sync(self.generate_work) @gen.coroutine def generate_work(self): self.queue = queues.Queue() IOLoop.current().spawn_callback(self.fetch_requests) for i in range(self.generate_concurrency): yield self.generate_requests(i) print 'before join queue size: %s' % self.queue.qsize() yield self.queue.join() @gen.coroutine def generate_requests(self, i): load = self.generate_num_requests / self.generate_concurrency for j in range(load): request = self.generate_request(i * 1000 + j) self.queue.put(request) @gen.coroutine def fetch_requests(self): while True: try: request = yield self.queue.get() yield self.client.fetch(request) except Exception as e: print 'failed fetching: %s: %s' % (request.body, e) finally: print 'fetched: %s' % json.loads(request.body)['seq'] self.queue.task_done() def generate_request(self, seq): event = { 'seq': seq, # ... more fields here ... } return HTTPRequest( 'http://localhost:%s/events' % self.server_port, method='POST', body=json.dumps(event), )我看到的是所有fetched: xxxx消息fetched: xxxx按顺序出现,如果生成器确实同时工作,这是绝对不可能的。
如何让它同时运行? 我对I / O循环的理解以及@gen.coroutine作用一定有很多缺失。 即无论我的generate_concurrency设置如何,性能都保持不变。
I'm trying to write a simple workload generator for a Tornado server, here's its simplified version:
class EventsLoader(object): generate_num_requests = 1000 generate_concurrency = 32 server_port = 8001 def __init__(self, conf_file): self.parse_config(conf_file) self.client = AsyncHTTPClient() def generate(self): IOLoop.current().run_sync(self.generate_work) @gen.coroutine def generate_work(self): self.queue = queues.Queue() IOLoop.current().spawn_callback(self.fetch_requests) for i in range(self.generate_concurrency): yield self.generate_requests(i) print 'before join queue size: %s' % self.queue.qsize() yield self.queue.join() @gen.coroutine def generate_requests(self, i): load = self.generate_num_requests / self.generate_concurrency for j in range(load): request = self.generate_request(i * 1000 + j) self.queue.put(request) @gen.coroutine def fetch_requests(self): while True: try: request = yield self.queue.get() yield self.client.fetch(request) except Exception as e: print 'failed fetching: %s: %s' % (request.body, e) finally: print 'fetched: %s' % json.loads(request.body)['seq'] self.queue.task_done() def generate_request(self, seq): event = { 'seq': seq, # ... more fields here ... } return HTTPRequest( 'http://localhost:%s/events' % self.server_port, method='POST', body=json.dumps(event), )What I see happen is that all messages fetched: xxxx appear in sequential order, which is absolutely improbable, if the generator was indeed working concurrently.
How do I make it run concurrently? There must be something huge missing in my understanding of what I/O loop is for, and what @gen.coroutine does. I.e. regardless of my generate_concurrency setting, the performance is unchanged.
最满意答案
无论你如何生成请求,你都只能获取任务。 您需要并行化的是获取而不是生成:
for i in range(self.fetch_concurrency): IOLoop.current().spawn_callback(self.fetch_requests)这将为您提供多个fetch_requests工作程序,可以从共享队列中提取工作。
此外,此代码的生成部分也不是并行运行的。 代替
for i in range(self.generate_concurrency): yield self.generate_requests(i)等待一个generate_requests调用在开始下一个之前完成,你可以并行运行它们
yield [self.generate_requests(i) for i in range(self.generate_concurrency)]No matter how you're generating the requests, you've only got on task fetching them. It's the fetching, not the generating, that you need to parallelize:
for i in range(self.fetch_concurrency): IOLoop.current().spawn_callback(self.fetch_requests)This will give you multiple fetch_requests workers that can pull work from the shared queue.
Also, the generation part of this code isn't running in parallel either. Instead of
for i in range(self.generate_concurrency): yield self.generate_requests(i)which waits for one generate_requests call to finish before starting the next, you can run them in parallel with
yield [self.generate_requests(i) for i in range(self.generate_concurrency)]更多推荐
发布评论