微言Netty:手写分布式服务框架
public class NettyServer {
/**
* 服务端带参构造
* @param serverAddress
* @param serviceRegistry
* @param serverBeans
*/
public NettyServer(String serverAddress, ServerRegistry serviceRegistry, Map<String, Object> serverBeans) {
this.serverAddress = serverAddress;
this.serviceRegistry = serviceRegistry;
this.serverBeans = serverBeans;
}
/**
* 日志记录
*/
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
/**
* 服务端绑定地址
*/
private String serverAddress;
/**
* 服务注册
*/
private ServerRegistry serviceRegistry;
/**
* 服务端加载的bean列表
*/
private Map<String, Object> serverBeans;
/**
* 主事件池
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup();
/**
* 副事件池
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* 服务端通道
*/
private Channel serverChannel;
/**
* 绑定本机监听
*
* @throws Exception
*/
public void bind() throws Exception {
//启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//为Acceptor设置事件池,为客户端接收设置事件池
serverBootstrap.group(bossGroup, workerGroup)
//工厂模式,创建NioServerSocketChannel类对象
.channel(NioServerSocketChannel.class)
//等待队列大小
.option(ChannelOption.SO_BACKLOG, 100)
//地址复用
.option(ChannelOption.SO_REUSEADDR, true)
//开启Nagle算法,
//网络好的时候:对响应要求比较高的业务,不建议开启,比如玩游戏,键盘数据,鼠标响应等,需要实时呈现;
// 对响应比较低的业务,建议开启,可以有效减少小数据包传输。
//网络差的时候:不建议开启,否则会导致整体效果更差。
.option(ChannelOption.TCP_NODELAY, true)
//日志记录组件的level
.handler(new LoggingHandler(LogLevel.INFO))
//各种业务处理handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//空闲检测handler,用于检测通道空闲状态
channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120));
//编码器
channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024, 4, 4));
//解码器
channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
//心跳包业务处理,一般需要配置idleStateHandler一起使用
channel.pipeline().addLast("heartBeatHandler", new HeartBeatResponseHandler());
//服务端先进行鉴权,然后处理业务
channel.pipeline().addLast("loginAuthResponseHandler", new LoginAuthResponseHandler());
//业务处理handler
channel.pipeline().addLast("nettyHandler", new ServerHandler(serverBeans));
}
});
//获取ip和端口
String[] array = serverAddress.split(":");
String host = array[];
int port = Integer.parseInt(array[1]);
//绑定端口,同步等待成功
ChannelFuture future = serverBootstrap.bind(host, port).sync();
//注册连接事件监听器
future.addListener(cfl -> {
if (cfl.isSuccess()) {
logger.info("服务端[" + host + ":" + port + "]已上线...");
serverChannel = future.channel();
}
});
//注册关闭事件监听器
future.channel().closeFuture().addListener(cfl -> {
//关闭服务端
close();
logger.info("服务端[" + host + ":" + port + "]已下线...");
});
//注册服务地址
if (serviceRegistry != null) {
serviceRegistry.register(serverBeans.keySet(), host, port);
}
}
/**
* 关闭server
*/
public void close() {
//关闭套接字
if(serverChannel!=null){
serverChannel.close();
}
//关闭主线程组
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
//关闭副线程组
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
public class NettyClient { /** * 日志记录 */ private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); /** * 客户端请求Future列表 */ private Map<String, TinyWhaleFuture> clientFutures = new ConcurrentHashMap<>(); /** * 客户端业务处理handler */ private ClientHandler clientHandler = new ClientHandler(clientFutures); /** * 事件池 */ private EventLoopGroup group = new NioEventLoopGroup(); /** * 启动器 */ private Bootstrap bootstrap = new Bootstrap(); /** * 客户端通道 */ private Channel clientChannel; /** * 客户端连接 * @param host * @param port * @throws InterruptedException */ public NettyClient(String host, int port) throws InterruptedException { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //通道空闲检测 channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120)); //解码器 channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4)); //编码器 channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder()); //心跳处理 channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler()); //业务处理 channel.pipeline().addLast("clientHandler", clientHandler); //鉴权处理 channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler()); } }); //发起同步连接操作 ChannelFuture channelFuture = bootstrap.connect(host, port); //注册连接事件 channelFuture.addListener((ChannelFutureListener)future -> { //如果连接成功 if (future.isSuccess()) { logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接..."); clientChannel = channelFuture.channel(); } //如果连接失败,尝试重新连接 else{ logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]连接失败,重新连接中..."); future.channel().close(); bootstrap.connect(host, port); } }); //注册关闭事件 channelFuture.channel().closeFuture().addListener(cfl -> { close(); logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开..."); }); } /** * 客户端关闭 */ private void close() { //关闭客户端套接字 if(clientChannel!=null){ clientChannel.close(); } //关闭客户端线程组 if (group != null) { group.shutdownGracefully(); } } /** * 客户端发送消息,将获取的Future句柄保存到clientFutures列表 * @return * @throws InterruptedException * @throws ExecutionException */ public TinyWhaleFuture send(NettyMessage<NettyRequest> request) { TinyWhaleFuture rpcFuture = new TinyWhaleFuture(request); rpcFuture.addCallback(new TinyWhaleAsyncCallback() { @Override public void success(Object result) { } @Override public void fail(Exception e) { logger.error("发送失败", e); } }); clientFutures.put(request.getBody().getRequestId(), rpcFuture); clientHandler.sendMessage(request); return rpcFuture; }}
朕已阅
相关文章