数据包结构
- 它的核心思想是按照约定的自定义规则先把数据进行打包,在使用数据的时候再按照规则进行拆包;
- 本例使用长度编码的方式约定通信双方的数据传输方式;
- 首先将被传输的消息分为定长的消息头(以 header 表示)和不定长的消息体(以 body 表示)两个部分;
- 同时再将 header 分为序列号和消息长度两个部分,序列号作为区分不同消息包的编号,消息长度用来确定每次提取的内容长度;
数据传输过程:
- encode:人为进行数据编码, 获取二进制数据包,然后封装成上面的数据包;
- decode:接收端接收到这个数据包后按照规则拆解数据,获取指定长度的数据;
Buffer 数据读写:
- 上面的过程需要使用 NodeJS 的 Buffer,主要是两个读写操作:
- writeInt16BE:将数据从指定位置写入到内存;
- readInt16BE:从内存指定位置开始读取数据;
- PS:还有 32 位的方法,本例使用 16 位就足够了;
示例代码
- 实现
class MyTransform { constructor() { this.packageHeaderLen = 4 // 规定 header 的字节总长度 this.serialNum = 0 // 序列号 this.serialLen = 2 // 消息字节长度 } /** * 编码 * @param {*} data 数据 * @param {*} serialNum 指定序列号 * @returns */ encode(data, serialNum) { // 消息体:将数据转化为二进制 const bodyBuf = Buffer.from(data) // 消息头 // 01 先按照指定的长度申请一片内存空间作为 header 使用 const headerBuf = Buffer.alloc(this.packageHeaderLen) // 02 写入消息头信息 headerBuf.writeInt16BE(serialNum || this.serialNum, 0) headerBuf.writeInt16BE(bodyBuf.length, this.serialLen) // 如果未指定序列号 if (serialNum === undefined) this.serialNum++ return Buffer.concat([headerBuf, bodyBuf]) } // 解码 decode(buffer) { const headerBuf = buffer.slice(0, this.packageHeaderLen) const bodyBuf = buffer.slice(this.packageHeaderLen) return { serialNum: headerBuf.readInt16BE(), // readInt16BE 内部会自动读取有效的值 bodyLength: headerBuf.readInt16BE(this.serialLen), body: bodyBuf.toString() } } // 获取包的长度 getPackageLen(buffer) { if (buffer.length < this.packageHeaderLen) { // 数据不完整,还不应该取数据 return 0 } else { return this.packageHeaderLen + buffer.readInt16BE(this.serialLen) } } } module.exports = MyTransform
- 测试
const MyTransformCode = require('./myTransform') const ts = new MyTransformCode() const str1 = '你好张三' const encodeBuf = ts.encode(str1, 1) // 前两个字节是序列号,后两个字节是消息体长度,后面是消息体数据 console.log(encodeBuf) // <Buffer 00 01 00 0c e4 bd a0 e5 a5 bd e5 bc a0 e4 b8 89> console.log(Buffer.from(str1)) // <Buffer e4 bd a0 e5 a5 bd e5 bc a0 e4 b8 89> const decodeBuf = ts.decode(encodeBuf) console.log(decodeBuf) // { serialNum: 1, bodyLength: 12, body: '你好张三' } const len = ts.getPackageLen(encodeBuf) console.log(len) // 16
解决粘包
- server.js
const net = require('net') const MyTransform = require('./myTransform') const server = net.createServer() const ts = new MyTransform() // 存储未处理的不完整的数据包 let overageBuffer = null server.listen('1234', 'localhost') server.on('listening', () => console.log('服务端已经开启,地址:localhost:1234')) server.on('connection', socket => { socket.on('data', chunk => { // 拼接之前未处理的数据 if (overageBuffer) chunk = Buffer.concat([overageBuffer, chunk]) // 每次提取的数据包的长度 let packageLen = 0 // 循环提取数据包 while ((packageLen = ts.getPackageLen(chunk))) { // 提取一个数据包 const packageCon = chunk.slice(0, packageLen) // 更新 chunk,将已读取的数据删除 chunk = chunk.slice(packageLen) // 拆解数据包 const ret = ts.decode(packageCon) // 回送给客户端 socket.write(ts.encode('你好,' + ret.body, ret.serialNum)) } // 将未处理 / 不完整的数据包进行存储 overageBuffer = chunk }) })
- client.js
const net = require('net') const MyTransform = require('./myTransform') const client = net.createConnection({ host: 'localhost', port: 1234 }) const ts = new MyTransform() // 存储未处理的不完整的数据包 let overageBuffer = null client.write(ts.encode('张三')) client.write(ts.encode('张三2')) client.write(ts.encode('张三3')) client.write(ts.encode('张三4')) client.on('data', chunk => { // 拼接之前未处理的数据 if (overageBuffer) chunk = Buffer.concat([overageBuffer, chunk]) // 每次提取的数据包的长度 let packageLen = 0 // 循环提取数据包 while ((packageLen = ts.getPackageLen(chunk))) { // 提取一个数据包 const packageCon = chunk.slice(0, packageLen) // 更新 chunk chunk = chunk.slice(packageLen) // 拆解数据包 const ret = ts.decode(packageCon) console.log(ret) } // 将未处理的不完整的数据包进行存储 overageBuffer = chunk })
打赏作者
您的打赏是我前进的动力
微信
支付宝
TCP 数据粘包
上一篇
评论