所谓的零拷贝是指用户态和内核态之间不存在拷贝。存在用户态到内核态的切换,但是拷贝都是发生在内核态。
网络编程
IO模型
- 同步阻塞IO:客户端发送请求,当服务端在等待某个操作的时候,客户端就会被阻塞,直到服务端完成请求。
- 同步非阻塞IO:客户端发送请求,当服务端在等待某个操作时,不会阻塞,此时客户端不断轮询去发送请求,直到服务端能够接受然后去执行。但是在执行过程中客户端依旧是阻塞的。并且循环请求消耗很多的资源。
- (也是同步)多路复用:客户端在服务端未准备好执行事件的时候先阻塞,当服务端准备好后,才去发送请求。无事件阻塞,有事件执行。客户端在服务端准备期间和执行期间都是堵塞的。
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)
- 异步非阻塞:客户端发送请求,通知服务端完成事件后,再通过回调方法来返回结果。
零拷贝优化(并非真正的0拷贝)
- 所谓的零拷贝是指用户态和内核态之间不存在拷贝。存在用户态到内核态的切换,但是拷贝都是发生在内核态。
- 问题:JAVA的IO实际不是物理设备级别的读写,而是缓存的复制,底层还是调用操作系统的读写方法,所以JAVA的每次读写都需要进行用户态到内核态,再从内核态到用户态的切换以及缓冲区的拷贝。
- 优化1:可以通过DirectByteBuffer进行优化。将堆外内存映射到JVM内存中来直接访问使用。减少一次数据的拷贝。但是用户态和内核态的切换次数没有减少。
- 优化2:通过channel调用transferTo/transferFrom方式拷贝数据。底层调用sendFile方法,直接将数据从内核缓冲区传输到socket缓冲区不经过JAVA,减少了用户态和内核态之间的切换。
- 优化3:也是调用transferTo方式,相比较优化2直接将内核缓冲区的数据发送到网卡。
多线程优化方向

- 一个boss只监听accept事件,多个woker(数量最好与cpu核数保持一致)监听读写事件。
- 每个selector和一个线程对应。
核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.bind(new InetSocketAddress(8282));
Selector boss = Selector.open(); ssc.register(boss, SelectionKey.OP_ACCEPT);
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker("word-"+i); } AtomicInteger size = new AtomicInteger();
while(true){ boss.select(); Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if(key.isAcceptable()){
SocketChannel sc = ssc.accept(); log.debug("监听到连接{}",sc.getRemoteAddress()); sc.configureBlocking(false);
workers[size.getAndIncrement() % workers.length].register(sc);
} }
} }
static class Worker implements Runnable{ private Thread thread; private Selector workSelector; private String name; private volatile boolean isStart = false;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) { this.name = name; }
public void register(SocketChannel socketChannel) throws IOException { if(!isStart){ thread = new Thread(this,name); workSelector = Selector.open(); thread.start(); isStart = true; } queue.add(()->{ try { socketChannel.register(workSelector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } }); workSelector.wakeup(); } @Override public void run() { while(true){ try { workSelector.select(); Runnable task = queue.poll(); if(task!=null){ task.run(); } Iterator<SelectionKey> iterator = workSelector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove();
if(key.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); int read = channel.read(buffer); if(read==-1){ key.cancel(); } System.out.println(read); } } } catch (IOException e) { e.printStackTrace(); } } } }
|