Skip to content

Netty - 03 黏包半包分析

在Netty中仍然存在黏包半包现象,但是Netty提供了一些处理器可以帮助我们方便快捷地处理黏包半包现象。

1. 短连接

短连接方案不是Netty提供的,但在这里仍作为一种方案展示。 短连接方案是指客户端发送完一条消息就断开连接,以连接作为消息之间的分隔符,所以我们可以把发送消息抽取为一个方法:

java
public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 10; i++) {
        sendMessage("abcdefgh");
    }
}

private static void sendMessage(String message) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Channel channel = new Bootstrap()
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                }
            })
            .connect(new InetSocketAddress("127.0.0.1", 8080))
            .sync()
            .channel();

    channel.writeAndFlush(message);

    channel.close().sync();

    group.shutdownGracefully();
}

短连接方案可以解决黏包问题,但是无法解决半包问题!

2. 定长消息

Netty提供了定长消息处理器FixedLengthFrameDecoder,我们可以使用它来指定一条消息的长度:

java
public static void main(String[] args) {
    new ServerBootstrap()
            .group(new NioEventLoopGroup(1), new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    // 指定消息长度为8字节
                    nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));
                    nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .bind(8080);
}

3. 基于分隔符分隔消息

Netty提供了基于特殊符号分隔消息的处理器,我们可以指定特殊符号作为消息的分隔符以切分消息。Netty也提供了现成的基于换行符分隔消息的处理器。

  • DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter):第一个参数指定了消息的最大长度,第二个参数指定了特殊符。
  • LineBasedFrameDecoder(final int maxLength):指定换行符(\n\r\n)作为消息分隔符;

为什么要指定消息的最大长度?如果没有指定消息的最大长度,那么服务端在没有遇到分隔符时会一直接收下去,这显然会导致内存溢出。

下面以换行符作为例子演示基于分隔符分隔消息:

java
public static void main(String[] args) {
    new ServerBootstrap()
            .group(new NioEventLoopGroup(1), new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    // 以换行分割符分隔消息,消息最大长度为1024
                    nioSocketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .bind(8080);
}
java
public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Channel channel = new Bootstrap()
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                }
            })
            .connect(new InetSocketAddress("127.0.0.1", 8080))
            .sync()
            .channel();

    for (int i = 0; i < 10; i++) {
        channel.writeAndFlush(makeString());
    }

    channel.close().sync();

    group.shutdownGracefully();
}


/**
 * 创建随机长度的消息
 * @return
 */
private static String makeString(){
    int i = new Random().nextInt(100) + 1;
    StringBuilder stringBuilder = new StringBuilder();
    char c = '0';
    for (int j = 0; j < i; j++) {
        stringBuilder.append(c);
    }
    stringBuilder.append("\n");

    return stringBuilder.toString();
}

4. 基于长度字段分隔消息

Netty提供了LengthFieldBasedFrameDecoder,用于提供基于长度字段的消息分隔方案。

java
LengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, 
    		int lengthFieldLength,
            int lengthAdjustment, 
    		int initialBytesToStrip)

LengthFieldBasedFrameDecoder构造方法有五个参数:

  1. maxFrameLength:指定了消息的最大长度;
  2. lengthFieldOffset:指定了长度字段的偏移量;
  3. lengthFieldLength:指定了长度字段的长度;
  4. lengthAdjustment:指定了内容字段基于长度字段的偏移量;
  5. initialBytesToStrip:指定消息分隔后,需要移除的头部字节数;

现在给出几个LengthFieldBasedFrameDecoder示例说明上述参数的作用:

txt
 * lengthFieldOffset   = 0   // 长度字段偏移量为0
 * lengthFieldLength   = 2   // 长度字段长度为2
 * lengthAdjustment    = 0   // 内容字段基于长度字段的偏移量为0
 * initialBytesToStrip = 0   // 头部字节移除量为0,表示不移除任何字节
 *
 * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
txt
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2
 * lengthAdjustment    = 0
 * initialBytesToStrip = 2 
 *
 * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +----------------+
 * | Length | Actual Content |----->| Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
 * +--------+----------------+      +----------------+
txt
 * lengthFieldOffset   = 2   // 在最开始增加头部,所以长度字段偏移量为2
 * lengthFieldLength   = 3
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0
 *
 * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
 * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
txt
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 3
 * lengthAdjustment    = 2 (= the length of Header 1)  // 头部字段在长度和内容之间,内容字段基于长度字段偏移量为2
 * initialBytesToStrip = 0
 *
 * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
 * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
txt
 * lengthFieldOffset   = 1 (= the length of HDR1)
 * lengthFieldLength   = 2
 * lengthAdjustment    = 1 (= the length of HDR2)
 * initialBytesToStrip = 3 (= the length of HDR1 + LEN)
 *
 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+

下面给出代码示例:

java
public static void main(String[] args) {
    new ServerBootstrap()
            .group(new NioEventLoopGroup(1), new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    // 基于长度字段的分隔符
                    nioSocketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(
                            1024,1,4,1,6
                    ));
                    nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .bind(8080);
}
java
public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Channel channel = new Bootstrap()
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                }
            })
            .connect(new InetSocketAddress("127.0.0.1", 8080))
            .sync()
            .channel();

    for (int i = 0; i < 10; i++) {
        String content = makeString();

        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        buffer.writeByte(1);  // 头部1
        buffer.writeInt(content.getBytes(StandardCharsets.UTF_8).length);  // 长度
        buffer.writeByte(1);  // 头部2
        buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));  // 内容

        channel.writeAndFlush(buffer);
    }

    channel.close().sync();

    group.shutdownGracefully();
}


/**
 * 创建随机长度的消息
 * @return
 */
private static String makeString(){
    int i = new Random().nextInt(100) + 1;
    StringBuilder stringBuilder = new StringBuilder();
    char c = '0';
    for (int j = 0; j < i; j++) {
        stringBuilder.append(c);
    }

    return stringBuilder.toString();
}