如何在Beast中保持此HTTPS连接?

编程入门 行业动态 更新时间:2024-10-27 16:30:14
本文介绍了如何在Beast中保持此HTTPS连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我对GraphQL服务器进行了大约30,000个查询;因为我有一个高延迟连接,所以我使用线程并行地执行许多查询。目前,每个查询都会建立一个新的连接;我希望重用这些连接,这应该会减少整个下载所需的时间。以下是我的代码:

#include <boost/asio.hpp> #include <boost/asio/ssl.hpp> #include <boost/beast.hpp> #include <boost/beast/ssl.hpp> #include <boost/asio/ssl/error.hpp> #include <boost/asio/ssl/stream.hpp> #include <chrono> #include <vector> #include <array> #include <iostream> #include "http.h" namespace beast=boost::beast; namespace http=beast::http; namespace net=boost::asio; namespace ssl=net::ssl; using tcp=net::ip::tcp; using namespace std; namespace cr=chrono; struct TimeBytes /* Used to compute the latency and data rate, which will be used * to compute the number of I/O threads for the next run. */ { float ms; int bytes; }; cr::steady_clock clk; vector<TimeBytes> timeBytes; mutex timeBytesMutex; thread_local string lastProto,lastHost,lastPort; array<string,4> parseUrl(string url) // protocol, hostname, port, path. All are strings, including the port. { size_t pos0=url.find("://"); size_t pos1; array<string,4> ret; ret[0]=url.substr(0,pos0); if (pos0<url.length()) pos0+=3; pos1=url.find("/",pos0); ret[1]=url.substr(pos0,pos1-pos0); ret[3]=url.substr(pos1); pos0=ret[1].find(":"); if (pos0<ret[1].length()) { ret[2]=ret[1].substr(pos0+1); ret[1]=ret[1].substr(0,pos0); } else if (ret[0]=="https") ret[2]="443"; else if (ret[0]=="https") ret[2]="80"; else ret[2]="0"; return ret; } string httpPost(string url,string data) { net::io_context context; ssl::context ctx(ssl::context::tlsv12_client); tcp::resolver res(context); tcp::resolver::results_type endpoints; beast::ssl_stream<beast::tcp_stream> stream(context,ctx); array<string,4> parsed=parseUrl(url); http::request<http::string_body> req; http::response<http::string_body> resp; beast::flat_buffer buffer; TimeBytes tb; cr::nanoseconds elapsed; cr::time_point<cr::steady_clock> timeStart=clk.now(); //if (parsed[0]==lastProto && parsed[1]==lastHost && parsed[2]==lastPort) //cout<<"same host "; //load_root_certificates(ctx); try { ctx.set_verify_mode(ssl::verify_peer); endpoints=res.resolve(parsed[1],parsed[2]); beast::get_lowest_layer(stream).connect(endpoints); SSL_set_tlsext_host_name(stream.native_handle(),parsed[1].c_str()); if (parsed[0]=="https") stream.handshake(net::ssl::stream_base::client); req.method(http::verb::post); req.target(parsed[3]); req.set(http::field::host,parsed[1]); req.set(http::field::connection,"keep-alive"); req.set(http::field::user_agent,BOOST_BEAST_VERSION_STRING); req.set(http::field::content_type,"application/json"); req.set(http::field::accept,"application/json"); req.body()=data; req.prepare_payload(); http::write(stream,req); http::read(stream,buffer,resp); elapsed=clk.now()-timeStart; tb.ms=elapsed.count()/1e6; tb.bytes=req.body().size()+resp.body().size()+7626; // 7626 accounts for HTTP, TCP, IP, and Ethernet headers. timeBytesMutex.lock(); timeBytes.push_back(tb); timeBytesMutex.unlock(); beast::close_socket(beast::get_lowest_layer(stream)); if (DEBUG_QUERY) { cout<<parsed[0]<<"| "<<parsed[1]<<"| "<<parsed[2]<<"| "<<parsed[3]<<"| "; cout<<data<<"| "; } } catch (...) { } lastProto=parsed[0]; lastHost=parsed[1]; lastPort=parsed[2]; return resp.body(); }

大多数请求都发送到一台服务器。向另一台服务器发出一些GET请求(使用与httpPost非常相似的httpGet函数)。下载数据后,我会对其进行处理,因此我希望在开始处理之前先关闭连接。

我在close_socket()之前尝试将context、ctx和stream设为本地线程,但程序在主线程第二次调用httpPost时崩溃,从http::read引发错误。(工作线程在主线程的两个查询之间进行一个查询。)在这一点上,我并没有试图保持连接打开,而是试图使线程本地工作,以便我可以保持连接打开。

推荐答案

我强烈建议使用异步接口。由于大部分时间显然都花在等待IO上,因此您很可能只从单个线程获得所有吞吐量。

下面的示例回答了您的问题(如何使客户端对多个请求保持打开状态),同时使处理异步化。现在,缺点是需要对单个客户端上的所有请求进行排序(这就是我使用_tasks队列的目的)。然而,这可能会起到启发作用。

请注意,初始化函数适用于所有完成处理程序结果类型:net::use_future、net::spawn(协程)等。

Live On Coliru

#include <boost/asio.hpp> #include <boost/asio/ssl.hpp> #include <boost/beast.hpp> #include <boost/beast/ssl.hpp> #include <chrono> #include <deque> #include <iomanip> #include <iostream> namespace net = boost::asio; namespace ssl = net::ssl; namespace beast = boost::beast; namespace http = beast::http; using clk = std::chrono::steady_clock; using net::ip::tcp; using beast::error_code; using namespace std::chrono_literals; /* Used to compute the latency and data rate, which will be used to compute the * number of I/O threads for the next run. */ struct TimeBytes { long double ms; size_t bytes; }; static std::vector<TimeBytes> timeBytes; static std::mutex timeBytesMutex; struct Url { struct Spec { std::string hostname, port; bool operator<(Spec const& rhs) const { return std::tie(hostname, port) < std::tie(rhs.hostname, rhs.port); } }; std::string protocol, hostname, port, path; Spec specification() const { return {hostname, port}; } }; #include <boost/spirit/home/x3.hpp> #include <boost/fusion/adapted/std_tuple.hpp> namespace x3 = boost::spirit::x3; Url parseUrl(std::string const& url) { Url ret; std::string hostport; { static const auto url_ = *(x3::char_ - "://") >> "://" // protocol >> +~x3::char_('/') // hostname >> *x3::char_; // path auto into = std::tie(ret.protocol, hostport, ret.path); parse(begin(url), end(url), x3::expect[url_], into); } { static const auto portspec_ = -(':' >> x3::uint_) >> x3::eoi; static const auto hostport_ = x3::raw[+(+~x3::char_(':') | !portspec_ >> x3::char_)] // >> -portspec_; boost::optional<uint16_t> port; auto into = std::tie(ret.hostname, port); parse(begin(hostport), end(hostport), x3::expect[hostport_], into); if (port.has_value()) { ret.port = std::to_string(*port); } else if (ret.protocol == "https") { ret.port = "443"; } else if (ret.protocol == "http") { ret.port = "80"; } else { ret.port = "0"; } } return ret; } struct Client : std::enable_shared_from_this<Client> { public: Client(net::any_io_executor ex, Url::Spec spec, ssl::context& ctx) : _executor(ex) , _spec(spec) , _sslcontext(ctx) { } template <typename Token> auto async_request(http::verb verb, std::string const& path, std::string const& data, Token&& token) { using R = typename net::async_result<std::decay_t<Token>, void(error_code, std::string)>; using H = typename R::completion_handler_type; H handler(std::forward<Token>(token)); R result(handler); auto chain_tasks = [this, h = std::move(handler), self = shared_from_this()](auto&&... args) mutable { if (!self->_tasks.empty()) { dispatch(self->_executor, [this, self] { if (not _tasks.empty()) _tasks.pop_front(); if (not _tasks.empty()) _tasks.front()->initiate(); }); } std::move(h)(std::forward<decltype(args)>(args)...); }; auto task = std::make_shared<RequestOp<decltype(chain_tasks)>>( this, verb, path, data, chain_tasks); enqueue(std::move(task)); return result.get(); } template <typename Token> auto async_post(std::string const& path, std::string const& data, Token&& token) { return async_request(http::verb::post,path, data, std::forward<Token>(token)); } template <typename Token> auto async_get(std::string const& path, Token&& token) { return async_request(http::verb::get,path, "", std::forward<Token>(token)); } private: template <typename Token> auto async_reconnect(Token&& token) { using R = typename net::async_result<std::decay_t<Token>, void(error_code)>; using H = typename R::completion_handler_type; H handler(std::forward<Token>(token)); R result(handler); assert(!_stream.has_value()); // probably a program flow bu _stream.emplace(_executor, _sslcontext); std::make_shared<ReconnectOp<H>>(this, std::move(handler))->start(); return result.get(); } template <typename Handler> struct ReconnectOp : std::enable_shared_from_this<ReconnectOp<Handler>> { ReconnectOp(Client* client, Handler h) : _client{client} , _handler(std::move(h)) , _resolver(client->_stream->get_executor()) { } Client* _client; Handler _handler; tcp::resolver _resolver; bool checked(error_code ec, bool complete = false) { if (complete || ec) std::move(_handler)(ec); if (ec && _client->_stream.has_value()) { std::cerr << "Socket " << _client->_stream->native_handle() << " closed due to " << ec.message() << std::endl; _client->_stream.reset(); } return !ec.failed(); } void start() { _resolver.async_resolve( _client->_spec.hostname, _client->_spec.port, beast::bind_front_handler(&ReconnectOp::on_resolved, this->shared_from_this())); } void on_resolved(error_code ec, tcp::resolver::results_type ep) { if (checked(ec)) { beast::get_lowest_layer(*_client->_stream) .async_connect( ep, beast::bind_front_handler(&ReconnectOp::on_connected, this->shared_from_this())); } } void on_connected(error_code ec, tcp::endpoint ep) { if (checked(ec)) { std::cerr << "Socket " << _client->_stream->native_handle() << " (re)connected to " << ep << std::endl; auto& hostname = _client->_spec.hostname; SSL_set_tlsext_host_name(_client->_stream->native_handle(), hostname.c_str()); _client->_stream->async_handshake( Stream::client, beast::bind_front_handler(&ReconnectOp::on_ready, this->shared_from_this())); } } void on_ready(error_code ec) { checked(ec, true); } }; struct IAsyncTask { virtual void initiate() = 0; }; template <typename Handler> struct RequestOp : IAsyncTask, std::enable_shared_from_this<RequestOp<Handler>> { RequestOp(Client* client, http::verb verb, std::string const& path, std::string data, Handler h) : _client(client) , _handler(std::move(h)) , _request(verb, path, 11, std::move(data)) { _request.set(http::field::host, _client->_spec.hostname); _request.set(http::field::connection, "keep-alive"); _request.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); _request.set(http::field::content_type, "application/json"); _request.set(http::field::accept, "application/json"); _request.prepare_payload(); } Client* _client; Handler _handler; http::request<http::string_body> _request; http::response<http::string_body> _response; beast::flat_buffer _buffer; size_t _bandwidth = 0; clk::time_point _start = clk::now(); bool checked(error_code ec, bool complete = false) { if (complete || ec) std::move(_handler)(ec, std::move(_response.body())); if (ec) _client->_stream.reset(); return !ec.failed(); } void initiate() override { if (!_client->_stream.has_value()) { _client->async_reconnect(beast::bind_front_handler( &RequestOp::on_connected, this->shared_from_this())); } else { on_connected(error_code{}); } } void on_connected(error_code ec) { _start = clk::now(); // This matches the start of measurements in // the original, synchronous code http::async_write(*_client->_stream, _request, beast::bind_front_handler( &RequestOp::on_sent, this->shared_from_this())); } void on_sent(error_code ec, size_t transferred) { _bandwidth += transferred; // measuring actual bytes including HTTP headers if (checked(ec)) { http::async_read( *_client->_stream, _buffer, _response, beast::bind_front_handler(&RequestOp::on_response, this->shared_from_this())); } } void on_response(error_code ec, size_t transferred) { _bandwidth += transferred; // measuring actual bytes including HTTP headers std::lock_guard lk(timeBytesMutex); timeBytes.push_back({(clk::now() - _start) / 1.0ms, _bandwidth}); checked(ec, true); } }; private: net::any_io_executor _executor; Url::Spec _spec; ssl::context& _sslcontext; using Stream = beast::ssl_stream<beast::tcp_stream>; std::optional<Stream> _stream; // nullopt when disconnected // task queueing using AsyncTask = std::shared_ptr<IAsyncTask>; std::deque<AsyncTask> _tasks; void enqueue(AsyncTask task) { post(_executor, [=, t = std::move(task), this, self = shared_from_this()] { _tasks.push_back(std::move(t)); if (_tasks.size() == 1) { _tasks.front()->initiate(); } }); } }; int main() { ssl::context ctx(ssl::context::tlsv12_client); ctx.set_verify_mode(ssl::verify_peer); ctx.set_default_verify_paths(); // load_root_certificates(ctx); net::thread_pool io(1); std::map<Url::Spec, std::shared_ptr<Client> > pool; using V = http::verb; for (auto [url, verb, data] : { std::tuple // {"httpbin/post", V::post, "post data"}, {"httpbin/delay/5", V::delete_, ""}, {"httpbin/base64/ZGVjb2RlZCBiYXM2NA==", V::get, ""}, {"httpbin/delay/7", V::patch, ""}, {"httpbin/stream/3", V::get, ""}, {"httpbin/uuid", V::get, ""}, }) // { auto parsed = parseUrl(url); std::cout << std::quoted(parsed.protocol) << " " << std::quoted(parsed.hostname) << " " << std::quoted(parsed.port) << " " << std::quoted(parsed.path) << " "; auto spec = parsed.specification(); if (!pool.contains(spec)) { pool.emplace(spec, std::make_shared<Client>( make_strand(io.get_executor()), spec, ctx)); } pool.at(spec)->async_request( verb, parsed.path, data, [=, v = verb, u = url](error_code ec, std::string const& body) { std::cout << v << " to " << u << ": " << std::quoted(body) << std::endl; }); } io.join(); for (auto& [time, bytes] : timeBytes) { std::cout << bytes << " bytes in " << time << "ms "; } }

在我的系统上打印

"https" "httpbin" "443" "/post" "https" "httpbin" "443" "/delay/5" "https" "httpbin" "443" "/base64/ZGVjb2RlZCBiYXM2NA==" "https" "httpbin" "443" "/delay/7" "https" "httpbin" "443" "/stream/3" "https" "httpbin" "443" "/uuid" Socket 0x7f4ad4001060 (re)connected to 18.232.227.86:443 POST to httpbin/post: "{ "args": {}, "data": "post data", "files": {}, "form": {}, "headers": { "Accept": "application/json", "Content-Length": "9", "Content-Type": "application/json", "Host": "httpbin", "User-Agent": "Boost.Beast/318", "X-Amzn-Trace-Id": "Root=1-618b513c-2c51c112061b10456a5e3d4e" }, "json": null, "origin": "163.158.244.77", "url": "httpbin/post" } " DELETE to httpbin/delay/5: "{ "args": {}, "data": "", "files": {}, "form": {}, "headers": { "Accept": "application/json", "Content-Type": "application/json", "Host": "httpbin", "User-Agent": "Boost.Beast/318", "X-Amzn-Trace-Id": "Root=1-618b513c-324c97504eb79d8b743c6c5d" }, "origin": "163.158.244.77", "url": "httpbin/delay/5" } " GET to httpbin/base64/ZGVjb2RlZCBiYXM2NA==: "decoded bas64" PATCH to httpbin/delay/7: "{ "args": {}, "data": "", "files": {}, "form": {}, "headers": { "Accept": "application/json", "Content-Type": "application/json", "Host": "httpbin", "User-Agent": "Boost.Beast/318", "X-Amzn-Trace-Id": "Root=1-618b5141-3a8c30e60562df583061fc5a" }, "origin": "163.158.244.77", "url": "httpbin/delay/7" } " GET to httpbin/stream/3: "{"url": "httpbin/stream/3", "args": {}, "headers": {"Host": "httpbin", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 0} {"url": "httpbin/stream/3", "args": {}, "headers": {"Host": "httpbin", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 1} {"url": "httpbin/stream/3", "args": {}, "headers": {"Host": "httpbin", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 2} " GET to httpbin/uuid: "{ "uuid": "4557c909-880e-456c-8ef9-049a72f5fda1" } " 826 bytes in 84.9807ms 752 bytes in 5267.26ms 425 bytes in 84.6031ms 751 bytes in 7085.28ms 1280 bytes in 86.6554ms 434 bytes in 85.0086ms

注意:

  • Httpbin有各种测试URL-其中一些会产生长时间延迟,因此会出现计时

  • 只有一个连接。如果出现IO错误,我们将断开连接(下一次请求时应重新连接)

  • HTTP错误不是错误,因为连接保持有效

  • DNS解析、连接和握手也是异步

更多推荐

如何在Beast中保持此HTTPS连接?

本文发布于:2023-11-06 17:50:02,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1564380.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何在   Beast   HTTPS

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!