在写了非阻塞IO的基础知识之后,决定学习一下常规的非阻塞IO运行模式。
所谓运行模式,就是指以怎样的代码来实现非阻塞IO服务。为了比较和说明,先从阻塞IO的线程池化的服务器开始。
package in.xnnyygn.nio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingIOEchoServer {
private class EchoHandler implements Runnable {
private final Socket socket;
EchoHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
System.out.println("handle requests from " + socket.getRemoteSocketAddress());
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while (!"exit".equals(line = reader.readLine())) {
socket.getOutputStream().write(line.getBytes());
socket.getOutputStream().write('\n');
}
} catch (IOException e) {
throw new IllegalStateException(e);
} finally {
System.out.println("close socket");
try {
socket.close();
} catch (IOException ignored) {
}
}
}
}
private final ServerSocket serverSocket;
private final ExecutorService executorService = Executors.newFixedThreadPool(4);
public BlockingIOEchoServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
}
public void run() throws IOException {
while (!Thread.interrupted()) {
Socket socket = this.serverSocket.accept();
executorService.submit(new EchoHandler(socket));
}
}
public static void main(String[] args) throws Exception {
int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888;
BlockingIOEchoServer server = new BlockingIOEchoServer(port);
System.out.println("server started at port " + port);
server.run();
}
}
这是一个简单的Echo服务。ServerSocket在accept得到一个Socket之后,由线程池中的某个线程来处理这个Socket。
使用线程池的服务端设计和实现都很简单,单个请求处理内都是线程安全的,在非高并发的环境下比较推荐。
基于非阻塞IO的服务器,通常使用reactor模式。这里的reactor模式是事件驱动设计模式的一种,不是某个JS框架。reactor从字面上比较难以理解,部分文章建议用dispatcher来理解这个模式。实际的非阻塞IO的reactor模式有三种:
- 单线程EventLoop
- 单线程EventLoop,多工作线程
- 单线程EventLoop(Acceptor),多个子Reactor(处理Read/Write),多工作线程
单线程EventLoop是最简单,也是学习非阻塞IO最简单的代码。
package in.xnnyygn.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class SimpleNonBlockingIoServer {
private class TimeHandler {
private String id;
private ByteBuffer buffer;
private int time = 0;
TimeHandler(String id) {
this.id = id;
buffer = ByteBuffer.allocate(2048);
}
void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException {
int length = socketChannel.read(buffer);
if (length == -1) {
System.out.println(id + ": close");
selectionKey.cancel();
socketChannel.close();
return;
}
if (length == 0) return;
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.clear();
int time = Integer.parseInt(new String(bytes).trim());
System.out.println(id + ": got time " + time);
if (time > 0 && time < 10000) {
try {
Thread.sleep(time);
} catch (InterruptedException ignored) {
}
this.time = time;
}
}
void writeToChannel(SocketChannel socketChannel) throws IOException {
if (time == 0) return;
System.out.println(id + ": reply with time " + time);
buffer.put(String.valueOf(time).getBytes());
buffer.putChar('\n');
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
time = 0;
}
}
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public SimpleNonBlockingIoServer(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
}
public void run() throws IOException {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.interrupted()) {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
String clientId = socketChannel.getRemoteAddress().toString();
System.out.println("accept connection from " + clientId);
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(clientId));
} else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
timeHandler.readFromChannel(selectionKey, socketChannel);
} else if (selectionKey.isWritable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
timeHandler.writeToChannel(socketChannel);
}
it.remove();
}
}
}
public static void main(String[] args) throws Exception {
int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888;
SimpleNonBlockingIoServer server = new SimpleNonBlockingIoServer(port);
server.run();
}
}
这里实现了一个自定义服务,输入一行数字,sleep指定时间。下面逐个讲解这段代码。
首先是SimpleNonBlockingIoServer的构造函数。像阻塞IO一样,需要一个ServerSocket的Channel,还有核心的Selector。
SimpleNonBlockingIoServer的run代码中,注册了ServerSocketChannel的ACCEPT事件。接下来就是所谓的EventLoop。Selector#select会阻塞(是的,这里的select是阻塞的)直到有ACCPET事件,然后代码遍历Selector的SelectionKey,分别对ACCEPT/READ/WRITE做处理。READ和WRITE是在ACCEPT得到一个Socket之后注册的。注册时附带了一个TimeHandler。这里有服务的核心逻辑。
代码中要注意的是SocketChannel#configureBlocking(false)是必须的,否则你会得到错误,因为你想把一个不是非阻塞的SocketChannel注册到Selector上。
使用非阻塞IO写代码,代码风格和阻塞IO大不相同,虽然外层都有一个while循环,但是你需要关注各种事件。还有因为IO多路复用的关系,你需要一个上下文来联系事件。幸好Java的NIO API中有attachment,你可以尝试使用它。
另外,关于Java NIO一个很有名的bug,想象一下Selector#select不阻塞,你的代码一直在循环,很有可能造成某个CPU一直在100%的状态。这个bug据说已经被修复,不过类似Netty等框架还是加了一层保护防止突然间的这种100%CPU的bug。
有哪些框架使用这种reactor模式么,有,比如Redis。Redis一直被提到的一点单线程,就是指这里的EventLoop。如果你要说这种reactor模式有什么问题,很显然,如果某个client执行了很长时间的操作,比如在上面的代码中输入5000,即sleep(5000ms)的话,另外一个client输入的命令要等着5秒过去之后才会被执行。这是因为只有一个EventLoop的线程。Redis就是这样一种情况,如果你在某个client里面执行了keys *,其他client有可能会被卡住。当然Redis选择单线程,更多地可能是因为数据一致性方面的原因,多线程下的数据操作很难等等。
回到正题,单EventLoop的设计基本上是不可取的,那么你可以选择第二个单EventLoop,多工作线程的方式。这种reactor模式与很多地方的模式很像,比如
- GUI的后台操作(GUI的EventLoop也是单线程的)
- 阻塞IO的服务器设计
等等,具体示例代码如下:
package in.xnnyygn.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiWorkerNonBlockingIoServer {
private class TimeHandler {
private final ExecutorService executorService;
private final ByteBuffer buffer;
private int time = 0;
TimeHandler(ExecutorService executorService) {
this.executorService = executorService;
buffer = ByteBuffer.allocate(2048);
}
void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) {
executorService.submit(() -> {
try {
doReadFromChannel(selectionKey, socketChannel);
} catch (IOException e) {
throw new IllegalStateException(e);
}
});
}
private void doReadFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException {
int length = socketChannel.read(buffer);
if (length == -1) {
socketChannel.close();
return;
}
if (length == 0) return;
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.clear();
int time = Integer.parseInt(new String(bytes).trim());
if (time > 0 && time < 10000) {
try {
Thread.sleep(time);
} catch (InterruptedException ignored) {
}
this.time = time;
}
}
void writeToChannel(SocketChannel socketChannel) throws IOException {
if (time == 0) return;
buffer.put(String.valueOf(time).getBytes());
buffer.putChar('\n');
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
time = 0;
}
}
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
private final ExecutorService executorService;
public MultiWorkerNonBlockingIoServer(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
executorService = Executors.newFixedThreadPool(4);
}
public void run() throws IOException {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.interrupted()) {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
it.remove();
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("accept connection from " + socketChannel.getRemoteAddress());
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(executorService));
}
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
timeHandler.readFromChannel(selectionKey, socketChannel);
}
if (selectionKey.isWritable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
timeHandler.writeToChannel(socketChannel);
}
}
}
}
public static void main(String[] args) throws Exception {
MultiWorkerNonBlockingIoServer server = new MultiWorkerNonBlockingIoServer(8888);
server.run();
}
}
具体代码上变化不大,只有在readFromChannel的地方用线程池来执行了。实际运行中,你可以尝试某个client执行长时间操作,另外一个client不会受到影响。
当然,上面的代码存在一些问题,我只对read操作执行了异步,这会造成write的竞态条件等问题。比较好的解决方法是把工作线程作为attachment。但是那样的话,你必须自己实现ThreadPool。
这种reactor模式基本可以作为一个常规非阻塞IO的基础了。典型的比如NodeJS。(虽然NodeJS一直宣称自己是Single Thread EventLoop,而且这句话也没错,但是这不代表NodeJS是一个Single Thread程序。一方面NodeJS不是浏览器JS,另一方面NodeJS的工作线程有他们自己的线程池,callback不代表用了异步IO,只是由线程池在执行而已)如果没有记错的话,Nginx应该也是这种模型。
要满足C10K的话,理论上这种reactor模型就足够了,因为C10K只关注连接,不关注处理,哈哈。
不过也看到,在高并发的时候,假如5000个连接的话,单个Selector要处理的fd数量会非常多,虽然epoll没有fd数量限制,也有比较好的数据通知方法,从程序员的角度来说,用多个Selector来分开处理可能会更稳定一些。这也就是第三种reactor模型,即多Selector的模式。示例代码如下:
package in.xnnyygn.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiSelectorNonBlockingIoServer {
private class TimeProtocol {
private ByteBuffer buffer;
private int time = 0;
TimeProtocol() {
buffer = ByteBuffer.allocate(2048);
}
void onReadable(SocketChannel socketChannel) throws IOException {
int length = socketChannel.read(buffer);
if (length == -1) {
socketChannel.close();
return;
}
if (length == 0) return;
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.clear();
int time = Integer.parseInt(new String(bytes).trim());
System.out.println("got time " + time);
if (time > 0 && time < 20000) {
try {
Thread.sleep(time);
} catch (InterruptedException ignored) {
}
this.time = time;
}
}
void onWriteable(SocketChannel socketChannel) throws IOException {
if (time == 0) return;
buffer.put(String.valueOf(time).getBytes());
buffer.putChar('\n');
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
time = 0;
}
}
private class SubReactorPool {
private int current;
private SubReactor[] subReactors;
SubReactorPool(int nSubReactor) {
subReactors = new SubReactor[nSubReactor];
}
SubReactor getSubReactor(ExecutorService workerExecutorService) throws IOException {
int index = (++current) % subReactors.length;
SubReactor subReactor = subReactors[index];
if (subReactor == null) {
System.out.println("create new sub-reactor");
subReactor = new SubReactor(workerExecutorService);
subReactor.start();
subReactors[index] = subReactor;
}
return subReactor;
}
}
private class SubReactor extends Thread {
private final Selector selector;
private final ExecutorService executorService;
SubReactor(ExecutorService executorService) throws IOException {
this.selector = SelectorProvider.provider().openSelector();
this.executorService = executorService;
}
void addSocketChannel(SocketChannel socketChannel) throws IOException {
System.out.println("add socket " + socketChannel.getRemoteAddress());
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeProtocol());
}
public void run() {
System.out.println("start sub-reactor thread");
try {
while (!Thread.interrupted()) {
if (selector.selectNow() <= 0) continue;
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
it.remove();
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment();
executorService.submit(() -> {
try {
timeProtocol.onReadable(socketChannel);
} catch (ClosedChannelException e) {
selectionKey.cancel();
} catch (IOException e) {
e.printStackTrace();
}
});
}
if (selectionKey.isWritable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment();
timeProtocol.onWriteable(socketChannel);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private final Selector mainSelector;
private final ServerSocketChannel serverSocketChannel;
private final SubReactorPool subReactorPool;
private final ExecutorService workerExecutorService;
public MultiSelectorNonBlockingIoServer(int port) throws IOException {
mainSelector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
subReactorPool = new SubReactorPool(2);
workerExecutorService = Executors.newFixedThreadPool(4);
}
public void run() throws IOException {
serverSocketChannel.register(mainSelector, SelectionKey.OP_ACCEPT);
while (!Thread.interrupted()) {
mainSelector.select();
Iterator<SelectionKey> it = mainSelector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
it.remove();
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
subReactorPool.getSubReactor(workerExecutorService).addSocketChannel(socketChannel);
}
}
}
}
public static void main(String[] args) throws Exception {
MultiSelectorNonBlockingIoServer server = new MultiSelectorNonBlockingIoServer(8888);
server.run();
}
}
SubReactor里面也有EventLoop,所以是单独的线程。
请注意某个地方我并没有用Selector#select,而是用了Selector#selectNow,selectNow是非阻塞版本。其原因是SocketChannel注册时候的竞态条件,估计改一下处理方式可能更好些。
SubReactor主要负责SocketChannel的读写事件,而主EventLoop只关心Accept事件,这样其实分得更加清楚了。
使用这种模式的典型的如Netty的Boss/Worker模型。
除了第一种reactor模型之外,第二第三都是常用的模型,如果你像从头开始写的话,可以参考一下。
另外,示例代码中并没有对于Socket的close很好的处理,还有数据的组包问题也没有解决,后者也是Socket开发的老问题了。
希望我的代码以及说明对各位有用。
参考