Netty简介

Netty是高性能的网络通信框架,底层对Java的NIO进行了封装,特点是简单易用易上手,又能为应用提供高性能的网络通信能力。

在网络通信场景中,我们一般使用Netty进行开发而不是NIO,因为Java的NIO使用非常的复杂,代码不易于维护。并且在网络通信中,我们需要对协议编解码、断线重连、心跳检测等问题进行处理,这些在NIO中都是不具备的,需要我们手动编码实现。

而Netty则对NIO进行了封装,使得我们无需编写复杂的NIO代码,又能实现高性能的网络通信。

Netty中的Reactor线程模型

我们在使用NIO编写网络通信的代码时,需要在while循环中通过Selector调用select()方法监听注册到Selector中的Channel,获取事件就绪的selectionKey,然后进行相应的处理。这个循环监听并处理的过程就是事件循环。而Netty通过EventLoop封装了这个事件循环,我们无需在手动编写这个事件循环的逻辑。

在这里插入图片描述

EventLoop的事件循环中对于监听到的就绪事件的处理,是交给该Channel对应的ChannelPipeline处理的。Netty中的Channel就是对NIO中的Channel进行封装,而ChannelPipeline是Netty中责任链模式的一种实现,ChannelPipeline里面是一个ChannelHandler组成的双向链表,我们可以定制处理Channel就绪事件的ChannelHandler链表,就绪的事件就由这些ChannelHandler按顺序进行处理。

在这里插入图片描述

那么,显然我们利用Netty应用开发时,只需要专注于ChannelHandler的编写,实在是大大简化了网络通信处理的复杂度。

EventLoop并不是直接裸奔的,而是在EventLoopGroup里面。如果把EventLoop视作一个线程,那么EventLoopGroup就相当于是一个EventLoop线程池。一个EventLoopGroup中可以有一个或多个EventLoop。

在这里插入图片描述

比如在主从Reactor模型中,代表主Reactor的EventLoopGroup只需一个EventLoop即可,该EventLoop专门处理连接请求,而代表子Reactor的EventLoopGroup则需要多个EventLoop处理IO请求。主EventLoopGroup的EventLoop对于连接请求的处理,则是由一个特殊的ChannelHandler把通过连接获取到的SocketChannel注册到子EventLoopGroup的其中一个EventLoop上。

在这里插入图片描述

这个特殊的ChannelHandler是Netty内部提供的,并且会自动组装进主EventLoopGroup那边的Channel对应的的ChannelPipeline中,无需我们操心。

EventLoopGroup和EventLoop有许多实现类,我们一般使用的时NioEventLoopGroup和NioEventLoop。然后NioEventLoop中会使用Selector的select()方法进行阻塞监听,当有就绪事件触发时,NioEventLoop会在processSelectedkeys方法中处理就绪事件。

在这里插入图片描述

NioEventLoop内部还有一个TaskQueue,我们可以往里面提交一些异步任务,NioEventLoop一次事件循环中,在processSelectedkeys方法调用完毕之后,会调用runAllTasks方法处理TaskQueue中的异步任务。

在这里插入图片描述

Netty入门案例

有了上面对Netty的宏观理解,再来看代码,就比较好理解了。

服务端代码

首先是服务端的启动类

public class NettyServer {
	public static void main(String[] args) throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//创建服务器端的启动对象
			ServerBootstrap bootstrap = new ServerBootstrap();
			//使用链式编程来配置参数
			bootstrap.group(bossGroup, workerGroup)
			//使用NioServerSocketChannel作为服务器的通道实现
			.channel(NioServerSocketChannel.class) 
			// 初始化服务器连接队列大小
			.option(ChannelOption.SO_BACKLOG, 1024)
			.childHandler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//对workerGroup的SocketChannel设置处理器
					ch.pipeline().addLast(new NettyServerHandler());
				}
			});
			System.out.println("netty server start。。");
			//绑定一个端口, bind是异步操作,sync方法是等待异步操作执行完毕,然后生成了一个ChannelFuture异步对象
			ChannelFuture cf = bootstrap.bind(9000).sync();
			//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭,通过sync方法同步等待通道关闭处理完毕
			cf.channel().closeFuture().sync();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}

bossGroup就是我们上面说的代表主Reactor的EventLoopGroup,而workerGroup就是代表子Reactor的EventLoopGroup。

在这里插入图片描述

ServerBootstrap是服务端的启动引导器,我们通过它来对服务端的Netty程序进行配置。

在这里插入图片描述

我们设置NioServerSocketChannel作为服务端的通道实现,这个是bossGroup中的EventLoop监听的通道,关注的是连接就绪事件,NioServerSocketChannel对应的ChannelPipeline不需要我们进行组装,Netty会把我们上面说的那个处理连接建立事件的特殊ChannelHandler组装进去。

在这里插入图片描述

我们设置了一个ChannelInitializer,这个ChannelInitializer的作用就是当一个新的NioSocketChannel注册到workerGroup中的某个EventLoop时,会通过该ChannelInitializer对新注册的NioSocketChannel对应的ChannelPipeline进行初始化。

在这里插入图片描述

ChannelInitializer中我们设置了NettyServerHandler到ChannelPipeline中,这个NettyServerHandler是我们自定义的ChannelHandler,我们看一下NettyServerHandler里面的代码。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	/**
	 * 读取客户端发送的数据
	 * @param ctx 上下文对象, 含有通道channel,管道pipeline
	 * @param msg 就是客户端发送的数据
	 * @throws Exception
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("服务器读取线程 " + Thread.currentThread().getName());
		ByteBuf buf = (ByteBuf) msg;
		System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
	}

	/**
	 * 数据读取完毕处理方法
	 * @param ctx
	 * @throws Exception
	 */
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
		ctx.writeAndFlush(buf);
	}

	/**
	 * 处理异常, 一般是需要关闭通道
	 * @param ctx
	 * @param cause
	 * @throws Exception
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
}

我们这个NettyServerHandler继承了Netty提供的适配器类ChannelInboundHandlerAdapter,然后实现它的channelRead方法,在channelRead方法中接收的参数msg就是客户端发来的数据,我们就可以在channelRead方法中对客户端发来的数据进行处理。

ChannelInboundHandlerAdapter是专门处理入站事件的适配器类,所谓入站事件就是运动方向是从对方到自己的事件。比如对于服务端来说,连接事件就是入站事件,因为连接是由客户端发起的,读就是事件也是入站事件,因为读取的数据是由对方发送的。

与ChannelInboundHandlerAdapter对应的是ChannelOutboundHandlerAdapter,专门处理出站事件。

入站事件在ChannelPipeline的处理顺序是从链表头部到链表尾部,出站事件在ChannelPipeline中的处理顺序是从链表尾部到链表头部。因此,我们组装workerGroup的ChannelPipeline时要注意ChannelHandler在链表中的排列顺序。

在这里插入图片描述

客户端代码

下面是Netty客户端的启动类。

public class NettyClient {
	public static void main(String[] args) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			//创建客户端启动对象
			Bootstrap bootstrap = new Bootstrap();
			//设置相关参数
			bootstrap.group(group)
			// 使用 NioSocketChannel 作为客户端的通道实现
			.channel(NioSocketChannel.class) 
			.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel channel) throws Exception {
					//加入处理器
					channel.pipeline().addLast(new NettyClientHandler());
				}
			});
			System.out.println("netty client start");
			//连接服务器端
			ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
			//对关闭通道进行监听
			channelFuture.channel().closeFuture().sync();
		} finally {
			group.shutdownGracefully();
		}
	}
}

Netty客户端只需要一个EventLoopGroup即可,因为客户端是请求方,不需要提供服务,因此不需要搞主从Reactor。

Netty服务端使用的是ServerBootstrap作为启动引导器,而Netty客户端使用的是Bootstrap作为启动引导器。

Netty客户端使用的是NioSocketChannel作为通道,该通道中的ChannelPipeline同样是由我们定义的ChannelInitializer进行初始化。

ChannelInitializer中向ChannelPipeline安装一个NettyClientHandler,是我们自己实现的Netty客户端的ChannelHandler。

bootstrap通过connect(“127.0.0.1”, 9000)连接服务端,但是由于connect方法也是一个异步方法,所以还要紧随其后调用sync()方法转成同步。

在这里插入图片描述

我们看看NettyClientHandler的代码。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

	/**
	 * 当客户端连接服务器完成就会触发该方法
	 * @param ctx
	 * @throws Exception
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
		ctx.writeAndFlush(buf);
	}

	/**
	 * 当服务端发送数据给客户端时,通道有读取事件时会触发该方法
	 * @param ctx
	 * @param msg
	 * @throws Exception
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
		System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

NettyClientHandler也是继承了ChannelInboundHandlerAdapter类,重写了channelActive和channelRead两个方法。

channelActive方法表示通道被激活,在channelActive方法中调用ChannelHandlerContext的writeAndFlush方法即可往服务端发送数据,writeAndFlush方法传入的参数是一个ByteBuf对象,我们把需要发送到服务端的数据写入ByteBuf,然后把ByteBuf传递给ChannelHandlerContext的writeAndFlush方法即可。

在这里插入图片描述

channelRead方法是在收到服务端返回的数据时会调用的方法。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部