Boost asio,单个TCP服务器,许多客户端

编程入门 行业动态 更新时间:2024-10-10 08:24:37
本文介绍了Boost asio,单个TCP服务器,许多客户端的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在创建一个将使用boost asio的TCP服务器,该服务器将接受来自许多客户端的连接,接收数据并发送确认.问题是我希望能够接受所有客户,但是我一次只能与一个客户一起工作.我希望所有其他事务都保持在队列中.

I am creating a TCP server that will use boost asio which will accept connections from many clients, receive data, and send confirmations. The thing is that I want to be able to accept all the clients but I want to work only with one at a time. I want all the other transactions to be kept in a queue.

示例:

  • Client1连接
  • Client2连接
  • Client1发送数据并要求答复
  • Client2发送数据并要求答复
  • Client2的请求放入队列
  • 读取Client1的数据,服务器回复,交易结束
  • Client2的请求从队列中获取,服务器读取数据,并回复事务结束.
  • 所以这是异步服务器和阻塞服务器之间的事情.我想一次只做一件事情,但同时我希望能够将所有客户端套接字及其需求存储在队列中.

    So this is something between asynchronous server and blocking server. I want to do just 1 thing at once but at the same time I want to be able to store all client sockets and their demands in the queue.

    我能够使用我需要的所有功能来创建服务器-客户端通信,但只能在单个线程上进行.客户端断开连接后,服务器也将终止.我真的不知道如何开始实施我上面提到的内容.每次接受连接时,我都应该打开新线程吗?我应该使用async_accept还是阻止accept?

    I was able to create server-client communication with all the functionality that I need but only on single thread. Once client disconnects server is terminated as well. I don't really know how to start implementing what I have mentioned above. Should I open new thread each time connection is accepted? Should I use async_accept or blocking accept?

    我已经阅读了boost :: asio聊天示例,其中许多客户端连接这么一台服务器,但是这里没有我需要的排队机制.

    I have read boost::asio chat example, where many clients connect so single server, but there is no queuing mechanism that I need here.

    我知道这篇文章可能有点令人困惑,但是TCP服务器对我来说是新的,所以我对术语还不够熟悉.也没有要发布的源代码,因为我只是在寻求有关该项目概念的帮助.

    I am aware that this post might be a bit confusing but TCP servers are new to me so I am not familiar enough with the terminology. There is also no source code to post because I am asking only for help with concept of this project.

    推荐答案

    只需继续接受即可.

    您没有显示任何代码,但通常看起来像

    You show no code, but it typically looks like

    void do_accept() { acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { std::cout << "async_accept -> " << ec.message() << "\n"; if (!ec) { std::make_shared<Connection>(std::move(socket_))->start(); do_accept(); // THIS LINE } }); }

    如果您不包括标记为// THIS LINE的行,则确实不会接受超过1个连接.

    If you don't include the line marked // THIS LINE you will indeed not accept more than 1 connection.

    如果这样做没有帮助,请提供一些我们可以使用的代码.

    If this doesn't help, please include some code we can work from.

    这仅将标准库功能用于非网络部分.

    This uses just standard library features for the non-network part.

    网络部分如前所述:

    #include <boost/asio.hpp> #include <boost/asio/high_resolution_timer.hpp> #include <istream> using namespace std::chrono_literals; using Clock = std::chrono::high_resolution_clock; namespace Shared { using PostRequest = std::function<void(std::istream& is)>; } namespace Network { namespace ba = boost::asio; using ba::ip::tcp; using error_code = boost::system::error_code; using Shared::PostRequest; struct Connection : std::enable_shared_from_this<Connection> { Connection(tcp::socket&& s, PostRequest poster) : _s(std::move(s)), _poster(poster) {} void process() { auto self = shared_from_this(); ba::async_read(_s, _request, [this,self](error_code ec, size_t) { if (!ec || ec == ba::error::eof) { std::istream reader(&_request); _poster(reader); } }); } private: tcp::socket _s; ba::streambuf _request; PostRequest _poster; }; struct Server { Server(unsigned port, PostRequest poster) : _port(port), _poster(poster) {} void run_for(Clock::duration d = 30s) { _stop.expires_from_now(d); _stop.async_wait([this](error_code ec) { if (!ec) _svc.post([this] { _a.close(); }); }); _a.listen(); do_accept(); _svc.run(); } private: void do_accept() { _a.async_accept(_s, [this](error_code ec) { if (!ec) { std::make_shared<Connection>(std::move(_s), _poster)->process(); do_accept(); } }); } unsigned short _port; PostRequest _poster; ba::io_service _svc; ba::high_resolution_timer _stop { _svc }; tcp::acceptor _a { _svc, tcp::endpoint {{}, _port } }; tcp::socket _s { _svc }; }; }

    与工作服务部分的唯一连接"是PostRequest处理程序,该处理程序在构造时传递给服务器:

    The only "connection" to the work service part is the PostRequest handler that is passed to the server at construction:

    Network::Server server(6767, handler);

    我还选择了异步操作,因此即使我们不使用任何线程,我们也可以有一个计时器来停止服务:

    I've also opted for async operations, so we can have a timer to stop the service, even though we do not use any threads:

    server.run_for(3s); // this blocks

    工作部分

    这是完全独立的,将使用线程.首先,我们定义一个Request和一个线程安全的Queue:

    The Work Part

    This is completely separate, and will use threads. First, let's define a Request, and a thread-safe Queue:

    namespace Service { struct Request { std::vector<char> data; // or whatever you read from the sockets... }; Request parse_request(std::istream& is) { Request result; result.data.assign(std::istream_iterator<char>(is), {}); return result; } struct Queue { Queue(size_t max = 50) : _max(max) {} void enqueue(Request req) { std::unique_lock<std::mutex> lk(mx); cv.wait(lk, [this] { return _queue.size() < _max; }); _queue.push_back(std::move(req)); cv.notify_one(); } Request dequeue(Clock::time_point deadline) { Request req; { std::unique_lock<std::mutex> lk(mx); _peak = std::max(_peak, _queue.size()); if (cv.wait_until(lk, deadline, [this] { return _queue.size() > 0; })) { req = std::move(_queue.front()); _queue.pop_front(); cv.notify_one(); } else { throw std::range_error("dequeue deadline"); } } return req; } size_t peak_depth() const { std::lock_guard<std::mutex> lk(mx); return _peak; } private: mutable std::mutex mx; mutable std::condition_variable cv; size_t _max = 50; size_t _peak = 0; std::deque<Request> _queue; };

    这没什么特别的,并且实际上还没有使用线程.让我们创建一个接受队列引用的worker函数(如果需要,可以启动1个以上的worker):

    This is nothing special, and doesn't actually use threads yet. Let's make a worker function that accepts a reference to a queue (more than 1 worker can be started if so desired):

    void worker(std::string name, Queue& queue, Clock::duration d = 30s) { auto const deadline = Clock::now() + d; while(true) try { auto r = queue.dequeue(deadline); (std::cout << "Worker " << name << " handling request '").write(r.data.data(), r.data.size()) << "'\n"; } catch(std::exception const& e) { std::cout << "Worker " << name << " got " << e.what() << "\n"; break; } } }

    main驱动程序

    在这里实例化队列,并启动网络服务器和一些工作线程:

    The main Driver

    Here's where the Queue gets instantiated and both the network server as well as some worker threads are started:

    int main() { Service::Queue queue; auto handler = [&](std::istream& is) { queue.enqueue(Service::parse_request(is)); }; Network::Server server(6767, handler); std::vector<std::thread> pool; pool.emplace_back([&queue] { Service::worker("one", queue, 6s); }); pool.emplace_back([&queue] { Service::worker("two", queue, 6s); }); server.run_for(3s); // this blocks for (auto& thread : pool) if (thread.joinable()) thread.join(); std::cout << "Maximum queue depth was " << queue.peak_depth() << "\n"; }

    实时演示

    在Coliru上直播

    测试负载如下:

    for a in "hello world" "the quick" "brown fox" "jumped over" "the pangram" "bye world" do netcat 127.0.0.1 6767 <<< "$a" || echo "not sent: '$a'"& done wait

    它打印类似:

    Worker one handling request 'brownfox' Worker one handling request 'thepangram' Worker one handling request 'jumpedover' Worker two handling request 'Worker helloworldone handling request 'byeworld' Worker one handling request 'thequick' ' Worker one got dequeue deadline Worker two got dequeue deadline Maximum queue depth was 6

    更多推荐

    Boost asio,单个TCP服务器,许多客户端

    本文发布于:2023-11-30 22:32:21,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1651669.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:客户端   服务器   Boost   asio   TCP

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!