Skip to content

Netty - 06 补充知识

本文介绍一些补充知识,例如如何处理客户端退出、空闲检测与心跳机制、HTTP请求头与请求体合并。

如何处理客户端退出

客户端退出分为两种:正常退出和异常退出

  • 正常退出:客户端调用了channel.close()方法,在服务端会触发入站处理器中的channelInactive()方法;
  • 异常退出:客户端异常退出(如网络中断、进程崩溃)时,服务端会捕获异常,并触发入站处理器中的exceptionCaught()方法; 所以我们可以定义一个退出处理器,实现上述两种方法:
java
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端正常退出");
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("客户端异常退出: " + cause.getMessage());
        ctx.close();
    }
}

在服务端流水线添加上述处理器:

java
public static void main(String[] args) {

    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();

    new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new QuitHandler());
                }
            })
            .bind(8888);
}
java
public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();

    Channel channel = null;
    try {
        channel = new Bootstrap()
                .group(group)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)  // 设置超时时间为1秒
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    }
                })
                .connect(new InetSocketAddress("127.0.0.1", 8888))
                .sync()
                .channel();
    }catch (Exception e){
        log.error(e.getMessage(), e);
        group.shutdownGracefully();
    }

    if(channel != null && channel.isActive()) {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String input = scanner.nextLine();
            if ("q".equals(input)) {
                channel.close().sync();
                group.shutdownGracefully();
                break;
            }

            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
            buffer.writeBytes(input.getBytes(StandardCharsets.UTF_8));

            channel.writeAndFlush(buffer);
        }
    }
}

分两次测试,第一次在客户端输入q触发正常退出,第二次直接关闭客户端,测试结果如下

txt
客户端正常退出
客户端异常退出: Connection reset
客户端正常退出

可以发现,在异常退出时,调用ctx.close()会再次触发正常退出。

2. 空闲检测与心跳机制

连接假死通常指的是客户端和服务端之间的连接看似正常(TCP 连接仍然存在),但实际上无法正常通信,表现为数据无法发送或接收。

可能原因:

  • 网络超时或丢包

    如果网络存在高丢包率或延迟,数据包可能无法及时到达,Netty 的事件循环可能感知不到连接已失效,导致假死。

  • TCP 连接未正确关闭

    如果客户端或服务端所在的操作系统、网络设备(如防火墙、路由器)未正确处理 TCP 连接的关闭,可能会导致连接处于“半死”状态。例如,客户端进程崩溃但未发送 FIN 包,服务端仍在等待数据。

  • 事件循环线程阻塞

    Netty 使用 EventLoop 线程处理 I/O 事件。如果业务逻辑中存在耗时操作(例如阻塞 I/O、复杂计算)且未正确交给其他线程池处理,会导致 EventLoop 线程阻塞,无法及时处理网络事件,从而表现为连接假死。

我们可是使用IdleStateHandler进行空闲检测,构造函数如下:

java
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
  • readerIdleTime: 读空闲时间(单位由 unit 指定)。如果在这个时间内没有收到任何数据(即没有读事件),会触发 READER_IDLE 状态的事件。
  • writerIdleTime: 写空闲时间。如果在这个时间内没有发送任何数据(即没有写事件),会触发 WRITER_IDLE 状态的事件。
  • allIdleTime: 读写空闲时间。如果在这个时间内既没有读事件也没有写事件,会触发 ALL_IDLE 状态的事件。
  • unit: 时间单位,例如 TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)等。

注意: 如果某个参数设置为 0,则表示禁用对应的空闲检测。例如,new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS) 只检测读空闲。

然后,自定义处理器,处理userEventTriggered()事件:

java
class ServerIdleHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            switch (event.state()) {
                case READER_IDLE:
                    System.out.println("客户端 " + ctx.channel().remoteAddress() + " 5 秒内未发送数据,关闭连接");
                    ctx.close();
                    break;
                case WRITER_IDLE:
                    System.out.println("服务端 5 秒内未发送数据");
                    break;
                case ALL_IDLE:
                    System.out.println("读写均空闲");
                    break;
            }
        }
    }
}

之后将上面两个处理器加入到流水线:

java
@Slf4j
public class Server {
    public static void main(String[] args) {

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new QuitHandler());

                        nioSocketChannel.pipeline().addLast(new IdleStateHandler(5,0,0));
                        nioSocketChannel.pipeline().addLast(new ServerIdleHandler());
                    }
                })
                .bind(8888);
    }
}

启动客户端但是不发送数据,结果如下:

txt
客户端 /127.0.0.1:9484 5秒内未发送数据,关闭连接
客户端正常退出

在客户端,为了保活,同样可以使用IdleStateHandler监听写事件,当一定时间内没有发送数据,则自动发送一次消息:

java
@Slf4j
public class Client {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        Channel channel = null;
        try {
            channel = new Bootstrap()
                    .group(group)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)  // 设置超时时间为1秒
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

                            // 3秒内没有发送数据,触发 IdleStateEvent.WRITER_IDLE事件
                            nioSocketChannel.pipeline().addLast(new IdleStateHandler(0,3,0, TimeUnit.SECONDS));
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                    if (evt instanceof IdleStateEvent) {
                                        IdleStateEvent event = (IdleStateEvent) evt;
                                        switch (event.state()) {
                                            case WRITER_IDLE:
                                                System.out.println("客户端3秒内没发送数据,自动发送一个心跳包");
                                                ByteBuf buffer = ctx.alloc().buffer();
                                                buffer.writeBytes("ping".getBytes(StandardCharsets.UTF_8));
                                                ctx.channel().writeAndFlush(buffer);
                                                break;
                                        }
                                    }
                                }
                            });
                        }
                    })
                    .connect(new InetSocketAddress("127.0.0.1", 8888))
                    .sync()
                    .channel();
        }catch (Exception e){
            log.error(e.getMessage(), e);
            group.shutdownGracefully();
        }

        if(channel != null && channel.isActive()) {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String input = scanner.nextLine();
                if ("q".equals(input)) {
                    channel.close().sync();
                    group.shutdownGracefully();
                    break;
                }

                ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
                buffer.writeBytes(input.getBytes(StandardCharsets.UTF_8));

                channel.writeAndFlush(buffer);
            }
        }
    }
}

3. HTTP请求头与请求体合并

Netty为我们提供了HttpServerCodec处理器,可以方便轻松地处理HTTP请求,但是该处理器会把请求分为HttpRequestHttpResponse,对于使用上有所不便。为了将请求头和请求体合并起来,我们可以使用HttpObjectAggregator处理器:

java
nioSocketChannel.pipeline().addLast(new HttpServerCodec());
nioSocketChannel.pipeline().addLast(new HttpObjectAggregator(65536));
  • HttpObjectAggregator 会自动将 HttpRequestHttpContent合并为一个 FullHttpRequest

  • HttpObjectAggregator(int maxContentLength)构造方法中参数 maxContentLength 指定最大允许的请求体长度(字节),超过则会抛出异常。

  • 这种方式适用于请求体较小、不需要分块处理的场景。

之后,我们可以实现入站处理器,处理FullHttpRequest实体:

java
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
        // FullHttpRequest 包含完整的请求(头部 + 体)
        System.out.println("请求方法: " + fullHttpRequest.method());
        System.out.println("请求 URI: " + fullHttpRequest.uri());
        String body = fullHttpRequest.content().toString(StandardCharsets.UTF_8);
        System.out.println("请求体: " + body);

        // 构造响应
        FullHttpResponse response = new DefaultFullHttpResponse(
                fullHttpRequest.protocolVersion(),
                HttpResponseStatus.OK,
                Unpooled.copiedBuffer("请求已处理", StandardCharsets.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().array().length);
        channelHandlerContext.writeAndFlush(response);
    }
}

然后在服务端使用:

java
public static void main(String[] args) {

    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    NioEventLoopGroup businessGroup = new NioEventLoopGroup();

    new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new QuitHandler());

                    nioSocketChannel.pipeline().addLast(new IdleStateHandler(5,0,0));
                    nioSocketChannel.pipeline().addLast(new ServerIdleHandler());

                    nioSocketChannel.pipeline().addLast(new HttpServerCodec());
                    nioSocketChannel.pipeline().addLast(new HttpObjectAggregator(65536)); // 最大聚合内容长度
                    nioSocketChannel.pipeline().addLast(new HttpRequestHandler());


                }
            })
            .bind(8888);
}

使用HttpObjectAggregator只适合请求体较小、不需要分块处理,如果对于大文件上传,则不适用,应该如何处理呢?