我走了一段旅程:我可以从Python网络服务器中挤出多少性能?"这导致我进入AIOHTTP和uvloop.不过,我仍然可以看到AIOHTTP并未充分利用我的CPU.我开始将多重处理与AIOHTTP一起使用.我了解到,有一个Linux内核功能,该功能允许多个进程共享同一TCP端口.这使我开发了以下代码(效果很好):
I went down a journey of "How much performance can I squeeze out of a Python web-server?" This lead me to AIOHTTP and uvloop. Still, I could see that AIOHTTP wasn't using my CPU to its full potential. I set out to use multiprocessing with AIOHTTP. I learned that there's a Linux kernel feature that allows multiple processes to share the same TCP port. This lead me to develop the following code (Which works wonderfully):
import asyncio import os import socket import time from aiohttp import web from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count CPU_COUNT = cpu_count() print("CPU Count:", CPU_COUNT) def mk_socket(host="127.0.0.1", port=8000, reuseport=False): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuseport: SO_REUSEPORT = 15 sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) sock.bind((host, port)) return sock async def handle(request): name = request.match_info.get('name', "Anonymous") pid = os.getpid() text = "{:.2f}: Hello {}! Process {} is treating you\n".format( time.time(), name, pid) #time.sleep(5) # intentionally blocking sleep to simulate CPU load return web.Response(text=text) def start_server(): host = "127.0.0.1" port=8000 reuseport = True app = web.Application() sock = mk_socket(host, port, reuseport=reuseport) app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) loop = asyncio.get_event_loop() coro = loop.create_server( protocol_factory=app.make_handler(), sock=sock, ) srv = loop.run_until_complete(coro) loop.run_forever() if __name__ == '__main__': with ProcessPoolExecutor() as executor: for i in range(0, CPU_COUNT): executor.submit(start_server)在应用此代码之前,
我的网站的wrk基准测试:
wrk benchmark of my site before applying this code:
Running 30s test @ 127.0.0.1:8000/ 12 threads and 400 connections Thread Stats Avg Stdev Max +/- Stdev Latency 54.33ms 6.54ms 273.24ms 89.95% Req/Sec 608.68 115.97 2.27k 83.63% 218325 requests in 30.10s, 41.23MB read Non-2xx or 3xx responses: 218325 Requests/sec: 7254.17 Transfer/sec: 1.37MB之后的wrk基准测试
Running 30s test @ 127.0.0.1:8000/ 12 threads and 400 connections Thread Stats Avg Stdev Max +/- Stdev Latency 15.96ms 7.27ms 97.29ms 84.78% Req/Sec 2.11k 208.30 4.45k 75.50% 759290 requests in 30.08s, 153.51MB read Requests/sec: 25242.39 Transfer/sec: 5.10MB哇!但是有一个问题:
DeprecationWarning: Application.make_handler(...) is deprecated, use AppRunner API instead protocol_factory=app.make_handler()所以我尝试了这个:
import asyncio import os import socket import time from aiohttp import web from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count CPU_COUNT = cpu_count() print("CPU Count:", CPU_COUNT) def mk_socket(host="127.0.0.1", port=8000, reuseport=False): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuseport: SO_REUSEPORT = 15 sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) sock.bind((host, port)) return sock async def handle(request): name = request.match_info.get('name', "Anonymous") pid = os.getpid() text = "{:.2f}: Hello {}! Process {} is treating you\n".format( time.time(), name, pid) #time.sleep(5) # intentionally blocking sleep to simulate CPU load return web.Response(text=text) async def start_server(): host = "127.0.0.1" port=8000 reuseport = True app = web.Application() sock = mk_socket(host, port, reuseport=reuseport) app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) coro = loop.create_server( protocol_factory=app.make_handler(), sock=sock, ) runner = web.AppRunner(app) await runner.setup() srv = web.TCPSite(runner, 'localhost', 8000) await srv.start() print('Server started at 127.0.0.1:8000') return coro, app, runner async def finalize(srv, app, runner): sock = srv.sockets[0] app.loop.remove_reader(sock.fileno()) sock.close() #await handler.finish_connections(1.0) await runner.cleanup() srv.close() await srv.wait_closed() await app.finish() def init(): loop = asyncio.get_event_loop() srv, app, runner = loop.run_until_complete(init) try: loop.run_forever() except KeyboardInterrupt: loop.run_until_complete((finalize(srv, app, runner))) if __name__ == '__main__': with ProcessPoolExecutor() as executor: for i in range(0, CPU_COUNT): executor.submit(init)这显然是不完整的,因为未使用coro.我不确定在哪里将套接字与AppRunner集成在一起.答案应显示修改为使用App Runner的原始示例.
which is obviously incomplete becuase coro isn't being used. I'm not sure where to integrate the socket with AppRunner. Answer should show original example modified to use App Runner.
推荐答案由于这是我第一次使用协程和aiohttp,所以我可能是错的,但它似乎可以与SockSite一起使用:
As it's my first time using coroutines and aiohttp, I may be wrong, but it seems to work with a SockSite:
#!/usr/bin/env python import asyncio import os import socket import time import traceback from aiohttp import web from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count CPU_COUNT = cpu_count() print("CPU Count:", CPU_COUNT) def mk_socket(host="127.0.0.1", port=9090, reuseport=False): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuseport: SO_REUSEPORT = 15 sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) sock.bind((host, port)) return sock async def handle(request): name = request.match_info.get('name', "Anonymous") pid = os.getpid() text = "{:.2f}: Hello {}! Process {} is treating you\n".format( time.time(), name, pid) #time.sleep(5) # intentionally blocking sleep to simulate CPU load return web.Response(text=text) async def start_server(): try: host = "127.0.0.1" port=9090 reuseport = True app = web.Application() app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) runner = web.AppRunner(app) await runner.setup() sock = mk_socket(host, port, reuseport=reuseport) srv = web.SockSite(runner, sock) await srv.start() print('Server started at 127.0.0.1:9090') return srv, app, runner except Exception: traceback.print_exc() raise async def finalize(srv, app, runner): sock = srv.sockets[0] app.loop.remove_reader(sock.fileno()) sock.close() #await handler.finish_connections(1.0) await runner.cleanup() srv.close() await srv.wait_closed() await app.finish() def init(): loop = asyncio.get_event_loop() srv, app, runner = loop.run_until_complete(start_server()) try: loop.run_forever() except KeyboardInterrupt: loop.run_until_complete((finalize(srv, app, runner))) if __name__ == '__main__': with ProcessPoolExecutor() as executor: for i in range(0, CPU_COUNT): executor.submit(init)最终:
>curl 127.0.0.1:9090 1580741746.47: Hello Anonymous! Process 54623 is treating you >curl 127.0.0.1:9090 1580741747.05: Hello Anonymous! Process 54620 is treating you >curl 127.0.0.1:9090 1580741747.77: Hello Anonymous! Process 54619 is treating you >curl 127.0.0.1:9090 1580741748.36: Hello Anonymous! Process 54621 is treating you我还在finalize例程中添加了一个日志,它似乎已正确触发.
I also added a log in the finalize routine and it seems correctly triggered.
出于好奇,我尝试在较旧的内核上进行了尝试,并且确认启用该选项后,它不起作用(与False配合使用).
And, out of curiosity, I gave it a try on an older kernel and I confirm it doesn't work when the option is enabled (it works with False).
更多推荐
AIOHTTP
发布评论