2、NIO详解之网络编程
前言
Github:https://github.com/HealerJean
篇将重点介绍基于网络编程NIO(异步IO)。
Selector 通道管理器
在我的JavaNIO详解(一)中已经详细介绍了Java NIO三个核心对象中的Buffer和Channel,现在我们就重点介绍一下第三个核心对象Selector。
Selector是一个对象,它可以被多个Channel注册,这样就可以监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样,通过一个线程管理多个Channel,就可以处理大量网络连接了。
有了Selector,我们就可以利用一个线程来处理所有的channels。线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好。
但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。
1、创建一个Selector
异步 I/O 中的核心对象名为 Selector。Selector 就是您注册对各种 I/O 事件兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。
Selector selector = Selector.open();
2、注册Channel到Selector
为了能让Channel和Selector配合使用,将Channel注册到Selector上。通过调用 channel.register()方法来实现注册:
//设置通道为 非阻塞 ,,注册的Channel 必须设置成异步模式 才可以,,否则异步IO就无法工作,这就意味着我们不能把一个FileChannel注册到Selector,因为`FileChannel`没有异步模式,但是网络编程中的`SocketChannel`是可以的。
channel.configureBlocking(false);
//第二个参数为,selector对那些事件感兴趣 ,即指定我们想要监听accept事件,也就是新的连接发生时所产生的事件,对于ServerSocketChannel通道来说,我们唯一可以指定的参数就是OP_ACCEPT。
// 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
// 注册该事件后,当事件到达的时候,selector.select()会返回, 如果事件没有到达selector.select()会一直阻塞
SelectionKey key =channel.register(selector,SelectionKey.OP_ACCEPT);
通道触发了一个事件,该事件已经 Ready(就绪)。
- Connect 某个Channel成功连接到另一个服务器称为
Connect Ready
。(作用到客户端) - Accept 一个
ServerSocketChannel
准备好接收新连接称为Accept Ready
,也就是说服务端有新连接的时候(作用到服务端) - Read 一个有数据可读的通道可以说是
Read Ready
-
Write 等待写数据的通道可以说是
Write Ready
。
3、SelectionKey(上面的4个事件)
3.1、channel.register()的调用的返回值是一个SelectionKey。
SelectionKey 代表这某个channel在 Selector 上注册。当某个 Selector 通知您某个事件发生的时候,是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。3.1.1、SelectionKey中包含如下属性:
1. SelectionKey.OP_CONNECT
2. SelectionKey.OP_ACCEPT
3. SelectionKey.OP_READ
4. SelectionKey.OP_WRITE
3.1.2、如果你对多个事件感兴趣,可以通过or操作符来连接这些常量:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
3.1.3、检测Channel中什么事件或操作已经就绪
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
3.2、selector.select() (表示有注册到selector的事件发生了)
一旦调用了select()方法,它就会返回一个数值,表示一个或多个channel通道已经就绪,然后你就可以通过调用selector.selectedKeys()
int select() 当注册事件到达时,方法返回,否则该方法会一直阻塞,阻塞到至少有一个通道在你注册的事件上就绪,方法将返回所发生的事件的数量。
int select(long timeout):select()一样,除了最长会阻塞timeout毫秒(参数);如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。
int selectNow(): 不会阻塞,不管什么通道就绪都立刻返回,此方法执行非阻塞的选择操作。
select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。
如果调用select()方法,因为有一个通道变成就绪状态,返回了1,
若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。
如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,
3.3、selector.selectedKeys()
一旦调用了select()方法,它就会返回一个数值,表示一个或多个通道已经就绪出发了监听事件,然后你就可以通过调用selector.selectedKeys()方法返回的SelectionKey集合来获得就绪的Channel。请看演示方法:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
当你通过Selector注册一个Channel时,channel.register()方法会返回一个SelectionKey对象,这个对象就代表了你注册的Channel。这些对象可以通过selectedKeys()方法获得。你可以通过迭代这些selected key来获得就绪的Channel,下面是演示代码:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
3.4、SelectionKey获取Channel 和Selector
我们可以通过SelectionKey获得Selector和注册的Channel:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
4、客户端和服务端说明
用到的两个buffer的方法
ByteBuffer.array : ByteBuffer转byte数组
ByteBuffer.array() 返回的 array 长度为 ByteBuffer allocate的长度,并不是里面所含的内容的长度
byte[] data = buffer.array();
ByteBuffer.wrap :byte数组转ByteBuffer
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
private void read(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
// 穿件读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("client receive msg from server:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);
}
4.1、服务端
4.1.1、将ServerSocketChannel
注册到服务端的selector上,事件类型为SelectionKey.OP_ACCEPT
4.1.2、通过服务端通道获取客户端通道,开始和客户端进行交互
ServerSocketChannel server = (ServerSocketChannel)
// 客户端请求连接事件成功
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 获得客户端连接的通道
SocketChannel channel = server.accept();
// 设置成非阻塞
channel.configureBlocking(false);
// 在这里可以发送消息给客户端
channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
// 在客户端 连接成功之后,注册到 服务端的通道管理器中,接收到客户端的信息,第二个是监听读的兴趣,
channel.register(this.selector, SelectionKey.OP_READ);
// 获得了可读的事件
}
package com.hlj.nio.D02Nio网络编程;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* @Description
* @Author HealerJean
* @Date 2018/3/28 下午3:14.
*/
public class NIOServer {
// 通道管理器
private Selector selector;
public void initServer(int port) throws Exception {
// 1、获得通道管理器
this.selector = Selector.open();
// 2、获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 2.1、设置通道为 非阻塞 ,注册的Channel 必须设置成异步模式 才可以,,否则异步IO就无法工作,
// 这就意味着我们不能把一个FileChannel注册到Selector,因为`FileChannel`没有异步模式,
// 但是网络编程中的`SocketChannel`是可以的。
serverChannel.configureBlocking(false);
// 2.2、将该通道对于的serverSocket绑定到port端口
ServerSocket serverSocket = serverChannel.socket() ;
serverSocket.bind(new InetSocketAddress(port));
// 3、channel注册到selector
// 第二个参数为,selector对那些事件感兴趣 ,即指定我们想要监听accept事件,也就是新的连接发生时所产生的事件,对于ServerSocketChannel通道来说,我们唯一可以指定的参数就是OP_ACCEPT。
// 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
// 注册该事件后,当事件到达的时候,selector.select()会返回, 如果事件没有到达selector.select()会一直阻塞
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
// 采用轮训的方式监听selector上是否有需要处理的事件,如果有,进行处理
public void listen() throws Exception {
//启动服务器
System.out.println("start server");
// 轮询访问selector
while (true) {
// 当注册事件到达时,方法返回,否则该方法会一直阻塞,阻塞到至少有一个通道在你注册的事件上就绪,方法将返回所发生的事件的数量。
selector.select();
// 获得selector中选中的相的迭代器,选中的相为注册的事件
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已使用过的key ,ite删除的是上一个key 以防重负处理
ite.remove();
// 客户端请求连接事件成功
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 获得客户端连接的通道
SocketChannel channel = server.accept();
// 设置成非阻塞
channel.configureBlocking(false);
// 在这里可以发送消息给客户端
channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
// 在客户端 连接成功之后,注册到 服务端的通道管理器中,接收到客户端的信息,第二个是监听读的兴趣,
channel.register(this.selector, SelectionKey.OP_READ);
// 获得了可读的事件
//获取客户端的channel的数据
} else if (key.isReadable()) {
read(key);
}
}
}
}
// 处理 读取客户端发来的信息事件
private void read(SelectionKey key) throws Exception {
// 服务器可读消息,得到事件发生的socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 穿件读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("server receive from client: " + msg);
//服务端给客户端回复内容
ByteBuffer outBuffer = ByteBuffer.wrap(("来自于服务端的回复"+msg).getBytes());
channel.write(outBuffer);
}
@Test
public void startServer(){
try {
initServer(8989);
listen();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.4、客户端
4.4.1、客户端通道SocketChannel
注册到客户端的通道管理器上,注册事件为SelectionKey.OP_CONNECT
4.4.2、客户端接收服务器发来的消息,以及客户端发消息给服务端
// 在这里可以给服务端发送信息哦
channel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
package com.hlj.nio.D02Nio网络编程;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* @Description
* @Author HealerJean
* @Date 2018/3/28 下午3:14.
*/
public class NIOClient {
// 通道管理器
private Selector selector;
/**
* 初始化客户端
* @param ip 客户端Ip
* @param port 服务端暴露出的端口
*/
public void initClient(String ip, int port) throws IOException { // 获得一个Socket通道
// 1、获得通道管理器
this.selector = Selector.open(); // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
//2、客户端获得通道
SocketChannel channel = SocketChannel.open();
// 设置通道为非阻塞
channel.configureBlocking(false);
// 3、连接服务端 ,能够得到服务器的响应,isConnectionPending 正在链接
// 此时还没有完全建立 连接,相当于和服务器打了个招呼,说要建立链接
// 后面使用,用channel.finishConnect();才能完成连接,
channel.connect(new InetSocketAddress(ip, port));
// 4、将该通道绑定的到通道管理器selector ,注册的事件为连接事件(连接到服务器成功)
channel.register(selector, SelectionKey.OP_CONNECT);
}
@SuppressWarnings("unchecked")
public void listen() throws Exception { // 轮询访问selector
while (true) {
// 当注册事件到达时,方法返回,否则该方法会一直阻塞,阻塞到至少有一个通道在你注册的事件上就绪,方法将返回所发生的事件的数量。
selector.select();
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已使用过的key ,ite删除的是上一个key 以防重负处理
ite.remove();
if (key.isConnectable()) { //表示已经连接上服务器了
SocketChannel channel = (SocketChannel) key.channel();
// 如果正在连接,则完成连接,完成真正的连接
if (channel.isConnectionPending()) {
channel.finishConnect();
} // 设置成非阻塞
channel.configureBlocking(false);
// 在这里可以给服务端发送信息哦
channel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
// 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
private void read(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
// 穿件读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("client receive msg from server:" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);
}
/**
* 启动客户端测试
*/
@Test
public void startServer(){
try {
initClient("localhost",8989);
listen();
} catch (Exception e) {
e.printStackTrace();
}
}
}