在写了非阻塞IO的基础知识之后,决定学习一下常规的非阻塞IO运行模式。
所谓运行模式,就是指以怎样的代码来实现非阻塞IO服务。为了比较和说明,先从阻塞IO的线程池化的服务器开始。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
package in.xnnyygn.nio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class BlockingIOEchoServer { private class EchoHandler implements Runnable { private final Socket socket; EchoHandler(Socket socket) { this.socket = socket; } @Override public void run() { System.out.println(“handle requests from “ + socket.getRemoteSocketAddress()); try { BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line; while (!“exit”.equals(line = reader.readLine())) { socket.getOutputStream().write(line.getBytes()); socket.getOutputStream().write(‘\n’); } } catch (IOException e) { throw new IllegalStateException(e); } finally { System.out.println(“close socket”); try { socket.close(); } catch (IOException ignored) { } } } } private final ServerSocket serverSocket; private final ExecutorService executorService = Executors.newFixedThreadPool(4); public BlockingIOEchoServer(int port) throws IOException { this.serverSocket = new ServerSocket(port); } public void run() throws IOException { while (!Thread.interrupted()) { Socket socket = this.serverSocket.accept(); executorService.submit(new EchoHandler(socket)); } } public static void main(String[] args) throws Exception { int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888; BlockingIOEchoServer server = new BlockingIOEchoServer(port); System.out.println(“server started at port “ + port); server.run(); } } |
这是一个简单的Echo服务。ServerSocket在accept得到一个Socket之后,由线程池中的某个线程来处理这个Socket。
使用线程池的服务端设计和实现都很简单,单个请求处理内都是线程安全的,在非高并发的环境下比较推荐。
基于非阻塞IO的服务器,通常使用reactor模式。这里的reactor模式是事件驱动设计模式的一种,不是某个JS框架。reactor从字面上比较难以理解,部分文章建议用dispatcher来理解这个模式。实际的非阻塞IO的reactor模式有三种:
- 单线程EventLoop
- 单线程EventLoop,多工作线程
- 单线程EventLoop(Acceptor),多个子Reactor(处理Read/Write),多工作线程
单线程EventLoop是最简单,也是学习非阻塞IO最简单的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
package in.xnnyygn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class SimpleNonBlockingIoServer { private class TimeHandler { private String id; private ByteBuffer buffer; private int time = 0; TimeHandler(String id) { this.id = id; buffer = ByteBuffer.allocate(2048); } void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException { int length = socketChannel.read(buffer); if (length == –1) { System.out.println(id + “: close”); selectionKey.cancel(); socketChannel.close(); return; } if (length == 0) return; buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); buffer.clear(); int time = Integer.parseInt(new String(bytes).trim()); System.out.println(id + “: got time “ + time); if (time > 0 && time < 10000) { try { Thread.sleep(time); } catch (InterruptedException ignored) { } this.time = time; } } void writeToChannel(SocketChannel socketChannel) throws IOException { if (time == 0) return; System.out.println(id + “: reply with time “ + time); buffer.put(String.valueOf(time).getBytes()); buffer.putChar(‘\n’); buffer.flip(); socketChannel.write(buffer); buffer.clear(); time = 0; } } private final Selector selector; private final ServerSocketChannel serverSocketChannel; public SimpleNonBlockingIoServer(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); } public void run() throws IOException { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); if (selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); String clientId = socketChannel.getRemoteAddress().toString(); System.out.println(“accept connection from “ + clientId); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(clientId)); } else if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.readFromChannel(selectionKey, socketChannel); } else if (selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.writeToChannel(socketChannel); } it.remove(); } } } public static void main(String[] args) throws Exception { int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888; SimpleNonBlockingIoServer server = new SimpleNonBlockingIoServer(port); server.run(); } } |
这里实现了一个自定义服务,输入一行数字,sleep指定时间。下面逐个讲解这段代码。
首先是SimpleNonBlockingIoServer的构造函数。像阻塞IO一样,需要一个ServerSocket的Channel,还有核心的Selector。
SimpleNonBlockingIoServer的run代码中,注册了ServerSocketChannel的ACCEPT事件。接下来就是所谓的EventLoop。Selector#select会阻塞(是的,这里的select是阻塞的)直到有ACCPET事件,然后代码遍历Selector的SelectionKey,分别对ACCEPT/READ/WRITE做处理。READ和WRITE是在ACCEPT得到一个Socket之后注册的。注册时附带了一个TimeHandler。这里有服务的核心逻辑。
代码中要注意的是SocketChannel#configureBlocking(false)是必须的,否则你会得到错误,因为你想把一个不是非阻塞的SocketChannel注册到Selector上。
使用非阻塞IO写代码,代码风格和阻塞IO大不相同,虽然外层都有一个while循环,但是你需要关注各种事件。还有因为IO多路复用的关系,你需要一个上下文来联系事件。幸好Java的NIO API中有attachment,你可以尝试使用它。
另外,关于Java NIO一个很有名的bug,想象一下Selector#select不阻塞,你的代码一直在循环,很有可能造成某个CPU一直在100%的状态。这个bug据说已经被修复,不过类似Netty等框架还是加了一层保护防止突然间的这种100%CPU的bug。
有哪些框架使用这种reactor模式么,有,比如Redis。Redis一直被提到的一点单线程,就是指这里的EventLoop。如果你要说这种reactor模式有什么问题,很显然,如果某个client执行了很长时间的操作,比如在上面的代码中输入5000,即sleep(5000ms)的话,另外一个client输入的命令要等着5秒过去之后才会被执行。这是因为只有一个EventLoop的线程。Redis就是这样一种情况,如果你在某个client里面执行了keys *,其他client有可能会被卡住。当然Redis选择单线程,更多地可能是因为数据一致性方面的原因,多线程下的数据操作很难等等。
回到正题,单EventLoop的设计基本上是不可取的,那么你可以选择第二个单EventLoop,多工作线程的方式。这种reactor模式与很多地方的模式很像,比如
- GUI的后台操作(GUI的EventLoop也是单线程的)
- 阻塞IO的服务器设计
等等,具体示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
package in.xnnyygn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiWorkerNonBlockingIoServer { private class TimeHandler { private final ExecutorService executorService; private final ByteBuffer buffer; private int time = 0; TimeHandler(ExecutorService executorService) { this.executorService = executorService; buffer = ByteBuffer.allocate(2048); } void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) { executorService.submit(() –> { try { doReadFromChannel(selectionKey, socketChannel); } catch (IOException e) { throw new IllegalStateException(e); } }); } private void doReadFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException { int length = socketChannel.read(buffer); if (length == –1) { socketChannel.close(); return; } if (length == 0) return; buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); buffer.clear(); int time = Integer.parseInt(new String(bytes).trim()); if (time > 0 && time < 10000) { try { Thread.sleep(time); } catch (InterruptedException ignored) { } this.time = time; } } void writeToChannel(SocketChannel socketChannel) throws IOException { if (time == 0) return; buffer.put(String.valueOf(time).getBytes()); buffer.putChar(‘\n’); buffer.flip(); socketChannel.write(buffer); buffer.clear(); time = 0; } } private final Selector selector; private final ServerSocketChannel serverSocketChannel; private final ExecutorService executorService; public MultiWorkerNonBlockingIoServer(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); executorService = Executors.newFixedThreadPool(4); } public void run() throws IOException { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); it.remove(); if (selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); System.out.println(“accept connection from “ + socketChannel.getRemoteAddress()); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(executorService)); } if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.readFromChannel(selectionKey, socketChannel); } if (selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.writeToChannel(socketChannel); } } } } public static void main(String[] args) throws Exception { MultiWorkerNonBlockingIoServer server = new MultiWorkerNonBlockingIoServer(8888); server.run(); } } |
具体代码上变化不大,只有在readFromChannel的地方用线程池来执行了。实际运行中,你可以尝试某个client执行长时间操作,另外一个client不会受到影响。
当然,上面的代码存在一些问题,我只对read操作执行了异步,这会造成write的竞态条件等问题。比较好的解决方法是把工作线程作为attachment。但是那样的话,你必须自己实现ThreadPool。
这种reactor模式基本可以作为一个常规非阻塞IO的基础了。典型的比如NodeJS。(虽然NodeJS一直宣称自己是Single Thread EventLoop,而且这句话也没错,但是这不代表NodeJS是一个Single Thread程序。一方面NodeJS不是浏览器JS,另一方面NodeJS的工作线程有他们自己的线程池,callback不代表用了异步IO,只是由线程池在执行而已)如果没有记错的话,Nginx应该也是这种模型。
要满足C10K的话,理论上这种reactor模型就足够了,因为C10K只关注连接,不关注处理,哈哈。
不过也看到,在高并发的时候,假如5000个连接的话,单个Selector要处理的fd数量会非常多,虽然epoll没有fd数量限制,也有比较好的数据通知方法,从程序员的角度来说,用多个Selector来分开处理可能会更稳定一些。这也就是第三种reactor模型,即多Selector的模式。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
package in.xnnyygn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiSelectorNonBlockingIoServer { private class TimeProtocol { private ByteBuffer buffer; private int time = 0; TimeProtocol() { buffer = ByteBuffer.allocate(2048); } void onReadable(SocketChannel socketChannel) throws IOException { int length = socketChannel.read(buffer); if (length == –1) { socketChannel.close(); return; } if (length == 0) return; buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); buffer.clear(); int time = Integer.parseInt(new String(bytes).trim()); System.out.println(“got time “ + time); if (time > 0 && time < 20000) { try { Thread.sleep(time); } catch (InterruptedException ignored) { } this.time = time; } } void onWriteable(SocketChannel socketChannel) throws IOException { if (time == 0) return; buffer.put(String.valueOf(time).getBytes()); buffer.putChar(‘\n’); buffer.flip(); socketChannel.write(buffer); buffer.clear(); time = 0; } } private class SubReactorPool { private int current; private SubReactor[] subReactors; SubReactorPool(int nSubReactor) { subReactors = new SubReactor[nSubReactor]; } SubReactor getSubReactor(ExecutorService workerExecutorService) throws IOException { int index = (++current) % subReactors.length; SubReactor subReactor = subReactors[index]; if (subReactor == null) { System.out.println(“create new sub-reactor”); subReactor = new SubReactor(workerExecutorService); subReactor.start(); subReactors[index] = subReactor; } return subReactor; } } private class SubReactor extends Thread { private final Selector selector; private final ExecutorService executorService; SubReactor(ExecutorService executorService) throws IOException { this.selector = SelectorProvider.provider().openSelector(); this.executorService = executorService; } void addSocketChannel(SocketChannel socketChannel) throws IOException { System.out.println(“add socket “ + socketChannel.getRemoteAddress()); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeProtocol()); } public void run() { System.out.println(“start sub-reactor thread”); try { while (!Thread.interrupted()) { if (selector.selectNow() <= 0) continue; Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); it.remove(); if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment(); executorService.submit(() –> { try { timeProtocol.onReadable(socketChannel); } catch (ClosedChannelException e) { selectionKey.cancel(); } catch (IOException e) { e.printStackTrace(); } }); } if (selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment(); timeProtocol.onWriteable(socketChannel); } } } } catch (IOException e) { e.printStackTrace(); } } } private final Selector mainSelector; private final ServerSocketChannel serverSocketChannel; private final SubReactorPool subReactorPool; private final ExecutorService workerExecutorService; public MultiSelectorNonBlockingIoServer(int port) throws IOException { mainSelector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); subReactorPool = new SubReactorPool(2); workerExecutorService = Executors.newFixedThreadPool(4); } public void run() throws IOException { serverSocketChannel.register(mainSelector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { mainSelector.select(); Iterator<SelectionKey> it = mainSelector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); it.remove(); if (selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); subReactorPool.getSubReactor(workerExecutorService).addSocketChannel(socketChannel); } } } } public static void main(String[] args) throws Exception { MultiSelectorNonBlockingIoServer server = new MultiSelectorNonBlockingIoServer(8888); server.run(); } } |
SubReactor里面也有EventLoop,所以是单独的线程。
请注意某个地方我并没有用Selector#select,而是用了Selector#selectNow,selectNow是非阻塞版本。其原因是SocketChannel注册时候的竞态条件,估计改一下处理方式可能更好些。
SubReactor主要负责SocketChannel的读写事件,而主EventLoop只关心Accept事件,这样其实分得更加清楚了。
使用这种模式的典型的如Netty的Boss/Worker模型。
除了第一种reactor模型之外,第二第三都是常用的模型,如果你像从头开始写的话,可以参考一下。
另外,示例代码中并没有对于Socket的close很好的处理,还有数据的组包问题也没有解决,后者也是Socket开发的老问题了。
希望我的代码以及说明对各位有用。
参考