Appearance
Netty - 06 补充知识
本文介绍一些补充知识,例如如何处理客户端退出、空闲检测与心跳机制、HTTP请求头与请求体合并。
如何处理客户端退出
客户端退出分为两种:正常退出和异常退出
- 正常退出:客户端调用了
channel.close()方法,在服务端会触发入站处理器中的channelInactive()方法; - 异常退出:客户端异常退出(如网络中断、进程崩溃)时,服务端会捕获异常,并触发入站处理器中的
exceptionCaught()方法; 所以我们可以定义一个退出处理器,实现上述两种方法:
java
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端正常退出");
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("客户端异常退出: " + cause.getMessage());
ctx.close();
}
}在服务端流水线添加上述处理器:
java
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new QuitHandler());
}
})
.bind(8888);
}java
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = null;
try {
channel = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) // 设置超时时间为1秒
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.connect(new InetSocketAddress("127.0.0.1", 8888))
.sync()
.channel();
}catch (Exception e){
log.error(e.getMessage(), e);
group.shutdownGracefully();
}
if(channel != null && channel.isActive()) {
Scanner scanner = new Scanner(System.in);
while (true) {
String input = scanner.nextLine();
if ("q".equals(input)) {
channel.close().sync();
group.shutdownGracefully();
break;
}
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(input.getBytes(StandardCharsets.UTF_8));
channel.writeAndFlush(buffer);
}
}
}分两次测试,第一次在客户端输入q触发正常退出,第二次直接关闭客户端,测试结果如下
txt
客户端正常退出
客户端异常退出: Connection reset
客户端正常退出可以发现,在异常退出时,调用ctx.close()会再次触发正常退出。
2. 空闲检测与心跳机制
连接假死通常指的是客户端和服务端之间的连接看似正常(TCP 连接仍然存在),但实际上无法正常通信,表现为数据无法发送或接收。
可能原因:
网络超时或丢包
如果网络存在高丢包率或延迟,数据包可能无法及时到达,Netty 的事件循环可能感知不到连接已失效,导致假死。
TCP 连接未正确关闭
如果客户端或服务端所在的操作系统、网络设备(如防火墙、路由器)未正确处理 TCP 连接的关闭,可能会导致连接处于“半死”状态。例如,客户端进程崩溃但未发送 FIN 包,服务端仍在等待数据。
事件循环线程阻塞
Netty 使用
EventLoop线程处理 I/O 事件。如果业务逻辑中存在耗时操作(例如阻塞 I/O、复杂计算)且未正确交给其他线程池处理,会导致EventLoop线程阻塞,无法及时处理网络事件,从而表现为连接假死。
我们可是使用IdleStateHandler进行空闲检测,构造函数如下:
java
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)- readerIdleTime: 读空闲时间(单位由 unit 指定)。如果在这个时间内没有收到任何数据(即没有读事件),会触发
READER_IDLE状态的事件。 - writerIdleTime: 写空闲时间。如果在这个时间内没有发送任何数据(即没有写事件),会触发
WRITER_IDLE状态的事件。 - allIdleTime: 读写空闲时间。如果在这个时间内既没有读事件也没有写事件,会触发
ALL_IDLE状态的事件。 - unit: 时间单位,例如
TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)等。
注意: 如果某个参数设置为 0,则表示禁用对应的空闲检测。例如,new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS) 只检测读空闲。
然后,自定义处理器,处理userEventTriggered()事件:
java
class ServerIdleHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE:
System.out.println("客户端 " + ctx.channel().remoteAddress() + " 5 秒内未发送数据,关闭连接");
ctx.close();
break;
case WRITER_IDLE:
System.out.println("服务端 5 秒内未发送数据");
break;
case ALL_IDLE:
System.out.println("读写均空闲");
break;
}
}
}
}之后将上面两个处理器加入到流水线:
java
@Slf4j
public class Server {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new QuitHandler());
nioSocketChannel.pipeline().addLast(new IdleStateHandler(5,0,0));
nioSocketChannel.pipeline().addLast(new ServerIdleHandler());
}
})
.bind(8888);
}
}启动客户端但是不发送数据,结果如下:
txt
客户端 /127.0.0.1:9484 5秒内未发送数据,关闭连接
客户端正常退出在客户端,为了保活,同样可以使用IdleStateHandler监听写事件,当一定时间内没有发送数据,则自动发送一次消息:
java
@Slf4j
public class Client {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = null;
try {
channel = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) // 设置超时时间为1秒
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 3秒内没有发送数据,触发 IdleStateEvent.WRITER_IDLE事件
nioSocketChannel.pipeline().addLast(new IdleStateHandler(0,3,0, TimeUnit.SECONDS));
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case WRITER_IDLE:
System.out.println("客户端3秒内没发送数据,自动发送一个心跳包");
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("ping".getBytes(StandardCharsets.UTF_8));
ctx.channel().writeAndFlush(buffer);
break;
}
}
}
});
}
})
.connect(new InetSocketAddress("127.0.0.1", 8888))
.sync()
.channel();
}catch (Exception e){
log.error(e.getMessage(), e);
group.shutdownGracefully();
}
if(channel != null && channel.isActive()) {
Scanner scanner = new Scanner(System.in);
while (true) {
String input = scanner.nextLine();
if ("q".equals(input)) {
channel.close().sync();
group.shutdownGracefully();
break;
}
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(input.getBytes(StandardCharsets.UTF_8));
channel.writeAndFlush(buffer);
}
}
}
}3. HTTP请求头与请求体合并
Netty为我们提供了HttpServerCodec处理器,可以方便轻松地处理HTTP请求,但是该处理器会把请求分为HttpRequest和HttpResponse,对于使用上有所不便。为了将请求头和请求体合并起来,我们可以使用HttpObjectAggregator处理器:
java
nioSocketChannel.pipeline().addLast(new HttpServerCodec());
nioSocketChannel.pipeline().addLast(new HttpObjectAggregator(65536));HttpObjectAggregator会自动将HttpRequest和HttpContent合并为一个FullHttpRequest。在
HttpObjectAggregator(int maxContentLength)构造方法中参数maxContentLength指定最大允许的请求体长度(字节),超过则会抛出异常。这种方式适用于请求体较小、不需要分块处理的场景。
之后,我们可以实现入站处理器,处理FullHttpRequest实体:
java
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
// FullHttpRequest 包含完整的请求(头部 + 体)
System.out.println("请求方法: " + fullHttpRequest.method());
System.out.println("请求 URI: " + fullHttpRequest.uri());
String body = fullHttpRequest.content().toString(StandardCharsets.UTF_8);
System.out.println("请求体: " + body);
// 构造响应
FullHttpResponse response = new DefaultFullHttpResponse(
fullHttpRequest.protocolVersion(),
HttpResponseStatus.OK,
Unpooled.copiedBuffer("请求已处理", StandardCharsets.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().array().length);
channelHandlerContext.writeAndFlush(response);
}
}然后在服务端使用:
java
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup businessGroup = new NioEventLoopGroup();
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new QuitHandler());
nioSocketChannel.pipeline().addLast(new IdleStateHandler(5,0,0));
nioSocketChannel.pipeline().addLast(new ServerIdleHandler());
nioSocketChannel.pipeline().addLast(new HttpServerCodec());
nioSocketChannel.pipeline().addLast(new HttpObjectAggregator(65536)); // 最大聚合内容长度
nioSocketChannel.pipeline().addLast(new HttpRequestHandler());
}
})
.bind(8888);
}使用HttpObjectAggregator只适合请求体较小、不需要分块处理,如果对于大文件上传,则不适用,应该如何处理呢?