Netty入门教程:Netty拆包粘包技术讲解

2019-08-08 00:00:00 技术 讲解 入门教程

Netty编解码技术是什么意思呢?所谓的编解码技术,说白了就是java序列化技术。序列化有两个目的:

1、进行网络传输
2、对象持久化

虽然我们可以使用java进行序列化,Netty去传输。但是java序列化的硬伤太多,比如java的序列化无法跨平台、序列化后码流太大、序列化性能非常低等等…

码流太大是什么意思呢?比如说原先的我一篇文档,比如说大小是1M,序列化完了之后可能0.5M,序列化减少二分之一的码,比较大。然后0.5M去网络传输这个不太好。你比如说用其它的一些主流序列化的话可能就0.01M,非常小。性能非常好。

性能太低就是说,我用java序列化的过程可能需要10s,而用其它的高性能序列化可能0.1s。差距就是这么的大。

序列化的目的无非就是网络传输。而目前主流的序列化框架有以下几种:

1、JBoss的Marshalling
2、Google的Protobuf
3、基于Protobuf的Kyro
4、MessagePack框架

其实我们主要是讲Marshalling和Google的Protobuf。这两个是业界非常好用的框架。其中JBoss的Marshalling速度还要比Google的Protobuf要快,原因是因为Marshalling不是跨语言,两端都是java与java之间相互传输的。因此,在这种情况下我们就用它就行了。但如果你想实现跨语言,比如这边是c#,另一边是java。这种跨语言进行通信传输的话,那你就需要用到Google的Protobuf来进行跨语言的传输。性能也非常高。而且它自己有一些大端小端的优化机制。

下面开始Marshalling编码实现。

首先新建一个java工程,导入netty和jboss-marshalling的jar包,导入几张图片到sources文件夹以便测试。

《Netty入门教程:Netty拆包粘包技术讲解》

 

新建一个Req类,并编写相关代码

 1 package com.it448.serial;
 2 
 3 import java.io.Serializable;
 4 
 5 public class Req implements Serializable{
 6     private static final long serialVersionUID = 1L;
 7     
 8     private String id ;
 9     private String name ;
10     private String requestMessage ;
11     private byte[] attachment;
12     
13     public String getId() {
14         return id;
15     }
16     public void setId(String id) {
17         this.id = id;
18     }
19     public String getName() {
20         return name;
21     }
22     public void setName(String name) {
23         this.name = name;
24     }
25     public String getRequestMessage() {
26         return requestMessage;
27     }
28     public void setRequestMessage(String requestMessage) {
29         this.requestMessage = requestMessage;
30     }
31     public byte[] getAttachment() {
32         return attachment;
33     }
34     public void setAttachment(byte[] attachment) {
35         this.attachment = attachment;
36     }
37 }

 

新建一个Resp类,并编写相关代码

 1 package com.it448.serial;
 2 
 3 import java.io.Serializable;
 4 
 5 public class Resp implements Serializable{
 6     
 7     private static final long serialVersionUID = 1L;
 8     
 9     private String id;
10     private String name;
11     private String responseMessage;
12     
13     public String getId() {
14         return id;
15     }
16     public void setId(String id) {
17         this.id = id;
18     }
19     public String getName() {
20         return name;
21     }
22     public void setName(String name) {
23         this.name = name;
24     }
25     public String getResponseMessage() {
26         return responseMessage;
27     }
28     public void setResponseMessage(String responseMessage) {
29         this.responseMessage = responseMessage;
30     }
31 }

 

新建一个工具类GzipUtils,方便调用

 1 package com.it448.utils;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.File;
 6 import java.io.FileInputStream;
 7 import java.io.FileOutputStream;
 8 import java.util.zip.GZIPInputStream;
 9 import java.util.zip.GZIPOutputStream;
10 
11 public class GzipUtils {
12     public static byte[] gzip(byte[] data) throws Exception{
13         ByteArrayOutputStream bos = new ByteArrayOutputStream();
14         GZIPOutputStream gzip = new GZIPOutputStream(bos);
15         gzip.write(data);
16         gzip.finish();
17         gzip.close();
18         byte[] ret = bos.toByteArray();
19         bos.close();
20         return ret;
21     }
22     
23     public static byte[] ungzip(byte[] data) throws Exception{
24         ByteArrayInputStream bis = new ByteArrayInputStream(data);
25         GZIPInputStream gzip = new GZIPInputStream(bis);
26         byte[] buf = new byte[1024];
27         int num = -1;
28         ByteArrayOutputStream bos = new ByteArrayOutputStream();
29         while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
30             bos.write(buf, 0, num);
31         }
32         gzip.close();
33         bis.close();
34         byte[] ret = bos.toByteArray();
35         bos.flush();
36         bos.close();
37         return ret;
38     }
39     
40     public static void main(String[] args) throws Exception{
41         
42         // 读取文件
43         String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "006.jpg";
44         File file = new File(readPath);  
45         FileInputStream in = new FileInputStream(file);  
46         byte[] data = new byte[in.available()];  
47         in.read(data);  
48         in.close();  
49         
50         System.out.println("文件原始大小:" + data.length);
51         // 测试压缩
52         
53         byte[] ret1 = GzipUtils.gzip(data);
54         System.out.println("压缩之后大小:" + ret1.length);
55         
56         byte[] ret2 = GzipUtils.ungzip(ret1);
57         System.out.println("还原之后大小:" + ret2.length);
58         
59         // 写出文件
60         String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "006.jpg";
61         FileOutputStream fos = new FileOutputStream(writePath);
62         fos.write(ret2);
63         fos.close();    
64     }   
65 }

 

新建一个Marshalling工厂类MarshallingCodeCFactory.java

 1 package com.it448.serial;
 2 
 3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
 5 import io.netty.handler.codec.marshalling.MarshallerProvider;
 6 import io.netty.handler.codec.marshalling.MarshallingDecoder;
 7 import io.netty.handler.codec.marshalling.MarshallingEncoder;
 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider;
 9 
10 import org.jboss.marshalling.MarshallerFactory;
11 import org.jboss.marshalling.Marshalling;
12 import org.jboss.marshalling.MarshallingConfiguration;
13 
14 /**
15  * Marshalling工厂
16  * @author(xyh)
17  * @since 2019-06-12
18  */
19 public final class MarshallingCodeCFactory {
20 
21     /**
22           * 创建Jboss Marshalling解码器MarshallingDecoder
23      * @return MarshallingDecoder
24      */
25     public static MarshallingDecoder buildMarshallingDecoder() {
26         // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
27         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
28         // 创建了MarshallingConfiguration对象,配置了版本号为5 
29         final MarshallingConfiguration configuration = new MarshallingConfiguration();
30         configuration.setVersion(5);
31         // 根据marshallerFactory和configuration创建provider
32         UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
33         // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
34         MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
35         return decoder;
36     }
37 
38     /**
39           * 创建Jboss Marshalling编码器MarshallingEncoder
40      * @return MarshallingEncoder
41      */
42     public static MarshallingEncoder buildMarshallingEncoder() {
43         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
44         final MarshallingConfiguration configuration = new MarshallingConfiguration();
45         configuration.setVersion(5);
46         MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
47         // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
48         MarshallingEncoder encoder = new MarshallingEncoder(provider);
49         return encoder;
50     }
51 }

 

新建一个服务端的Handler类ServerHandler.java

 1 package com.it448.serial;
 2 
 3 import java.io.File;
 4 import java.io.FileOutputStream;
 5 
 6 import com.it448.utils.GzipUtils;
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelInboundHandlerAdapter;
 9 
10 public class ServerHandler extends ChannelInboundHandlerAdapter{
11     @Override
12     public void channelActive(ChannelHandlerContext ctx) throws Exception {
13     }
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
17         Req req = (Req)msg;
18         System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
19         byte[] attachment = GzipUtils.ungzip(req.getAttachment());
20         
21         String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";
22         FileOutputStream fos = new FileOutputStream(path);
23         fos.write(attachment);
24         fos.close();
25         
26         Resp resp = new Resp();
27         resp.setId(req.getId());
28         resp.setName("resp" + req.getId());
29         resp.setResponseMessage("响应内容" + req.getId());
30         ctx.writeAndFlush(resp);
31     }
32 
33     @Override
34     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
35     }
36 
37     @Override
38     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
39         ctx.close();
40     }    
41 }

 

新建一个服务端类Server.java

 1 package com.it448.serial;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.logging.LogLevel;
12 import io.netty.handler.logging.LoggingHandler;
13 
14 public class Server {
15     public static void main(String[] args) throws Exception{
16         EventLoopGroup pGroup = new NioEventLoopGroup();
17         EventLoopGroup cGroup = new NioEventLoopGroup();
18         
19         ServerBootstrap b = new ServerBootstrap();
20         b.group(pGroup, cGroup)
21          .channel(NioServerSocketChannel.class)
22          .option(ChannelOption.SO_BACKLOG, 1024)
23          // 设置日志
24          .handler(new LoggingHandler(LogLevel.INFO))
25          .childHandler(new ChannelInitializer<SocketChannel>() {
26             protected void initChannel(SocketChannel sc) throws Exception {
27                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
28                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
29                 sc.pipeline().addLast(new ServerHandler());
30             }
31         });
32         
33         ChannelFuture cf = b.bind(8765).sync();
34         
35         cf.channel().closeFuture().sync();
36         pGroup.shutdownGracefully();
37         cGroup.shutdownGracefully();        
38     }
39 }

 

新建一个客户端Handler类ClientHandler.java

 1 package com.it448.serial;
 2 
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 import io.netty.util.ReferenceCountUtil;
 6 
 7 public class ClientHandler extends ChannelInboundHandlerAdapter{
 8     @Override
 9     public void channelActive(ChannelHandlerContext ctx) throws Exception {
10     }
11 
12     @Override
13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
14         try {
15             Resp resp = (Resp)msg;
16             System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
17         } finally {
18             ReferenceCountUtil.release(msg);
19         }
20     }
21 
22     @Override
23     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
24     }
25 
26     @Override
27     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
28         ctx.close();
29     }    
30 }

 

 

新建一个客户端类Client.java

 1 package com.it448.serial;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioSocketChannel;
10 
11 import java.io.File;
12 import java.io.FileInputStream;
13 
14 import com.it448.utils.GzipUtils;
15 
16 public class Client {
17     public static void main(String[] args) throws Exception{
18         EventLoopGroup group = new NioEventLoopGroup();
19         Bootstrap b = new Bootstrap();
20         b.group(group)
21          .channel(NioSocketChannel.class)
22          .handler(new ChannelInitializer<SocketChannel>() {
23             @Override
24             protected void initChannel(SocketChannel sc) throws Exception {
25                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
26                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
27                 sc.pipeline().addLast(new ClientHandler());
28             }
29         });
30         
31         ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
32         
33         for(int i = 0; i < 1000; i++ ){
34             Req req = new Req();
35             req.setId("" + i);
36             req.setName("pro" + i);
37             req.setRequestMessage("数据信息" + i);    
38             String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";
39             File file = new File(path);
40             FileInputStream in = new FileInputStream(file);  
41             byte[] data = new byte[in.available()];  
42             in.read(data);  
43             in.close(); 
44             req.setAttachment(GzipUtils.gzip(data));
45             cf.channel().writeAndFlush(req);
46         }
47 
48         cf.channel().closeFuture().sync();
49         group.shutdownGracefully();
50     }
51 }

 

代码测试

首先启动服务端,也就是运行Server类的main方法。

《Netty入门教程:Netty拆包粘包技术讲解》

 

然后启用客户端,也就是运行Client类的main方法。

《Netty入门教程:Netty拆包粘包技术讲解》

 

测试结果

《Netty入门教程:Netty拆包粘包技术讲解》

 

从图中可以看到,receive文件夹多了一张001.jpg的图片。说明图片已经传输过来了。

好了,这部分内容就讲到这里,送上今天的福利:三套Netty系列教程【价值600】,加wxhaox就可以领取。当然了,对应netty有任何疑问也都可以咨询!!

end — 1560313059

  – 学而不思则罔,思而不学则殆

相关文章