Boost asio,单个TCP服务器,许多客户端

我正在创建一个将使用boost asio的TCP服务器,该服务器将接受来自许多客户端的连接,接收数据并发送确认.问题是我希望能够接受所有客户,但是我一次只能与一个客户一起工作.我希望所有其他事务都保持在队列中.

I am creating a TCP server that will use boost asio which will accept connections from many clients, receive data, and send confirmations. The thing is that I want to be able to accept all the clients but I want to work only with one at a time. I want all the other transactions to be kept in a queue.


  • Client1连接
  • Client2连接
  • Client1发送数据并要求答复
  • Client2发送数据并要求答复
  • Client2的请求放入队列
  • 读取Client1的数据,服务器回复,交易结束
  • Client2的请求从队列中获取,服务器读取数据,并回复事务结束.
  • 所以这是异步服务器和阻塞服务器之间的事情.我想一次只做一件事情,但同时我希望能够将所有客户端套接字及其需求存储在队列中.

    So this is something between asynchronous server and blocking server. I want to do just 1 thing at once but at the same time I want to be able to store all client sockets and their demands in the queue.


    I was able to create server-client communication with all the functionality that I need but only on single thread. Once client disconnects server is terminated as well. I don't really know how to start implementing what I have mentioned above. Should I open new thread each time connection is accepted? Should I use async_accept or blocking accept?

    我已经阅读了boost :: asio聊天示例,其中许多客户端连接这么一台服务器,但是这里没有我需要的排队机制.

    I have read boost::asio chat example, where many clients connect so single server, but there is no queuing mechanism that I need here.


    I am aware that this post might be a bit confusing but TCP servers are new to me so I am not familiar enough with the terminology. There is also no source code to post because I am asking only for help with concept of this project.




    You show no code, but it typically looks like

    void do_accept() { acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { std::cout << "async_accept -> " << ec.message() << "\n"; if (!ec) { std::make_shared<Connection>(std::move(socket_))->start(); do_accept(); // THIS LINE } }); }

    如果您不包括标记为// THIS LINE的行,则确实不会接受超过1个连接.

    If you don't include the line marked // THIS LINE you will indeed not accept more than 1 connection.


    If this doesn't help, please include some code we can work from.


    This uses just standard library features for the non-network part.


    #include <boost/asio.hpp> #include <boost/asio/high_resolution_timer.hpp> #include <istream> using namespace std::chrono_literals; using Clock = std::chrono::high_resolution_clock; namespace Shared { using PostRequest = std::function<void(std::istream& is)>; } namespace Network { namespace ba = boost::asio; using ba::ip::tcp; using error_code = boost::system::error_code; using Shared::PostRequest; struct Connection : std::enable_shared_from_this<Connection> { Connection(tcp::socket&& s, PostRequest poster) : _s(std::move(s)), _poster(poster) {} void process() { auto self = shared_from_this(); ba::async_read(_s, _request, [this,self](error_code ec, size_t) { if (!ec || ec == ba::error::eof) { std::istream reader(&_request); _poster(reader); } }); } private: tcp::socket _s; ba::streambuf _request; PostRequest _poster; }; struct Server { Server(unsigned port, PostRequest poster) : _port(port), _poster(poster) {} void run_for(Clock::duration d = 30s) { _stop.expires_from_now(d); _stop.async_wait([this](error_code ec) { if (!ec)[this] { _a.close(); }); }); _a.listen(); do_accept();; } private: void do_accept() { _a.async_accept(_s, [this](error_code ec) { if (!ec) { std::make_shared<Connection>(std::move(_s), _poster)->process(); do_accept(); } }); } unsigned short _port; PostRequest _poster; ba::io_service _svc; ba::high_resolution_timer _stop { _svc }; tcp::acceptor _a { _svc, tcp::endpoint {{}, _port } }; tcp::socket _s { _svc }; }; }


    The only "connection" to the work service part is the PostRequest handler that is passed to the server at construction:

    Network::Server server(6767, handler);


    I've also opted for async operations, so we can have a timer to stop the service, even though we do not use any threads:

    server.run_for(3s); // this blocks



    The Work Part

    This is completely separate, and will use threads. First, let's define a Request, and a thread-safe Queue:

    namespace Service { struct Request { std::vector<char> data; // or whatever you read from the sockets... }; Request parse_request(std::istream& is) { Request result;<char>(is), {}); return result; } struct Queue { Queue(size_t max = 50) : _max(max) {} void enqueue(Request req) { std::unique_lock<std::mutex> lk(mx); cv.wait(lk, [this] { return _queue.size() < _max; }); _queue.push_back(std::move(req)); cv.notify_one(); } Request dequeue(Clock::time_point deadline) { Request req; { std::unique_lock<std::mutex> lk(mx); _peak = std::max(_peak, _queue.size()); if (cv.wait_until(lk, deadline, [this] { return _queue.size() > 0; })) { req = std::move(_queue.front()); _queue.pop_front(); cv.notify_one(); } else { throw std::range_error("dequeue deadline"); } } return req; } size_t peak_depth() const { std::lock_guard<std::mutex> lk(mx); return _peak; } private: mutable std::mutex mx; mutable std::condition_variable cv; size_t _max = 50; size_t _peak = 0; std::deque<Request> _queue; };


    This is nothing special, and doesn't actually use threads yet. Let's make a worker function that accepts a reference to a queue (more than 1 worker can be started if so desired):

    void worker(std::string name, Queue& queue, Clock::duration d = 30s) { auto const deadline = Clock::now() + d; while(true) try { auto r = queue.dequeue(deadline); (std::cout << "Worker " << name << " handling request '").write(, << "'\n"; } catch(std::exception const& e) { std::cout << "Worker " << name << " got " << e.what() << "\n"; break; } } }



    The main Driver

    Here's where the Queue gets instantiated and both the network server as well as some worker threads are started:

    int main() { Service::Queue queue; auto handler = [&](std::istream& is) { queue.enqueue(Service::parse_request(is)); }; Network::Server server(6767, handler); std::vector<std::thread> pool; pool.emplace_back([&queue] { Service::worker("one", queue, 6s); }); pool.emplace_back([&queue] { Service::worker("two", queue, 6s); }); server.run_for(3s); // this blocks for (auto& thread : pool) if (thread.joinable()) thread.join(); std::cout << "Maximum queue depth was " << queue.peak_depth() << "\n"; }




    for a in "hello world" "the quick" "brown fox" "jumped over" "the pangram" "bye world" do netcat 6767 <<< "$a" || echo "not sent: '$a'"& done wait


    Worker one handling request 'brownfox' Worker one handling request 'thepangram' Worker one handling request 'jumpedover' Worker two handling request 'Worker helloworldone handling request 'byeworld' Worker one handling request 'thequick' ' Worker one got dequeue deadline Worker two got dequeue deadline Maximum queue depth was 6


