分布式架构底层磐石

编程入门 行业动态 更新时间:2024-10-24 06:34:41

分布式架构底层<a href=https://www.elefans.com/category/jswz/34/1697548.html style=磐石"/>

分布式架构底层磐石

一、BIO(Blocked IO  阻塞IO):

客户端多并发请求时,服务端的TPS是1,如何提升服务端的并发数??

 

解决上面的单线程问题,提升服务端的TPS,做以下改良:

这个版本,依然存在巨大的问题,例如线程池的最大线程数是1000,那么有可能80%的线程是阻塞在read(),极大的浪费系统资源,继续改良,如果read时候,发现没有就绪,是否可以不用阻塞??

package com.jason.thread.socket.newsocket.bio.pool;import java.io.IOException;
import java.InetSocketAddress;
import java.Socket;import static java.lang.Thread.sleep;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:36**/
public class MyBioClientSocket  implements Runnable {public static void main(String[] args) {Thread   thread1=new Thread(new MyBioClientSocket());Thread   thread2=new Thread(new MyBioClientSocket());Thread   thread3=new Thread(new MyBioClientSocket());thread1.start();thread2.start();thread3.start();}@Overridepublic void run() {Socket   socket=new Socket();try {socket.connect(new InetSocketAddress(8080));try {sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}socket.getOutputStream().write("helloword!".getBytes());} catch (IOException e) {e.printStackTrace();}}
}package com.jason.thread.socket.newsocket.bio.pool;import java.io.IOException;
import java.InetSocketAddress;
import java.ServerSocket;
import java.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:27**/
public class MyBioServerSocket {private static ExecutorService   pool= Executors.newFixedThreadPool(20);public static void main(String[] args) {try {ServerSocket  serverSocket=new ServerSocket();serverSocket.bind(new InetSocketAddress(8080));while (true){byte[]  buffer=new byte[1024];long time1=System.currentTimeMillis();System.out.println("等待客户端连接");Socket socket=serverSocket.accept();//连接阻塞long time2=System.currentTimeMillis();System.out.println("有连接添加进线程池处理,等待连接耗时="+(time2-time1));pool.execute(new MyThread(socket));}} catch (IOException e) {e.printStackTrace();}}
}
package com.jason.thread.socket.newsocket.bio.pool;import java.Socket;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-16 00:34**/
public class MyThread  implements Runnable {private  Socket  socket;public MyThread(Socket socket) {this.socket = socket;}@Overridepublic void run() {byte[]  buffer=new byte[1024];try{long time1=System.currentTimeMillis();socket.getInputStream().read(buffer);//读阻塞long time2=System.currentTimeMillis();System.out.println(Thread.currentThread().getName()+"-->read:"+new String(buffer)+"-->用时="+(time2-time1));}catch (Exception e){e.printStackTrace();}}
}

运行结果:

通过运行结果,可以看出:线程6,线程7,在accept()环节,没有等待,直接进入read()方法,性能相比单线程已经提升很大。

有连接添加进线程池处理,等待连接耗时=157943
等待客户端连接
有连接添加进线程池处理,等待连接耗时=0
等待客户端连接
有连接添加进线程池处理,等待连接耗时=0
等待客户端连接
pool-1-thread-5-->read:helloword!                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      -->用时=2006
pool-1-thread-7-->read:helloword!                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      -->用时=2006
pool-1-thread-6-->read:helloword!

二、NIO(New IO  非阻塞IO  或者称为 No  Blocked IO):

      顾名思义,针对BIO的连接阻塞、IO阻塞,NIO提供了一套非阻塞的IO组件,同时还引入了ByteBuffer,分配的内存使用的是本机内存而不是Java堆上的内存,这也进一步说明每次分配内存时会调用操作系统的os::malloc函数。另外一方面直接ByteBuffer产生的数据如果和网络或者磁盘交互都在操作系统的内核空间中发生……

调试过程的一段错误日志,连上的客户端,写完数据后,在服务端读数据之前退出,服务端报以下异常。

这个版本还是有个缺点,需要用个线程轮询各个通道的数据就绪情况,接下来我们重点聊一下NIO(底层Epoll的封装)

java.io.IOException: 远程主机强迫关闭了一个现有的连接。at sun.nio.ch.SocketDispatcher.read0(Native Method)at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at sun.nio.ch.IOUtil.read(IOUtil.java:197)at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)at com.jason.thread.socket.newsocket.nio.MyBioServerSocket.main(MyBioServerSocket.java:39)

 

连接上,但是连接还没有完成,所以accept()之后,判断连接是否OK:

Exception in thread "main" java.nio.channels.NotYetConnectedExceptionat sun.nio.ch.SocketChannelImpl.ensureWriteOpen(SocketChannelImpl.java:274)at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:461)at com.jason.thread.socket.newsocket.nio.selector.MyBioClientSocket.doConnect(MyBioClientSocket.java:55)at com.jason.thread.socket.newsocket.nio.selector.MyBioClientSocket.main(MyBioClientSocket.java:40)

源码:

package com.jason.thread.socket.newsocket.nio;import java.io.IOException;
import java.InetSocketAddress;
import java.ServerSocket;
import java.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:27**/
public class MyBioServerSocket {private  static List<SocketChannel> socketChannelList=new LinkedList<>();public static void main(String[] args) {try {ServerSocketChannel  serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//设置不阻塞serverSocketChannel.bind( new InetSocketAddress(8080));while(true){doWaitingSocketChannel();//轮询已经建立连接,但是没有读取到数据的SocketChannelSystem.out.println("等待客户端连接:");SocketChannel  socketChannel=serverSocketChannel.accept();if(socketChannel!=null){Thread.sleep(10000);socketChannel.configureBlocking(false);ByteBuffer  buffer=ByteBuffer.allocate(1024);int i=socketChannel.read(buffer);//i表示读取到的字节数if(i<=0){socketChannelList.add(socketChannel);}else{System.out.println(new String(buffer.array()));}}Thread.sleep(1000);}} catch (IOException | InterruptedException e) {e.printStackTrace();}}public static void doWaitingSocketChannel() throws IOException {ByteBuffer  buffer=ByteBuffer.allocate(1024);for(SocketChannel socketChannel:socketChannelList){buffer.clear();int i=socketChannel.read(buffer);if(i>0){System.out.println(new String(buffer.array()));socketChannelList.remove(socketChannel);}}}
}package com.jason.thread.socket.newsocket.nio;import java.io.IOException;
import java.InetSocketAddress;
import java.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;import static java.lang.Thread.sleep;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:36**/
public class MyBioClientSocket {public static void main(String[] args) {try {SocketChannel    socket=SocketChannel.open();socket.connect(new InetSocketAddress(8080));
//          socket.configureBlocking(false);//设置不阻塞while(true){if(socket.isConnected()){socket.finishConnect();socket.write(ByteBuffer.wrap("helloword!".getBytes()));try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}break;}}} catch (IOException e) {e.printStackTrace();}}
}

 三、NIO多路复用:

      我们重点聊一下Selector,多路选择器,Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。例如,下图是一个单线程中使用一个Selector处理3个Channel的图示:

                                   

  • 什么是NIO多路复用机制?

        采用一个线程维护多个TCP连接的IO操作;线程安全,支持高并发;

  •  原理:NIO的选择器将多个不同的channel统一交给selector选择器进行管理;
  • NIO多路复用机制在不用的系统中存在差别,在window中选择器是select去轮循channel获取信息,Linux中选择器是epoll通过注册事件回调通知获取信息; 
  • select中会存在空连接,就是只连接但不发送信息,且select中有数量限制,不能很好的支持高并发; linux操作就出现epoll实现事件驱动回调形式通知,不会存在空轮训的情况,只是对活跃的socket实现主动回调【有信息会主动通知】,这样的性能有很大的提升,避免了空连接,所以时间复杂度为是o(1)
  • NIO核心设计思想是什么

        非阻塞式IO,选择器实现IO多路复用机制,缓冲区提高读写效率; 

源码:

package com.jason.thread.socket.newsocket.nio.selector;import org.springframework.expression.spel.ast.Selection;import java.io.IOException;
import java.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:27**/
public class MyBioServerSocket {//epoll底层有实现,wait列表,wait列表中的依赖事件触发回调,移到就绪列表中。//private  static List<SocketChannel> socketChannelList=new LinkedList<>();public static Selector selector;public static void main(String[] args) {try {selector= Selector.open();ServerSocketChannel  serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//设置不阻塞serverSocketChannel.bind( new InetSocketAddress(8080));serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(true){System.out.println("等待客户端连接:");selector.select();//阻塞,也可以设置超时时间Set<SelectionKey> selectionKeySet=selector.selectedKeys();for(SelectionKey key:selectionKeySet){try{if(key.isAcceptable()){doAccept(key);}else if(key.isReadable()){doRead(key);}}finally {selectionKeySet.remove(key);}}Thread.sleep(1000);}} catch (IOException | InterruptedException e) {e.printStackTrace();}}public  static void doAccept(SelectionKey  selectionKey) throws IOException {ServerSocketChannel  serverSocketChannel= (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel=serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.write(ByteBuffer.wrap("NIO Server request: I am NIO Server".getBytes()));System.out.println("服务端发送读事件");socketChannel.register(selector,SelectionKey.OP_READ);//注册读事件}public  static void doRead(SelectionKey  selectionKey) throws IOException {SocketChannel   socketChannel=(SocketChannel) selectionKey.channel();ByteBuffer   buffer=ByteBuffer.allocate(1024);socketChannel.read(buffer);System.out.println("收到:客户端发送消息,内容:"+(new String(buffer.array())));}
}package com.jason.thread.socket.newsocket.nio.selector;import java.io.IOException;
import java.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:36**/
public class MyBioClientSocket {public static Selector selector;public static void main(String[] args) {try {selector=selector.open();SocketChannel    socketChannel=SocketChannel.open();socketChannel.configureBlocking(false);//设置不阻塞socketChannel.connect(new InetSocketAddress("localhost",8080));socketChannel.register(selector, SelectionKey.OP_CONNECT);while(true){System.out.println("客户端等待连接事件");selector.select();//阻塞读System.out.println("捕获到事件:");Set<SelectionKey> selectionKeySet=selector.selectedKeys();Iterator iterator=selectionKeySet.iterator();while (iterator.hasNext()){SelectionKey key=(SelectionKey) iterator.next();iterator.remove();if(key.isConnectable()){doConnect(key);}else if(key.isReadable()){doRead(key);}}}} catch (IOException e) {e.printStackTrace();}}private static void doConnect(SelectionKey selectionKey) throws IOException {SocketChannel   socketChannel=(SocketChannel) selectionKey.channel();if(socketChannel.isConnectionPending()){socketChannel.finishConnect();}socketChannel.configureBlocking(false);socketChannel.write(ByteBuffer.wrap(("收到客户端发来消息:"+"Hello  word!").getBytes()));socketChannel.register(selector,SelectionKey.OP_READ);}public  static void doRead(SelectionKey  selectionKey) throws IOException {SocketChannel   socketChannel=(SocketChannel) selectionKey.channel();ByteBuffer   buffer=ByteBuffer.allocate(1024);socketChannel.read(buffer);System.out.println("收到:服务端发送消息,内容:"+(new String(buffer.array())));}
}

 

更多推荐

分布式架构底层磐石

本文发布于:2024-02-25 02:52:19,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1697544.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:磐石   分布式   底层   架构

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!