池化可以重用ByteBuf实例,高并发下,池化功能更节约内存,减少内存溢出的可能。
网络编程
核心代码
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler( new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder()); nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); } })
.bind(8282);
|
客户端(同步去处理结果)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| EventLoopGroup group = new NioEventLoopGroup(); Channel channel = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8282)) .sync() .channel();
while (true){ Scanner scanner = new Scanner(System.in); String s = scanner.nextLine(); if(s.equals("q")){ ChannelFuture closeFuture = channel.close(); closeFuture.sync(); break; }else{ channel.writeAndFlush(s); } }
group.shutdownGracefully();
|
理解
- handler为一道工序,pipeline为流水线,多道工序合在一起就是一个流水线。
- 每次有一个事件发生,pipeLine就会负责发布事件,传播给每个handler,handler对自己感兴趣的事件进行处理。
- eventLoop为处理数据的工人,负责管理多个channel的io操作,并且一旦工人负责了某个channel就要负责到底,即绑定。(能够保证线程安全)
- eventLoop除了io操作还可以进行任务处理,并且有任务队列。
- eventLoop按照pipeline里的handler顺序去处理数据。
ByteBuf
直接内存和堆内存
- 直接内存创建和销毁的代价昂贵,但是读写性能高,适合配合池化功能。
- 直接内存对GC压力小,不受JVM垃圾回收管理,但要注意主动释放。
池化和非池化
- 池化可以重用ByteBuf实例,高并发下,池化功能更节约内存,减少内存溢出的可能。
- 没有池化,每次都要创建新的实例,这种方式对直接内存代价太高,对于堆内存会增加GC压力。
内存回收
- UnpooledHeapByteBuf使用JVM内存,只需要等待GC回收内存。
- UnpooledDirectByteBuf,需要特殊的方法来回收。
- PooledByteBuf采用的池化的机制,需要更复杂的规则来回收。
- 因为pipeline的存在,一般需要将ByteBuf传递给下一个handler,如果在finally中release了,就失去了传递性,所以谁是ByteBuf(如果某个handler对ByteBuf进行了转换也算)最后的使用者,谁来负责release。
零拷贝的体现
- slice方法,划分为多个ByteBuf分开进行读和修改(不能进行写操作,切片后的ByteBuf如果进行写,会影响原有的ByteBuf),但是多个ByteBuf共用的同一片内存,并没有进行数据复制。
- duplicate方法,相当于截取了原始的ByteBuf的全部内容,但是用的是同一块底层内存,只是读写指针式独立的。
- Unpooled.wrappedBuffer(buf1,buf2)将两个buf组合为一个buf。
相比较ByteBuffer的优势
- 池化,可以重用ByteBuf实例,节约内存,减少内存溢出的可能。
- 读写指针分离,不需要切换读写模式。
- 自动扩容。
- 支持链式调用。
- 许多方法体现零拷贝。
双向通信的实现
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug(msg.toString()); ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(msg.toString().getBytes(StandardCharsets.UTF_8)); ctx.writeAndFlush(buffer); super.channelRead(ctx, msg);
} }); } }) .bind(8282);
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| EventLoopGroup group = new NioEventLoopGroup(); ChannelFuture future = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug(msg.toString()); } }); } }) .connect(new InetSocketAddress("localhost", 8282)); Channel channel = future.sync().channel();
while(true){ Scanner scanner = new Scanner(System.in); String s = scanner.nextLine(); if(s.equals("q")){ break; }else{ channel.writeAndFlush(s); } } ChannelFuture channelFuture = channel.close(); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { group.shutdownGracefully(); } });
|
粘包半包
原因
- 本质上是因为TCP是流式协议,消息无边界。
- 为了减少传输中的消耗,需要用算法(nagle算法,尽可能多的传输数据)进行处理而导致粘包,因为TCP传输时需要给报文加上若干字节的头部信息,即使是1个字节的信息。
- MSS限制,当发送的数据超过MSS限制后,会将数据切分发送,造成半包。
- 应用层:接收方的ByteBuf设置的太大太小。
滑动窗口
- 设置一个滑动窗口大小,在窗口以内的数据可以不等待服务器响应的结果就可以直接发送。在应答未到达之前,窗口必须停止滑动。接收方也要维护一个窗口,只有落在窗口内的数据才能被接受。
- 滑动窗口起着缓冲区的作用和流量控制的作用。
解决
- 短连接,发送了完整数据后就断开连接。将连接建立到连接断开作为消息边界。能解决粘包但不能解决半包,并且效率低。
- 定长解码器,FixedLengthFrameDecoder(和客户端约定的长度)。会造成很多内存的浪费。
- 分隔符来界定消息的边界,LineBasedFrameDecoder(maxLength),用\n和\r\n定界。DelimiterBasedFrameDecoder自定义分隔符。底层实现还是一个个字节去找分隔符,效率较低。
- 基于长度字段的LTC解码器。LengthFieldBasedFrameDecoder(最大值,长度偏移量,长度字段的长度,跳过多少字节,去掉头部多少字节)。
协议的设计与解析
redis协议
1 2 3 4 5 6 7 8 9 10 11 12
| final static byte[] LINE = {13,10}; public static void send(ByteBuf buf,String command){ String[] words = command.split(" "); buf.writeBytes(("*"+words.length).getBytes()); buf.writeBytes(LINE); for(String word:words){ buf.writeBytes(("$"+word.length()).getBytes()); buf.writeBytes(LINE); buf.writeBytes(word.getBytes()); buf.writeBytes(LINE); } }
|
http协议
自定义协议
要素
- 魔数: 用来第一时间判定是否为无效数据包。
- 版本号: 可以支持协议的升级
- 序列化算法: 消息正文到底采用哪种序列化反序列化方式,如json,protobuf。
- 指令类型:登陆,注册,单聊,群聊等业务相关。
- 请求序号:为了双工通信,提供异步能力。
- 正文长度。
- 消息正文:Json,xml,对象流等。