AIOHTTP

编程入门 行业动态 更新时间:2024-10-26 06:35:57
本文介绍了AIOHTTP-不推荐使用Application.make_handler(...)-添加多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我走了一段旅程:我可以从Python网络服务器中挤出多少性能?"这导致我进入AIOHTTP和uvloop.不过,我仍然可以看到AIOHTTP并未充分利用我的CPU.我开始将多重处理与AIOHTTP一起使用.我了解到,有一个Linux内核功能,该功能允许多个进程共享同一TCP端口.这使我开发了以下代码(效果很好):

I went down a journey of "How much performance can I squeeze out of a Python web-server?" This lead me to AIOHTTP and uvloop. Still, I could see that AIOHTTP wasn't using my CPU to its full potential. I set out to use multiprocessing with AIOHTTP. I learned that there's a Linux kernel feature that allows multiple processes to share the same TCP port. This lead me to develop the following code (Which works wonderfully):

import asyncio import os import socket import time from aiohttp import web from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count CPU_COUNT = cpu_count() print("CPU Count:", CPU_COUNT) def mk_socket(host="127.0.0.1", port=8000, reuseport=False): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuseport: SO_REUSEPORT = 15 sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) sock.bind((host, port)) return sock async def handle(request): name = request.match_info.get('name', "Anonymous") pid = os.getpid() text = "{:.2f}: Hello {}! Process {} is treating you\n".format( time.time(), name, pid) #time.sleep(5) # intentionally blocking sleep to simulate CPU load return web.Response(text=text) def start_server(): host = "127.0.0.1" port=8000 reuseport = True app = web.Application() sock = mk_socket(host, port, reuseport=reuseport) app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) loop = asyncio.get_event_loop() coro = loop.create_server( protocol_factory=app.make_handler(), sock=sock, ) srv = loop.run_until_complete(coro) loop.run_forever() if __name__ == '__main__': with ProcessPoolExecutor() as executor: for i in range(0, CPU_COUNT): executor.submit(start_server)

在应用此代码之前,

我的网站的wrk基准测试:

wrk benchmark of my site before applying this code:

Running 30s test @ 127.0.0.1:8000/ 12 threads and 400 connections Thread Stats Avg Stdev Max +/- Stdev Latency 54.33ms 6.54ms 273.24ms 89.95% Req/Sec 608.68 115.97 2.27k 83.63% 218325 requests in 30.10s, 41.23MB read Non-2xx or 3xx responses: 218325 Requests/sec: 7254.17 Transfer/sec: 1.37MB

之后的wrk基准测试

Running 30s test @ 127.0.0.1:8000/ 12 threads and 400 connections Thread Stats Avg Stdev Max +/- Stdev Latency 15.96ms 7.27ms 97.29ms 84.78% Req/Sec 2.11k 208.30 4.45k 75.50% 759290 requests in 30.08s, 153.51MB read Requests/sec: 25242.39 Transfer/sec: 5.10MB

哇!但是有一个问题:

DeprecationWarning: Application.make_handler(...) is deprecated, use AppRunner API instead protocol_factory=app.make_handler()

所以我尝试了这个:

import asyncio import os import socket import time from aiohttp import web from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count CPU_COUNT = cpu_count() print("CPU Count:", CPU_COUNT) def mk_socket(host="127.0.0.1", port=8000, reuseport=False): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuseport: SO_REUSEPORT = 15 sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) sock.bind((host, port)) return sock async def handle(request): name = request.match_info.get('name', "Anonymous") pid = os.getpid() text = "{:.2f}: Hello {}! Process {} is treating you\n".format( time.time(), name, pid) #time.sleep(5) # intentionally blocking sleep to simulate CPU load return web.Response(text=text) async def start_server(): host = "127.0.0.1" port=8000 reuseport = True app = web.Application() sock = mk_socket(host, port, reuseport=reuseport) app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) coro = loop.create_server( protocol_factory=app.make_handler(), sock=sock, ) runner = web.AppRunner(app) await runner.setup() srv = web.TCPSite(runner, 'localhost', 8000) await srv.start() print('Server started at 127.0.0.1:8000') return coro, app, runner async def finalize(srv, app, runner): sock = srv.sockets[0] app.loop.remove_reader(sock.fileno()) sock.close() #await handler.finish_connections(1.0) await runner.cleanup() srv.close() await srv.wait_closed() await app.finish() def init(): loop = asyncio.get_event_loop() srv, app, runner = loop.run_until_complete(init) try: loop.run_forever() except KeyboardInterrupt: loop.run_until_complete((finalize(srv, app, runner))) if __name__ == '__main__': with ProcessPoolExecutor() as executor: for i in range(0, CPU_COUNT): executor.submit(init)

这显然是不完整的,因为未使用coro.我不确定在哪里将套接字与AppRunner集成在一起.答案应显示修改为使用App Runner的原始示例.

which is obviously incomplete becuase coro isn't being used. I'm not sure where to integrate the socket with AppRunner. Answer should show original example modified to use App Runner.

推荐答案

由于这是我第一次使用协程和aiohttp,所以我可能是错的,但它似乎可以与SockSite一起使用:

As it's my first time using coroutines and aiohttp, I may be wrong, but it seems to work with a SockSite:

#!/usr/bin/env python import asyncio import os import socket import time import traceback from aiohttp import web from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count CPU_COUNT = cpu_count() print("CPU Count:", CPU_COUNT) def mk_socket(host="127.0.0.1", port=9090, reuseport=False): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuseport: SO_REUSEPORT = 15 sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) sock.bind((host, port)) return sock async def handle(request): name = request.match_info.get('name', "Anonymous") pid = os.getpid() text = "{:.2f}: Hello {}! Process {} is treating you\n".format( time.time(), name, pid) #time.sleep(5) # intentionally blocking sleep to simulate CPU load return web.Response(text=text) async def start_server(): try: host = "127.0.0.1" port=9090 reuseport = True app = web.Application() app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) runner = web.AppRunner(app) await runner.setup() sock = mk_socket(host, port, reuseport=reuseport) srv = web.SockSite(runner, sock) await srv.start() print('Server started at 127.0.0.1:9090') return srv, app, runner except Exception: traceback.print_exc() raise async def finalize(srv, app, runner): sock = srv.sockets[0] app.loop.remove_reader(sock.fileno()) sock.close() #await handler.finish_connections(1.0) await runner.cleanup() srv.close() await srv.wait_closed() await app.finish() def init(): loop = asyncio.get_event_loop() srv, app, runner = loop.run_until_complete(start_server()) try: loop.run_forever() except KeyboardInterrupt: loop.run_until_complete((finalize(srv, app, runner))) if __name__ == '__main__': with ProcessPoolExecutor() as executor: for i in range(0, CPU_COUNT): executor.submit(init)

最终:

>curl 127.0.0.1:9090 1580741746.47: Hello Anonymous! Process 54623 is treating you >curl 127.0.0.1:9090 1580741747.05: Hello Anonymous! Process 54620 is treating you >curl 127.0.0.1:9090 1580741747.77: Hello Anonymous! Process 54619 is treating you >curl 127.0.0.1:9090 1580741748.36: Hello Anonymous! Process 54621 is treating you

我还在finalize例程中添加了一个日志,它似乎已正确触发.

I also added a log in the finalize routine and it seems correctly triggered.

出于好奇,我尝试在较旧的内核上进行了尝试,并且确认启用该选项后,它不起作用(与False配合使用).

And, out of curiosity, I gave it a try on an older kernel and I confirm it doesn't work when the option is enabled (it works with False).

更多推荐

AIOHTTP

本文发布于:2023-11-23 10:26:10,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1621103.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:AIOHTTP

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!