netty 学习
netty 是一款高性能的java 网络应用框架,好多RPC框架也都是基于netty进行开发的,比如dubbo。netty中使用了阻塞和非阻塞的api,用户可以根据需求来选择。此外还提供了多种协议的支持,比如:http,ssl,websocket等,用户也可以根据自己的需求,实现自己的协议。
以下内容为我看《Netty 实战》进行总结
组件
Channel
Channel 可以把它当作是一个Socket连接,可以用它来进行读和写操作
生命周期状态:
- ChannelUnregistered 从EventLoop中取消注册
- ChannelRegistered 已经注册到EventLoop
- ChannelActive 已经连接到远程节点,处于活动状态,可以用来发送和接收数据了
- ChannelInactive 失去远程节点的连接
状态变化流程:ChannelRegistered -> ChannelActive -> ChannelInactive -> ChannelUnregistered
当这些状态发生变化时,会通过ChannelPipeline去调用ChannelHandler中对应的事件
如果你要进行这些状态变化的测试,可以在服务端自定义ChannelHandler,并重写对应的方法,添加日志的打印。此时创建一个客户端并连接到服务器,发送完数据之后,关闭客户端,查看对应的日志信息。
Future
Future 是JDK中内置的一个api,可以用来获取执行的结果,判断是否执行完成,但是是阻塞的。为此,netty提供了ChannelFutrue,它允许你注册一个监听器,在执行完毕时会自动调用监听器,通知结果。
回调
这里说的回调其实就是一个方法,我的理解是它更像一个事件。比如ChannelHandler的channelActive的方法,在一个新的连接被建立时进行回调。
事件和ChannelHandler
ChannelHandler 我认为是内容处理器,所有的接收的数据都由ChannelHandler进行处理,包括出站和入站处理数据。事件也主要指的是ChannelHandler中的各种的方法。ChannelHander在netty中是链形进行处理,如下图所示:
ChannelHandler 生命周期方法:
- handlerAdded ChannelHandler 添加到 ChannelPipeline 中调用
- handlerRemoved ChannelHandler 从 ChannelPipeline 中移除
- exceptionCaught 处理过程中ChannelPipeline中发生错误时调用,如果我们没有处理错误,会一直按照链式来往下进行传递的。如果到最后也没有处理,netty会进行日志的打印。
ChannelPipeline
ChannelPipeline 是 ChannelHandler 的容器,ChannelHandler 在 ChannelPipeline 中是链式存储的,并且被添加到 ChannelPipeline 中时都会创建一个对应的 ChannelHandlerContext 来保存 ChannelHandler 和 ChannelPipeline 的对应关系。我们看下添加一个 ChannelHandler 到 ChannelPipeline 尾部是怎么样的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| ChannelHandler handler = ...; ChannelPipeline pipeline = ...;
pipeline.addLast(handler);
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx);
if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; }
EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
|
EventLoop
EventLoop 用于处理Channel的生命周期中所发生的事情,也是Netty多线程的核心。EventLoop 同时继承和实现了 AbstractExecutorService 和 ScheduledExecutorService ,是在jdk提供的线程并发的基础之上提供的多线程。
Channel 创建之后就会注册到一个EventLoop中,一个EventLoop可能会有多个Channel。
演示
我们是用netty来创建一个echo服务端和客户端,在客户端发送信息到服务端。因为是演示代码,对异常就不进行处理了,直接抛出
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class EchoServer {
public static void main(String[] args) throws Exception { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); ChannelFuture f = bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).childHandler( new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { System.out.println(ch.pipeline().getClass()); ch.pipeline().addLast(new MyHandler()); } } ).bind(8082).sync(); f.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully().sync(); }
} static class MyHandler extends SimpleChannelInboundHandler<ByteBuf> {
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("hello,this is server !", CharsetUtil.UTF_8); ctx.writeAndFlush(buf); }
} }
|
group 方法,用来配置线程池。
channel 方法用来指定我们需要使用的Socket
childHandler 用户指定处理客户端Channel 的Handler。需要注意的是handler方法。服务端的Channel在接收到客户端连接之后会创建一个子channel,我们指定的handler就是处理客户端的channel。而handler方法指定的是处理服务端Channel的。
大家可以看到,在调用childHandler方法的时候,我们并不是直接使用我们自定义的MyHandler
,而是使用了ChannelInitializer
,这是因为当需要有指定多个hanlder的时候,多次调用childHandler会比较麻烦,可以直接在ChannelInitializer
的初始化方法中指定多个。
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class EchoClient {
public static void main(String[] args) throws Exception { NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap();
ChannelFuture f = bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress("127.0.0.1", 8082)) .handler(new ChannelInitializer() {
protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new MyChannelHandler()); } }).connect().sync(); f.channel().closeFuture().sync();
}
static class MyChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ByteBuf buf = Unpooled.copiedBuffer("hello ,this is echo client", CharsetUtil.UTF_8); ctx.writeAndFlush(buf); } protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client received :" + msg.toString(CharsetUtil.UTF_8));
} } }
|
客户端的建立跟服务端代码基本是一致的,就是可能使用的引导类不一样,需要指定远程的ip地址信息。
运行代码
运行程序之后,我们能看到客户端控制台打印出了如下的信息,说明我们已经成功接收到服务端的信息。
1
| client received :hello,this is server !
|
小知识点
- 需要注意的是,channel 和 ChannelPipeline 调用write方法,事件流会通过整个ChannelPipeline,但是如果使用 ChannelHandlerContext 进行操作,只会从当前节点向后传递,之前的节点就不会再次经过了。