磐石"/>
分布式架构底层磐石
一、BIO(Blocked IO 阻塞IO):
客户端多并发请求时,服务端的TPS是1,如何提升服务端的并发数??
解决上面的单线程问题,提升服务端的TPS,做以下改良:
这个版本,依然存在巨大的问题,例如线程池的最大线程数是1000,那么有可能80%的线程是阻塞在read(),极大的浪费系统资源,继续改良,如果read时候,发现没有就绪,是否可以不用阻塞??
package com.jason.thread.socket.newsocket.bio.pool;import java.io.IOException;
import java.InetSocketAddress;
import java.Socket;import static java.lang.Thread.sleep;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:36**/
public class MyBioClientSocket implements Runnable {public static void main(String[] args) {Thread thread1=new Thread(new MyBioClientSocket());Thread thread2=new Thread(new MyBioClientSocket());Thread thread3=new Thread(new MyBioClientSocket());thread1.start();thread2.start();thread3.start();}@Overridepublic void run() {Socket socket=new Socket();try {socket.connect(new InetSocketAddress(8080));try {sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}socket.getOutputStream().write("helloword!".getBytes());} catch (IOException e) {e.printStackTrace();}}
}package com.jason.thread.socket.newsocket.bio.pool;import java.io.IOException;
import java.InetSocketAddress;
import java.ServerSocket;
import java.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:27**/
public class MyBioServerSocket {private static ExecutorService pool= Executors.newFixedThreadPool(20);public static void main(String[] args) {try {ServerSocket serverSocket=new ServerSocket();serverSocket.bind(new InetSocketAddress(8080));while (true){byte[] buffer=new byte[1024];long time1=System.currentTimeMillis();System.out.println("等待客户端连接");Socket socket=serverSocket.accept();//连接阻塞long time2=System.currentTimeMillis();System.out.println("有连接添加进线程池处理,等待连接耗时="+(time2-time1));pool.execute(new MyThread(socket));}} catch (IOException e) {e.printStackTrace();}}
}
package com.jason.thread.socket.newsocket.bio.pool;import java.Socket;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-16 00:34**/
public class MyThread implements Runnable {private Socket socket;public MyThread(Socket socket) {this.socket = socket;}@Overridepublic void run() {byte[] buffer=new byte[1024];try{long time1=System.currentTimeMillis();socket.getInputStream().read(buffer);//读阻塞long time2=System.currentTimeMillis();System.out.println(Thread.currentThread().getName()+"-->read:"+new String(buffer)+"-->用时="+(time2-time1));}catch (Exception e){e.printStackTrace();}}
}
运行结果:
通过运行结果,可以看出:线程6,线程7,在accept()环节,没有等待,直接进入read()方法,性能相比单线程已经提升很大。
有连接添加进线程池处理,等待连接耗时=157943
等待客户端连接
有连接添加进线程池处理,等待连接耗时=0
等待客户端连接
有连接添加进线程池处理,等待连接耗时=0
等待客户端连接
pool-1-thread-5-->read:helloword! -->用时=2006
pool-1-thread-7-->read:helloword! -->用时=2006
pool-1-thread-6-->read:helloword!
二、NIO(New IO 非阻塞IO 或者称为 No Blocked IO):
顾名思义,针对BIO的连接阻塞、IO阻塞,NIO提供了一套非阻塞的IO组件,同时还引入了ByteBuffer,分配的内存使用的是本机内存而不是Java堆上的内存,这也进一步说明每次分配内存时会调用操作系统的os::malloc函数。另外一方面直接ByteBuffer产生的数据如果和网络或者磁盘交互都在操作系统的内核空间中发生……
调试过程的一段错误日志,连上的客户端,写完数据后,在服务端读数据之前退出,服务端报以下异常。
这个版本还是有个缺点,需要用个线程轮询各个通道的数据就绪情况,接下来我们重点聊一下NIO(底层Epoll的封装)
java.io.IOException: 远程主机强迫关闭了一个现有的连接。at sun.nio.ch.SocketDispatcher.read0(Native Method)at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at sun.nio.ch.IOUtil.read(IOUtil.java:197)at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)at com.jason.thread.socket.newsocket.nio.MyBioServerSocket.main(MyBioServerSocket.java:39)
连接上,但是连接还没有完成,所以accept()之后,判断连接是否OK:
Exception in thread "main" java.nio.channels.NotYetConnectedExceptionat sun.nio.ch.SocketChannelImpl.ensureWriteOpen(SocketChannelImpl.java:274)at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:461)at com.jason.thread.socket.newsocket.nio.selector.MyBioClientSocket.doConnect(MyBioClientSocket.java:55)at com.jason.thread.socket.newsocket.nio.selector.MyBioClientSocket.main(MyBioClientSocket.java:40)
源码:
package com.jason.thread.socket.newsocket.nio;import java.io.IOException;
import java.InetSocketAddress;
import java.ServerSocket;
import java.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:27**/
public class MyBioServerSocket {private static List<SocketChannel> socketChannelList=new LinkedList<>();public static void main(String[] args) {try {ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//设置不阻塞serverSocketChannel.bind( new InetSocketAddress(8080));while(true){doWaitingSocketChannel();//轮询已经建立连接,但是没有读取到数据的SocketChannelSystem.out.println("等待客户端连接:");SocketChannel socketChannel=serverSocketChannel.accept();if(socketChannel!=null){Thread.sleep(10000);socketChannel.configureBlocking(false);ByteBuffer buffer=ByteBuffer.allocate(1024);int i=socketChannel.read(buffer);//i表示读取到的字节数if(i<=0){socketChannelList.add(socketChannel);}else{System.out.println(new String(buffer.array()));}}Thread.sleep(1000);}} catch (IOException | InterruptedException e) {e.printStackTrace();}}public static void doWaitingSocketChannel() throws IOException {ByteBuffer buffer=ByteBuffer.allocate(1024);for(SocketChannel socketChannel:socketChannelList){buffer.clear();int i=socketChannel.read(buffer);if(i>0){System.out.println(new String(buffer.array()));socketChannelList.remove(socketChannel);}}}
}package com.jason.thread.socket.newsocket.nio;import java.io.IOException;
import java.InetSocketAddress;
import java.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;import static java.lang.Thread.sleep;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:36**/
public class MyBioClientSocket {public static void main(String[] args) {try {SocketChannel socket=SocketChannel.open();socket.connect(new InetSocketAddress(8080));
// socket.configureBlocking(false);//设置不阻塞while(true){if(socket.isConnected()){socket.finishConnect();socket.write(ByteBuffer.wrap("helloword!".getBytes()));try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}break;}}} catch (IOException e) {e.printStackTrace();}}
}
三、NIO多路复用:
我们重点聊一下Selector,多路选择器,Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。例如,下图是一个单线程中使用一个Selector处理3个Channel的图示:
- 什么是NIO多路复用机制?
采用一个线程维护多个TCP连接的IO操作;线程安全,支持高并发;
- 原理:NIO的选择器将多个不同的channel统一交给selector选择器进行管理;
- NIO多路复用机制在不用的系统中存在差别,在window中选择器是select去轮循channel获取信息,Linux中选择器是epoll通过注册事件回调通知获取信息;
- select中会存在空连接,就是只连接但不发送信息,且select中有数量限制,不能很好的支持高并发; linux操作就出现epoll实现事件驱动回调形式通知,不会存在空轮训的情况,只是对活跃的socket实现主动回调【有信息会主动通知】,这样的性能有很大的提升,避免了空连接,所以时间复杂度为是o(1)
- NIO核心设计思想是什么
非阻塞式IO,选择器实现IO多路复用机制,缓冲区提高读写效率;
源码:
package com.jason.thread.socket.newsocket.nio.selector;import org.springframework.expression.spel.ast.Selection;import java.io.IOException;
import java.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:27**/
public class MyBioServerSocket {//epoll底层有实现,wait列表,wait列表中的依赖事件触发回调,移到就绪列表中。//private static List<SocketChannel> socketChannelList=new LinkedList<>();public static Selector selector;public static void main(String[] args) {try {selector= Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);//设置不阻塞serverSocketChannel.bind( new InetSocketAddress(8080));serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(true){System.out.println("等待客户端连接:");selector.select();//阻塞,也可以设置超时时间Set<SelectionKey> selectionKeySet=selector.selectedKeys();for(SelectionKey key:selectionKeySet){try{if(key.isAcceptable()){doAccept(key);}else if(key.isReadable()){doRead(key);}}finally {selectionKeySet.remove(key);}}Thread.sleep(1000);}} catch (IOException | InterruptedException e) {e.printStackTrace();}}public static void doAccept(SelectionKey selectionKey) throws IOException {ServerSocketChannel serverSocketChannel= (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel=serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.write(ByteBuffer.wrap("NIO Server request: I am NIO Server".getBytes()));System.out.println("服务端发送读事件");socketChannel.register(selector,SelectionKey.OP_READ);//注册读事件}public static void doRead(SelectionKey selectionKey) throws IOException {SocketChannel socketChannel=(SocketChannel) selectionKey.channel();ByteBuffer buffer=ByteBuffer.allocate(1024);socketChannel.read(buffer);System.out.println("收到:客户端发送消息,内容:"+(new String(buffer.array())));}
}package com.jason.thread.socket.newsocket.nio.selector;import java.io.IOException;
import java.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** @program: JasonSpringMybatis* @description* @author: 大龄程序猿* @create: 2020-06-15 23:36**/
public class MyBioClientSocket {public static Selector selector;public static void main(String[] args) {try {selector=selector.open();SocketChannel socketChannel=SocketChannel.open();socketChannel.configureBlocking(false);//设置不阻塞socketChannel.connect(new InetSocketAddress("localhost",8080));socketChannel.register(selector, SelectionKey.OP_CONNECT);while(true){System.out.println("客户端等待连接事件");selector.select();//阻塞读System.out.println("捕获到事件:");Set<SelectionKey> selectionKeySet=selector.selectedKeys();Iterator iterator=selectionKeySet.iterator();while (iterator.hasNext()){SelectionKey key=(SelectionKey) iterator.next();iterator.remove();if(key.isConnectable()){doConnect(key);}else if(key.isReadable()){doRead(key);}}}} catch (IOException e) {e.printStackTrace();}}private static void doConnect(SelectionKey selectionKey) throws IOException {SocketChannel socketChannel=(SocketChannel) selectionKey.channel();if(socketChannel.isConnectionPending()){socketChannel.finishConnect();}socketChannel.configureBlocking(false);socketChannel.write(ByteBuffer.wrap(("收到客户端发来消息:"+"Hello word!").getBytes()));socketChannel.register(selector,SelectionKey.OP_READ);}public static void doRead(SelectionKey selectionKey) throws IOException {SocketChannel socketChannel=(SocketChannel) selectionKey.channel();ByteBuffer buffer=ByteBuffer.allocate(1024);socketChannel.read(buffer);System.out.println("收到:服务端发送消息,内容:"+(new String(buffer.array())));}
}
更多推荐
分布式架构底层磐石
发布评论