Skip to content

2019 09 13 netty案例,netty4.1源码分析篇四《ByteBuf的数据结构在使用方式中的剖析》

fuzhengwei edited this page May 16, 2020 · 1 revision

作者:小傅哥
博客:https://bugstack.cn - 原创系列专题

沉淀、分享、成长,让自己和他人都能有所收获!

前言介绍

在Netty中ByteBuf是一个非常重要的类,它可以以高效易用的数据结构方式来满足网络通信过程中处理数据包内字节码序列的移动。

数据结构

 +-------------------+------------------+------------------+
 | discardable bytes |  readable bytes  |  writable bytes  |
 |                   |     (CONTENT)    |                  |
 +-------------------+------------------+------------------+
 |                   |                  |                  |
 0      <=      readerIndex   <=   writerIndex    <=    capacity

那么这种数据结构之所以能高效的处理数据传输处理并解决半包粘包,主要得益于ByteBuf中有两个不同的索引;读索引(readIndex)、写索引(writerIndex)。 读索引:当我们从ByteBuf读取数据时,readerIndex指针位置也会指向到读取字节位置 写索引:当我们向ByteBuf写入数据时,writerIndex指针位置也会指向到写入字节位置

discardable bytes:当从ByteBuf读取一部分数据后,这部分数据就属于discardable,他们是可以被废弃的。 readable bytes:剩余可以继续读取的内容区域,就像一个长条的鸡蛋卡槽,一边放一边拿。从已经拿完到剩余的鸡蛋位置属于可拿区域。 writable bytes:同上一样,这一部分就是可以继续放置鸡蛋的位置。

功能案例

// 62 75 67 73 74 61 63 6B B3 E6 B6 B4 D5 BB
public static void main(String[] args) throws UnsupportedEncodingException {
	// 1.创建一个非池化的ByteBuf,大小为14个字节
	ByteBuf buffer = Unpooled.buffer(14);
	System.out.println("1.创建一个非池化的ByteBuf,大小为14个字节");
	System.out.println("ByteBuf空间大小:" + buffer.capacity());
	// 2.写入3个字节
	buffer.writeByte(62);
	buffer.writeByte(75);
	buffer.writeByte(67);
	System.out.println("\r\n2.写入3个字节");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	// 3.写入一段字节
	byte[] bytes = {73, 74, 61, 63, 0x6B};
	buffer.writeBytes(bytes);
	System.out.println("\r\n3.写入一段字节");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	// 4.读取全部内容
	byte[] allBytes = new byte[buffer.readableBytes()];
	buffer.readBytes(allBytes);
	System.out.println("\r\n4.读取全部内容");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	System.out.println("读取全部内容:" + Arrays.toString(allBytes));
	// 5.重置指针位置
	buffer.resetReaderIndex();
	System.out.println("\r\n5.重置指针位置");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	// 6.读取3个字节
	byte b0 = buffer.readByte();
	byte b1 = buffer.readByte();
	byte b2 = buffer.readByte();
	System.out.println("\r\n6.读取3个字节");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	System.out.println("读取3个字节:" + Arrays.toString(new byte[]{b0, b1, b2}));
	// 7.读取一段字节
	ByteBuf byteBuf = buffer.readBytes(5);
	byte[] dst = new byte[5];
	byteBuf.readBytes(dst);
	System.out.println("\r\n7.读取一段字节");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	System.out.println("读取一段字节:" + Arrays.toString(dst));
	// 8.丢弃已读内容
	buffer.discardReadBytes();
	System.out.println("\r\n8.丢弃已读内容");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	// 9.清空指针位置
	buffer.clear();
	System.out.println("\r\n9.清空指针位置");
	System.out.println("readerIndex位置:" + buffer.readerIndex());
	System.out.println("writerIndex位置:" + buffer.writerIndex());
	// 10.ByteBuf中还有很多其他方法;拷贝、标记、跳过字节,多用于自定义解码器进行半包粘包处理
}
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {

	//基础长度不足,我们设定基础长度为4
	if (in.readableBytes() < BASE_LENGTH) {
		return;
	}

	int beginIdx; //记录包头位置

	while (true) {
		// 获取包头开始的index
		beginIdx = in.readerIndex();
		// 标记包头开始的index
		in.markReaderIndex();
		// 读到了协议的开始标志,结束while循环
		if (in.readByte() == 0x02) {
			break;
		}
		// 未读到包头,略过一个字节
		// 每次略过,一个字节,去读取,包头信息的开始标记
		in.resetReaderIndex();
		in.readByte();
		// 当略过,一个字节之后,
		// 数据包的长度,又变得不满足
		// 此时,应该结束。等待后面的数据到达
		if (in.readableBytes() < BASE_LENGTH) {
			return;
		}

	}

	//剩余长度不足可读取数量[没有内容长度位]
	int readableCount = in.readableBytes();
	if (readableCount <= 1) {
		in.readerIndex(beginIdx);
		return;
	}

	//长度域占4字节,读取int
	ByteBuf byteBuf = in.readBytes(1);
	String msgLengthStr = byteBuf.toString(Charset.forName("GBK"));
	int msgLength = Integer.parseInt(msgLengthStr);

	//剩余长度不足可读取数量[没有结尾标识]
	readableCount = in.readableBytes();
	if (readableCount < msgLength + 1) {
		in.readerIndex(beginIdx);
		return;
	}

	ByteBuf msgContent = in.readBytes(msgLength);

	//如果没有结尾标识,还原指针位置[其他标识结尾]
	byte end = in.readByte();
	if (end != 0x03) {
		in.readerIndex(beginIdx);
		return;
	}

	out.add(msgContent.toString(Charset.forName("GBK")));
}

内存模型

** 1、堆内内存(JVM堆空间内) ** 最常用的ByteBuf模式是将数据存储在JVM的堆空间中。它能在没有使用池化的情况下提供快速的分配和释放。

** 2、堆外内存(本机直接内存)** JDK允许JVM实现通过本地调用来分配内存。主要是为了避免每次调用本地I/O操作之前(或者之后)将缓冲区的内容复制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。

** 3、复合缓冲区(以上2种缓冲区多个混合)** 常用类:CompositeByteBuf,它为多个ByteBuf提供一个聚合视图,将多个缓冲区表示为单个合并缓冲区的虚拟表示。 比如:HTTP协议:头部和主体这两部分由应用程序的不同模块产生。这个时候把这两部分合并的话,选择CompositeByteBuf是比较好的。

源码解读

ByteBuf实现了ReferenceCounted与Comparable两个接口

public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>

创建一个指定容量大小堆缓冲区,并按需扩充容量{与list集和很像},指针位置都是从0开始

Unpooled.buffer(14);

/**
 * Creates a new big-endian Java heap buffer with the specified {@code capacity}, which
 * expands its capacity boundlessly on demand.  The new buffer's {@code readerIndex} and
 * {@code writerIndex} are {@code 0}.
 */
public static ByteBuf buffer(int initialCapacity) {
	return ALLOC.heapBuffer(initialCapacity);
}

跟进代码的创建过程会发现,UnpooledHeapByteBuf.UnpooledHeapByteBuf用来创建非池化堆Buf

**
 * Creates a new heap buffer with an existing byte array.
 *
 * @param initialArray the initial underlying byte array
 * @param maxCapacity the max capacity of the underlying byte array
 */
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
	super(maxCapacity);

	checkNotNull(alloc, "alloc");
	checkNotNull(initialArray, "initialArray");

	if (initialArray.length > maxCapacity) {
		throw new IllegalArgumentException(String.format(
				"initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
	}

	this.alloc = alloc;
	setArray(initialArray);
	setIndex(0, initialArray.length);
}

内容总结

  • ByteBuf提供了两个指针;读、写,分别用来标记“可读”、“可写”、“可丢弃”的字节
  • 通过调用write*方法写入数据后,写指针将会向后移动
  • 通过调用read*方法读取数据后,读指针将会向后移动
  • 写入数据或读取数据时会检查是否有足够多的空间可以写入和是否有数据可以读取
  • 写入数据之前会进行容量检查,当剩余可写的容量小于需要写入的容量时,需要执行扩容操作
  • clear等修改读写指针的方法,只会更改读写指针的值,并不会影响ByteBuf中已有的内容

上一篇:netty案例,netty4.1源码分析篇三《Netty服务端初始化过程以及反射工厂的作用》

下一篇:netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush都做了哪些事》

微信搜索「bugstack虫洞栈」公众号,关注后回复「rpc案例源码」获取本文源码&更多原创专题案例!

📝 首页

🌏 知识星球码农会锁

实战项目:「DDD+RPC分布式抽奖系统」、专属小册、问题解答、简历指导、架构图稿、视频课程

🐲 头条

⛳ 目录

  1. 源码 - :octocat: 公众号:bugstack虫洞栈 文章所涉及到的全部开源代码
  2. Java
  3. Spring
  4. 面向对象
  5. 中间件
  6. Netty 4.x
  7. 字节码编程
  8. 💯实战项目
  9. 部署 Dev-Ops
  10. 📚PDF 下载
  11. 关于

💋 精选

🐾 友链

建立本开源项目的初衷是基于个人学习与工作中对 Java 相关技术栈的总结记录,在这里也希望能帮助一些在学习 Java 过程中遇到问题的小伙伴,如果您需要转载本仓库的一些文章到自己的博客,请按照以下格式注明出处,谢谢合作。

作者:小傅哥
链接:https://bugstack.cn
来源:bugstack虫洞栈

2021年10月24日,小傅哥 的文章全部开源到代码库 CodeGuide 中,与同好同行,一起进步,共同维护。

这里我提供 3 种方式:

  1. 提出 Issue :在 Issue 中指出你觉得需要改进/完善的地方(能够独立解决的话,可以在提出 Issue 后再提交 PR )。
  2. 处理 Issue : 帮忙处理一些待处理的 Issue
  3. 提交 PR: 对于错别字/笔误这类问题可以直接提交PR,无需提交Issue 确认。

详细参考:CodeGuide 贡献指南 - 非常感谢你的支持,这里会留下你的足迹

  • 加群交流 本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “小傅哥” 微信(fustack),备注:加群。
微信:fustack

  • 公众号(bugstack虫洞栈) - 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。
公众号:bugstack虫洞栈

感谢以下人员对本仓库做出的贡献或者对小傅哥的赞赏,当然不仅仅只有这些贡献者,这里就不一一列举了。如果你希望被添加到这个名单中,并且提交过 Issue 或者 PR,请与我联系。

Clone this wiki locally