Netty入门教程:Netty拆包粘包技术讲解
Netty编解码技术是什么意思呢?所谓的编解码技术,说白了就是java序列化技术。序列化有两个目的:
1、进行网络传输
2、对象持久化
虽然我们可以使用java进行序列化,Netty去传输。但是java序列化的硬伤太多,比如java的序列化无法跨平台、序列化后码流太大、序列化性能非常低等等…
码流太大是什么意思呢?比如说原先的我一篇文档,比如说大小是1M,序列化完了之后可能0.5M,序列化减少二分之一的码,比较大。然后0.5M去网络传输这个不太好。你比如说用其它的一些主流序列化的话可能就0.01M,非常小。性能非常好。
性能太低就是说,我用java序列化的过程可能需要10s,而用其它的高性能序列化可能0.1s。差距就是这么的大。
序列化的目的无非就是网络传输。而目前主流的序列化框架有以下几种:
1、JBoss的Marshalling
2、Google的Protobuf
3、基于Protobuf的Kyro
4、MessagePack框架
其实我们主要是讲Marshalling和Google的Protobuf。这两个是业界非常好用的框架。其中JBoss的Marshalling速度还要比Google的Protobuf要快,原因是因为Marshalling不是跨语言,两端都是java与java之间相互传输的。因此,在这种情况下我们就用它就行了。但如果你想实现跨语言,比如这边是c#,另一边是java。这种跨语言进行通信传输的话,那你就需要用到Google的Protobuf来进行跨语言的传输。性能也非常高。而且它自己有一些大端小端的优化机制。
下面开始Marshalling编码实现。
首先新建一个java工程,导入netty和jboss-marshalling的jar包,导入几张图片到sources文件夹以便测试。
新建一个Req类,并编写相关代码
1 package com.it448.serial; 2 3 import java.io.Serializable; 4 5 public class Req implements Serializable{ 6 private static final long serialVersionUID = 1L; 7 8 private String id ; 9 private String name ; 10 private String requestMessage ; 11 private byte[] attachment; 12 13 public String getId() { 14 return id; 15 } 16 public void setId(String id) { 17 this.id = id; 18 } 19 public String getName() { 20 return name; 21 } 22 public void setName(String name) { 23 this.name = name; 24 } 25 public String getRequestMessage() { 26 return requestMessage; 27 } 28 public void setRequestMessage(String requestMessage) { 29 this.requestMessage = requestMessage; 30 } 31 public byte[] getAttachment() { 32 return attachment; 33 } 34 public void setAttachment(byte[] attachment) { 35 this.attachment = attachment; 36 } 37 }
新建一个Resp类,并编写相关代码
1 package com.it448.serial; 2 3 import java.io.Serializable; 4 5 public class Resp implements Serializable{ 6 7 private static final long serialVersionUID = 1L; 8 9 private String id; 10 private String name; 11 private String responseMessage; 12 13 public String getId() { 14 return id; 15 } 16 public void setId(String id) { 17 this.id = id; 18 } 19 public String getName() { 20 return name; 21 } 22 public void setName(String name) { 23 this.name = name; 24 } 25 public String getResponseMessage() { 26 return responseMessage; 27 } 28 public void setResponseMessage(String responseMessage) { 29 this.responseMessage = responseMessage; 30 } 31 }
新建一个工具类GzipUtils,方便调用
1 package com.it448.utils; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ByteArrayOutputStream; 5 import java.io.File; 6 import java.io.FileInputStream; 7 import java.io.FileOutputStream; 8 import java.util.zip.GZIPInputStream; 9 import java.util.zip.GZIPOutputStream; 10 11 public class GzipUtils { 12 public static byte[] gzip(byte[] data) throws Exception{ 13 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 14 GZIPOutputStream gzip = new GZIPOutputStream(bos); 15 gzip.write(data); 16 gzip.finish(); 17 gzip.close(); 18 byte[] ret = bos.toByteArray(); 19 bos.close(); 20 return ret; 21 } 22 23 public static byte[] ungzip(byte[] data) throws Exception{ 24 ByteArrayInputStream bis = new ByteArrayInputStream(data); 25 GZIPInputStream gzip = new GZIPInputStream(bis); 26 byte[] buf = new byte[1024]; 27 int num = -1; 28 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 29 while((num = gzip.read(buf, 0 , buf.length)) != -1 ){ 30 bos.write(buf, 0, num); 31 } 32 gzip.close(); 33 bis.close(); 34 byte[] ret = bos.toByteArray(); 35 bos.flush(); 36 bos.close(); 37 return ret; 38 } 39 40 public static void main(String[] args) throws Exception{ 41 42 // 读取文件 43 String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "006.jpg"; 44 File file = new File(readPath); 45 FileInputStream in = new FileInputStream(file); 46 byte[] data = new byte[in.available()]; 47 in.read(data); 48 in.close(); 49 50 System.out.println("文件原始大小:" + data.length); 51 // 测试压缩 52 53 byte[] ret1 = GzipUtils.gzip(data); 54 System.out.println("压缩之后大小:" + ret1.length); 55 56 byte[] ret2 = GzipUtils.ungzip(ret1); 57 System.out.println("还原之后大小:" + ret2.length); 58 59 // 写出文件 60 String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "006.jpg"; 61 FileOutputStream fos = new FileOutputStream(writePath); 62 fos.write(ret2); 63 fos.close(); 64 } 65 }
新建一个Marshalling工厂类MarshallingCodeCFactory.java
1 package com.it448.serial; 2 3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; 5 import io.netty.handler.codec.marshalling.MarshallerProvider; 6 import io.netty.handler.codec.marshalling.MarshallingDecoder; 7 import io.netty.handler.codec.marshalling.MarshallingEncoder; 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider; 9 10 import org.jboss.marshalling.MarshallerFactory; 11 import org.jboss.marshalling.Marshalling; 12 import org.jboss.marshalling.MarshallingConfiguration; 13 14 /** 15 * Marshalling工厂 16 * @author(xyh) 17 * @since 2019-06-12 18 */ 19 public final class MarshallingCodeCFactory { 20 21 /** 22 * 创建Jboss Marshalling解码器MarshallingDecoder 23 * @return MarshallingDecoder 24 */ 25 public static MarshallingDecoder buildMarshallingDecoder() { 26 // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 27 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); 28 // 创建了MarshallingConfiguration对象,配置了版本号为5 29 final MarshallingConfiguration configuration = new MarshallingConfiguration(); 30 configuration.setVersion(5); 31 // 根据marshallerFactory和configuration创建provider 32 UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); 33 // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 34 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); 35 return decoder; 36 } 37 38 /** 39 * 创建Jboss Marshalling编码器MarshallingEncoder 40 * @return MarshallingEncoder 41 */ 42 public static MarshallingEncoder buildMarshallingEncoder() { 43 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); 44 final MarshallingConfiguration configuration = new MarshallingConfiguration(); 45 configuration.setVersion(5); 46 MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); 47 // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 48 MarshallingEncoder encoder = new MarshallingEncoder(provider); 49 return encoder; 50 } 51 }
新建一个服务端的Handler类ServerHandler.java
1 package com.it448.serial; 2 3 import java.io.File; 4 import java.io.FileOutputStream; 5 6 import com.it448.utils.GzipUtils; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelInboundHandlerAdapter; 9 10 public class ServerHandler extends ChannelInboundHandlerAdapter{ 11 @Override 12 public void channelActive(ChannelHandlerContext ctx) throws Exception { 13 } 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 17 Req req = (Req)msg; 18 System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage()); 19 byte[] attachment = GzipUtils.ungzip(req.getAttachment()); 20 21 String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg"; 22 FileOutputStream fos = new FileOutputStream(path); 23 fos.write(attachment); 24 fos.close(); 25 26 Resp resp = new Resp(); 27 resp.setId(req.getId()); 28 resp.setName("resp" + req.getId()); 29 resp.setResponseMessage("响应内容" + req.getId()); 30 ctx.writeAndFlush(resp); 31 } 32 33 @Override 34 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 35 } 36 37 @Override 38 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 39 ctx.close(); 40 } 41 }
新建一个服务端类Server.java
1 package com.it448.serial; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.logging.LogLevel; 12 import io.netty.handler.logging.LoggingHandler; 13 14 public class Server { 15 public static void main(String[] args) throws Exception{ 16 EventLoopGroup pGroup = new NioEventLoopGroup(); 17 EventLoopGroup cGroup = new NioEventLoopGroup(); 18 19 ServerBootstrap b = new ServerBootstrap(); 20 b.group(pGroup, cGroup) 21 .channel(NioServerSocketChannel.class) 22 .option(ChannelOption.SO_BACKLOG, 1024) 23 // 设置日志 24 .handler(new LoggingHandler(LogLevel.INFO)) 25 .childHandler(new ChannelInitializer<SocketChannel>() { 26 protected void initChannel(SocketChannel sc) throws Exception { 27 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); 28 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); 29 sc.pipeline().addLast(new ServerHandler()); 30 } 31 }); 32 33 ChannelFuture cf = b.bind(8765).sync(); 34 35 cf.channel().closeFuture().sync(); 36 pGroup.shutdownGracefully(); 37 cGroup.shutdownGracefully(); 38 } 39 }
新建一个客户端Handler类ClientHandler.java
1 package com.it448.serial; 2 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 import io.netty.util.ReferenceCountUtil; 6 7 public class ClientHandler extends ChannelInboundHandlerAdapter{ 8 @Override 9 public void channelActive(ChannelHandlerContext ctx) throws Exception { 10 } 11 12 @Override 13 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 14 try { 15 Resp resp = (Resp)msg; 16 System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); 17 } finally { 18 ReferenceCountUtil.release(msg); 19 } 20 } 21 22 @Override 23 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 24 } 25 26 @Override 27 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 28 ctx.close(); 29 } 30 }
新建一个客户端类Client.java
1 package com.it448.serial; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioSocketChannel; 10 11 import java.io.File; 12 import java.io.FileInputStream; 13 14 import com.it448.utils.GzipUtils; 15 16 public class Client { 17 public static void main(String[] args) throws Exception{ 18 EventLoopGroup group = new NioEventLoopGroup(); 19 Bootstrap b = new Bootstrap(); 20 b.group(group) 21 .channel(NioSocketChannel.class) 22 .handler(new ChannelInitializer<SocketChannel>() { 23 @Override 24 protected void initChannel(SocketChannel sc) throws Exception { 25 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); 26 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); 27 sc.pipeline().addLast(new ClientHandler()); 28 } 29 }); 30 31 ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); 32 33 for(int i = 0; i < 1000; i++ ){ 34 Req req = new Req(); 35 req.setId("" + i); 36 req.setName("pro" + i); 37 req.setRequestMessage("数据信息" + i); 38 String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg"; 39 File file = new File(path); 40 FileInputStream in = new FileInputStream(file); 41 byte[] data = new byte[in.available()]; 42 in.read(data); 43 in.close(); 44 req.setAttachment(GzipUtils.gzip(data)); 45 cf.channel().writeAndFlush(req); 46 } 47 48 cf.channel().closeFuture().sync(); 49 group.shutdownGracefully(); 50 } 51 }
代码测试
首先启动服务端,也就是运行Server类的main方法。
然后启用客户端,也就是运行Client类的main方法。
测试结果
从图中可以看到,receive文件夹多了一张001.jpg的图片。说明图片已经传输过来了。
好了,这部分内容就讲到这里,送上今天的福利:三套Netty系列教程【价值600】,加wxhaox就可以领取。当然了,对应netty有任何疑问也都可以咨询!!
end — 1560313059
– 学而不思则罔,思而不学则殆
相关文章