我将如何在实时场景中使用并发.期货和队列?

编程入门 行业动态 更新时间:2024-10-10 09:17:50
本文介绍了我将如何在实时场景中使用并发.期货和队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

使用Python 3的concurrent.futures模块进行并行工作非常容易,如下所示.

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to = {executor.submit(do_work, input, 60): input for input in dictionary} for future in concurrent.futures.as_completed(future_to): data = future.result()

在队列中插入和检索项目也非常方便.

q = queue.Queue() for task in tasks: q.put(task) while not q.empty(): q.get()

我有一个脚本在后台运行,以监听更新.现在,从理论上讲,假设随着这些更新的到来,我将对它们进行排队,并使用ThreadPoolExecutor同时对其进行处理.

现在,所有这些组件都单独工作并且有意义,但是如何一起使用它们呢?我不知道是否有可能实时从队列中提供ThreadPoolExecutor工作,除非预先确定要从其工作的数据?

简而言之,我要做的就是每秒接收4条消息的更新,将它们推送到队列中,并让我的current.futures对其进行处理.如果我不这样做,那么我会陷入缓慢的顺序方法中.

让我们以 Python中的规范示例为例下面的文档:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))

URLS的列表是固定的.是否可以实时提供此列表,并让工作人员从列表中进行处理(也许出于管理目的而从队列中进行处理)?我对我的方法是否实际上可行

有点困惑

解决方案

示例进行了扩展,可以从队列中进行工作.需要注意的更改是,此代码使用concurrent.futures.wait而不是concurrent.futures.as_completed来允许在等待其他工作完成时开始新工作.

import concurrent.futures import urllib.request import time import queue q = queue.Queue() URLS = ['www.foxnews/', 'wwwn/', 'europe.wsj/', 'www.bbc.co.uk/', 'some-made-up-domain/'] def feed_the_workers(spacing): """ Simulate outside actors sending in work to do, request each url twice """ for url in URLS + URLS: time.sleep(spacing) q.put(url) return "DONE FEEDING" def load_url(url, timeout): """ Retrieve a single page and report the URL and contents """ with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # start a future for a thread which sends work in through the queue future_to_url = { executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'} while future_to_url: # check for status of the futures which are currently working done, not_done = concurrent.futures.wait( future_to_url, timeout=0.25, return_when=concurrent.futures.FIRST_COMPLETED) # if there is incoming work, start a new future while not q.empty(): # fetch a url from the queue url = q.get() # Start the load operation and mark the future with its URL future_to_url[executor.submit(load_url, url, 60)] = url # process any completed futures for future in done: url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: if url == 'FEEDER DONE': print(data) else: print('%r page is %d bytes' % (url, len(data))) # remove the now completed future del future_to_url[future]

两次获取每个url的输出:

'www.foxnews/' page is 67574 bytes 'wwwn/' page is 136975 bytes 'www.bbc.co.uk/' page is 193780 bytes 'some-made-up-domain/' page is 896 bytes 'www.foxnews/' page is 67574 bytes 'wwwn/' page is 136975 bytes DONE FEEDING 'www.bbc.co.uk/' page is 193605 bytes 'some-made-up-domain/' page is 896 bytes 'europe.wsj/' page is 874649 bytes 'europe.wsj/' page is 874649 bytes

It is fairly easy to do parallel work with Python 3's concurrent.futures module as shown below.

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to = {executor.submit(do_work, input, 60): input for input in dictionary} for future in concurrent.futures.as_completed(future_to): data = future.result()

It is also very handy to insert and retrieve items into a Queue.

q = queue.Queue() for task in tasks: q.put(task) while not q.empty(): q.get()

I have a script running in background listening for updates. Now, in theory assume that, as those updates arrive, I would queue them and do work on them concurrently using the ThreadPoolExecutor.

Now, individually, all of these components work in isolation, and make sense, but how do I go about using them together? I am not aware if it is possible to feed the ThreadPoolExecutor work from the queue in real time unless the data to work from is predetermined?

In a nutshell, all I want to do is, receive updates of say 4 messages a second, shove them in a queue, and get my concurrent.futures to work on them. If I don't, then I am stuck with a sequential approach which is slow.

Let's take the canonical example in the Python documentation below:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))

The list of URLS is fixed. Is it possible to feed this list in real-time and get the worker to process it as they come by, perhaps from a queue for management purposes? I am a bit confused on whether my approach is actually possible?

解决方案

The example from the Python docs, expanded to take its work from a queue. A change to note, is that this code uses concurrent.futures.wait instead of concurrent.futures.as_completed to allow new work to be started while waiting for other work to complete.

import concurrent.futures import urllib.request import time import queue q = queue.Queue() URLS = ['www.foxnews/', 'wwwn/', 'europe.wsj/', 'www.bbc.co.uk/', 'some-made-up-domain/'] def feed_the_workers(spacing): """ Simulate outside actors sending in work to do, request each url twice """ for url in URLS + URLS: time.sleep(spacing) q.put(url) return "DONE FEEDING" def load_url(url, timeout): """ Retrieve a single page and report the URL and contents """ with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # start a future for a thread which sends work in through the queue future_to_url = { executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'} while future_to_url: # check for status of the futures which are currently working done, not_done = concurrent.futures.wait( future_to_url, timeout=0.25, return_when=concurrent.futures.FIRST_COMPLETED) # if there is incoming work, start a new future while not q.empty(): # fetch a url from the queue url = q.get() # Start the load operation and mark the future with its URL future_to_url[executor.submit(load_url, url, 60)] = url # process any completed futures for future in done: url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: if url == 'FEEDER DONE': print(data) else: print('%r page is %d bytes' % (url, len(data))) # remove the now completed future del future_to_url[future]

Output from fetching each url twice:

'www.foxnews/' page is 67574 bytes 'wwwn/' page is 136975 bytes 'www.bbc.co.uk/' page is 193780 bytes 'some-made-up-domain/' page is 896 bytes 'www.foxnews/' page is 67574 bytes 'wwwn/' page is 136975 bytes DONE FEEDING 'www.bbc.co.uk/' page is 193605 bytes 'some-made-up-domain/' page is 896 bytes 'europe.wsj/' page is 874649 bytes 'europe.wsj/' page is 874649 bytes

更多推荐

我将如何在实时场景中使用并发.期货和队列?

本文发布于:2023-11-26 09:22:28,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1633542.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   我将   实时   期货   景中

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!