diff --git a/examples/zinx_decoder/README.MD b/examples/zinx_decoder/README.MD
new file mode 100644
index 00000000..bed08762
--- /dev/null
+++ b/examples/zinx_decoder/README.MD
@@ -0,0 +1,209 @@
+# LengthFieldFrameDecoder使用详解
+
+
+>LengthFieldFrameDecoder是一个基于长度字段的解码器,比较难理解的解码器,它主要有5个核心的参数配置:
+
+>maxFrameLength: 数据包最大长度
+
+>lengthFieldOffset: 长度字段偏移量
+
+>lengthFieldLength: 长度字段所占的字节数
+
+>lengthAdjustment: 长度的调整值
+
+>initialBytesToStrip:解码后跳过的字节数
+
+
+
+
+## 示例讲解
+
+#### TLV格式协议
+
+TLV,即Tag(Type)—Length—Value,是一种简单实用的数据传输方案。在TLV的定义中,可以知道它包括三个域,分别为:标签域(Tag),长度域(Length),内容域(Value)。这里的长度域的值实际上就是内容域的长度。
+
+```
+解码前 (20 bytes) 解码后 (20 bytes)
++------------+------------+-----------------+ +------------+------------+-----------------+
+| Tag | Length | Value |----->| Tag | Length | Value |
+| 0x00000001 | 0x0000000C | "HELLO, WORLD" | | 0x00000001 | 0x0000000C | "HELLO, WORLD" |
++------------+------------+-----------------+ +------------+------------+-----------------+
+```
+> Tag: uint32类型,占4字节,Tag作为MsgId,暂定为1
+> Length:uint32类型,占4字节,Length标记Value长度12(hex:0x0000000C)
+> Value: 共12个字符,占12字节
+
+```
+说明:
+lengthFieldOffset = 4 (Length的字节位索引下标是4) 长度字段的偏差
+lengthFieldLength = 4 (Length是4个byte) 长度字段占的字节数
+lengthAdjustment = 0 (Length只表示Value长度,程序只会读取Length个字节就结束,后面没有来,故为0,若Value后面还有crc占2字节的话,那么此处就是2。若Length标记的是Tag+Length+Value总长度,那么此处是-8)
+initialBytesToStrip = 0 (这个0表示返回完整的协议内容Tag+Length+Value,如果只想返回Value内容,去掉Tag的4字节和Length的4字节,此处就是8) 从解码帧中第一次去除的字节数
+maxFrameLength = 2^32 + 4 + 4 (Length为uint类型,故2^32次方表示Value最大长度,此外Tag和Length各占4字节)
+```
+
+
+#### HTLV+CRC格式协议
+
+HTLV+CRC,H头码,T功能码,L数据长度,V数据内容
+
+
+```
+
++------+-------+---------+--------+--------+
+| 头码 | 功能码 | 数据长度 | 数据内容 | CRC校验 |
+| 1字节 | 1字节 | 1字节 | N字节 | 2字节 |
++------+-------+---------+--------+--------+
+
+```
+
+数据示例
+
+```
+头码 功能码 数据长度 Body CRC
+A2 10 0E 0102030405060708091011121314 050B
+```
+
+```
+
+说明:
+ 1.数据长度len是14(0E),这里的len仅仅指Body长度;
+
+ lengthFieldOffset = 2 (len的索引下标是2,下标从0开始) 长度字段的偏差
+ lengthFieldLength = 1 (len是1个byte) 长度字段占的字节数
+ lengthAdjustment = 2 (len只表示Body长度,程序只会读取len个字节就结束,但是CRC还有2byte没读呢,所以为2)
+ initialBytesToStrip = 0 (这个0表示完整的协议内容,如果不想要A2,那么这里就是1) 从解码帧中第一次去除的字节数
+ maxFrameLength = 255 + 4(起始码、功能码、CRC) (len是1个byte,所以最大长度是无符号1个byte的最大值)
+
+```
+
+
+## 案例分析
+以下7种案例足以满足所有协议,只处理断粘包,并不能处理错包,包的完整性需要依靠协议自身定义CRC来校验
+
+#### 案例1:
+```
+lengthFieldOffset =0 长度字段从0开始
+lengthFieldLength =2 长度字段本身占2个字节
+lengthAdjustment =0 需要调整0字节
+initialBytesToStrip=0 解码后跳过0字节
+
+
+
+解码前 (14 bytes) 解码后 (14 bytes)
++--------+----------------+ +--------+----------------+
+| Length | Actual Content |----->| Length | Actual Content |
+| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
++--------+----------------+ +--------+----------------+
+```
+
+> Length为0x000C,这个是十六进制,0x000C转化十进制就是14
+
+
+#### 案例2:
+```
+lengthFieldOffset =0 长度字段从0开始
+lengthFieldLength =2 长度字段本身占2个字节
+lengthAdjustment =0 需要调整0字节
+initialBytesToStrip=2 解码后跳过2字节
+
+解码前 (14 bytes) 解码后 (12 bytes)
++--------+----------------+ +----------------+
+| Length | Actual Content |----->| Actual Content |
+| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
++--------+----------------+ +----------------+
+```
+>这时initialBytesToStrip字段起作用了,在解码后会将前面的2字节跳过,所以解码后就只剩余了数据部分。
+
+#### 案例3:
+```
+lengthFieldOffset =0 长度字段从0开始
+lengthFieldLength =2 长度字段本身占2个字节
+lengthAdjustment =-2 需要调整 -2 字节
+initialBytesToStrip=0 解码后跳过2字节
+
+
+解码前 (14 bytes) 解码后 (14 bytes)
++--------+----------------+ +--------+----------------+
+| Length | Actual Content |----->| Length | Actual Content |
+| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
++--------+----------------+ +--------+----------------+
+```
+
+>这时lengthAdjustment起作用了,因为长度字段的值包含了长度字段本身的2字节,
+如果要获取数据的字节数,需要加上lengthAdjustment的值,就是 14+(-2)=12,这样才算出来数据的长度。
+
+
+#### 案例4:
+
+```
+lengthFieldOffset =2 长度字段从第2个字节开始
+lengthFieldLength =3 长度字段本身占3个字节
+lengthAdjustment =0 需要调整0字节
+initialBytesToStrip=0 解码后跳过0字节
+
+
+解码前 (17 bytes) 解码后 (17 bytes)
++----------+----------+----------------+ +----------+----------+----------------+
+| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
+| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
++----------+----------+----------------+ +----------+----------+----------------+
+```
+>由于数据包最前面加了2个字节的Header,所以lengthFieldOffset为2,
+说明长度字段是从第2个字节开始的。然后lengthFieldLength为3,说明长度字段本身占了3个字节。
+
+
+#### 案例5:
+```
+lengthFieldOffset =0 长度字段从第0个字节开始
+lengthFieldLength =3 长度字段本身占3个字节
+lengthAdjustment =2 需要调整2字节
+initialBytesToStrip=0 解码后跳过0字节
+
+
+解码前 (17 bytes) 解码后 (17 bytes)
++----------+----------+----------------+ +----------+----------+----------------+
+| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
+| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
++----------+----------+----------------+ +----------+----------+----------------+
+```
+>lengthFieldOffset为0,所以长度字段从0字节开始。lengthFieldLength为3,长度总共占3字节。
+因为长度字段后面还剩余14字节的总数据,但是长度字段的值为12,只表示了数据的长度,不包含头的长度,
+所以lengthAdjustment为2,就是12+2=14,计算出Header+Content的总长度。
+
+
+#### 案例6:
+
+```
+lengthFieldOffset =1 长度字段从第1个字节开始
+lengthFieldLength =2 长度字段本身占2个字节
+lengthAdjustment =1 需要调整1字节
+initialBytesToStrip=3 解码后跳过3字节
+
+解码前 (16 bytes) 解码后 (13 bytes)
++------+--------+------+----------------+ +------+----------------+
+| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
+| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
++------+--------+------+----------------+ +------+----------------+
+```
+>这一次将Header分为了两个1字节的部分,lengthFieldOffset为1表示长度从第1个字节开始,lengthFieldLength为2表示长度字段占2个字节。
+因为长度字段的值为12,只表示了数据的长度,所以lengthAdjustment为1,12+1=13,
+表示Header的第二部分加上数据的总长度为13。因为initialBytesToStrip为3,所以解码后跳过前3个字节。
+
+
+#### 案例7:
+```
+lengthFieldOffset =1 长度字段从第1个字节开始
+lengthFieldLength =2 长度字段本身占2个字节
+lengthAdjustment =-3 需要调整 -3 字节
+initialBytesToStrip=3 解码后跳过3字节
+
+解码前 (16 bytes) 解码后 (13 bytes)
++------+--------+------+----------------+ +------+----------------+
+| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
+| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
++------+--------+------+----------------+ +------+----------------+
+```
+>这一次长度字段的值为16,表示包的总长度,所以lengthAdjustment为 -3 ,16+ (-3)=13,
+表示Header的第二部分加数据部分的总长度为13字节。initialBytesToStrip为3,解码后跳过前3个字节。
+
diff --git a/examples/zinx_decoder/bili/README.MD b/examples/zinx_decoder/bili/README.MD
new file mode 100644
index 00000000..4d5fbfe0
--- /dev/null
+++ b/examples/zinx_decoder/bili/README.MD
@@ -0,0 +1,23 @@
+# 一款水机设备服务端模拟程序
+
+
+```azure
+HTLV+CRC,H头码,T功能码,L数据长度,V数据内容
++------+-------+---------+--------+--------+
+| 头码 | 功能码 | 数据长度 | 数据内容 | CRC校验 |
+| 1字节 | 1字节 | 1字节 | N字节 | 2字节 |
++------+-------+---------+--------+--------+
+
+头码 功能码 数据长度 Body CRC
+A2 10 0E 0102030405060708091011121314 050B
+
+
+说明:
+ 1.数据长度len是14(0E),这里的len仅仅指Body长度;
+
+ lengthFieldOffset = 2 (len的索引下标是2,下标从0开始) 长度字段的偏差
+ lengthFieldLength = 1 (len是1个byte) 长度字段占的字节数
+ lengthAdjustment = 2 (len只表示Body长度,程序只会读取len个字节就结束,但是CRC还有2byte没读呢,所以为2)
+ initialBytesToStrip = 0 (这个0表示完整的协议内容,如果不想要A2,那么这里就是1) 从解码帧中第一次去除的字节数
+ maxFrameLength = 255 + 4(起始码、功能码、CRC) (len是1个byte,所以最大长度是无符号1个byte的最大值)
+```
\ No newline at end of file
diff --git a/examples/zinx_decoder/bili/main.go b/examples/zinx_decoder/bili/main.go
new file mode 100644
index 00000000..f711aef8
--- /dev/null
+++ b/examples/zinx_decoder/bili/main.go
@@ -0,0 +1,37 @@
+package main
+
+import (
+ "github.com/aceld/zinx/examples/zinx_decoder/bili/router"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/znet"
+ "math"
+)
+
+func DoConnectionBegin(conn ziface.IConnection) {
+}
+
+func DoConnectionLost(conn ziface.IConnection) {
+}
+
+func main() {
+ server := znet.NewServer(func(s *znet.Server) {
+ s.Port = 9090
+ s.LengthField = ziface.LengthField{
+ MaxFrameLength: math.MaxUint8 + 4,
+ LengthFieldOffset: 2,
+ LengthFieldLength: 1,
+ LengthAdjustment: 2,
+ InitialBytesToStrip: 0,
+ }
+ })
+ server.SetOnConnStart(DoConnectionBegin)
+ server.SetOnConnStop(DoConnectionLost)
+ server.AddInterceptor(&decode.HtlvCrcDecoder{})
+ server.AddRouter(0x10, &router.Data0x10Router{})
+ server.AddRouter(0x13, &router.Data0x13Router{})
+ server.AddRouter(0x14, &router.Data0x14Router{})
+ server.AddRouter(0x15, &router.Data0x15Router{})
+ server.AddRouter(0x16, &router.Data0x16Router{})
+ server.Serve()
+}
diff --git a/examples/zinx_decoder/bili/router/bili0x10router.go b/examples/zinx_decoder/bili/router/bili0x10router.go
new file mode 100644
index 00000000..6dd0149e
--- /dev/null
+++ b/examples/zinx_decoder/bili/router/bili0x10router.go
@@ -0,0 +1,56 @@
+package router
+
+import (
+ "bytes"
+ "encoding/hex"
+ "github.com/aceld/zinx/examples/zinx_decoder/bili/utils"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/zlog"
+ "github.com/aceld/zinx/znet"
+)
+
+type Data0x10Router struct {
+ znet.BaseRouter
+}
+
+func (this *Data0x10Router) Handle(request ziface.IRequest) {
+ zlog.Ins().DebugF("Data0x10Router Handle %s \n", hex.EncodeToString(request.GetMessage().GetData()))
+ _response := request.GetResponse()
+ if _response != nil {
+ switch _response.(type) {
+ case decode.HtlvCrcData:
+ _data := _response.(decode.HtlvCrcData)
+ //zlog.Ins().DebugF("Data0x10Router %v \n", _data)
+ buffer := pack10(_data)
+ request.GetConnection().Send(buffer)
+ }
+ }
+}
+
+// 头码 功能码 数据长度 Body CRC
+// A2 10 0E 0102030405060708091011121314 050B
+func pack10(_data decode.HtlvCrcData) []byte {
+ buffer := bytes.NewBuffer([]byte{})
+ buffer.WriteByte(0xA1)
+ buffer.WriteByte(_data.Funcode)
+ buffer.WriteByte(0x1E)
+ //3~9:唯一设备码 将IMEI码转换为16进制
+ buffer.Write(_data.Body[:7])
+ //10~14:园区代码 后台根据幼儿园生成的唯一代码
+ buffer.Write([]byte{10, 11, 12, 13, 14})
+ //15~18:时间戳 实际当前北京时间的时间戳,转换为16进制
+ buffer.Write([]byte{15, 16, 17, 18})
+ //19:RFID模块工作模式 0x01-离线工作模式(默认工作模式)0x02-在线工作模式
+ buffer.WriteByte(0x02)
+ //20~27:通讯密匙 预留,全填0x00
+ buffer.Write([]byte{20, 21, 22, 23, 24, 25, 26, 27})
+ //28:出水方式 0x00-放杯出水,取杯停止出水 0x01-刷一下卡出水,再刷停止出水【数联默认】
+ buffer.WriteByte(0x01)
+ //29~32:预留 全填0x00
+ buffer.Write([]byte{29, 30, 31, 32})
+ crc := utils.GetCrC(buffer.Bytes())
+ buffer.Write(crc)
+ return buffer.Bytes()
+
+}
diff --git a/examples/zinx_decoder/bili/router/bili0x13router.go b/examples/zinx_decoder/bili/router/bili0x13router.go
new file mode 100644
index 00000000..c616a0a3
--- /dev/null
+++ b/examples/zinx_decoder/bili/router/bili0x13router.go
@@ -0,0 +1,53 @@
+package router
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/aceld/zinx/examples/zinx_decoder/bili/utils"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/znet"
+)
+
+type Data0x13Router struct {
+ znet.BaseRouter
+}
+
+func (this *Data0x13Router) Handle(request ziface.IRequest) {
+ fmt.Println("Data0x13Router Handle", request.GetMessage().GetData())
+ _response := request.GetResponse()
+ if _response != nil {
+ switch _response.(type) {
+ case decode.HtlvCrcData:
+ _data := _response.(decode.HtlvCrcData)
+ fmt.Println("Data0x13Router", _data)
+ buffer := pack13(_data)
+ request.GetConnection().Send(buffer)
+ }
+ }
+}
+
+// 头码 功能码 数据长度 Body CRC
+// A2 10 0E 0102030405060708091011121314 050B
+func pack13(_data decode.HtlvCrcData) []byte {
+ buffer := bytes.NewBuffer([]byte{})
+ buffer.WriteByte(0xA1)
+ buffer.WriteByte(_data.Funcode)
+ buffer.WriteByte(0x0E)
+ //3~9:3~6:用户卡号 用户IC卡卡号
+ buffer.Write(_data.Body[:4])
+ //7:卡状态: 0x00-未绑定(如服务器未查询到该IC卡时)
+ //0x01-已绑定
+ //0x02-解除绑定(如服务器查询到该IC卡解除绑定时下发)
+ buffer.WriteByte(0x01)
+ //8~9:剩余使用天数 该用户的剩余流量天数
+ buffer.Write([]byte{8, 9})
+ //10~11:每次最大出水量 单位mL,实际出水量
+ buffer.Write([]byte{10, 11})
+ //12~16:预留 全填0x00
+ buffer.Write([]byte{12, 13, 14, 15, 16})
+ crc := utils.GetCrC(buffer.Bytes())
+ buffer.Write(crc)
+ return buffer.Bytes()
+
+}
diff --git a/examples/zinx_decoder/bili/router/bili0x14router.go b/examples/zinx_decoder/bili/router/bili0x14router.go
new file mode 100644
index 00000000..22b1fc70
--- /dev/null
+++ b/examples/zinx_decoder/bili/router/bili0x14router.go
@@ -0,0 +1,39 @@
+package router
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/aceld/zinx/examples/zinx_decoder/bili/utils"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/znet"
+)
+
+type Data0x14Router struct {
+ znet.BaseRouter
+}
+
+func (this *Data0x14Router) Handle(request ziface.IRequest) {
+ fmt.Println("Data0x14Router Handle", request.GetMessage().GetData())
+ _response := request.GetResponse()
+ if _response != nil {
+ switch _response.(type) {
+ case decode.HtlvCrcData:
+ _data := _response.(decode.HtlvCrcData)
+ fmt.Println("Data0x14Router", _data)
+ buffer := pack14(_data)
+ request.GetConnection().Send(buffer)
+ }
+ }
+}
+
+// 头码 功能码 数据长度 Body CRC
+// A2 10 0E 0102030405060708091011121314 050B
+func pack14(_data decode.HtlvCrcData) []byte {
+ _data.Data[0] = 0xA1
+ buffer := bytes.NewBuffer(_data.Data[:len(_data.Data)-2])
+ crc := utils.GetCrC(buffer.Bytes())
+ buffer.Write(crc)
+ return buffer.Bytes()
+
+}
diff --git a/examples/zinx_decoder/bili/router/bili0x15router.go b/examples/zinx_decoder/bili/router/bili0x15router.go
new file mode 100644
index 00000000..4926356f
--- /dev/null
+++ b/examples/zinx_decoder/bili/router/bili0x15router.go
@@ -0,0 +1,140 @@
+package router
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/aceld/zinx/examples/zinx_decoder/bili/utils"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/znet"
+)
+
+type Data0x15Router struct {
+ znet.BaseRouter
+}
+
+func (this *Data0x15Router) Handle(request ziface.IRequest) {
+ fmt.Println("Data0x15Router Handle", request.GetMessage().GetData())
+ _response := request.GetResponse()
+ if _response != nil {
+ switch _response.(type) {
+ case decode.HtlvCrcData:
+ _data := _response.(decode.HtlvCrcData)
+ fmt.Println("Data0x15Router", _data)
+ buffer := pack15(_data)
+ request.GetConnection().Send(buffer)
+ }
+ }
+}
+
+// 头码 功能码 数据长度 Body CRC
+// A2 10 0E 0102030405060708091011121314 050B
+func pack15(_data decode.HtlvCrcData) []byte {
+ buffer := bytes.NewBuffer([]byte{})
+ buffer.WriteByte(0xA1)
+ buffer.WriteByte(_data.Funcode)
+ buffer.WriteByte(0x26)
+ //3~9:设备代码 将IMEI码转换为16进制
+ buffer.Write(_data.Body[:7])
+ //10:机型代码 A8(即热式幼儿园机)
+ buffer.WriteByte(0xA8)
+ //11:主机状态1 Bit0:0-待机中,1-运行中
+ //Bit1:0-非智控,1-智控【本设备按智控】
+ //Bit2:0-不能饮用,1-可以饮用
+ //Bit3:0-无人用水,1-有人用水
+ //Bit4:0-上电进水中,1-正常工作中
+ //Bit5:0-消毒未启动,1-消毒进行中
+ //Bit6:0-低压开关断开(无水),1-低压开关接通(有水)
+ //Bit7:0-主机不带RO,1-主机带RO
+ buffer.WriteByte(0x01)
+ //12:主机状态2 Bit0:0-RO机不允许启动水泵,1-RO机允许启动水泵
+ //Bit1:0-开水无人用,1-开水有人用
+ //Bit2:0-高压开关断开(满水),1-高压开关接通(缺水)
+ //Bit3:0-冰水无人用,1-冰水有人用【本设备无意义】
+ //Bit4:0-无漏水信号,1-有漏水信号
+ //Bit5:0-紫外灯未启动,1-紫外线灯杀菌中
+ //Bit6:预留
+ //Bit7:预留
+ buffer.WriteByte(0x01)
+ //13:水位状态
+ //(即热水位) Bit0:低水位,0-代表无水,1-代表有水【本设备低水位有水即表示水满】
+ //Bit1:中水位,0-代表无水,1-代表有水【本设备无意义】
+ //Bit2:高水位,0-代表无水,1-代表有水【本设备无意义】
+ //Bit3:溢出水位,0-代表无水,1-代表有水【本设备无意义】
+ //Bit4:预留
+ //Bit5:预留
+ //Bit6:预留
+ //Bit7:预留
+ buffer.WriteByte(0x01)
+ //14:开水温度 0℃~100℃,表示当前开水温度
+ buffer.WriteByte(0x1A)
+ //15:当前系统的停止加热温度 30~98℃,实际数值
+ buffer.WriteByte(0x27)
+ //16:负载状态 Bit0:加热,0-未加热,1-加热中
+ //Bit1:进水,0-未进水,1-进水中
+ //Bit2:换水或消毒,0-未换水,1-换水或消毒
+ //Bit3:冲洗,0-未冲洗,1-冲洗中
+ //Bit4:增压泵和RO进水阀,0-未启动,1-启动增压泵和RO进水阀
+ //Bit5:RO进水阀2,0-未启动,1-启动中【本设备无意义】
+ //Bit6:开水出水阀1,0-未启动,1-启动中
+ //Bit7:净化水出水阀1,0-未启动,1-启动中【本设备无意义】
+ buffer.WriteByte(0x01)
+ //17:负载状态2 预留,填0x00
+ buffer.WriteByte(0x00)
+ //18:故障状态 Bit0:故障1,0-无故障,1-有故障
+ //Bit1:故障2,0-无故障,1-有故障
+ //Bit2:障保3,0-无故障,1-有故障
+ //Bit3:故障4 ,0-无故障,1-有故障
+ //Bit4:故障5 ,0-无故障,1-有故障
+ //Bit5:故障6,0-无故障,1-有故障
+ //Bit6:故障7,0-无故障,1-有故障
+ //Bit7:故障8,0-无故障,1-有故障
+ buffer.WriteByte(0x00)
+ //19:故障状态2 Bit0:故障A,0-无故障,1-有故障
+ //Bit1:故障B,0-无故障,1-有故障
+ //Bit2:障保C,0-无故障,1-有故障
+ //Bit3:故障9,0-无故障,1-有故障
+ //Bit4:故障D,0-无故障,1-有故障
+ //Bit5:故障E,0-无故障,1-有故障
+ //Bit6:预留
+ //Bit7:预留
+ buffer.WriteByte(0x00)
+ //20:主板软件版本 实际数值1~255
+ buffer.WriteByte(0x01)
+ //21:水位状态2 Bit0:纯水箱低水位,0-代表无水,1-代表有水【本设备无意义】
+ //Bit1:纯水箱中水位,0-代表无水,1-代表有水【本设备无意义】
+ //Bit2:纯水箱高水位,0-代表无水,1-代表有水【本设备无意义】
+ //Bit3:保温箱低水位,0-代表无水,1-代表有水
+ //Bit4:保温箱中水位,0-代表无水,1-代表有水【本设备无意义】
+ //Bit5:保温箱高水位,0-代表无水,1-代表有水
+ //Bit6:保温箱溢水位,0-代表无水,1-代表有水
+ //Bit7:预留
+ buffer.WriteByte(0x01)
+ //22:温开水温度 0~100℃
+ buffer.WriteByte(0x30)
+ //23~24:剩余滤芯寿命 单位:小时,实际数值
+ buffer.Write([]byte{23, 24})
+ //25~26:剩余紫外线灯寿命
+ buffer.Write([]byte{25, 26})
+ //27~28:源水TDS值 0x0000-无此功能
+ //实际数值,单位,ppm
+ buffer.Write([]byte{27, 28})
+ //29:净水TDS值 0x00-无此功能
+ //实际数值,单位,ppm
+ buffer.WriteByte(0x00)
+ //30~33:耗电量 0xFFFFFFFF-无此功能
+ //实际数值,高位在前,低位在后,单位wh
+ buffer.Write([]byte{30, 31, 32, 33})
+ //34:信号强度 0x01~0x28
+ //0x01~0x0A对应:-81~-90dbm=极差
+ //0x0B~0x14对应:-71~-80dbm=差
+ //0x15~0x1E对应-61~-70dbm=好
+ //0x1F~0x28对应:-41以上~-50dbm=良好
+ buffer.WriteByte(0x30)
+ //35~40:预留 全填0x00
+ buffer.Write([]byte{0x00, 0x00})
+ crc := utils.GetCrC(buffer.Bytes())
+ buffer.Write(crc)
+ return buffer.Bytes()
+
+}
diff --git a/examples/zinx_decoder/bili/router/bili0x16router.go b/examples/zinx_decoder/bili/router/bili0x16router.go
new file mode 100644
index 00000000..e3d5fda1
--- /dev/null
+++ b/examples/zinx_decoder/bili/router/bili0x16router.go
@@ -0,0 +1,39 @@
+package router
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/aceld/zinx/examples/zinx_decoder/bili/utils"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/znet"
+)
+
+type Data0x16Router struct {
+ znet.BaseRouter
+}
+
+func (this *Data0x16Router) Handle(request ziface.IRequest) {
+ fmt.Println("Data0x16Router Handle", request.GetMessage().GetData())
+ _response := request.GetResponse()
+ if _response != nil {
+ switch _response.(type) {
+ case decode.HtlvCrcData:
+ _data := _response.(decode.HtlvCrcData)
+ fmt.Println("Data0x16Router", _data)
+ buffer := pack16(_data)
+ request.GetConnection().Send(buffer)
+ }
+ }
+}
+
+// 头码 功能码 数据长度 Body CRC
+// A2 10 0E 0102030405060708091011121314 050B
+func pack16(_data decode.HtlvCrcData) []byte {
+ _data.Data[0] = 0xA1
+ buffer := bytes.NewBuffer(_data.Data[:len(_data.Data)-2])
+ crc := utils.GetCrC(buffer.Bytes())
+ buffer.Write(crc)
+ return buffer.Bytes()
+
+}
diff --git a/examples/zinx_decoder/bili/utils/crc.go b/examples/zinx_decoder/bili/utils/crc.go
new file mode 100644
index 00000000..19bb67d9
--- /dev/null
+++ b/examples/zinx_decoder/bili/utils/crc.go
@@ -0,0 +1,37 @@
+package utils
+
+var crc16_h = []byte{0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40, 0x01, 0xC0, 0x80, 0x41, 0x01, 0xC0, 0x80, 0x41, 0x00, 0xC1, 0x81, 0x40}
+var crc16_l = []byte{0x00, 0xC0, 0xC1, 0x01, 0xC3, 0x03, 0x02, 0xC2, 0xC6, 0x06, 0x07, 0xC7, 0x05, 0xC5, 0xC4, 0x04, 0xCC, 0x0C, 0x0D, 0xCD, 0x0F, 0xCF, 0xCE, 0x0E, 0x0A, 0xCA, 0xCB, 0x0B, 0xC9, 0x09, 0x08, 0xC8, 0xD8, 0x18, 0x19, 0xD9, 0x1B, 0xDB, 0xDA, 0x1A, 0x1E, 0xDE, 0xDF, 0x1F, 0xDD, 0x1D, 0x1C, 0xDC, 0x14, 0xD4, 0xD5, 0x15, 0xD7, 0x17, 0x16, 0xD6, 0xD2, 0x12, 0x13, 0xD3, 0x11, 0xD1, 0xD0, 0x10, 0xF0, 0x30, 0x31, 0xF1, 0x33, 0xF3, 0xF2, 0x32, 0x36, 0xF6, 0xF7, 0x37, 0xF5, 0x35, 0x34, 0xF4, 0x3C, 0xFC, 0xFD, 0x3D, 0xFF, 0x3F, 0x3E, 0xFE, 0xFA, 0x3A, 0x3B, 0xFB, 0x39, 0xF9, 0xF8, 0x38, 0x28, 0xE8, 0xE9, 0x29, 0xEB, 0x2B, 0x2A, 0xEA, 0xEE, 0x2E, 0x2F, 0xEF, 0x2D, 0xED, 0xEC, 0x2C, 0xE4, 0x24, 0x25, 0xE5, 0x27, 0xE7, 0xE6, 0x26, 0x22, 0xE2, 0xE3, 0x23, 0xE1, 0x21, 0x20, 0xE0, 0xA0, 0x60, 0x61, 0xA1, 0x63, 0xA3, 0xA2, 0x62, 0x66, 0xA6, 0xA7, 0x67, 0xA5, 0x65, 0x64, 0xA4, 0x6C, 0xAC, 0xAD, 0x6D, 0xAF, 0x6F, 0x6E, 0xAE, 0xAA, 0x6A, 0x6B, 0xAB, 0x69, 0xA9, 0xA8, 0x68, 0x78, 0xB8, 0xB9, 0x79, 0xBB, 0x7B, 0x7A, 0xBA, 0xBE, 0x7E, 0x7F, 0xBF, 0x7D, 0xBD, 0xBC, 0x7C, 0xB4, 0x74, 0x75, 0xB5, 0x77, 0xB7, 0xB6, 0x76, 0x72, 0xB2, 0xB3, 0x73, 0xB1, 0x71, 0x70, 0xB0, 0x50, 0x90, 0x91, 0x51, 0x93, 0x53, 0x52, 0x92, 0x96, 0x56, 0x57, 0x97, 0x55, 0x95, 0x94, 0x54, 0x9C, 0x5C, 0x5D, 0x9D, 0x5F, 0x9F, 0x9E, 0x5E, 0x5A, 0x9A, 0x9B, 0x5B, 0x99, 0x59, 0x58, 0x98, 0x88, 0x48, 0x49, 0x89, 0x4B, 0x8B, 0x8A, 0x4A, 0x4E, 0x8E, 0x8F, 0x4F, 0x8D, 0x4D, 0x4C, 0x8C, 0x44, 0x84, 0x85, 0x45, 0x87, 0x47, 0x46, 0x86, 0x82, 0x42, 0x43, 0x83, 0x41, 0x81, 0x80, 0x40}
+
+func GetCrC(buff []byte) []byte {
+ var hi uint16 = 0x00ff
+ var low uint16 = 0x00ff
+ var pos uint16 = 0
+
+ for i := 0; i < len(buff); i++ {
+ pos = (low ^ uint16(buff[i])) & 0x00ff
+ low = hi ^ uint16(crc16_h[pos])
+ hi = uint16(crc16_l[pos])
+ }
+
+ var d_crc = ((hi & 0x00ff) << 8) | (low&0x00ff)&0xffff
+ //var d_crcArr = [2]byte{byte((d_crc & 0xff)), byte((d_crc >> 8) & 0xff)}
+ d_crcArr := make([]byte, 0)
+ d_crcArr = append(d_crcArr, byte((d_crc & 0xff)), byte((d_crc>>8)&0xff))
+ return d_crcArr
+}
+
+func IsComplete(src []byte, dst []byte) bool {
+ if src != nil && dst != nil {
+ if len(src) == 2 && len(dst) == 2 {
+ if src[0] == dst[0] && src[1] == dst[1] {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func CheckCRC(src []byte, crc []byte) bool {
+ return IsComplete(GetCrC(src), crc)
+}
diff --git a/examples/zinx_decoder/client.go b/examples/zinx_decoder/client.go
new file mode 100644
index 00000000..30ddbfbc
--- /dev/null
+++ b/examples/zinx_decoder/client.go
@@ -0,0 +1,107 @@
+package main
+
+import (
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/zlog"
+ "github.com/aceld/zinx/znet"
+ "os"
+ "os/signal"
+ "time"
+)
+
+// 使用该方法生成模拟数据
+func getTLVPackData() []byte {
+ msgID := 1
+ tag := make([]byte, 4)
+ binary.BigEndian.PutUint32(tag, uint32(msgID))
+
+ str := "HELLO, WORLD"
+ var value = []byte(str)
+
+ length := make([]byte, 4)
+ binary.BigEndian.PutUint32(length, uint32(len(value)))
+
+ _data := make([]byte, 0)
+ _data = append(_data, tag...)
+ _data = append(_data, length...)
+ _data = append(_data, value...)
+ fmt.Println("--->", len(_data), hex.EncodeToString(_data))
+ return _data
+}
+
+func getTLVData(index int) []byte {
+ //通过 getTLVPackData()方法,获得一段完整的TLV模拟数据包:000000010000000c48454c4c4f2c20574f524c44
+ tlvPackData := []string{
+ "000000010000000c48454c4c4f2c20574f524c44000000010000000c", //一包半
+ "48454c4c4f2c20574f524c44", //剩下的半包
+ "000000010000000c48454c4c4f2c20574f524c44000000010000000c48454c4c4f2c20574f524c44", //两包
+ }
+ //此处模拟顺序如:两包一包半剩下的半包
+ index = index % 3
+ if index == 0 {
+ fmt.Println("模拟-粘包")
+ index = 2 //模拟粘包情况,两包数据一起
+ } else {
+ index = index / 2 % 2 //模拟断包情况,一包半+剩下的半包
+ fmt.Println("模拟-断包")
+ }
+ arr, _ := hex.DecodeString(tlvPackData[index])
+ return arr
+}
+
+func getHTLVCRCData(index int) []byte {
+ //一段完整的HTLVCRC模拟数据包:A2100E0102030405060708091011121314050B
+ tlvPackData := []string{
+ "A2100E0102030405060708091011121314050BA2100E01020304050607", //一包半
+ "08091011121314050B", //剩下的半包
+ "A2100E0102030405060708091011121314050BA2100E0102030405060708091011121314050B", //两包
+ }
+ //此处模拟顺序如:两包一包半剩下的半包
+ index = index % 3
+ if index == 0 {
+ fmt.Println("模拟-粘包")
+ index = 2 //模拟粘包情况,两包数据一起
+ } else {
+ index = index / 2 % 2 //模拟断包情况,一包半+剩下的半包
+ fmt.Println("模拟-断包")
+ }
+ arr, _ := hex.DecodeString(tlvPackData[index])
+ return arr
+}
+
+// 客户端自定义业务
+func business(conn ziface.IConnection) {
+ var i int
+ for {
+ buffer := getTLVData(i)
+ //buffer := getHTLVCRCData(i)
+ conn.Send(buffer)
+ i++
+ time.Sleep(1 * time.Second)
+ }
+}
+
+// 创建连接的时候执行
+func DoClientConnectedBegin(conn ziface.IConnection) {
+ zlog.Debug("DoConnecionBegin is Called ... ")
+ go business(conn)
+}
+
+func main() {
+ //创建一个Client句柄,使用Zinx的API
+ client := znet.NewClient("127.0.0.1", 8999)
+ //添加首次建立链接时的业务
+ client.SetOnConnStart(DoClientConnectedBegin)
+ //启动客户端client
+ client.Start()
+
+ // close
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill)
+ sig := <-c
+ fmt.Println("===exit===", sig)
+
+}
diff --git a/examples/zinx_decoder/decode/htlvcrcdecoder.go b/examples/zinx_decoder/decode/htlvcrcdecoder.go
new file mode 100644
index 00000000..f6af5779
--- /dev/null
+++ b/examples/zinx_decoder/decode/htlvcrcdecoder.go
@@ -0,0 +1,76 @@
+// HTLV+CRC,H头码,T功能码,L数据长度,V数据内容
+//+------+-------+---------+--------+--------+
+//| 头码 | 功能码 | 数据长度 | 数据内容 | CRC校验 |
+//| 1字节 | 1字节 | 1字节 | N字节 | 2字节 |
+//+------+-------+---------+--------+--------+
+
+//头码 功能码 数据长度 Body CRC
+//A2 10 0E 0102030405060708091011121314 050B
+//
+//
+// 说明:
+// 1.数据长度len是14(0E),这里的len仅仅指Body长度;
+//
+//
+// lengthFieldOffset = 2 (len的索引下标是2,下标从0开始) 长度字段的偏差
+// lengthFieldLength = 1 (len是1个byte) 长度字段占的字节数
+// lengthAdjustment = 2 (len只表示Body长度,程序只会读取len个字节就结束,但是CRC还有2byte没读呢,所以为2)
+// initialBytesToStrip = 0 (这个0表示完整的协议内容,如果不想要A2,那么这里就是1) 从解码帧中第一次去除的字节数
+// maxFrameLength = 255 + 4(起始码、功能码、CRC) (len是1个byte,所以最大长度是无符号1个byte的最大值)
+
+package decode
+
+import (
+ "encoding/hex"
+ "fmt"
+ "github.com/aceld/zinx/examples/zinx_decoder/bili/utils"
+ "github.com/aceld/zinx/ziface"
+)
+
+const HEADER_SIZE = 5
+
+type HtlvCrcData struct {
+ Data []byte //数据内容
+ Head byte //头码
+ Funcode byte //功能码
+ Length byte //数据长度
+ Body []byte //数据内容
+ Crc []byte //CRC校验
+}
+
+type HtlvCrcDecoder struct {
+}
+
+func (this *HtlvCrcDecoder) Intercept(chain ziface.Chain) ziface.Response {
+ request := chain.Request()
+ if request != nil {
+ switch request.(type) {
+ case ziface.IRequest:
+ iRequest := request.(ziface.IRequest)
+ iMessage := iRequest.GetMessage()
+ if iMessage != nil {
+ data := iMessage.GetData()
+ fmt.Println("1htlvData", data)
+ datasize := len(data)
+ htlvData := HtlvCrcData{
+ Data: data,
+ }
+ if datasize >= HEADER_SIZE {
+ htlvData.Head = data[0]
+ htlvData.Funcode = data[1]
+ htlvData.Length = data[2]
+ htlvData.Body = data[3 : datasize-2]
+ htlvData.Crc = data[datasize-2 : datasize]
+ if !utils.CheckCRC(data[:datasize-2], htlvData.Crc) {
+ fmt.Println("crc校验失败", hex.EncodeToString(data), hex.EncodeToString(htlvData.Crc))
+ return nil
+ }
+ iMessage.SetMsgID(uint32(htlvData.Funcode))
+ iRequest.SetResponse(htlvData)
+ //zlog.Ins().DebugF("2htlvData %s \n", hex.EncodeToString(htlvData.data))
+ }
+ }
+ }
+ }
+ return chain.Proceed(chain.Request())
+}
diff --git a/examples/zinx_decoder/decode/tlvdecoder.go b/examples/zinx_decoder/decode/tlvdecoder.go
new file mode 100644
index 00000000..4588b65d
--- /dev/null
+++ b/examples/zinx_decoder/decode/tlvdecoder.go
@@ -0,0 +1,63 @@
+// TLV,即Tag(Type)—Length—Value,是一种简单实用的数据传输方案。
+//在TLV的定义中,可以知道它包括三个域,分别为:标签域(Tag),长度域(Length),内容域(Value)。这里的长度域的值实际上就是内容域的长度。
+//
+//解码前 (20 bytes) 解码后 (20 bytes)
+//+------------+------------+-----------------+ +------------+------------+-----------------+
+//| Tag | Length | Value |----->| Tag | Length | Value |
+//| 0x00000001 | 0x0000000C | "HELLO, WORLD" | | 0x00000001 | 0x0000000C | "HELLO, WORLD" |
+//+------------+------------+-----------------+ +------------+------------+-----------------+
+// Tag: uint32类型,占4字节,Tag作为MsgId,暂定为1
+// Length:uint32类型,占4字节,Length标记Value长度12(hex:0x0000000C)
+// Value: 共12个字符,占12字节
+//
+// 说明:
+// lengthFieldOffset = 4 (Length的字节位索引下标是4) 长度字段的偏差
+// lengthFieldLength = 4 (Length是4个byte) 长度字段占的字节数
+// lengthAdjustment = 0 (Length只表示Value长度,程序只会读取Length个字节就结束,后面没有来,故为0,若Value后面还有crc占2字节的话,那么此处就是2。若Length标记的是Tag+Length+Value总长度,那么此处是-8)
+// initialBytesToStrip = 0 (这个0表示返回完整的协议内容Tag+Length+Value,如果只想返回Value内容,去掉Tag的4字节和Length的4字节,此处就是8) 从解码帧中第一次去除的字节数
+// maxFrameLength = 2^32 + 4 + 4 (Length为uint类型,故2^32次方表示Value最大长度,此外Tag和Length各占4字节)
+
+package decode
+
+import (
+ "encoding/binary"
+ "fmt"
+ "github.com/aceld/zinx/ziface"
+)
+
+const TLV_HEADER_SIZE = 8 //表示TLV空包长度
+
+type TlvData struct {
+ Tag uint32
+ Length uint32
+ Value string
+}
+
+type TLVDecoder struct {
+}
+
+func (this *TLVDecoder) Intercept(chain ziface.Chain) ziface.Response {
+ request := chain.Request()
+ if request != nil {
+ switch request.(type) {
+ case ziface.IRequest:
+ iRequest := request.(ziface.IRequest)
+ iMessage := iRequest.GetMessage()
+ if iMessage != nil {
+ data := iMessage.GetData()
+ fmt.Println("1-TLV", len(data), data)
+ datasize := len(data)
+ _data := TlvData{}
+ if datasize >= TLV_HEADER_SIZE {
+ _data.Tag = binary.BigEndian.Uint32(data[0:4])
+ _data.Length = binary.BigEndian.Uint32(data[4:8])
+ _data.Value = string(data[8 : 8+_data.Length])
+ iMessage.SetMsgID(_data.Tag)
+ iRequest.SetResponse(_data)
+ fmt.Println("2-TLV", _data)
+ }
+ }
+ }
+ }
+ return chain.Proceed(chain.Request())
+}
diff --git a/examples/zinx_decoder/router/htlvcrcbusinessrouter.go b/examples/zinx_decoder/router/htlvcrcbusinessrouter.go
new file mode 100644
index 00000000..4c14f578
--- /dev/null
+++ b/examples/zinx_decoder/router/htlvcrcbusinessrouter.go
@@ -0,0 +1,27 @@
+package router
+
+import (
+ "fmt"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/znet"
+)
+
+type HtlvCrcBusinessRouter struct {
+ znet.BaseRouter
+}
+
+func (this *HtlvCrcBusinessRouter) Handle(request ziface.IRequest) {
+ fmt.Println("Call HtlvCrcBusinessRouter Handle", request.GetMessage().GetMsgID(), request.GetMessage().GetData())
+ msgID := request.GetMessage().GetMsgID()
+ if msgID == 0x10 {
+ _response := request.GetResponse()
+ if _response != nil {
+ switch _response.(type) {
+ case decode.HtlvCrcData:
+ tlvData := _response.(decode.HtlvCrcData)
+ fmt.Println("do msgid=0x10 data business", tlvData)
+ }
+ }
+ }
+}
diff --git a/examples/zinx_decoder/router/tlvbusinessrouter.go b/examples/zinx_decoder/router/tlvbusinessrouter.go
new file mode 100644
index 00000000..67cdcc3a
--- /dev/null
+++ b/examples/zinx_decoder/router/tlvbusinessrouter.go
@@ -0,0 +1,28 @@
+package router
+
+import (
+ "fmt"
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/znet"
+)
+
+type TLVBusinessRouter struct {
+ znet.BaseRouter
+}
+
+func (this *TLVBusinessRouter) Handle(request ziface.IRequest) {
+ fmt.Println("Call TLVRouter Handle", request.GetMessage().GetMsgID(), request.GetMessage().GetData())
+ msgID := request.GetMessage().GetMsgID()
+ if msgID == 0x00000001 {
+ _response := request.GetResponse()
+ if _response != nil {
+ switch _response.(type) {
+ case decode.TlvData:
+ tlvData := _response.(decode.TlvData)
+ fmt.Println("do msgid=0x00000001 data business", tlvData)
+ }
+ }
+ }
+
+}
diff --git a/examples/zinx_decoder/server.go b/examples/zinx_decoder/server.go
new file mode 100644
index 00000000..c7ebdd34
--- /dev/null
+++ b/examples/zinx_decoder/server.go
@@ -0,0 +1,37 @@
+package main
+
+import (
+ "github.com/aceld/zinx/examples/zinx_decoder/decode"
+ "github.com/aceld/zinx/examples/zinx_decoder/router"
+ "github.com/aceld/zinx/ziface"
+ "github.com/aceld/zinx/zlog"
+ "github.com/aceld/zinx/znet"
+)
+
+func DoConnectionBegin(conn ziface.IConnection) {
+ zlog.Ins().InfoF("DoConnecionBegin is Called ...")
+}
+
+func DoConnectionLost(conn ziface.IConnection) {
+ zlog.Ins().InfoF("Conn is Lost")
+}
+
+func main() {
+ //创建一个server句柄
+ s := znet.NewServer()
+
+ //注册链接hook回调函数
+ s.SetOnConnStart(DoConnectionBegin)
+ s.SetOnConnStop(DoConnectionLost)
+
+ //处理TLV协议数据
+ s.AddInterceptor(&decode.TLVDecoder{}) //TVL协议解码器
+ s.AddRouter(0x00000001, &router.TLVBusinessRouter{}) //TLV协议对应业务功能
+
+ //处理HTLVCRC协议数据
+ s.AddInterceptor(&decode.HtlvCrcDecoder{}) //TVL协议解码器
+ s.AddRouter(0x10, &router.HtlvCrcBusinessRouter{}) //TLV协议对应业务功能,因为client.go中模拟数据funcode字段为0x10
+
+ //开启服务
+ s.Serve()
+}
diff --git a/zcode/interceptorchain.go b/zcode/interceptorchain.go
new file mode 100644
index 00000000..9fc507b8
--- /dev/null
+++ b/zcode/interceptorchain.go
@@ -0,0 +1,36 @@
+/**
+ * @author uuxia
+ * @date 15:57 2023/3/10
+ * @description 拦截器管理
+ **/
+
+package zcode
+
+import "github.com/aceld/zinx/ziface"
+
+// InterceptorChain
+// HTLV+CRC,H头码,T功能码,L数据长度,V数据内容
+// +------+-------+---------+--------+--------+
+// | 头码 | 功能码 | 数据长度 | 数据内容 | CRC校验 |
+// | 1字节 | 1字节 | 1字节 | N字节 | 2字节 |
+// +------+-------+---------+--------+--------+
+type InterceptorChain struct {
+ interceptors []ziface.Interceptor
+ request ziface.Request
+}
+
+func NewInterceptorBuilder() ziface.InterceptorBuilder {
+ return &InterceptorChain{
+ interceptors: make([]ziface.Interceptor, 0),
+ }
+}
+
+func (this *InterceptorChain) AddInterceptor(interceptor ziface.Interceptor) {
+ this.interceptors = append(this.interceptors, interceptor)
+}
+
+func (this *InterceptorChain) Execute(request ziface.Request) ziface.Response {
+ this.request = request
+ chain := NewRealInterceptorChain(this.interceptors, 0, request)
+ return chain.Proceed(this.request)
+}
diff --git a/zcode/lengthfieldframedecoder.go b/zcode/lengthfieldframedecoder.go
new file mode 100644
index 00000000..42935eb1
--- /dev/null
+++ b/zcode/lengthfieldframedecoder.go
@@ -0,0 +1,424 @@
+/**
+ * @author uuxia
+ * @date 15:57 2023/3/10
+ * @description 通用解码器
+ **/
+
+package zcode
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "github.com/aceld/zinx/ziface"
+ "math"
+ "sync"
+)
+
+// EncoderData
+// A decoder that splits the received {@link ByteBuf}s dynamically by the
+// value of the length field in the message. It is particularly useful when you
+// decode a binary message which has an integer header field that represents the
+// length of the message body or the whole message.
+//
+// {@link LengthFieldBasedFrameDecoder} has many configuration parameters so +// that it can decode any message with a length field, which is often seen in +// proprietary client-server protocols. Here are some example that will give +// you the basic idea on which option does what. +// +//
+// LengthFieldOffset = 0 +// LengthFieldLength = 2 +// LengthAdjustment = 0 +// InitialBytesToStrip = 0 (= do not strip header) +// +// BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) +// +--------+----------------+ +--------+----------------+ +// | Length | Actual Content |----->| Length | Actual Content | +// | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" | +// +--------+----------------+ +--------+----------------+ +//+// +//
+// LengthFieldOffset = 0 +// LengthFieldLength = 2 +// LengthAdjustment = 0 +// InitialBytesToStrip = 2 (= the length of the Length field) +// +// BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes) +// +--------+----------------+ +----------------+ +// | Length | Actual Content |----->| Actual Content | +// | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" | +// +--------+----------------+ +----------------+ +//+// +//
+// LengthFieldOffset = 0 +// LengthFieldLength = 2 +// LengthAdjustment = -2 (= the length of the Length field) +// InitialBytesToStrip = 0 +// +// BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) +// +--------+----------------+ +--------+----------------+ +// | Length | Actual Content |----->| Length | Actual Content | +// | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" | +// +--------+----------------+ +--------+----------------+ +//+// +//
+// LengthFieldOffset = 2 (= the length of Header 1) +// LengthFieldLength = 3 +// LengthAdjustment = 0 +// InitialBytesToStrip = 0 +// +// BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +// +----------+----------+----------------+ +----------+----------+----------------+ +// | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content | +// | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" | +// +----------+----------+----------------+ +----------+----------+----------------+ +//+// +//
+// LengthFieldOffset = 0 +// LengthFieldLength = 3 +// LengthAdjustment = 2 (= the length of Header 1) +// InitialBytesToStrip = 0 +// +// BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +// +----------+----------+----------------+ +----------+----------+----------------+ +// | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content | +// | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" | +// +----------+----------+----------------+ +----------+----------+----------------+ +//+// +//
+// LengthFieldOffset = 1 (= the length of HDR1) +// LengthFieldLength = 2 +// LengthAdjustment = 1 (= the length of HDR2) +// InitialBytesToStrip = 3 (= the length of HDR1 + LEN) +// +// BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) +// +------+--------+------+----------------+ +------+----------------+ +// | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | +// | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +// +------+--------+------+----------------+ +------+----------------+ +//+// +//
+// LengthFieldOffset = 1 +// LengthFieldLength = 2 +// LengthAdjustment = -3 (= the length of HDR1 + LEN, negative) +// InitialBytesToStrip = 3 +// +// BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) +// +------+--------+------+----------------+ +------+----------------+ +// | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | +// | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +// +------+--------+------+----------------+ +------+----------------+ +// https://blog.csdn.net/weixin_45271492/article/details/125347939 + +type EncoderData struct { + lengthField ziface.LengthField + LengthFieldEndOffset int //长度字段结束位置的偏移量 LengthFieldOffset+LengthFieldLength + failFast bool //快速失败 + discardingTooLongFrame bool //true 表示开启丢弃模式,false 正常工作模式 + tooLongFrameLength int64 //当某个数据包的长度超过maxLength,则开启丢弃模式,此字段记录需要丢弃的数据长度 + bytesToDiscard int64 //记录还剩余多少字节需要丢弃 + in []byte + lock sync.Mutex +} + +func NewLengthFieldFrameDecoderByLengthField(lengthField ziface.LengthField) ziface.IDecoder { + return &EncoderData{ + lengthField: lengthField, + LengthFieldEndOffset: lengthField.LengthFieldOffset + lengthField.LengthFieldLength, + in: make([]byte, 0), + } +} + +func NewLengthFieldFrameDecoder(maxFrameLength int64, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip int) ziface.IDecoder { + return &EncoderData{ + lengthField: ziface.LengthField{ + MaxFrameLength: maxFrameLength, + LengthFieldOffset: lengthFieldOffset, + LengthFieldLength: lengthFieldLength, + LengthAdjustment: lengthAdjustment, + InitialBytesToStrip: initialBytesToStrip, + Order: binary.BigEndian, + }, + LengthFieldEndOffset: lengthFieldOffset + lengthFieldLength, + in: make([]byte, 0), + } +} + +func (this *EncoderData) fail(frameLength int64) { + //丢弃完成或未完成都抛异常 + //if frameLength > 0 { + // msg := fmt.Sprintf("Adjusted frame length exceeds %d : %d - discarded", this.MaxFrameLength, frameLength) + // panic(msg) + //} else { + // msg := fmt.Sprintf("Adjusted frame length exceeds %d - discarded", this.MaxFrameLength) + // panic(msg) + //} +} + +func (this *EncoderData) discardingTooLongFrameFunc(buffer *bytes.Buffer) { + //保存还需丢弃多少字节 + bytesToDiscard := this.bytesToDiscard + //获取当前可以丢弃的字节数,有可能出现半包 + localBytesToDiscard := math.Min(float64(bytesToDiscard), float64(buffer.Len())) + //fmt.Println("--->", bytesToDiscard, buffer.Len(), localBytesToDiscard) + localBytesToDiscard = 2 + //丢弃 + buffer.Next(int(localBytesToDiscard)) + //更新还需丢弃的字节数 + bytesToDiscard -= int64(localBytesToDiscard) + this.bytesToDiscard = bytesToDiscard + //是否需要快速失败,回到上面的逻辑 + this.failIfNecessary(false) +} + +func (this *EncoderData) getUnadjustedFrameLength(buf *bytes.Buffer, offset int, length int, order binary.ByteOrder) int64 { + //长度字段的值 + var frameLength int64 + arr := buf.Bytes() + arr = arr[offset : offset+length] + buffer := bytes.NewBuffer(arr) + switch length { + case 1: + //byte + var value byte + binary.Read(buffer, order, &value) + frameLength = int64(value) + case 2: + //short + var value int16 + binary.Read(buffer, order, &value) + frameLength = int64(value) + case 3: + //int占32位,这里取出后24位,返回int类型 + if order == binary.LittleEndian { + n := int(uint(arr[0]) | uint(arr[1])<<8 | uint(arr[2])<<16) + frameLength = int64(n) + } else { + n := int(uint(arr[2]) | uint(arr[1])<<8 | uint(arr[0])<<16) + frameLength = int64(n) + } + case 4: + //int + var value int32 + binary.Read(buffer, order, &value) + frameLength = int64(value) + case 8: + //long + binary.Read(buffer, order, &frameLength) + default: + panic(fmt.Sprintf("unsupported LengthFieldLength: %d (expected: 1, 2, 3, 4, or 8)", this.lengthField.LengthFieldLength)) + } + return frameLength +} + +func (this *EncoderData) failOnNegativeLengthField(in *bytes.Buffer, frameLength int64, lengthFieldEndOffset int) { + in.Next(lengthFieldEndOffset) + panic(fmt.Sprintf("negative pre-adjustment length field: %d", frameLength)) +} + +func (this *EncoderData) failIfNecessary(firstDetectionOfTooLongFrame bool) { + if this.bytesToDiscard == 0 { + //说明需要丢弃的数据已经丢弃完成 + //保存一下被丢弃的数据包长度 + tooLongFrameLength := this.tooLongFrameLength + this.tooLongFrameLength = 0 + //关闭丢弃模式 + this.discardingTooLongFrame = false + //failFast:默认true + //firstDetectionOfTooLongFrame:传入true + if !this.failFast || firstDetectionOfTooLongFrame { + //快速失败 + this.fail(tooLongFrameLength) + } + } else { + //说明还未丢弃完成 + if this.failFast && firstDetectionOfTooLongFrame { + //快速失败 + this.fail(this.tooLongFrameLength) + } + } +} + +// frameLength:数据包的长度 +func (this *EncoderData) exceededFrameLength(in *bytes.Buffer, frameLength int64) { + //数据包长度-可读的字节数 两种情况 + //1. 数据包总长度为100,可读的字节数为50,说明还剩余50个字节需要丢弃但还未接收到 + //2. 数据包总长度为100,可读的字节数为150,说明缓冲区已经包含了整个数据包 + discard := frameLength - int64(in.Len()) + //记录一下最大的数据包的长度 + this.tooLongFrameLength = frameLength + if discard < 0 { + //说明是第二种情况,直接丢弃当前数据包 + in.Next(int(frameLength)) + } else { + //说明是第一种情况,还有部分数据未接收到 + //开启丢弃模式 + this.discardingTooLongFrame = true + //记录下次还需丢弃多少字节 + this.bytesToDiscard = discard + //丢弃缓冲区所有数据 + in.Next(in.Len()) + } + //跟进去 + this.failIfNecessary(true) +} + +func (this *EncoderData) failOnFrameLengthLessThanInitialBytesToStrip(in *bytes.Buffer, frameLength int64, initialBytesToStrip int) { + in.Next(int(frameLength)) + panic(fmt.Sprintf("Adjusted frame length (%d) is less than InitialBytesToStrip: %d", frameLength, initialBytesToStrip)) +} + +// https://blog.csdn.net/qq_39280718/article/details/125762004 +func (this *EncoderData) decode(buf []byte) []byte { + in := bytes.NewBuffer(buf) + //丢弃模式 + if this.discardingTooLongFrame { + this.discardingTooLongFrameFunc(in) + } + ////判断缓冲区中可读的字节数是否小于长度字段的偏移量 + if in.Len() < this.LengthFieldEndOffset { + //说明长度字段的包都还不完整,半包 + return nil + } + //执行到这,说明可以解析出长度字段的值了 + + //计算出长度字段的开始偏移量 + actualLengthFieldOffset := this.lengthField.LengthFieldOffset + //获取长度字段的值,不包括lengthAdjustment的调整值 + frameLength := this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthField.LengthFieldLength, this.lengthField.Order) + //如果数据帧长度小于0,说明是个错误的数据包 + if frameLength < 0 { + //内部会跳过这个数据包的字节数,并抛异常 + this.failOnNegativeLengthField(in, frameLength, this.LengthFieldEndOffset) + } + + //套用前面的公式:长度字段后的数据字节数=长度字段的值+lengthAdjustment + //frameLength就是长度字段的值,加上lengthAdjustment等于长度字段后的数据字节数 + //lengthFieldEndOffset为lengthFieldOffset+lengthFieldLength + //那说明最后计算出的framLength就是整个数据包的长度 + frameLength += int64(this.lengthField.LengthAdjustment) + int64(this.LengthFieldEndOffset) + //丢弃模式就是在这开启的 + //如果数据包长度大于最大长度 + if frameLength > int64(this.lengthField.MaxFrameLength) { + //对超过的部分进行处理 + this.exceededFrameLength(in, frameLength) + return nil + } + + //执行到这说明是正常模式 + //数据包的大小 + frameLengthInt := int(frameLength) + //判断缓冲区可读字节数是否小于数据包的字节数 + if in.Len() < frameLengthInt { + //半包,等会再来解析 + return nil + } + + //执行到这说明缓冲区的数据已经包含了数据包 + + //跳过的字节数是否大于数据包长度 + if this.lengthField.InitialBytesToStrip > frameLengthInt { + this.failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, this.lengthField.InitialBytesToStrip) + } + //跳过initialBytesToStrip个字节 + in.Next(this.lengthField.InitialBytesToStrip) + //解码 + //获取跳过后的真实数据长度 + actualFrameLength := frameLengthInt - this.lengthField.InitialBytesToStrip + //提取真实的数据 + buff := make([]byte, actualFrameLength) + in.Read(buff) + //bytes.NewBuffer([]byte{}) + //_in := bytes.NewBuffer(buff) + return buff +} + +func (this *EncoderData) Decode(buff []byte) [][]byte { + this.lock.Lock() + defer this.lock.Unlock() + this.in = append(this.in, buff...) + resp := make([][]byte, 0) + for { + arr := this.decode(this.in) + if arr != nil { + //证明已经解析出一个完整包 + resp = append(resp, arr) + _size := len(arr) + //_len := len(this.in) + //fmt.Println(_len) + if _size > 0 { + this.in = this.in[_size:] + } + } else { + return resp + } + } + return nil + +} diff --git a/zcode/lengthfieldframeinterceptor.go b/zcode/lengthfieldframeinterceptor.go new file mode 100644 index 00000000..0ff0510c --- /dev/null +++ b/zcode/lengthfieldframeinterceptor.go @@ -0,0 +1,54 @@ +/** + * @author uuxia + * @date 15:58 2023/3/10 + * @description 通过拦截,处理数据,任务向下传递 + **/ + +package zcode + +import ( + "github.com/aceld/zinx/ziface" +) + +type LengthFieldFrameInterceptor struct { + decoder ziface.IDecoder +} + +func NewLengthFieldFrameInterceptor(maxFrameLength int64, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip int) *LengthFieldFrameInterceptor { + return &LengthFieldFrameInterceptor{ + decoder: NewLengthFieldFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip), + } +} + +func (this *LengthFieldFrameInterceptor) Intercept(chain ziface.Chain) ziface.Response { + request := chain.Request() + if request != nil { + switch request.(type) { + case ziface.IRequest: + iRequest := request.(ziface.IRequest) + iMessage := iRequest.GetMessage() + if iMessage != nil { + data := iMessage.GetData() + if this.decoder != nil { + bytebuffers := this.decoder.Decode(data) + size := len(bytebuffers) + if size == 0 { //半包,或者其他情况,任务就不要往下再传递了 + return nil + } + for i := 0; i < size; i++ { + buffer := bytebuffers[i] + if buffer != nil { + bufferSize := len(buffer) + iMessage.SetData(buffer) + iMessage.SetDataLen(uint32(bufferSize)) + if i < size-1 { + chain.Proceed(chain.Request()) + } + } + } + } + } + } + } + return chain.Proceed(chain.Request()) +} diff --git a/zcode/realinterceptorchain.go b/zcode/realinterceptorchain.go new file mode 100644 index 00000000..d3ed9fe5 --- /dev/null +++ b/zcode/realinterceptorchain.go @@ -0,0 +1,37 @@ +/** + * @author uuxia + * @date 15:56 2023/3/10 + * @description 责任链模式 + **/ + +package zcode + +import "github.com/aceld/zinx/ziface" + +type RealInterceptorChain struct { + request ziface.Request + position int + interceptors []ziface.Interceptor +} + +func (this *RealInterceptorChain) Request() ziface.Request { + return this.request +} + +func (this *RealInterceptorChain) Proceed(request ziface.Request) ziface.Response { + if this.position < len(this.interceptors) { + chain := NewRealInterceptorChain(this.interceptors, this.position+1, request) + interceptor := this.interceptors[this.position] + response := interceptor.Intercept(chain) + return response + } + return request +} + +func NewRealInterceptorChain(list []ziface.Interceptor, pos int, request ziface.Request) ziface.Chain { + return &RealInterceptorChain{ + request: request, + position: pos, + interceptors: list, + } +} diff --git a/ziface/iDecoder.go b/ziface/iDecoder.go new file mode 100644 index 00000000..78b059d9 --- /dev/null +++ b/ziface/iDecoder.go @@ -0,0 +1,20 @@ +package ziface + +import "encoding/binary" + +type IDecoder interface { + Decode(buff []byte) [][]byte +} + +type LengthField struct { + //大小端排序 + //大端模式:是指数据的高字节保存在内存的低地址中,而数据的低字节保存在内存的高地址中,地址由小向大增加,而数据从高位往低位放; + //小端模式:是指数据的高字节保存在内存的高地址中,而数据的低字节保存在内存的低地址中,高地址部分权值高,低地址部分权值低,和我们的日常逻辑方法一致。 + //不了解的自行查阅一下资料 + Order binary.ByteOrder + MaxFrameLength int64 //最大帧长度 + LengthFieldOffset int //长度字段偏移量 + LengthFieldLength int //长度域字段的字节数 + LengthAdjustment int //长度调整 + InitialBytesToStrip int //需要跳过的字节数 +} diff --git a/ziface/iclient.go b/ziface/iclient.go index 38a9fac7..13ab5438 100644 --- a/ziface/iclient.go +++ b/ziface/iclient.go @@ -1,12 +1,13 @@ // Package ziface 主要提供zinx全部抽象层接口定义. // 包括: -// IServer 服务mod接口 -// IRouter 路由mod接口 -// IConnection 连接mod层接口 -// IMessage 消息mod接口 -// IDataPack 消息拆解接口 -// IMsgHandler 消息处理及协程池接口 -// IClient 客户端接口 +// +// IServer 服务mod接口 +// IRouter 路由mod接口 +// IConnection 连接mod层接口 +// IMessage 消息mod接口 +// IDataPack 消息拆解接口 +// IMsgHandler 消息处理及协程池接口 +// IClient 客户端接口 // // 当前文件描述: // @Title iclient.go @@ -30,4 +31,6 @@ type IClient interface { GetMsgHandler() IMsgHandle //获取Client绑定的消息处理模块 StartHeartBeat(time.Duration) //启动心跳检测 StartHeartBeatWithOption(time.Duration, *HeartBeatOption) //启动心跳检测(自定义回调) + AddInterceptor(interceptor Interceptor) //添加协议解析拦截器 + GetLengthField() LengthField } diff --git a/ziface/iconnection.go b/ziface/iconnection.go index 88592963..b69dc948 100644 --- a/ziface/iconnection.go +++ b/ziface/iconnection.go @@ -1,11 +1,12 @@ // Package ziface 主要提供zinx全部抽象层接口定义. // 包括: -// IServer 服务mod接口 -// IRouter 路由mod接口 -// IConnection 连接mod层接口 -// IMessage 消息mod接口 -// IDataPack 消息拆解接口 -// IMsgHandler 消息处理及协程池接口 +// +// IServer 服务mod接口 +// IRouter 路由mod接口 +// IConnection 连接mod层接口 +// IMessage 消息mod接口 +// IDataPack 消息拆解接口 +// IMsgHandler 消息处理及协程池接口 // // 当前文件描述: // @Title iconnection.go @@ -18,7 +19,7 @@ import ( "net" ) -//定义连接接口 +// 定义连接接口 type IConnection interface { Start() //启动连接,让当前连接开始工作 Stop() //停止连接,结束当前连接状态 @@ -29,6 +30,8 @@ type IConnection interface { RemoteAddr() net.Addr //获取链接远程地址信息 LocalAddr() net.Addr //获取链接本地地址信息 + Send(data []byte) error + SendToQueue(data []byte) error SendMsg(msgID uint32, data []byte) error //直接将Message数据发送数据给远程的TCP客户端(无缓冲) SendBuffMsg(msgID uint32, data []byte) error //直接将Message数据发送给远程的TCP客户端(有缓冲) @@ -38,10 +41,10 @@ type IConnection interface { IsAlive() bool //判断当前连接是否存活 } -//用户自定义的心跳检测消息处理方法 +// 用户自定义的心跳检测消息处理方法 type HeartBeatMsgFunc func(IConnection) []byte -//用户自定义的远程连接不存活时的处理方法 +// 用户自定义的远程连接不存活时的处理方法 type OnRemoteNotAlive func(IConnection) type HeartBeatOption struct { diff --git a/ziface/imsghandler.go b/ziface/imsghandler.go index f9a30406..887d15be 100644 --- a/ziface/imsghandler.go +++ b/ziface/imsghandler.go @@ -1,11 +1,12 @@ // Package ziface 主要提供zinx全部抽象层接口定义. // 包括: -// IServer 服务mod接口 -// IRouter 路由mod接口 -// IConnection 连接mod层接口 -// IMessage 消息mod接口 -// IDataPack 消息拆解接口 -// IMsgHandler 消息处理及协程池接口 +// +// IServer 服务mod接口 +// IRouter 路由mod接口 +// IConnection 连接mod层接口 +// IMessage 消息mod接口 +// IDataPack 消息拆解接口 +// IMsgHandler 消息处理及协程池接口 // // 当前文件描述: // @Title imsghandler.go @@ -14,7 +15,7 @@ package ziface /* - 消息管理抽象层 +消息管理抽象层 */ type IMsgHandle interface { DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息 @@ -22,4 +23,7 @@ type IMsgHandle interface { AddRouter(msgID uint32, router IRouter) StartWorkerPool() //启动worker工作池 SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理 + + Decode(request IRequest) // + AddInterceptor(interceptor Interceptor) //注册责任链任务入口,每个拦截器处理完后,数据都会传递至下一个拦截器,使得消息可以层层处理层层传递,顺序取决于注册顺序 } diff --git a/ziface/interceptor.go b/ziface/interceptor.go new file mode 100644 index 00000000..d9c715ea --- /dev/null +++ b/ziface/interceptor.go @@ -0,0 +1,28 @@ +/** + * @author uuxia + * @date 15:54 2023/3/10 + * @description //TODO + **/ + +package ziface + +// 请求父类,定义空接口,用于扩展支持任意类型 + +type Request interface { +} + +// 回复父类,定义空接口,用于扩展支持任意类型 + +type Response interface { +} +type Interceptor interface { + Intercept(Chain) Response +} +type Chain interface { + Request() Request + Proceed(Request) Response +} +type InterceptorBuilder interface { + AddInterceptor(interceptor Interceptor) + Execute(request Request) Response +} diff --git a/ziface/irequest.go b/ziface/irequest.go index b231db46..1b819874 100644 --- a/ziface/irequest.go +++ b/ziface/irequest.go @@ -1,11 +1,12 @@ // Package ziface 主要提供zinx全部抽象层接口定义. // 包括: -// IServer 服务mod接口 -// IRouter 路由mod接口 -// IConnection 连接mod层接口 -// IMessage 消息mod接口 -// IDataPack 消息拆解接口 -// IMsgHandler 消息处理及协程池接口 +// +// IServer 服务mod接口 +// IRouter 路由mod接口 +// IConnection 连接mod层接口 +// IMessage 消息mod接口 +// IDataPack 消息拆解接口 +// IMsgHandler 消息处理及协程池接口 // // 当前文件描述: // @Title irequest.go @@ -16,14 +17,19 @@ package ziface type HandleStep int /* - IRequest 接口: - 实际上是把客户端请求的链接信息 和 请求的数据 包装到了 Request里 +IRequest 接口: +实际上是把客户端请求的链接信息 和 请求的数据 包装到了 Request里 */ type IRequest interface { GetConnection() IConnection //获取请求连接信息 GetData() []byte //获取请求消息的数据 GetMsgID() uint32 //获取请求的消息ID + GetMessage() IMessage //获取请求消息的原始数据 add by uuxia 2023-03-10 + + GetResponse() Response //获取解析完后序列化数据 + SetResponse(Response) //设置解析完后序列化数据 + BindRouter(router IRouter) //绑定这次请求由哪个路由处理 Call() //转进到下一个处理器开始执行 但是调用此方法的函数会根据先后顺序逆序执行 Abort() //终止处理函数的运行 但调用此方法的函数会执行完毕 diff --git a/ziface/iserver.go b/ziface/iserver.go index ac81401e..2c53fff0 100644 --- a/ziface/iserver.go +++ b/ziface/iserver.go @@ -1,11 +1,12 @@ // Package ziface 主要提供zinx全部抽象层接口定义. // 包括: -// IServer 服务mod接口 -// IRouter 路由mod接口 -// IConnection 连接mod层接口 -// IMessage 消息mod接口 -// IDataPack 消息拆解接口 -// IMsgHandler 消息处理及协程池接口 +// +// IServer 服务mod接口 +// IRouter 路由mod接口 +// IConnection 连接mod层接口 +// IMessage 消息mod接口 +// IDataPack 消息拆解接口 +// IMsgHandler 消息处理及协程池接口 // // 当前文件描述: // @Title iserver.go @@ -13,9 +14,11 @@ // @Author Aceld - Thu Mar 11 10:32:29 CST 2019 package ziface -import "time" +import ( + "time" +) -//定义服务接口 +// 定义服务接口 type IServer interface { Start() //启动服务器方法 Stop() //停止服务器方法 @@ -31,4 +34,6 @@ type IServer interface { SetPacket(IDataPack) //设置Server绑定的数据协议封包方式 StartHeartBeat(time.Duration) //启动心跳检测 StartHeartBeatWithOption(time.Duration, *HeartBeatOption) //启动心跳检测(自定义回调) + AddInterceptor(interceptor Interceptor) //添加协议解析拦截器 + GetLengthField() LengthField } diff --git a/znet/client.go b/znet/client.go index b1fd0799..1dbbd645 100644 --- a/znet/client.go +++ b/znet/client.go @@ -5,6 +5,7 @@ import ( "github.com/aceld/zinx/ziface" "github.com/aceld/zinx/zlog" "github.com/aceld/zinx/zpack" + "math" "net" "time" ) @@ -26,6 +27,8 @@ type Client struct { exitChan chan struct{} //消息管理模块 msgHandler ziface.IMsgHandle + //断粘包解码器 + LengthField ziface.LengthField } func NewClient(ip string, port int, opts ...ClientOption) ziface.IClient { @@ -35,6 +38,28 @@ func NewClient(ip string, port int, opts ...ClientOption) ziface.IClient { Port: port, msgHandler: NewMsgHandle(), packet: zpack.Factory().NewPack(ziface.ZinxDataPack), //默认使用zinx的TLV封包方式 + // +---------------+---------------+---------------+ + // | Tag | Length | Value | + // | uint32(4byte) | uint32(4byte) | n byte | + // +---------------+---------------+---------------+ + // Tag: uint32类型,占4字节 + // Length:uint32类型,占4字节,Length标记Value长度 + // Value: 占n字节 + // + //说明: + // lengthFieldOffset = 4 (Length的字节位索引下标是4) 长度字段的偏差 + // lengthFieldLength = 4 (Length是4个byte) 长度字段占的字节数 + // lengthAdjustment = 0 (Length只表示Value长度,程序只会读取Length个字节就结束,后面没有来,故为0,若Value后面还有crc占2字节的话,那么此处就是2。若Length标记的是Tag+Length+Value总长度,那么此处是-8) + // initialBytesToStrip = 0 (这个0表示返回完整的协议内容Tag+Length+Value,如果只想返回Value内容,去掉Tag的4字节和Length的4字节,此处就是8) 从解码帧中第一次去除的字节数 + // maxFrameLength = 2^32 + 4 + 4 (Length为uint32类型,故2^32次方表示Value最大长度,此外Tag和Length各占4字节) + //默认使用TLV封包方式 + LengthField: ziface.LengthField{ + MaxFrameLength: math.MaxUint32 + 4 + 4, + LengthFieldOffset: 4, + LengthFieldLength: 4, + LengthAdjustment: 0, + InitialBytesToStrip: 0, + }, } //应用Option设置 @@ -45,7 +70,15 @@ func NewClient(ip string, port int, opts ...ClientOption) ziface.IClient { return c } -//启动客户端,发送请求且建立链接 +func (this *Client) AddInterceptor(interceptor ziface.Interceptor) { + this.msgHandler.AddInterceptor(interceptor) +} + +func (this *Client) GetLengthField() ziface.LengthField { + return this.LengthField +} + +// 启动客户端,发送请求且建立链接 func (c *Client) Start() { c.exitChan = make(chan struct{}) @@ -82,7 +115,7 @@ func (c *Client) Start() { }() } -//启动心跳检测 +// 启动心跳检测 func (c *Client) StartHeartBeat(interval time.Duration) { checker := NewHeartbeatCheckerC(interval, c) @@ -92,7 +125,7 @@ func (c *Client) StartHeartBeat(interval time.Duration) { go checker.Start() } -//启动心跳检测(自定义回调) +// 启动心跳检测(自定义回调) func (c *Client) StartHeartBeatWithOption(interval time.Duration, option *ziface.HeartBeatOption) { checker := NewHeartbeatCheckerC(interval, c) @@ -121,32 +154,32 @@ func (c *Client) Conn() ziface.IConnection { return c.conn } -//设置该Client的连接创建时Hook函数 +// 设置该Client的连接创建时Hook函数 func (c *Client) SetOnConnStart(hookFunc func(ziface.IConnection)) { c.onConnStart = hookFunc } -//设置该Client的连接断开时的Hook函数 +// 设置该Client的连接断开时的Hook函数 func (c *Client) SetOnConnStop(hookFunc func(ziface.IConnection)) { c.onConnStop = hookFunc } -//GetOnConnStart 得到该Server的连接创建时Hook函数 +// GetOnConnStart 得到该Server的连接创建时Hook函数 func (c *Client) GetOnConnStart() func(ziface.IConnection) { return c.onConnStart } -//得到该Server的连接断开时的Hook函数 +// 得到该Server的连接断开时的Hook函数 func (c *Client) GetOnConnStop() func(ziface.IConnection) { return c.onConnStop } -//获取Client绑定的数据协议封包方式 +// 获取Client绑定的数据协议封包方式 func (c *Client) GetPacket() ziface.IDataPack { return c.packet } -//设置Client绑定的数据协议封包方式 +// 设置Client绑定的数据协议封包方式 func (c *Client) SetPacket(packet ziface.IDataPack) { c.packet = packet } diff --git a/znet/connection.go b/znet/connection.go index 41fb3baa..d7034b53 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -1,14 +1,15 @@ /* - 服务端Server的链接模块 +服务端Server的链接模块 */ package znet import ( "context" + "encoding/hex" "errors" + "github.com/aceld/zinx/zcode" "github.com/aceld/zinx/zlog" "github.com/aceld/zinx/zpack" - "io" "net" "sync" "time" @@ -17,8 +18,8 @@ import ( "github.com/aceld/zinx/ziface" ) -//Connection Tcp连接模块 -//用于处理Tcp连接的读写业务 一个连接对应一个Connection +// Connection Tcp连接模块 +// 用于处理Tcp连接的读写业务 一个连接对应一个Connection type Connection struct { //当前连接的socket TCP套接字 conn net.Conn @@ -49,18 +50,21 @@ type Connection struct { packet ziface.IDataPack //最后一次活动时间 lastActivityTime time.Time + //断粘包解码器 + lengthFieldDecoder ziface.IDecoder } -//newServerConn :for Server, 创建一个Server服务端特性的连接的方法 -//Note: 名字由 NewConnection 更变 +// newServerConn :for Server, 创建一个Server服务端特性的连接的方法 +// Note: 名字由 NewConnection 更变 func newServerConn(server ziface.IServer, conn net.Conn, connID uint32) *Connection { //初始化Conn属性 c := &Connection{ - conn: conn, - connID: connID, - isClosed: false, - msgBuffChan: nil, - property: nil, + conn: conn, + connID: connID, + isClosed: false, + msgBuffChan: nil, + property: nil, + lengthFieldDecoder: zcode.NewLengthFieldFrameDecoderByLengthField(server.GetLengthField()), } //从server继承过来的属性 @@ -77,14 +81,15 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint32) *Connect return c } -//newClientConn :for Client, 创建一个Client服务端特性的连接的方法 +// newClientConn :for Client, 创建一个Client服务端特性的连接的方法 func newClientConn(client ziface.IClient, conn net.Conn) *Connection { c := &Connection{ - conn: conn, - connID: 0, //client ignore - isClosed: false, - msgBuffChan: nil, - property: nil, + conn: conn, + connID: 0, //client ignore + isClosed: false, + msgBuffChan: nil, + property: nil, + lengthFieldDecoder: zcode.NewLengthFieldFrameDecoderByLengthField(client.GetLengthField()), } //从client继承过来的属性 @@ -96,7 +101,7 @@ func newClientConn(client ziface.IClient, conn net.Conn) *Connection { return c } -//StartWriter 写消息Goroutine, 用户将数据发送给客户端 +// StartWriter 写消息Goroutine, 用户将数据发送给客户端 func (c *Connection) StartWriter() { zlog.Ins().InfoF("Writer Goroutine is running") defer zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String()) @@ -123,7 +128,7 @@ func (c *Connection) StartWriter() { } } -//StartReader 读消息Goroutine,用于从客户端中读取数据 +// StartReader 读消息Goroutine,用于从客户端中读取数据 func (c *Connection) StartReader() { zlog.Ins().InfoF("[Reader Goroutine is running]") defer zlog.Ins().InfoF("%s [conn Reader exit!]", c.RemoteAddr().String()) @@ -137,7 +142,7 @@ func (c *Connection) StartReader() { default: //读取客户端的Msg head - headData := make([]byte, c.packet.GetHeadLen()) + /*headData := make([]byte, c.packet.GetHeadLen()) if _, err := io.ReadFull(c.conn, headData); err != nil { zlog.Ins().ErrorF("read msg head error %s", err) return @@ -175,12 +180,35 @@ func (c *Connection) StartReader() { } else { //从绑定好的消息和对应的处理方法中执行对应的Handle方法 go c.msgHandler.DoMsgHandler(req) + }*/ + + //add by uuxia 2023-02-03 + buffer := make([]byte, 1024) + n, err := c.conn.Read(buffer[:]) + if err != nil { + zlog.Ins().ErrorF("read msg head error%d %s", n, err) + return } + zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n])) + + if c.lengthFieldDecoder != nil { + bufArrays := c.lengthFieldDecoder.Decode(buffer[0:n]) + if bufArrays != nil { + for _, bytes := range bufArrays { + zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(bytes)) + msg := &zpack.Message{DataLen: uint32(len(bytes)), Data: bytes} + //得到当前客户端请求的Request数据 + req := NewRequest(c, msg) + c.msgHandler.Decode(req) + } + } + } + } } } -//Start 启动连接,让当前连接开始工作 +// Start 启动连接,让当前连接开始工作 func (c *Connection) Start() { c.ctx, c.cancel = context.WithCancel(context.Background()) //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法 @@ -195,7 +223,7 @@ func (c *Connection) Start() { } } -//Stop 停止连接,结束当前连接状态M +// Stop 停止连接,结束当前连接状态M func (c *Connection) Stop() { c.cancel() } @@ -204,22 +232,74 @@ func (c *Connection) GetConnection() net.Conn { return c.conn } -//GetConnID 获取当前连接ID +// GetConnID 获取当前连接ID func (c *Connection) GetConnID() uint32 { return c.connID } -//RemoteAddr 获取链接远程地址信息 +// RemoteAddr 获取链接远程地址信息 func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } -//LocalAddr 获取链接本地地址信息 +// LocalAddr 获取链接本地地址信息 func (c *Connection) LocalAddr() net.Addr { return c.conn.LocalAddr() } -//SendMsg 直接将Message数据发送数据给远程的TCP客户端 +func (c *Connection) Send(data []byte) error { + c.msgLock.RLock() + defer c.msgLock.RUnlock() + if c.isClosed == true { + return errors.New("connection closed when send msg") + } + + //写回客户端 + _, err := c.conn.Write(data) + if err != nil { + zlog.Ins().ErrorF("SendMsg err data = %+v, err = %+v", data, err) + return err + } + + //写对端成功, 更新链接活动时间 + c.updateActivity() + + return nil +} + +func (c *Connection) SendToQueue(data []byte) error { + c.msgLock.RLock() + defer c.msgLock.RUnlock() + + if c.msgBuffChan == nil { + c.msgBuffChan = make(chan []byte, utils.GlobalObject.MaxMsgChanLen) + //开启用于写回客户端数据流程的Goroutine + //此方法只读取MsgBuffChan中的数据没调用SendBuffMsg可以分配内存和启用协程 + go c.StartWriter() + } + + idleTimeout := time.NewTimer(5 * time.Millisecond) + defer idleTimeout.Stop() + + if c.isClosed == true { + return errors.New("Connection closed when send buff msg") + } + + if data == nil { + zlog.Ins().ErrorF("Pack data is nil") + return errors.New("Pack data is nil") + } + + // 发送超时 + select { + case <-idleTimeout.C: + return errors.New("send buff msg timeout") + case c.msgBuffChan <- data: + return nil + } +} + +// SendMsg 直接将Message数据发送数据给远程的TCP客户端 func (c *Connection) SendMsg(msgID uint32, data []byte) error { c.msgLock.RLock() defer c.msgLock.RUnlock() @@ -247,7 +327,7 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error { return nil } -//SendBuffMsg 发生BuffMsg +// SendBuffMsg 发生BuffMsg func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { c.msgLock.RLock() defer c.msgLock.RUnlock() @@ -282,7 +362,7 @@ func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { } } -//SetProperty 设置链接属性 +// SetProperty 设置链接属性 func (c *Connection) SetProperty(key string, value interface{}) { c.propertyLock.Lock() defer c.propertyLock.Unlock() @@ -293,7 +373,7 @@ func (c *Connection) SetProperty(key string, value interface{}) { c.property[key] = value } -//GetProperty 获取链接属性 +// GetProperty 获取链接属性 func (c *Connection) GetProperty(key string) (interface{}, error) { c.propertyLock.Lock() defer c.propertyLock.Unlock() @@ -305,7 +385,7 @@ func (c *Connection) GetProperty(key string) (interface{}, error) { return nil, errors.New("no property found") } -//RemoveProperty 移除链接属性 +// RemoveProperty 移除链接属性 func (c *Connection) RemoveProperty(key string) { c.propertyLock.Lock() defer c.propertyLock.Unlock() @@ -313,7 +393,7 @@ func (c *Connection) RemoveProperty(key string) { delete(c.property, key) } -//返回ctx,用于用户自定义的go程获取连接退出状态 +// 返回ctx,用于用户自定义的go程获取连接退出状态 func (c *Connection) Context() context.Context { return c.ctx } @@ -348,7 +428,7 @@ func (c *Connection) finalizer() { zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID) } -//callOnConnStart 调用连接OnConnStart Hook函数 +// callOnConnStart 调用连接OnConnStart Hook函数 func (c *Connection) callOnConnStart() { if c.onConnStart != nil { zlog.Ins().InfoF("ZINX CallOnConnStart....") @@ -356,7 +436,7 @@ func (c *Connection) callOnConnStart() { } } -//callOnConnStart 调用连接OnConnStop Hook函数 +// callOnConnStart 调用连接OnConnStop Hook函数 func (c *Connection) callOnConnStop() { if c.onConnStop != nil { zlog.Ins().InfoF("ZINX CallOnConnStop....") diff --git a/znet/msghandler.go b/znet/msghandler.go index 8b9fa9a6..3b7b9dc3 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -1,8 +1,10 @@ package znet import ( + "encoding/hex" "fmt" "github.com/aceld/zinx/utils" + "github.com/aceld/zinx/zcode" "github.com/aceld/zinx/ziface" "github.com/aceld/zinx/zlog" ) @@ -12,19 +14,45 @@ type MsgHandle struct { Apis map[uint32]ziface.IRouter //存放每个MsgID 所对应的处理方法的map属性 WorkerPoolSize uint32 //业务工作Worker池的数量 TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列 + builder ziface.InterceptorBuilder //责任链构造器 } -//NewMsgHandle 创建MsgHandle +// NewMsgHandle 创建MsgHandle func NewMsgHandle() *MsgHandle { return &MsgHandle{ Apis: make(map[uint32]ziface.IRouter), WorkerPoolSize: utils.GlobalObject.WorkerPoolSize, //一个worker对应一个queue TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize), + builder: zcode.NewInterceptorBuilder(), } } -//SendMsgToTaskQueue 将消息交给TaskQueue,由worker进行处理 +func (this *MsgHandle) Intercept(chain ziface.Chain) ziface.Response { + request := chain.Request() + if request != nil { + switch request.(type) { + case ziface.IRequest: + iRequest := request.(ziface.IRequest) + if utils.GlobalObject.WorkerPoolSize > 0 { + //已经启动工作池机制,将消息交给Worker处理 + this.SendMsgToTaskQueue(iRequest) + } else { + //从绑定好的消息和对应的处理方法中执行对应的Handle方法 + go this.DoMsgHandler(iRequest) + } + } + } + return chain.Proceed(chain.Request()) +} + +func (this *MsgHandle) AddInterceptor(interceptor ziface.Interceptor) { + if this.builder != nil { + this.builder.AddInterceptor(interceptor) + } +} + +// SendMsgToTaskQueue 将消息交给TaskQueue,由worker进行处理 func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) { //根据ConnID来分配当前的连接应该由哪个worker负责处理 //轮询的平均分配法则 @@ -34,22 +62,28 @@ func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) { //zlog.Ins().DebugF("Add ConnID=%d request msgID=%d to workerID=%d", request.GetConnection().GetConnID(), request.GetMsgID(), workerID) //将请求消息发送给任务队列 mh.TaskQueue[workerID] <- request + zlog.Ins().ErrorF("SendMsgToTaskQueue-->%s", hex.EncodeToString(request.GetData())) } -//DoMsgHandler 马上以非阻塞方式处理消息 +// DoMsgHandler 马上以非阻塞方式处理消息 func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { handler, ok := mh.Apis[request.GetMsgID()] if !ok { zlog.Ins().ErrorF("api msgID = %d is not FOUND!", request.GetMsgID()) return } + //Request请求绑定Router对应关系 request.BindRouter(handler) //执行对应处理方法 request.Call() } -//AddRouter 为消息添加具体的处理逻辑 +func (mh *MsgHandle) Decode(request ziface.IRequest) { + mh.builder.Execute(request) //将消息丢到责任链,通过责任链里拦截器层层处理层层传递 +} + +// AddRouter 为消息添加具体的处理逻辑 func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) { //1 判断当前msg绑定的API处理方法是否已经存在 if _, ok := mh.Apis[msgID]; ok { @@ -61,7 +95,7 @@ func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) { zlog.Ins().InfoF("Add api msgID = %d", msgID) } -//StartOneWorker 启动一个Worker工作流程 +// StartOneWorker 启动一个Worker工作流程 func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) { zlog.Ins().InfoF("Worker ID = %d is started.", workerID) //不断的等待队列中的消息 @@ -74,8 +108,10 @@ func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest } } -//StartWorkerPool 启动worker工作池 +// StartWorkerPool 启动worker工作池 func (mh *MsgHandle) StartWorkerPool() { + //此处必须把 msghandler 添加到责任链中,并且是责任链最后一环,在msghandler中进行解码后由router做数据分发 + mh.AddInterceptor(mh) //遍历需要启动worker的数量,依此启动 for i := 0; i < int(mh.WorkerPoolSize); i++ { //一个worker被启动 diff --git a/znet/request.go b/znet/request.go index 79f7a22f..8095b2c2 100644 --- a/znet/request.go +++ b/znet/request.go @@ -13,8 +13,9 @@ const ( HANDLE_OVER ) -//Request 请求 +// Request 请求 type Request struct { + response ziface.Response conn ziface.IConnection //已经和客户端建立好的 链接 msg ziface.IMessage //客户端请求的数据 router ziface.IRouter //请求处理的函数 @@ -23,6 +24,14 @@ type Request struct { needNext bool } +func (r *Request) GetResponse() ziface.Response { + return r.response +} + +func (r *Request) SetResponse(response ziface.Response) { + r.response = response +} + func NewRequest(conn ziface.IConnection, msg ziface.IMessage) *Request { req := new(Request) req.steps = PRE_HANDLE @@ -34,17 +43,22 @@ func NewRequest(conn ziface.IConnection, msg ziface.IMessage) *Request { return req } -//GetConnection 获取请求连接信息 +// GetMessage 获取消息实体 +func (r *Request) GetMessage() ziface.IMessage { + return r.msg +} + +// GetConnection 获取请求连接信息 func (r *Request) GetConnection() ziface.IConnection { return r.conn } -//GetData 获取请求消息的数据 +// GetData 获取请求消息的数据 func (r *Request) GetData() []byte { return r.msg.GetData() } -//GetMsgID 获取请求的消息的ID +// GetMsgID 获取请求的消息的ID func (r *Request) GetMsgID() uint32 { return r.msg.GetMsgID() } @@ -89,6 +103,8 @@ func (r *Request) Call() { r.next() } + + r.steps = PRE_HANDLE } func (r *Request) Abort() { diff --git a/znet/server.go b/znet/server.go index 700a12ab..d986591c 100644 --- a/znet/server.go +++ b/znet/server.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/aceld/zinx/zlog" + "math" "net" "os" "os/signal" @@ -27,7 +28,7 @@ var topLine = `┌──────────────────── var borderLine = `│` var bottomLine = `└──────────────────────────────────────────────────────┘` -//Server 接口实现,定义一个Server服务类 +// Server 接口实现,定义一个Server服务类 type Server struct { //服务器的名称 Name string @@ -49,9 +50,15 @@ type Server struct { packet ziface.IDataPack //异步捕获链接关闭状态 exitChan chan struct{} + //断粘包解码器 + LengthField ziface.LengthField } -//NewServer 创建一个服务器句柄 +func (this *Server) GetLengthField() ziface.LengthField { + return this.LengthField +} + +// NewServer 创建一个服务器句柄 func NewServer(opts ...Option) ziface.IServer { printLogo() @@ -65,6 +72,28 @@ func NewServer(opts ...Option) ziface.IServer { exitChan: nil, //默认使用zinx的TLV封包方式 packet: zpack.Factory().NewPack(ziface.ZinxDataPack), + // +---------------+---------------+---------------+ + // | Tag | Length | Value | + // | uint32(4byte) | uint32(4byte) | n byte | + // +---------------+---------------+---------------+ + // Tag: uint32类型,占4字节 + // Length:uint32类型,占4字节,Length标记Value长度 + // Value: 占n字节 + // + //说明: + // lengthFieldOffset = 4 (Length的字节位索引下标是4) 长度字段的偏差 + // lengthFieldLength = 4 (Length是4个byte) 长度字段占的字节数 + // lengthAdjustment = 0 (Length只表示Value长度,程序只会读取Length个字节就结束,后面没有来,故为0,若Value后面还有crc占2字节的话,那么此处就是2。若Length标记的是Tag+Length+Value总长度,那么此处是-8) + // initialBytesToStrip = 0 (这个0表示返回完整的协议内容Tag+Length+Value,如果只想返回Value内容,去掉Tag的4字节和Length的4字节,此处就是8) 从解码帧中第一次去除的字节数 + // maxFrameLength = 2^32 + 4 + 4 (Length为uint32类型,故2^32次方表示Value最大长度,此外Tag和Length各占4字节) + //默认使用TLV封包方式 + LengthField: ziface.LengthField{ + MaxFrameLength: math.MaxUint32 + 4 + 4, + LengthFieldOffset: 4, + LengthFieldLength: 4, + LengthAdjustment: 0, + InitialBytesToStrip: 0, + }, } for _, opt := range opts { @@ -77,7 +106,7 @@ func NewServer(opts ...Option) ziface.IServer { return s } -//NewServer 创建一个服务器句柄 +// NewServer 创建一个服务器句柄 func NewUserConfServer(config *utils.Config, opts ...Option) ziface.IServer { //打印logo printLogo() @@ -91,6 +120,14 @@ func NewUserConfServer(config *utils.Config, opts ...Option) ziface.IServer { ConnMgr: NewConnManager(), exitChan: nil, packet: zpack.Factory().NewPack(ziface.ZinxDataPack), + //默认使用TLV封包方式 + LengthField: ziface.LengthField{ + MaxFrameLength: math.MaxUint32 + 4 + 4, + LengthFieldOffset: 4, + LengthFieldLength: 4, + LengthAdjustment: 0, + InitialBytesToStrip: 0, + }, } //更替打包方式 for _, opt := range opts { @@ -107,7 +144,11 @@ func NewUserConfServer(config *utils.Config, opts ...Option) ziface.IServer { //============== 实现 ziface.IServer 里的全部接口方法 ======== -//Start 开启网络服务 +func (this *Server) AddInterceptor(interceptor ziface.Interceptor) { + this.msgHandler.AddInterceptor(interceptor) +} + +// Start 开启网络服务 func (s *Server) Start() { zlog.Ins().InfoF("[START] Server name: %s,listenner at IP: %s, Port %d is starting", s.Name, s.IP, s.Port) s.exitChan = make(chan struct{}) @@ -181,7 +222,7 @@ func (s *Server) Start() { }() } -//Stop 停止服务 +// Stop 停止服务 func (s *Server) Stop() { zlog.Ins().InfoF("[STOP] Zinx server , name %s", s.Name) @@ -191,7 +232,7 @@ func (s *Server) Stop() { close(s.exitChan) } -//Serve 运行服务 +// Serve 运行服务 func (s *Server) Serve() { s.Start() @@ -206,32 +247,32 @@ func (s *Server) Serve() { zlog.Ins().InfoF("[SERVE] Zinx server , name %s, Serve Interrupt, signal = %v", s.Name, sig) } -//AddRouter 路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用 +// AddRouter 路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用 func (s *Server) AddRouter(msgID uint32, router ziface.IRouter) { s.msgHandler.AddRouter(msgID, router) } -//GetConnMgr 得到链接管理 +// GetConnMgr 得到链接管理 func (s *Server) GetConnMgr() ziface.IConnManager { return s.ConnMgr } -//SetOnConnStart 设置该Server的连接创建时Hook函数 +// SetOnConnStart 设置该Server的连接创建时Hook函数 func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) { s.onConnStart = hookFunc } -//SetOnConnStop 设置该Server的连接断开时的Hook函数 +// SetOnConnStop 设置该Server的连接断开时的Hook函数 func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) { s.onConnStop = hookFunc } -//GetOnConnStart 得到该Server的连接创建时Hook函数 +// GetOnConnStart 得到该Server的连接创建时Hook函数 func (s *Server) GetOnConnStart() func(ziface.IConnection) { return s.onConnStart } -//得到该Server的连接断开时的Hook函数 +// 得到该Server的连接断开时的Hook函数 func (s *Server) GetOnConnStop() func(ziface.IConnection) { return s.onConnStop }