netty客户端启动源码分析

一个使用netty创建的Echo客户端的启动程序的源码分析

源码启动程序

先把客户端的代码列上,不懂的可以先看netty学习

1
2
3
4
5
6
7
8
9
10
11
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
ChannelFuture f = bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("127.0.0.1", 8082))
.handler(new ChannelInitializer() {

protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new MyChannelHandler());
}
}).connect().sync();
f.channel().closeFuture().sync();

配置信息

客户端启动必需的四个参数信息

  • group 用来管理多线程和进行io操作
  • channel 用来指定channel的创建工厂
  • handler 用来指定数据的处理器
  • remoteAdderss 远程连接的地址信息,包含一个ip和端口号信息

这参数都只是简单的进行的赋值操作,没有太复杂的逻辑,可以自己看下源码,这里我们重点分析connect方法。

连接

connect

同样的我们先上代码,根据代码进行分析

1
2
3
4
5
6
7
// 校验 group 、 channelFactory、handler 是否为空
validate();
SocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress not set");
}
return doResolveAndConnect(remoteAddress, config.localAddress());

connect方法先是校验了必须的参数信息是否为空,紧接着就调用了真正的处理连接方法doResolveAndConnect。传入的两个参数,一个是远程的地址信息,一个是本地的地址信息,此时本地的地址信息为空,没有数据。

doResolveAndConnect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}

initAndRegister 方法,调用channelFactory创建了一个Channel实例对象channel,并进行了初始化操作。在初始化的过程中,把handler添加到了channel的ChannelPipeline中。

代码:

1
2
3
4
5
6
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());

//.. 下面省略了一些指定属性的操作
}

此外,还把channel注册到了一个EventLoop实例对象了上,我们看下完整的initAndRegister方法的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//... 此处省略了代码
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//获取了我们配置的group方法,并调用了注册方法,内部是获取了一个EventLoop并注册
ChannelFuture regFuture = config().group().register(channel);
//... 此处省略了代码
return regFuture;
}

初始化完毕之后,我们获得了一个Futrue对象,通过判断isDone走了两个不同的分支,通过调试代码,我知道它走的是else分支的代码。

在分支代码中,创建了一个PendingRegistrationPromise的实例对象promise,这个对象是我们最后的返回值。

同时,对regFuture添加了一个监听器,在监听器的操作完成的方法中,我们设置的promise的注册值,同时调用了doResolveAndConnect0方法,doResolveAndConnect0方法中包含真正的连接操作。

在看这块代码的时候,刚开始有一个疑问就是先定义了PendingRegistrationPromise,后添加了监听器,如果已经结束了通知了所有的监听器,不是就没有作用了吗?感觉自己有点蠢,让我们直接来看一下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// DefaultPromise 类中的实现
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");

synchronized (this) {
addListener0(listener);
}

if (isDone()) {
notifyListeners();
}

return this;
}

可以看到,在添加监听器的时候,就会直接判断如果已经结束了,就会直接调用通知监听器方法。

接下来我们看下doResolveAndConnect0方法,我们直接上源码分析下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}

可以看到,我们向EventLoop提交了一个任务,这个任务的内容就是跟远程服务器创建连接。我们知道,我们并没有给本地连接赋值,所以肯定会走两个参数的connect方法。我们再去看该方法的实现。

1
2
3
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}

我们可以看到,是调用了DefaultChannelPipeline的connect方法。其内部实现有点复杂,我这里就截取部分的代码来说明原理。

在pipeline内部是调用了tail的connect方法,tail 是一个ChannelHandlerContext对象,这个connect方法使用的是AbstractChannelHandlerContext的实现,在该实现中是通过tail找到一个AbstractChannelHandlerContext为next,调用了next的invokeConnect方法。注意此处,如果是同一个线程之内,就会立即执行,如果不是,就提交一个任务到EventExecutor,也就是EventLoop。

1
2
3
4
5
6
7
8
9
10
11
12
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
// 1
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}

在invokeConnect中会调用handler方法,获取到一个ChannelOutboundHandler类型的Handler实例对象,这里获取到的是DefaultChannelPipeline类内的一个内部类HeadContext。HeadContext继承了AbstractChannelHandlerContext,也实现了ChannelOutboundHandler, ChannelInboundHandler。connect方法内部是掉用了属性unsafe的connect方法。unsafe 是一个NioSocketChannel的一个内部类NioSocketChannelUnsafe。真正的实现是unsafe父类AbstractNioUnsafe的connect方法,此处确实是有点绕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}

boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
//... 省略
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}