NIO概念和多线程优化

所谓的零拷贝是指用户态和内核态之间不存在拷贝。存在用户态到内核态的切换,但是拷贝都是发生在内核态。

网络编程

IO模型

  1. 同步阻塞IO:客户端发送请求,当服务端在等待某个操作的时候,客户端就会被阻塞,直到服务端完成请求。
  2. 同步非阻塞IO:客户端发送请求,当服务端在等待某个操作时,不会阻塞,此时客户端不断轮询去发送请求,直到服务端能够接受然后去执行。但是在执行过程中客户端依旧是阻塞的。并且循环请求消耗很多的资源。
  3. (也是同步)多路复用:客户端在服务端未准备好执行事件的时候先阻塞,当服务端准备好后,才去发送请求。无事件阻塞,有事件执行。客户端在服务端准备期间和执行期间都是堵塞的。
  4. 同步:线程自己去获取结果(一个线程)
  5. 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)
  6. 异步非阻塞:客户端发送请求,通知服务端完成事件后,再通过回调方法来返回结果。

零拷贝优化(并非真正的0拷贝)

  1. 所谓的零拷贝是指用户态和内核态之间不存在拷贝。存在用户态到内核态的切换,但是拷贝都是发生在内核态。
  2. 问题:JAVA的IO实际不是物理设备级别的读写,而是缓存的复制,底层还是调用操作系统的读写方法,所以JAVA的每次读写都需要进行用户态到内核态,再从内核态到用户态的切换以及缓冲区的拷贝。
  3. 优化1:可以通过DirectByteBuffer进行优化。将堆外内存映射到JVM内存中来直接访问使用。减少一次数据的拷贝。但是用户态和内核态的切换次数没有减少。
  4. 优化2:通过channel调用transferTo/transferFrom方式拷贝数据。底层调用sendFile方法,直接将数据从内核缓冲区传输到socket缓冲区不经过JAVA,减少了用户态和内核态之间的切换。
  5. 优化3:也是调用transferTo方式,相比较优化2直接将内核缓冲区的数据发送到网卡

多线程优化方向

  1. 一个boss只监听accept事件,多个woker(数量最好与cpu核数保持一致)监听读写事件。
  2. 每个selector和一个线程对应。

核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8282));

Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT);

//创建固定数量的work,而不是每一次连接都创建一个work
//数量固定为cpu核数
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("word-"+i);
}
AtomicInteger size = new AtomicInteger();

//boss线程循环监听连接事件,无连接则阻塞。
while(true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){

SocketChannel sc = ssc.accept();
log.debug("监听到连接{}",sc.getRemoteAddress());
sc.configureBlocking(false);

//负载均衡,循环将channel注册到不同的worker上,并执行监听读写事件。
//固定了worker的数量后,每个worker就局部变成了单线程一个selector管理多个channel。
workers[size.getAndIncrement() % workers.length].register(sc);

}
}

}
}

static class Worker implements Runnable{
private Thread thread;
private Selector workSelector;
private String name;
private volatile boolean isStart = false; //判断是否已经启动线程

//任务队列,用于两个不同的线程情况下,同步不同的任务之间的顺序
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) { this.name = name; }

public void register(SocketChannel socketChannel) throws IOException {
if(!isStart){
thread = new Thread(this,name);
workSelector = Selector.open();
thread.start();
isStart = true;
}
//将一个线程任务放到任务队列中,等到另一个线程中再执行
queue.add(()->{
try {
socketChannel.register(workSelector,SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
workSelector.wakeup();
}
@Override
public void run() {
//boss线程循环监听读写事件,无连接则阻塞。
while(true){
try {
workSelector.select();
//将任务队列中的任务取出执行
Runnable task = queue.poll();
if(task!=null){
task.run();
}
Iterator<SelectionKey> iterator = workSelector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();

if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
int read = channel.read(buffer);
if(read==-1){
key.cancel();
}
System.out.println(read);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!