非阻塞IO与reactor模式


在写了非阻塞IO的基础知识之后,决定学习一下常规的非阻塞IO运行模式。

所谓运行模式,就是指以怎样的代码来实现非阻塞IO服务。为了比较和说明,先从阻塞IO的线程池化的服务器开始。

package in.xnnyygn.nio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingIOEchoServer {

    private class EchoHandler implements Runnable {

        private final Socket socket;

        EchoHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            System.out.println("handle requests from " + socket.getRemoteSocketAddress());
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while (!"exit".equals(line = reader.readLine())) {
                    socket.getOutputStream().write(line.getBytes());
                    socket.getOutputStream().write('\n');
                }
            } catch (IOException e) {
                throw new IllegalStateException(e);
            } finally {
                System.out.println("close socket");
                try {
                    socket.close();
                } catch (IOException ignored) {
                }
            }
        }
    }

    private final ServerSocket serverSocket;
    private final ExecutorService executorService = Executors.newFixedThreadPool(4);

    public BlockingIOEchoServer(int port) throws IOException {
        this.serverSocket = new ServerSocket(port);
    }

    public void run() throws IOException {
        while (!Thread.interrupted()) {
            Socket socket = this.serverSocket.accept();
            executorService.submit(new EchoHandler(socket));
        }
    }

    public static void main(String[] args) throws Exception {
        int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888;
        BlockingIOEchoServer server = new BlockingIOEchoServer(port);
        System.out.println("server started at port " + port);
        server.run();
    }
}

这是一个简单的Echo服务。ServerSocket在accept得到一个Socket之后,由线程池中的某个线程来处理这个Socket。

使用线程池的服务端设计和实现都很简单,单个请求处理内都是线程安全的,在非高并发的环境下比较推荐。

基于非阻塞IO的服务器,通常使用reactor模式。这里的reactor模式是事件驱动设计模式的一种,不是某个JS框架。reactor从字面上比较难以理解,部分文章建议用dispatcher来理解这个模式。实际的非阻塞IO的reactor模式有三种:

  1. 单线程EventLoop
  2. 单线程EventLoop,多工作线程
  3. 单线程EventLoop(Acceptor),多个子Reactor(处理Read/Write),多工作线程

单线程EventLoop是最简单,也是学习非阻塞IO最简单的代码。

package in.xnnyygn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class SimpleNonBlockingIoServer {

    private class TimeHandler {

        private String id;
        private ByteBuffer buffer;
        private int time = 0;

        TimeHandler(String id) {
            this.id = id;
            buffer = ByteBuffer.allocate(2048);
        }

        void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException {
            int length = socketChannel.read(buffer);
            if (length == -1) {
                System.out.println(id + ": close");
                selectionKey.cancel();
                socketChannel.close();
                return;
            }

            if (length == 0) return;

            buffer.flip();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            buffer.clear();

            int time = Integer.parseInt(new String(bytes).trim());
            System.out.println(id + ": got time " + time);
            if (time > 0 && time < 10000) {
                try {
                    Thread.sleep(time);
                } catch (InterruptedException ignored) {
                }

                this.time = time;
            }
        }

        void writeToChannel(SocketChannel socketChannel) throws IOException {
            if (time == 0) return;

            System.out.println(id + ": reply with time " + time);

            buffer.put(String.valueOf(time).getBytes());
            buffer.putChar('\n');

            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();

            time = 0;
        }
    }

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;

    public SimpleNonBlockingIoServer(int port) throws IOException {
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
    }

    public void run() throws IOException {
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (!Thread.interrupted()) {
            selector.select();
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey selectionKey = it.next();
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);

                    String clientId = socketChannel.getRemoteAddress().toString();
                    System.out.println("accept connection from " + clientId);
                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(clientId));
                } else if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
                    timeHandler.readFromChannel(selectionKey, socketChannel);
                } else if (selectionKey.isWritable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
                    timeHandler.writeToChannel(socketChannel);
                }
                it.remove();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        int port = args.length > 0 ? Integer.parseInt(args[0]) : 8888;
        SimpleNonBlockingIoServer server = new SimpleNonBlockingIoServer(port);
        server.run();
    }
}

这里实现了一个自定义服务,输入一行数字,sleep指定时间。下面逐个讲解这段代码。

首先是SimpleNonBlockingIoServer的构造函数。像阻塞IO一样,需要一个ServerSocket的Channel,还有核心的Selector。

SimpleNonBlockingIoServer的run代码中,注册了ServerSocketChannel的ACCEPT事件。接下来就是所谓的EventLoop。Selector#select会阻塞(是的,这里的select是阻塞的)直到有ACCPET事件,然后代码遍历Selector的SelectionKey,分别对ACCEPT/READ/WRITE做处理。READ和WRITE是在ACCEPT得到一个Socket之后注册的。注册时附带了一个TimeHandler。这里有服务的核心逻辑。

代码中要注意的是SocketChannel#configureBlocking(false)是必须的,否则你会得到错误,因为你想把一个不是非阻塞的SocketChannel注册到Selector上。

使用非阻塞IO写代码,代码风格和阻塞IO大不相同,虽然外层都有一个while循环,但是你需要关注各种事件。还有因为IO多路复用的关系,你需要一个上下文来联系事件。幸好Java的NIO API中有attachment,你可以尝试使用它。

另外,关于Java NIO一个很有名的bug,想象一下Selector#select不阻塞,你的代码一直在循环,很有可能造成某个CPU一直在100%的状态。这个bug据说已经被修复,不过类似Netty等框架还是加了一层保护防止突然间的这种100%CPU的bug。

有哪些框架使用这种reactor模式么,有,比如Redis。Redis一直被提到的一点单线程,就是指这里的EventLoop。如果你要说这种reactor模式有什么问题,很显然,如果某个client执行了很长时间的操作,比如在上面的代码中输入5000,即sleep(5000ms)的话,另外一个client输入的命令要等着5秒过去之后才会被执行。这是因为只有一个EventLoop的线程。Redis就是这样一种情况,如果你在某个client里面执行了keys *,其他client有可能会被卡住。当然Redis选择单线程,更多地可能是因为数据一致性方面的原因,多线程下的数据操作很难等等。

回到正题,单EventLoop的设计基本上是不可取的,那么你可以选择第二个单EventLoop,多工作线程的方式。这种reactor模式与很多地方的模式很像,比如

  • GUI的后台操作(GUI的EventLoop也是单线程的)
  • 阻塞IO的服务器设计

等等,具体示例代码如下:

package in.xnnyygn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiWorkerNonBlockingIoServer {

    private class TimeHandler {

        private final ExecutorService executorService;
        private final ByteBuffer buffer;
        private int time = 0;

        TimeHandler(ExecutorService executorService) {
            this.executorService = executorService;

            buffer = ByteBuffer.allocate(2048);
        }

        void readFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) {
            executorService.submit(() -> {
                try {
                    doReadFromChannel(selectionKey, socketChannel);
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            });
        }

        private void doReadFromChannel(SelectionKey selectionKey, SocketChannel socketChannel) throws IOException {
            int length = socketChannel.read(buffer);
            if (length == -1) {
                socketChannel.close();
                return;
            }

            if (length == 0) return;

            buffer.flip();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            buffer.clear();

            int time = Integer.parseInt(new String(bytes).trim());
            if (time > 0 && time < 10000) {
                try {
                    Thread.sleep(time);
                } catch (InterruptedException ignored) {
                }

                this.time = time;
            }
        }

        void writeToChannel(SocketChannel socketChannel) throws IOException {
            if (time == 0) return;

            buffer.put(String.valueOf(time).getBytes());
            buffer.putChar('\n');

            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();

            time = 0;
        }
    }

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;
    private final ExecutorService executorService;

    public MultiWorkerNonBlockingIoServer(int port) throws IOException {
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);

        executorService = Executors.newFixedThreadPool(4);
    }

    public void run() throws IOException {
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (!Thread.interrupted()) {
            selector.select();
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey selectionKey = it.next();
                it.remove();

                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);

                    System.out.println("accept connection from " + socketChannel.getRemoteAddress());

                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeHandler(executorService));
                }

                if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
                    timeHandler.readFromChannel(selectionKey, socketChannel);
                }

                if (selectionKey.isWritable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    TimeHandler timeHandler = (TimeHandler) selectionKey.attachment();
                    timeHandler.writeToChannel(socketChannel);
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        MultiWorkerNonBlockingIoServer server = new MultiWorkerNonBlockingIoServer(8888);
        server.run();
    }
}

具体代码上变化不大,只有在readFromChannel的地方用线程池来执行了。实际运行中,你可以尝试某个client执行长时间操作,另外一个client不会受到影响。

当然,上面的代码存在一些问题,我只对read操作执行了异步,这会造成write的竞态条件等问题。比较好的解决方法是把工作线程作为attachment。但是那样的话,你必须自己实现ThreadPool。

这种reactor模式基本可以作为一个常规非阻塞IO的基础了。典型的比如NodeJS。(虽然NodeJS一直宣称自己是Single Thread EventLoop,而且这句话也没错,但是这不代表NodeJS是一个Single Thread程序。一方面NodeJS不是浏览器JS,另一方面NodeJS的工作线程有他们自己的线程池,callback不代表用了异步IO,只是由线程池在执行而已)如果没有记错的话,Nginx应该也是这种模型。

要满足C10K的话,理论上这种reactor模型就足够了,因为C10K只关注连接,不关注处理,哈哈。

不过也看到,在高并发的时候,假如5000个连接的话,单个Selector要处理的fd数量会非常多,虽然epoll没有fd数量限制,也有比较好的数据通知方法,从程序员的角度来说,用多个Selector来分开处理可能会更稳定一些。这也就是第三种reactor模型,即多Selector的模式。示例代码如下:

package in.xnnyygn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiSelectorNonBlockingIoServer {

    private class TimeProtocol {

        private ByteBuffer buffer;
        private int time = 0;

        TimeProtocol() {
            buffer = ByteBuffer.allocate(2048);
        }

        void onReadable(SocketChannel socketChannel) throws IOException {
            int length = socketChannel.read(buffer);
            if (length == -1) {
                socketChannel.close();
                return;
            }

            if (length == 0) return;

            buffer.flip();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            buffer.clear();

            int time = Integer.parseInt(new String(bytes).trim());
            System.out.println("got time " + time);
            if (time > 0 && time < 20000) {
                try {
                    Thread.sleep(time);
                } catch (InterruptedException ignored) {
                }

                this.time = time;
            }
        }

        void onWriteable(SocketChannel socketChannel) throws IOException {
            if (time == 0) return;

            buffer.put(String.valueOf(time).getBytes());
            buffer.putChar('\n');

            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();

            time = 0;
        }
    }

    private class SubReactorPool {

        private int current;
        private SubReactor[] subReactors;

        SubReactorPool(int nSubReactor) {
            subReactors = new SubReactor[nSubReactor];
        }

        SubReactor getSubReactor(ExecutorService workerExecutorService) throws IOException {
            int index = (++current) % subReactors.length;
            SubReactor subReactor = subReactors[index];
            if (subReactor == null) {
                System.out.println("create new sub-reactor");
                subReactor = new SubReactor(workerExecutorService);
                subReactor.start();
                subReactors[index] = subReactor;
            }
            return subReactor;
        }
    }

    private class SubReactor extends Thread {

        private final Selector selector;
        private final ExecutorService executorService;

        SubReactor(ExecutorService executorService) throws IOException {
            this.selector = SelectorProvider.provider().openSelector();
            this.executorService = executorService;
        }

        void addSocketChannel(SocketChannel socketChannel) throws IOException {
            System.out.println("add socket " + socketChannel.getRemoteAddress());
            socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new TimeProtocol());
        }

        public void run() {
            System.out.println("start sub-reactor thread");
            try {
                while (!Thread.interrupted()) {
                    if (selector.selectNow() <= 0) continue;

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey selectionKey = it.next();
                        it.remove();
                        if (selectionKey.isReadable()) {
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment();
                            executorService.submit(() -> {
                                try {
                                    timeProtocol.onReadable(socketChannel);
                                } catch (ClosedChannelException e) {
                                    selectionKey.cancel();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            });
                        }

                        if (selectionKey.isWritable()) {
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            TimeProtocol timeProtocol = (TimeProtocol) selectionKey.attachment();
                            timeProtocol.onWriteable(socketChannel);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private final Selector mainSelector;
    private final ServerSocketChannel serverSocketChannel;

    private final SubReactorPool subReactorPool;
    private final ExecutorService workerExecutorService;

    public MultiSelectorNonBlockingIoServer(int port) throws IOException {
        mainSelector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);

        subReactorPool = new SubReactorPool(2);
        workerExecutorService = Executors.newFixedThreadPool(4);
    }

    public void run() throws IOException {
        serverSocketChannel.register(mainSelector, SelectionKey.OP_ACCEPT);
        while (!Thread.interrupted()) {
            mainSelector.select();
            Iterator<SelectionKey> it = mainSelector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey selectionKey = it.next();
                it.remove();
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    subReactorPool.getSubReactor(workerExecutorService).addSocketChannel(socketChannel);
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        MultiSelectorNonBlockingIoServer server = new MultiSelectorNonBlockingIoServer(8888);
        server.run();
    }

}

SubReactor里面也有EventLoop,所以是单独的线程。

请注意某个地方我并没有用Selector#select,而是用了Selector#selectNow,selectNow是非阻塞版本。其原因是SocketChannel注册时候的竞态条件,估计改一下处理方式可能更好些。

SubReactor主要负责SocketChannel的读写事件,而主EventLoop只关心Accept事件,这样其实分得更加清楚了。

使用这种模式的典型的如Netty的Boss/Worker模型。

除了第一种reactor模型之外,第二第三都是常用的模型,如果你像从头开始写的话,可以参考一下。

另外,示例代码中并没有对于Socket的close很好的处理,还有数据的组包问题也没有解决,后者也是Socket开发的老问题了。

希望我的代码以及说明对各位有用。

参考