netty 学习

netty 是一款高性能的java 网络应用框架,好多RPC框架也都是基于netty进行开发的,比如dubbo。netty中使用了阻塞和非阻塞的api,用户可以根据需求来选择。此外还提供了多种协议的支持,比如:http,ssl,websocket等,用户也可以根据自己的需求,实现自己的协议。

以下内容为我看《Netty 实战》进行总结

组件

Channel

Channel 可以把它当作是一个Socket连接,可以用它来进行读和写操作

生命周期状态:

  • ChannelUnregistered 从EventLoop中取消注册
  • ChannelRegistered 已经注册到EventLoop
  • ChannelActive 已经连接到远程节点,处于活动状态,可以用来发送和接收数据了
  • ChannelInactive 失去远程节点的连接

状态变化流程:ChannelRegistered -> ChannelActive -> ChannelInactive -> ChannelUnregistered

当这些状态发生变化时,会通过ChannelPipeline去调用ChannelHandler中对应的事件

如果你要进行这些状态变化的测试,可以在服务端自定义ChannelHandler,并重写对应的方法,添加日志的打印。此时创建一个客户端并连接到服务器,发送完数据之后,关闭客户端,查看对应的日志信息。

Future

Future 是JDK中内置的一个api,可以用来获取执行的结果,判断是否执行完成,但是是阻塞的。为此,netty提供了ChannelFutrue,它允许你注册一个监听器,在执行完毕时会自动调用监听器,通知结果。

回调

这里说的回调其实就是一个方法,我的理解是它更像一个事件。比如ChannelHandler的channelActive的方法,在一个新的连接被建立时进行回调。

事件和ChannelHandler

ChannelHandler 我认为是内容处理器,所有的接收的数据都由ChannelHandler进行处理,包括出站和入站处理数据。事件也主要指的是ChannelHandler中的各种的方法。ChannelHander在netty中是链形进行处理,如下图所示:

事件流程图

ChannelHandler 生命周期方法:

  • handlerAdded ChannelHandler 添加到 ChannelPipeline 中调用
  • handlerRemoved ChannelHandler 从 ChannelPipeline 中移除
  • exceptionCaught 处理过程中ChannelPipeline中发生错误时调用,如果我们没有处理错误,会一直按照链式来往下进行传递的。如果到最后也没有处理,netty会进行日志的打印。

ChannelPipeline

ChannelPipeline 是 ChannelHandler 的容器,ChannelHandler 在 ChannelPipeline 中是链式存储的,并且被添加到 ChannelPipeline 中时都会创建一个对应的 ChannelHandlerContext 来保存 ChannelHandler 和 ChannelPipeline 的对应关系。我们看下添加一个 ChannelHandler 到 ChannelPipeline 尾部是怎么样的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 创建一个ChannelHandler
ChannelHandler handler = ...;
ChannelPipeline pipeline = ...;
//添加到尾部
pipeline.addLast(handler);

//来看下添加方法具体的实现
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 这里创建了一个 ChannelHandlerContext 对象
newCtx = newContext(group, filterName(name, handler), handler);
// 添加到 ChannelHandlerContext 链的尾部
addLast0(newCtx);

if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

//来看下 newContext 方法
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// 第一个参数传入了当前对象,也就是ChannelPipeline的一个实例对象
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

// 添加方法
private void addLast0(AbstractChannelHandlerContext newCtx) {
// 这里的 tail 是TailContext 的实例对象,是一个特殊的ChannelHandlerContext,只是用来标记尾部的
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
//

EventLoop

EventLoop 用于处理Channel的生命周期中所发生的事情,也是Netty多线程的核心。EventLoop 同时继承和实现了 AbstractExecutorService 和 ScheduledExecutorService ,是在jdk提供的线程并发的基础之上提供的多线程。

Channel 创建之后就会注册到一个EventLoop中,一个EventLoop可能会有多个Channel。

演示

我们是用netty来创建一个echo服务端和客户端,在客户端发送信息到服务端。因为是演示代码,对异常就不进行处理了,直接抛出

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class EchoServer {

public static void main(String[] args) throws Exception {
//创建NioEventLoopGroup,可以认为是线程池
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//启动引导类
ServerBootstrap bootstrap = new ServerBootstrap();
//配置服务端信息
ChannelFuture f = bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).childHandler(
new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
System.out.println(ch.pipeline().getClass());
ch.pipeline().addLast(new MyHandler());
}
}
).bind(8082).sync();
//关闭
f.channel().closeFuture().sync();
} finally {
//释放资源
eventLoopGroup.shutdownGracefully().sync();
}

}
//自定义handler,收到客户端信息的时候,返回固定的信息
static class MyHandler extends SimpleChannelInboundHandler<ByteBuf> {

protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("hello,this is server !", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}

}
}

group 方法,用来配置线程池。

channel 方法用来指定我们需要使用的Socket

childHandler 用户指定处理客户端Channel 的Handler。需要注意的是handler方法。服务端的Channel在接收到客户端连接之后会创建一个子channel,我们指定的handler就是处理客户端的channel。而handler方法指定的是处理服务端Channel的。
大家可以看到,在调用childHandler方法的时候,我们并不是直接使用我们自定义的MyHandler,而是使用了ChannelInitializer,这是因为当需要有指定多个hanlder的时候,多次调用childHandler会比较麻烦,可以直接在ChannelInitializer的初始化方法中指定多个。

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

public class EchoClient {

public static void main(String[] args) throws Exception {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();

ChannelFuture f = bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("127.0.0.1", 8082))
.handler(new ChannelInitializer() {

protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new MyChannelHandler());
}
}).connect().sync();
f.channel().closeFuture().sync();


}

static class MyChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {

//表示连接已经建立,可以发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ByteBuf buf = Unpooled.copiedBuffer("hello ,this is echo client", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
//接收到服务端的信息,并打印
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client received :" + msg.toString(CharsetUtil.UTF_8));

}
}
}

客户端的建立跟服务端代码基本是一致的,就是可能使用的引导类不一样,需要指定远程的ip地址信息。

运行代码

运行程序之后,我们能看到客户端控制台打印出了如下的信息,说明我们已经成功接收到服务端的信息。

1
client received :hello,this is server !

小知识点

  • 需要注意的是,channel 和 ChannelPipeline 调用write方法,事件流会通过整个ChannelPipeline,但是如果使用 ChannelHandlerContext 进行操作,只会从当前节点向后传递,之前的节点就不会再次经过了。