这是NIO学习笔记的第3篇,也是最后一篇了,还剩下最后一个核心类——Selector,此外会补充一些前面略过的点。

Selector

在使用BIO时,为了避免阻塞,我们可能会给每个I/O流开辟一个线程(高端一点,整个线程池啥的),但大量的线程仍可能占用掉过多的资源。启用NIO中Channel的非阻塞特性吧,上一篇末尾通过死循环来判断结果的逻辑是我不能接受的,甚至也达不到提升性能的目的,这时就需要Selector登场了。

Selector的作用是检查指定的Channel是否有某些的事件发生,比如有数据可以读/写了。如果有,我们才去调用相应的非阻塞方法,通过这种操作规避了在死循环中无意义地调用。

在使用Selector之前,我们需要打开它,并将关注的Channel事件注册给它

  • java.nio.channels.Selector#open
  • java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int)

注意:注册给Selector的Channel必须是非阻塞的,否则会抛出java.nio.channels.IllegalBlockingModeException异常

register方法的第二个int型参数实际上是java.nio.channels.SelectionKey中的常量,共有4个,表明Selector感兴趣的事件(读/写/建立连接/接受连接)

  • java.nio.channels.SelectionKey#OP_READ
  • java.nio.channels.SelectionKey#OP_WRITE
  • java.nio.channels.SelectionKey#OP_CONNECT
  • java.nio.channels.SelectionKey#OP_ACCEPT

注意:一个Selector中可以注册多个Channel,但一个Channel只能在一个Selector里注册一次,第二次及以后的注册只相当于更新SelectionKey

接着就可以从Selector中选取就绪的Channel了,对应的方法是

  • java.nio.channels.Selector#select()
  • java.nio.channels.Selector#selectNow

两者的区别是前者会阻塞直到至少有一个Channel可以被选中(不一定就绪哦~所以它很有可能返回“0个Channel已就绪”),而后者是非阻塞的,会立即返回当前就绪的Channel个数,此外select还有一个支持超时的重载版本。

当有Channel就绪以后,通过迭代器迭代下面方法返回的集合,就可以知道当前哪些Channel就绪了

  • java.nio.channels.Selector#selectedKeys

注意:处理过的Channel不会自动从集合中移除,需要手动remove或clear

再对这些就绪的Channel,用下面的方法判断是否有感兴趣的事件发生

  • java.nio.channels.SelectionKey#isReadable
  • java.nio.channels.SelectionKey#isWritable
  • java.nio.channels.SelectionKey#isConnectable
  • java.nio.channels.SelectionKey#isAcceptable

如果有,取出并强制造型相应的Channel作进一步处理

  • java.nio.channels.SelectionKey#channel

最后附上一个功能非常简单的非阻塞服务器程序,它使用TCP监听本机8888端口,接受任何传入的连接,不论这个连接向服务器发生什么,都回复当前时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class SelectorDemo {
public static void main(String[] args) throws IOException {
try (Selector selector = Selector.open();
ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8888));
//channel必须是非阻塞的
channel.configureBlocking(false);
//向selector注册关注的事件
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if (selector.select() == 0) {
//没有channel为ready(能被选中)
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
System.out.println("SELECT: " + key);
//处理关注的事件
if (key.isAcceptable()) {
//当前channel已经可以接受连接(那就接受连接呗!)
accept(((ServerSocketChannel) key.channel()).accept(), selector);
}
if (key.isReadable()) {
//当前channel已经可读(读取数据,然后关注写事件)
read((SocketChannel) key.channel());
key.interestOps(SelectionKey.OP_WRITE);
}
if (key.isWritable()) {
//当前channel已经可写(写入数据,然后关注读事件)
write((SocketChannel) key.channel());
key.interestOps(SelectionKey.OP_READ);
}
//selectionKey处理完成后要手动从selectedKeys中移除
keys.remove();
}
}
}
}
private static void accept(SocketChannel channel, Selector selector) throws IOException {
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
System.out.println("ACCEPT: " + channel);
}
private static void read(SocketChannel channel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println("READ: " + new String(bytes, Charset.forName("utf-8")));
}
}
}
private static void write(SocketChannel channel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
String string = new Date().toString();
System.out.println("WRITE: " + string);
buffer.put(string.getBytes(Charset.forName("utf-8")));
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
}
}

Paths和Files

这两位,是Java7以后新增的工具类,分别用于操作路径和文件,提供类似下面的几个方法

  • java.nio.file.Paths#get(java.lang.String, java.lang.String…)
  • java.nio.file.Files#copy(java.nio.file.Path, java.nio.file.Path, java.nio.file.CopyOption…)
  • java.nio.file.Files#move
  • java.nio.file.Files#delete
  • ……

AsynchronousFileChannel

之前提过,FileChannel是几个常见Channel中唯一一个不支持设置成非阻塞模式的Channel,但某些情况下我们确实有非阻塞操作文件的需求,这该怎么办呢?固然我们可以自己用多线程封装这么一个工具,很可能也已经有三方库实现了类似功能,但Java7以后对此提供了原生的支持——java.nio.channels.AsynchronousFileChannel。

首先依然是打开AsynchronousFileChannel

  • java.nio.channels.AsynchronousFileChannel#open(java.nio.file.Path, java.nio.file.OpenOption…)

第2个参数类似于打开RandomAccessFile时的第2个参数,OpenOption的实现类提供了若干个常量表示以读/写等模式打开文件。

随后就可以进行异步的读/写操作了。无论读还是写,都提供2个重载版本

  • java.nio.channels.AsynchronousFileChannel#read(java.nio.ByteBuffer, long)
  • java.nio.channels.AsynchronousFileChannel#read(java.nio.ByteBuffer, long, A, java.nio.channels.CompletionHandler)
  • java.nio.channels.AsynchronousFileChannel#write(java.nio.ByteBuffer, long)
  • java.nio.channels.AsynchronousFileChannel#write(java.nio.ByteBuffer, long, A, java.nio.channels.CompletionHandler)

其中2个参数的方法,返回java.util.concurrent.Future供后期获取执行结果,而4个参数的版本,第4个参数直接是一个java.nio.channels.CompletionHandler类型的、当对文件异步读/写完成的回调。

以读为例,测试代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static void readFuture() throws IOException, ExecutionException, InterruptedException {
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("nio-async-file.txt"), StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = channel.read(buffer, 0);
int length = future.get();
//处理读取结果
}
}
private static void readCallback() throws IOException {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("nio-async-file.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
//读取成功回调
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
//读取失败回调
}
});
}

分了三部分记的NIO学习笔记暂告一段,可能涉及的内容基本上只是NIO如何使用,没有过多地深入其细节。