在写了非阻塞IO的基础知识之后,决定学习一下常规的非阻塞IO运行模式。
所谓运行模式,就是指以怎样的代码来实现非阻塞IO服务。为了比较和说明,先从阻塞IO的线程池化的服务器开始。
package in.xnnyygn.nio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class BlockingIOEchoServer { private class EchoHandler implements Runnable { private final Socket socket; EchoHandler(Socket socket) { this.socket = socket; } @Override public void run() { System.out.println("handle requests from " + socket.getRemoteSocketAddress()); try { BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line; while (!"exit".equals(line = reader.readLine())) { socket.getOutputStream().write(line.getBytes()); socket.getOutputStream().write('\n'); } } catch (IOException e) { throw new IllegalStateException(e); } finally { System.out.println("close socket"); try { socket.close(); } catch (IOException ignored) { } } } } private final ServerSocket serverSocket; private final ExecutorService executorService = Executors.newFixedThreadPool(4); public BlockingIOEchoServer(int port) throws IOException { this.serverSocket = new ServerSocket(port); } public void run() throws IOException { while (!Thread.interrupted()) { Socket socket = this.serverSocket.accept(); executorService.submit(new EchoHandler(socket)); } } public static void main(String[] args) throws Exception { int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888; BlockingIOEchoServer server = new BlockingIOEchoServer(port); System.out.println("server started at port " + port); server.run(); } }
这是一个简单的Echo服务。ServerSocket在accept得到一个Socket之后,由线程池中的某个线程来处理这个Socket。
使用线程池的服务端设计和实现都很简单,单个请求处理内都是线程安全的,在非高并发的环境下比较推荐。
基于非阻塞IO的服务器,通常使用reactor模式。这里的reactor模式是事件驱动设计模式的一种,不是某个JS框架。reactor从字面上比较难以理解,部分文章建议用dispatcher来理解这个模式。实际的非阻塞IO的reactor模式有三种:
- 单线程EventLoop
- 单线程EventLoop,多工作线程
- 单线程EventLoop(Acceptor),多个子Reactor(处理Read/Write),多工作线程
单线程EventLoop是最简单,也是学习非阻塞IO最简单的代码。
package in.xnnyygn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class SimpleNonBlockingIoServer { private class TimeHandler { private String id; private ByteBuffer buffer; private int time = 0; TimeHandler(String id) { this.id = id; buffer = ByteBuffer.allocate(2048); } void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException { int length = socketChannel.read(buffer); if (length == -1) { System.out.println(id + ": close"); selectionKey.cancel(); socketChannel.close(); return; } if (length == 0) return; buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); buffer.clear(); int time = Integer.parseInt(new String(bytes).trim()); System.out.println(id + ": got time " + time); if (time > 0 && time < 10000) { try { Thread.sleep(time); } catch (InterruptedException ignored) { } this.time = time; } } void writeToChannel(SocketChannel socketChannel) throws IOException { if (time == 0) return; System.out.println(id + ": reply with time " + time); buffer.put(String.valueOf(time).getBytes()); buffer.putChar('\n'); buffer.flip(); socketChannel.write(buffer); buffer.clear(); time = 0; } } private final Selector selector; private final ServerSocketChannel serverSocketChannel; public SimpleNonBlockingIoServer(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); } public void run() throws IOException { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); if (selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); String clientId = socketChannel.getRemoteAddress().toString(); System.out.println("accept connection from " + clientId); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(clientId)); } else if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.readFromChannel(selectionKey, socketChannel); } else if (selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.writeToChannel(socketChannel); } it.remove(); } } } public static void main(String[] args) throws Exception { int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888; SimpleNonBlockingIoServer server = new SimpleNonBlockingIoServer(port); server.run(); } }
这里实现了一个自定义服务,输入一行数字,sleep指定时间。下面逐个讲解这段代码。
首先是SimpleNonBlockingIoServer的构造函数。像阻塞IO一样,需要一个ServerSocket的Channel,还有核心的Selector。
SimpleNonBlockingIoServer的run代码中,注册了ServerSocketChannel的ACCEPT事件。接下来就是所谓的EventLoop。Selector#select会阻塞(是的,这里的select是阻塞的)直到有ACCPET事件,然后代码遍历Selector的SelectionKey,分别对ACCEPT/READ/WRITE做处理。READ和WRITE是在ACCEPT得到一个Socket之后注册的。注册时附带了一个TimeHandler。这里有服务的核心逻辑。
代码中要注意的是SocketChannel#configureBlocking(false)是必须的,否则你会得到错误,因为你想把一个不是非阻塞的SocketChannel注册到Selector上。
使用非阻塞IO写代码,代码风格和阻塞IO大不相同,虽然外层都有一个while循环,但是你需要关注各种事件。还有因为IO多路复用的关系,你需要一个上下文来联系事件。幸好Java的NIO API中有attachment,你可以尝试使用它。
另外,关于Java NIO一个很有名的bug,想象一下Selector#select不阻塞,你的代码一直在循环,很有可能造成某个CPU一直在100%的状态。这个bug据说已经被修复,不过类似Netty等框架还是加了一层保护防止突然间的这种100%CPU的bug。
有哪些框架使用这种reactor模式么,有,比如Redis。Redis一直被提到的一点单线程,就是指这里的EventLoop。如果你要说这种reactor模式有什么问题,很显然,如果某个client执行了很长时间的操作,比如在上面的代码中输入5000,即sleep(5000ms)的话,另外一个client输入的命令要等着5秒过去之后才会被执行。这是因为只有一个EventLoop的线程。Redis就是这样一种情况,如果你在某个client里面执行了keys *,其他client有可能会被卡住。当然Redis选择单线程,更多地可能是因为数据一致性方面的原因,多线程下的数据操作很难等等。
回到正题,单EventLoop的设计基本上是不可取的,那么你可以选择第二个单EventLoop,多工作线程的方式。这种reactor模式与很多地方的模式很像,比如
- GUI的后台操作(GUI的EventLoop也是单线程的)
- 阻塞IO的服务器设计
等等,具体示例代码如下:
package in.xnnyygn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiWorkerNonBlockingIoServer { private class TimeHandler { private final ExecutorService executorService; private final ByteBuffer buffer; private int time = 0; TimeHandler(ExecutorService executorService) { this.executorService = executorService; buffer = ByteBuffer.allocate(2048); } void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) { executorService.submit(() -> { try { doReadFromChannel(selectionKey, socketChannel); } catch (IOException e) { throw new IllegalStateException(e); } }); } private void doReadFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException { int length = socketChannel.read(buffer); if (length == -1) { socketChannel.close(); return; } if (length == 0) return; buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); buffer.clear(); int time = Integer.parseInt(new String(bytes).trim()); if (time > 0 && time < 10000) { try { Thread.sleep(time); } catch (InterruptedException ignored) { } this.time = time; } } void writeToChannel(SocketChannel socketChannel) throws IOException { if (time == 0) return; buffer.put(String.valueOf(time).getBytes()); buffer.putChar('\n'); buffer.flip(); socketChannel.write(buffer); buffer.clear(); time = 0; } } private final Selector selector; private final ServerSocketChannel serverSocketChannel; private final ExecutorService executorService; public MultiWorkerNonBlockingIoServer(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); executorService = Executors.newFixedThreadPool(4); } public void run() throws IOException { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); it.remove(); if (selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); System.out.println("accept connection from " + socketChannel.getRemoteAddress()); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(executorService)); } if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.readFromChannel(selectionKey, socketChannel); } if (selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeHandler timeHandler = (TimeHandler) selectionKey.attachment(); timeHandler.writeToChannel(socketChannel); } } } } public static void main(String[] args) throws Exception { MultiWorkerNonBlockingIoServer server = new MultiWorkerNonBlockingIoServer(8888); server.run(); } }
具体代码上变化不大,只有在readFromChannel的地方用线程池来执行了。实际运行中,你可以尝试某个client执行长时间操作,另外一个client不会受到影响。
当然,上面的代码存在一些问题,我只对read操作执行了异步,这会造成write的竞态条件等问题。比较好的解决方法是把工作线程作为attachment。但是那样的话,你必须自己实现ThreadPool。
这种reactor模式基本可以作为一个常规非阻塞IO的基础了。典型的比如NodeJS。(虽然NodeJS一直宣称自己是Single Thread EventLoop,而且这句话也没错,但是这不代表NodeJS是一个Single Thread程序。一方面NodeJS不是浏览器JS,另一方面NodeJS的工作线程有他们自己的线程池,callback不代表用了异步IO,只是由线程池在执行而已)如果没有记错的话,Nginx应该也是这种模型。
要满足C10K的话,理论上这种reactor模型就足够了,因为C10K只关注连接,不关注处理,哈哈。
不过也看到,在高并发的时候,假如5000个连接的话,单个Selector要处理的fd数量会非常多,虽然epoll没有fd数量限制,也有比较好的数据通知方法,从程序员的角度来说,用多个Selector来分开处理可能会更稳定一些。这也就是第三种reactor模型,即多Selector的模式。示例代码如下:
package in.xnnyygn.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiSelectorNonBlockingIoServer { private class TimeProtocol { private ByteBuffer buffer; private int time = 0; TimeProtocol() { buffer = ByteBuffer.allocate(2048); } void onReadable(SocketChannel socketChannel) throws IOException { int length = socketChannel.read(buffer); if (length == -1) { socketChannel.close(); return; } if (length == 0) return; buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); buffer.clear(); int time = Integer.parseInt(new String(bytes).trim()); System.out.println("got time " + time); if (time > 0 && time < 20000) { try { Thread.sleep(time); } catch (InterruptedException ignored) { } this.time = time; } } void onWriteable(SocketChannel socketChannel) throws IOException { if (time == 0) return; buffer.put(String.valueOf(time).getBytes()); buffer.putChar('\n'); buffer.flip(); socketChannel.write(buffer); buffer.clear(); time = 0; } } private class SubReactorPool { private int current; private SubReactor[] subReactors; SubReactorPool(int nSubReactor) { subReactors = new SubReactor[nSubReactor]; } SubReactor getSubReactor(ExecutorService workerExecutorService) throws IOException { int index = (++current) % subReactors.length; SubReactor subReactor = subReactors[index]; if (subReactor == null) { System.out.println("create new sub-reactor"); subReactor = new SubReactor(workerExecutorService); subReactor.start(); subReactors[index] = subReactor; } return subReactor; } } private class SubReactor extends Thread { private final Selector selector; private final ExecutorService executorService; SubReactor(ExecutorService executorService) throws IOException { this.selector = SelectorProvider.provider().openSelector(); this.executorService = executorService; } void addSocketChannel(SocketChannel socketChannel) throws IOException { System.out.println("add socket " + socketChannel.getRemoteAddress()); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeProtocol()); } public void run() { System.out.println("start sub-reactor thread"); try { while (!Thread.interrupted()) { if (selector.selectNow() <= 0) continue; Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); it.remove(); if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment(); executorService.submit(() -> { try { timeProtocol.onReadable(socketChannel); } catch (ClosedChannelException e) { selectionKey.cancel(); } catch (IOException e) { e.printStackTrace(); } }); } if (selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment(); timeProtocol.onWriteable(socketChannel); } } } } catch (IOException e) { e.printStackTrace(); } } } private final Selector mainSelector; private final ServerSocketChannel serverSocketChannel; private final SubReactorPool subReactorPool; private final ExecutorService workerExecutorService; public MultiSelectorNonBlockingIoServer(int port) throws IOException { mainSelector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); subReactorPool = new SubReactorPool(2); workerExecutorService = Executors.newFixedThreadPool(4); } public void run() throws IOException { serverSocketChannel.register(mainSelector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { mainSelector.select(); Iterator<SelectionKey> it = mainSelector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selectionKey = it.next(); it.remove(); if (selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); subReactorPool.getSubReactor(workerExecutorService).addSocketChannel(socketChannel); } } } } public static void main(String[] args) throws Exception { MultiSelectorNonBlockingIoServer server = new MultiSelectorNonBlockingIoServer(8888); server.run(); } }
SubReactor里面也有EventLoop,所以是单独的线程。
请注意某个地方我并没有用Selector#select,而是用了Selector#selectNow,selectNow是非阻塞版本。其原因是SocketChannel注册时候的竞态条件,估计改一下处理方式可能更好些。
SubReactor主要负责SocketChannel的读写事件,而主EventLoop只关心Accept事件,这样其实分得更加清楚了。
使用这种模式的典型的如Netty的Boss/Worker模型。
除了第一种reactor模型之外,第二第三都是常用的模型,如果你像从头开始写的话,可以参考一下。
另外,示例代码中并没有对于Socket的close很好的处理,还有数据的组包问题也没有解决,后者也是Socket开发的老问题了。
希望我的代码以及说明对各位有用。
参考