云原生小课堂

编程入门 行业动态 更新时间:2024-10-10 13:20:06

云原生小<a href=https://www.elefans.com/category/jswz/34/1769265.html style=课堂"/>

云原生小课堂

前言

Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。

Envoy也是istio的核心组件之一,以 sidecar 的方式与服务运行在一起,对服务的流量进行拦截转发,具有路由,流量控制等等强大特性。本系列文章,我们将不局限于istio,envoy的官方文档,从源码级别切入,分享Envoy启动、流量劫持、http 请求处理流程的进阶应用实例,深度分析Envoy架构。

本篇是Envoy请求流程源码解析的第二篇,主要分享Envoy的outbound方向上篇,包含启动监听和建立连接。注:本文中所讨论的issue和pr基于21年12月。

envoy当中基于libevent进行封装了各种文件,定时器事件等操作,以及dispatch对象的分发,和延迟析构,worker启动,worker listener绑定等部分不在这里作解读,后续有空可以单独再进行分析。跳过envoy当中的事件循环模型,这里以请求触发开始。

outbound方向

filter解析

启动监听

  1. 通过xDS或者静态配置,获得Envoy代理的监听器信息

  2. 如果监听器bind_to_port,则直接调用libevent的接口,绑定监听,回调函数设置为ListenerImpl::listenCallback

    void ListenerManagerImpl::addListenerToWorker(Worker& worker,absl::optional<uint64_t> overridden_listener,ListenerImpl& listener,ListenerCompletionCallback completion_callback) {if (overridden_listener.has_value()) {ENVOY_LOG(debug, "replacing existing listener {}", overridden_listener.value());worker.addListener(overridden_listener, listener, [this, completion_callback](bool) -> void {server_.dispatcher().post([this, completion_callback]() -> void {stats_.listener_create_success_.inc();if (completion_callback) {completion_callback();}});});return;}worker.addListener(overridden_listener, listener, [this, &listener, completion_callback](bool success) -> void {// The add listener completion runs on the worker thread. Post back to the main thread to// avoid locking.server_.dispatcher().post([this, success, &listener, completion_callback]() -> void {void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options,bool bind_to_port) {setListenSocketOptions(options);if (bind_to_port) {bind(address_provider_->localAddress());}}ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,Network::ListenerConfig& config): ActiveTcpListener(parent,parent.dispatcher().createListener(config.listenSocketFactory().getListenSocket(), *this,config.bindToPort(), config.tcpBacklogSize()),config) {}class ActiveTcpListener : public Network::TcpListenerCallbacks,public ActiveListenerImplBase,public Network::BalancedConnectionHandler,Logger::Loggable<Logger::Id::conn_handler> {public:ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config);ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,Network::ListenerConfig& config);~ActiveTcpListener() override;bool listenerConnectionLimitReached() const {// TODO(tonya11en): Delegate enforcement of per-listener connection limits to overload// manager.return !config_->openConnections().canCreate();}void decNumConnections() {ASSERT(num_listener_connections_ > 0);--num_listener_connections_;config_->openConnections().dec();}// Network::TcpListenerCallbacksvoid onAccept(Network::ConnectionSocketPtr&& socket) override;void onReject(RejectCause) override;listener_.reset(// libevent的base                      当前对象方法                   套接字的文件描述符evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));if (!listener_) {throw CreateListenerException(fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));}if (!Network::Socket::applyOptions(socket.options(), socket,envoy::api::v3::core::SocketOption::STATE_LISTENING)) {throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}",socket.localAddress()->asString()));}evconnlistener_set_error_cb(listener_.get(), errorCallback);

关于reuseport

  1. /

多个 server socket 监听相同的端口。每个 server socket 对应一个监听线程。内核 TCP 栈接收到客户端建立连接请求(SYN)时,按 TCP 4 元组(srcIP,srcPort,destIP,destPort) hash 算法,选择一个监听线程,唤醒之。新连接绑定到被唤醒的线程。所以相对于非SO_REUSEPORT, 连接更为平均地分布到线程中(hash 算法不是绝对平均)

envoy当中是支持在listener去设置开启这个特性,但是热重启场景时,对内核版本有一定要求(4.19-rc1)

.18.3/api-v3/config/listener/v3/listener.proto

验证观察

默认未开启,通过envoyfilter进行开启后,可见15001的端口被开启

apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:name: reuseportnamespace: testhl
spec:workloadSelector:labels:app: asm-0configPatches:- applyTo: LISTENERmatch:context: SIDECAR_OUTBOUNDlistener:portNumber: 15001name: "virtualOutbound"patch:operation: MERGEvalue:reuse_port: true

需要重启 POD

而对于没有应用reuseport

大致的平均

关于绝对的链接平衡, 可以试试 Listener 的配置connection_balance_config:exact_balance,不过由于有锁,对高频新连接应该有一定的性能损耗。目前只适用于 TCP 监听器

Network::BalancedConnectionHandlerOptRef new_listener;if (hand_off_restored_destination_connections_ &&socket_->addressProvider().localAddressRestored()) {// Find a listener associated with the original destination address.new_listener =listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());}if (!rebalanced) {Network::BalancedConnectionHandler& target_handler =config_->connectionBalancer().pickTargetHandler(*this);if (&target_handler != this) {target_handler.post(std::move(socket));return;}}auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),hand_off_restored_destination_connections);// Create and run the filtersconfig_->filterChainFactory().createListenerFilterChain(*active_socket);active_socket->continueFilterChain(true);Network::BalancedConnectionHandlerOptRef
ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) {// This is a linear operation, may need to add a map<address, listener> to improve performance.// However, linear performance might be adequate since the number of listeners is small.// We do not return stopped listeners.auto listener_it =std::find_if(listeners_.begin(), listeners_.end(),[&address](std::pair<Network::Address::InstanceConstSharedPtr,ConnectionHandlerImpl::ActiveListenerDetails>& p) {return p.second.tcpListener().has_value() &&p.second.listener_->listener() != nullptr &&p.first->type() == Network::Address::Type::Ip && *(p.first) == address;});// If there is exact address match, return the corresponding listener.if (listener_it != listeners_.end()) {return Network::BalancedConnectionHandlerOptRef(listener_it->second.tcpListener().value().get());}// Otherwise, we need to look for the wild card match, i.e., 0.0.0.0:[address_port].// We do not return stopped listeners.// TODO(wattli): consolidate with previous search for more efficiency.if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.listener_wildcard_match_ip_family")) {listener_it =std::find_if(listeners_.begin(), listeners_.end(),[&address](const std::pair<Network::Address::InstanceConstSharedPtr,ConnectionHandlerImpl::ActiveListenerDetails>& p) {

建立连接

  1. DispatcherImpl通过libevent,接收到请求,调用ListenerImpl::listenCallback

  2. client向envoy发起连接,envoy的worker接收eventloop的callback, 触发 Envoy::Network::ListenerImpl::listenCallback(port: 15001)

  3. 15001的useOriginalDst": true,accept_filters_中会带有OriginalDstFilter

  4. OriginalDstFilter.OnAccept中用os_syscalls.getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len)获取在iptables修改之前dst ip  iptables与getsockopt

  Network::Address::InstanceConstSharedPtr OriginalDstFilter::getOriginalDst(Network::Socket& sock) {return Network::Utility::getOriginalDst(sock);}sockaddr_storage orig_addr;memset(&orig_addr, 0, sizeof(orig_addr));socklen_t addr_len = sizeof(sockaddr_storage);int status;if (*ipVersion == Address::IpVersion::v4) {status = sock.getSocketOption(SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len).rc_;} else {status = sock.getSocketOption(SOL_IPV6, IP6T_SO_ORIGINAL_DST, &orig_addr, &addr_len).rc_;}if (status != 0) {return nullptr;}return Address::addressFromSockAddr(orig_addr, 0, true /* default for v6 constructor */);
  1. 在newconnection当中,还会通过 getBalancedHandlerByAddress寻找到实际的虚拟listener

    void ActiveTcpSocket::newConnection() {connected_ = true;// Check if the socket may need to be redirected to another listener.Network::BalancedConnectionHandlerOptRef new_listener;if (hand_off_restored_destination_connections_ &&socket_->addressProvider().localAddressRestored()) {// Find a listener associated with the original destination address.new_listener =listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());}
  1. 通过ConnectionHandlerImpl::findActiveListenerByTag

  Network::BalancedConnectionHandlerOptRefConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) {// This is a linear operation, may need to add a map<address, listener> to improve performance.// However, linear performance might be adequate since the number of listeners is small.// We do not return stopped listeners.auto listener_it =std::find_if(listeners_.begin(), listeners_.end(),[&address](std::pair<Network::Address::InstanceConstSharedPtr,ConnectionHandlerImpl::ActiveListenerDetails>& p) {return p.second.tcpListener().has_value() &&p.second.listener_->listener() != nullptr &&p.first->type() == Network::Address::Type::Ip && *(p.first) == address;});// If there is exact address match, return the corresponding listener.if (listener_it != listeners_.end()) {return Network::BalancedConnectionHandlerOptRef(listener_it->second.tcpListener().value().get());}

查到addr对应的Listener

  • 先查找Listener.IP==addr.ip && Listener.Port==addr.port的Listener

  • 再查找Listener.IP==0.0.0.0 && Listener.Port==addr.port的Listener (对于tcp服务,ip会有值,对于http服务,ip为4个0)

  1. dispatcher.createServerConnection传入accept到的fd 创建Server连接对象ConnectionImpl, 并把onFileEvent注册到eventloop,等待读写事件的到来,因为socket是由一个non-blocking listening socket创建而来,所以也是non-blocking

  2. 且注册的触发方式为epoll的边缘触发

    auto server_conn_ptr = parent_.dispatcher().createServerConnection(std::move(socket), std::move(transport_socket), *stream_info);Network::ServerConnectionPtrDispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,Network::TransportSocketPtr&& transport_socket,StreamInfo::StreamInfo& stream_info) {ASSERT(isThreadSafe());return std::make_unique<Network::ServerConnectionImpl>(*this, std::move(socket), std::move(transport_socket), stream_info, true);}class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {public:ServerConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info,bool connected);// ServerConnection implvoid setTransportSocketConnectTimeout(std::chrono::milliseconds timeout) override;void raiseEvent(ConnectionEvent event) override;private:void onTransportSocketConnectTimeout();bool transport_connect_pending_{true};// Implements a timeout for the transport socket signaling connection. The timer is enabled by a// call to setTransportSocketConnectTimeout and is reset when the connection is established.Event::TimerPtr transport_socket_connect_timer_;};Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;// We never ask for both early close and read at the same time. If we are reading, we want to// consume all available data.socket_->ioHandle().initializeFileEvent(dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,Event::FileReadyType::Read | Event::FileReadyType::Write);transport_socket_->setTransportSocketCallbacks(*this);constexpr FileTriggerType determinePlatformPreferredEventType() {#if defined(WIN32) || defined(FORCE_LEVEL_EVENTS)return FileTriggerType::EmulatedEdge;#elsereturn FileTriggerType::Edge;#endif}static constexpr FileTriggerType PlatformDefaultTriggerType = determinePlatformPreferredEventType();
  1. http的listener里filters为envoy.http_connection_managerbuildFilterChain里会把HTTP::ConnectionManagerImpl加入到upstream_filters_(list)中,这样在请求数据到达的时候,就可以使用http_connection_manager的on_read方法

     void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {ASSERT(connection_.state() == Connection::State::Open);ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter});filter->initializeReadFilterCallbacks(*new_filter);LinkedList::moveIntoListBack(std::move(new_filter), upstream_filters_);}CodecClient::CodecClient(Type type, Network::ClientConnectionPtr&& connection,Upstream::HostDescriptionConstSharedPtr host,Event::Dispatcher& dispatcher): type_(type), host_(host), connection_(std::move(connection)),idle_timeout_(host_->cluster().idleTimeout()) {if (type_ != Type::HTTP3) {// Make sure upstream connections process data and then the FIN, rather than processing// TCP disconnects immediately. (see  for// details)connection_->detectEarlyCloseWhenReadDisabled(false);}connection_->addConnectionCallbacks(*this);connection_->addReadFilter(Network::ReadFilterSharedPtr{new CodecReadFilter(*this)});connection_->noDelay(true);
  1. 当连接刚刚加入eventloop的时候, Write Event会被立即触发,但因为write_buffer_没有数据,所以不会写入任何数据

   void CodecClient::onEvent(Network::ConnectionEvent event) {if (event == Network::ConnectionEvent::Connected) {ENVOY_CONN_LOG(debug, "connected", *connection_);connection_->streamInfo().setDownstreamSslConnection(connection_->ssl());connected_ = true;}if (event == Network::ConnectionEvent::RemoteClose) {remote_closed_ = true;}// HTTP/1 can signal end of response by disconnecting. We need to handle that case.if (type_ == Type::HTTP1 && event == Network::ConnectionEvent::RemoteClose &&!active_requests_.empty()) {Buffer::OwnedImpl empty;onData(empty);}if (event == Network::ConnectionEvent::RemoteClose ||event == Network::ConnectionEvent::LocalClose) {ENVOY_CONN_LOG(debug, "disconnect. resetting {} pending requests", *connection_,active_requests_.size());disableIdleTimer();idle_timer_.reset();StreamResetReason reason = StreamResetReason::ConnectionFailure;if (connected_) {reason = StreamResetReason::ConnectionTermination;if (protocol_error_) {if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.return_502_for_upstream_protocol_errors")) {reason = StreamResetReason::ProtocolError;connection_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamProtocolError);}}}while (!active_requests_.empty()) {// Fake resetting all active streams so that reset() callbacks get invoked.active_requests_.front()->encoder_->getStream().resetStream(reason);}}}

ASM试用申请

Envoy是Istio中的Sidecar官方标配,是一个面向Service Mesh的高性能网络代理服务。

当前Service Mesh是Kubernetes上微服务治理的最佳实践,灵雀云微服务治理平台Alauda Service Mesh(简称:ASM)可完整覆盖微服务落地所需要的基础设施,让开发者真正聚焦业务。

点击此处,深入体验ASM!

关于【云原生小课堂】

【云原生小课堂】是由灵雀云、Kube-OVN社区、云原生技术社区联合开设的公益性技术分享类专题,将以丰富详实的精品内容和灵活多样的呈现形式,持续为您分享云原生前沿技术,带您了解更多云原生实践干货。

在数字化转型的背景下,云原生已经成为企业创新发展的核心驱动力。作为国内最早将 Kubernetes 产品化的厂商之一,灵雀云从出生便携带“云原生基因”,致力于通过革命性的技术帮助企业完成数字化转型,我们期待着云原生给这个世界带来更多改变。

关注我们,学习更多云原生知识,一起让改变发生。

相关阅读:

云原生小课堂 | Envoy请求流程源码解析(一):流量劫持

更多推荐

云原生小课堂

本文发布于:2024-02-14 08:48:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1762934.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:课堂

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!