前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

  篇将重点介绍基于网络编程NIO(异步IO)。

WX20190222-171710@2x

Selector 通道管理器

  在我的JavaNIO详解(一)中已经详细介绍了Java NIO三个核心对象中的Buffer和Channel,现在我们就重点介绍一下第三个核心对象Selector。

  Selector是一个对象,它可以被多个Channel注册,这样就可以监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样,通过一个线程管理多个Channel,就可以处理大量网络连接了。

  有了Selector,我们就可以利用一个线程来处理所有的channels。线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好。
  但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。        WX20180328-110213@2x

1、创建一个Selector

  异步 I/O 中的核心对象名为 Selector。Selector 就是您注册对各种 I/O 事件兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。


Selector selector = Selector.open();

2、注册Channel到Selector

  为了能让Channel和Selector配合使用,将Channel注册到Selector上。通过调用 channel.register()方法来实现注册:


//设置通道为 非阻塞 ,,注册的Channel 必须设置成异步模式 才可以,,否则异步IO就无法工作,这就意味着我们不能把一个FileChannel注册到Selector,因为`FileChannel`没有异步模式,但是网络编程中的`SocketChannel`是可以的。  
channel.configureBlocking(false);

 //第二个参数为,selector对那些事件感兴趣 ,即指定我们想要监听accept事件,也就是新的连接发生时所产生的事件,对于ServerSocketChannel通道来说,我们唯一可以指定的参数就是OP_ACCEPT。
//  将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
//  注册该事件后,当事件到达的时候,selector.select()会返回, 如果事件没有到达selector.select()会一直阻塞
SelectionKey key =channel.register(selector,SelectionKey.OP_ACCEPT);

通道触发了一个事件,该事件已经 Ready(就绪)。

  1. Connect 某个Channel成功连接到另一个服务器称Connect Ready。(作用到客户端)
  2. Accept 一个ServerSocketChannel准备好接收新连接称为 Accept Ready,也就是说服务端有新连接的时候(作用到服务端)
  3. Read 一个有数据可读的通道可以说是 Read Ready
  4. Write 等待写数据的通道可以说是Write Ready

    3、SelectionKey(上面的4个事件)

3.1、channel.register()的调用的返回值是一个SelectionKey。

  SelectionKey 代表这某个channel在 Selector 上注册。当某个 Selector 通知您某个事件发生的时候,是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。   

3.1.1、SelectionKey中包含如下属性:

1. SelectionKey.OP_CONNECT
2. SelectionKey.OP_ACCEPT
3. SelectionKey.OP_READ
4. SelectionKey.OP_WRITE

3.1.2、如果你对多个事件感兴趣,可以通过or操作符来连接这些常量:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 


3.1.3、检测Channel中什么事件或操作已经就绪


int readySet = selectionKey.readyOps();


selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

3.2、selector.select() (表示有注册到selector的事件发生了)

一旦调用了select()方法,它就会返回一个数值,表示一个或多个channel通道已经就绪,然后你就可以通过调用selector.selectedKeys()

 
int select()  当注册事件到达时方法返回否则该方法会一直阻塞阻塞到至少有一个通道在你注册的事件上就绪方法将返回所发生的事件的数量


int select(long timeout)select()一样除了最长会阻塞timeout毫秒(参数);如果自从前一次选择操作后没有通道变成可选择的则此方法直接返回零

int selectNow() 不会阻塞不管什么通道就绪都立刻返回此方法执行非阻塞的选择操作

select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。
如果调用select()方法,因为有一个通道变成就绪状态,返回了1,
若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。
如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,

3.3、selector.selectedKeys()

一旦调用了select()方法,它就会返回一个数值,表示一个或多个通道已经就绪出发了监听事件,然后你就可以通过调用selector.selectedKeys()方法返回的SelectionKey集合来获得就绪的Channel。请看演示方法:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

当你通过Selector注册一个Channel时,channel.register()方法会返回一个SelectionKey对象,这个对象就代表了你注册的Channel。这些对象可以通过selectedKeys()方法获得。你可以通过迭代这些selected key来获得就绪的Channel,下面是演示代码:


Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) { 
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
keyIterator.remove();
}

3.4、SelectionKey获取Channel 和Selector

我们可以通过SelectionKey获得Selector和注册的Channel:


Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector(); 

4、客户端和服务端说明

用到的两个buffer的方法

ByteBuffer.array : ByteBuffer转byte数组

ByteBuffer.array()  返回的 array 长度为 ByteBuffer allocate的长度,并不是里面所含的内容的长度


byte[] data = buffer.array();

ByteBuffer.wrap :byte数组转ByteBuffer

ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());


    private void read(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        // 穿件读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10);
        channel.read(buffer);
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("client receive msg from server:" + msg);
        ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
        channel.write(outBuffer);

    }

4.1、服务端

4.1.1、将ServerSocketChannel注册到服务端的selector上,事件类型为SelectionKey.OP_ACCEPT

4.1.2、通过服务端通道获取客户端通道,开始和客户端进行交互


ServerSocketChannel server = (ServerSocketChannel)  
// 客户端请求连接事件成功
 if (key.isAcceptable()) {
     ServerSocketChannel server = (ServerSocketChannel) key.channel();
     // 获得客户端连接的通道
     SocketChannel channel = server.accept();
     // 设置成非阻塞
     channel.configureBlocking(false);
     // 在这里可以发送消息给客户端
     channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
     // 在客户端 连接成功之后,注册到 服务端的通道管理器中,接收到客户端的信息,第二个是监听读的兴趣,
     channel.register(this.selector, SelectionKey.OP_READ);
     // 获得了可读的事件
}
   
          
package com.hlj.nio.D02Nio网络编程;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
 * @Description
 * @Author HealerJean
 * @Date 2018/3/28  下午3:14.
 */



public class NIOServer {

    // 通道管理器
    private Selector selector;

    public void initServer(int port) throws Exception {
        // 1、获得通道管理器
        this.selector = Selector.open();

        // 2、获得一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        //    2.1、设置通道为 非阻塞 ,注册的Channel 必须设置成异步模式 才可以,,否则异步IO就无法工作,
        //                  这就意味着我们不能把一个FileChannel注册到Selector,因为`FileChannel`没有异步模式,
        //                  但是网络编程中的`SocketChannel`是可以的。
        serverChannel.configureBlocking(false);

        //    2.2、将该通道对于的serverSocket绑定到port端口
        ServerSocket serverSocket =  serverChannel.socket() ;
        serverSocket.bind(new InetSocketAddress(port));

        // 3、channel注册到selector
        //      第二个参数为,selector对那些事件感兴趣 ,即指定我们想要监听accept事件,也就是新的连接发生时所产生的事件,对于ServerSocketChannel通道来说,我们唯一可以指定的参数就是OP_ACCEPT。
        //  将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
        //  注册该事件后,当事件到达的时候,selector.select()会返回, 如果事件没有到达selector.select()会一直阻塞

        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    // 采用轮训的方式监听selector上是否有需要处理的事件,如果有,进行处理
    public void listen() throws Exception {

        //启动服务器
        System.out.println("start server");

        // 轮询访问selector
        while (true) {
            // 当注册事件到达时,方法返回,否则该方法会一直阻塞,阻塞到至少有一个通道在你注册的事件上就绪,方法将返回所发生的事件的数量。
            selector.select();
            // 获得selector中选中的相的迭代器,选中的相为注册的事件
            Iterator ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已使用过的key ,ite删除的是上一个key 以防重负处理
                ite.remove();
                // 客户端请求连接事件成功
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    // 获得客户端连接的通道
                    SocketChannel channel = server.accept();
                    // 设置成非阻塞
                    channel.configureBlocking(false);
                    // 在这里可以发送消息给客户端
                    channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
                    // 在客户端 连接成功之后,注册到 服务端的通道管理器中,接收到客户端的信息,第二个是监听读的兴趣,
                    channel.register(this.selector, SelectionKey.OP_READ);
                    // 获得了可读的事件

                 //获取客户端的channel的数据
                } else if (key.isReadable()) {
                    read(key);
                }

            }
        }
    }

    // 处理 读取客户端发来的信息事件
    private void read(SelectionKey key) throws Exception {
        // 服务器可读消息,得到事件发生的socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 穿件读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10);
        channel.read(buffer);

        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("server receive from client: " + msg);

        //服务端给客户端回复内容
        ByteBuffer outBuffer = ByteBuffer.wrap(("来自于服务端的回复"+msg).getBytes());
        channel.write(outBuffer);
    }

    @Test
    public void startServer(){
        try {
            initServer(8989);
            listen();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

4.4、客户端

4.4.1、客户端通道SocketChannel注册到客户端的通道管理器上,注册事件为SelectionKey.OP_CONNECT

4.4.2、客户端接收服务器发来的消息,以及客户端发消息给服务端


 // 在这里可以给服务端发送信息哦
channel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));


package com.hlj.nio.D02Nio网络编程;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * @Description
 * @Author HealerJean
 * @Date 2018/3/28  下午3:14.
 */

public class NIOClient {

    // 通道管理器
    private Selector selector;

    /**
     * 初始化客户端
     * @param ip 客户端Ip
     * @param port 服务端暴露出的端口
     */
    public void initClient(String ip, int port) throws IOException { // 获得一个Socket通道

        // 1、获得通道管理器
        this.selector = Selector.open(); // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调

        //2、客户端获得通道
        SocketChannel channel = SocketChannel.open();
        // 设置通道为非阻塞
        channel.configureBlocking(false);

        // 3、连接服务端 ,能够得到服务器的响应,isConnectionPending 正在链接
        //  此时还没有完全建立 连接,相当于和服务器打了个招呼,说要建立链接
        //  后面使用,用channel.finishConnect();才能完成连接,
        channel.connect(new InetSocketAddress(ip, port));

        // 4、将该通道绑定的到通道管理器selector  ,注册的事件为连接事件(连接到服务器成功)
        channel.register(selector, SelectionKey.OP_CONNECT);
    }

    @SuppressWarnings("unchecked")
    public void listen() throws Exception { // 轮询访问selector
        while (true) {
            // 当注册事件到达时,方法返回,否则该方法会一直阻塞,阻塞到至少有一个通道在你注册的事件上就绪,方法将返回所发生的事件的数量。
            selector.select();
            Iterator ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已使用过的key ,ite删除的是上一个key 以防重负处理
                ite.remove();
                if (key.isConnectable()) { //表示已经连接上服务器了
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 如果正在连接,则完成连接,完成真正的连接
                    if (channel.isConnectionPending()) {
                        channel.finishConnect();
                    } // 设置成非阻塞
                    channel.configureBlocking(false);
                    // 在这里可以给服务端发送信息哦
                    channel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
                    // 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
                    channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件
                } else if (key.isReadable()) {
                    read(key);
                }
            }
        }
    }

    private void read(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        // 穿件读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10);
        channel.read(buffer);
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("client receive msg from server:" + msg);
        ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
        channel.write(outBuffer);

    }


    /**
     * 启动客户端测试
     */
    @Test
    public void startServer(){
        try {
            initClient("localhost",8989);
            listen();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

ContactAuthor