NIO基础

selector将非阻塞模式改进为无事件阻塞,有事件非阻塞。

网络编程

非阻塞,阻塞和多路复用

  1. 阻塞:线程没有监听到指定的操作执行时,就会停止运行。如ServerSocketChannel.accept()和channel.read(buffer)都是阻塞方法。在单线程的情况下,服务端只能执行一轮accpet一个线程并且read客户端传来的数据,但是会阻塞在下一次的accept。所以阻塞方法只能适用在多线程的情况下。但是在多线程的情况下也会有线程太多,频繁上下文切换导致性能降低.
  2. 非阻塞:各个操作之间没有影响,可以监听多个线程的多个操作。问题在于一直循环cpu占用率很高.
  3. 多路复用:单线程情况下通过配合Selector完成对多个channel可读写事件的监控,就是多路复用。
    Selector保证了有可连接事件时才去连接,有可读事件时才去读取,有可写事件时才去写入.

Selector(基于事件驱动)(多路复用)

1. 核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public static void main(String[] args) throws IOException {
//1.定义Selector,管理多个channel
Selector selector = Selector.open();

//服务端的channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8282));

//2.将ServerSocketChannel注册到selector下,参数0表示不监听任何事件,通过下一个方法进行监听
SelectionKey sscKey = ssc.register(selector,0,null);

//3.定义SelectionKey的监听事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);

while(true){
//4.select方法
//当没有任何事件发生则阻塞,任一事件发生了就继续执行。
//避免了无效的空转
selector.select();

//5.处理事件,方法返回所有可用的集合事件
//利用迭代器遍历
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey curKey = iterator.next();

//非常重要,解决空指针异常,当一个selectionKey上的事件都处理完了之后,nio并不会将该selectionKey从集合中除去,下次执行就会爆空指针异常,所以要手动除去。
iterator.remove();

//区分事件类型
if(curKey.isAcceptable()){
log.debug("连接事件...");

//6.通过SelectionKey获取到关联的channel
ServerSocketChannel channel = (ServerSocketChannel)curKey.channel();

//执行对应的事件
SocketChannel sc = channel.accept();
sc.configureBlocking(false);

//7.将SocketChannel注册到Selector中,并将buffer作为附件关联到SelectionKey,使之能够一一对应,一个channel维护一个独立的buffer,避免多线程情况下buffer中内容混乱。
ByteBuffer buffer = ByteBuffer.allocate(16);
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
}else if(curKey.isReadable()){

try {
log.debug("读取事件...");

//6.通过SelectionKey获取到关联的channel
SocketChannel channel = (SocketChannel)curKey.channel();
//7.获取附件的buffer
ByteBuffer buffer = (ByteBuffer)curKey.attachment();
int read = channel.read(buffer);

//如果客户端正常断开,返回值拿到-1,需要将事件取消
if(read == -1) {
curKey.cancel();
continue;
}

split(buffer);
//如果position和limit相同,则说明buffer已经满了
if(buffer.position()==buffer.limit()){
System.out.println("经过一次扩容");
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
curKey.attach(newBuffer);
}else{
for (int j = buffer.position(); j < buffer.limit(); j++) {
System.out.print((char)buffer.get(j));
}
System.out.println();
}


} catch (IOException e) {
//如果强制断开,进入异常,需要将事件取消
e.printStackTrace();
curKey.cancel();
}
}
}
}
}

2. 理解

  • Selector类似于注册中心,通过selector可以获取到所有的channel。
  • Selector模式下有两个集合,分别为channel集合(key为selectionKey,value为channel)和事件集合(key为selectionKey,value为事件队列)。当select()方法监听到事件后,会同时添加两个到两个集合中,而当事件执行完成后,会将事件集合中当前键值对的value去除掉已经执行的事件,但是即使value为空了当前键值对也不会删除,所以每次都需要手动地去迭代器中删除key,来解决空指针异常。
  • selector.select()等待的是所有事件(包括未处理事件),所以不会造成一个事件阻塞而导致另外事件无法监听到的问题。问题在于事件必须处理,或者执行cancel方法,否则会一直轮询。
  • selector将非阻塞模式改进为无事件阻塞,有事件非阻塞。
  • channel通过register注册到selector,注册的同时绑定一个selectionKey,selectionKey通过interestOps监听事件,并且可以通过attach绑定buffer。

3. 处理消息边界

  • 拆包粘包:LTV或者TLV协议传输
  • attachment附件:channel注册时绑定对应的buffer,将一个buffer作为附件关联到selectionKey上。
  • channel容量不足:扩容然后作为新的附件关联到selectionKey上。

4. buffer大小分配

  1. 先分配小的buffer,不断两倍扩容。消息连续易于处理,但是需要拷贝耗费性能。
  2. 多个数组组成buffer,一个数组不够就把多的内容写入新的数组。不连续解析复杂,但是避免了拷贝。

5. Select()何时不阻塞

  1. 客户端发起连接,触发accept
  2. 客户端发数据,客户端正常,异常关闭或者发送的数据大于buffer缓冲区触发1到n次读取事件。
  3. channel可写,触发1到n次写入事件。
  4. 调用selector的wakeup()和close()方法。

    ByteBuffer 和 Channel 常用方法

  • 读取
    从channel读取数据填充ByteBuffer。
    int readBytes = channel.read(buffer)
  • 写入
    1
    2
    3
    4
    5
    6
    7
    8
    ByteBuffer buffer = ...
    buffer.put(..);
    buffer.flip();//切换读模式

    while(buffer.hasRemaining()) {
    channel.write(buffer);
    //channel.write(Charset.defaultCharset().encode("hello"));
    }
  • 文件传输
    from.transferTo(position,size,to)

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!