数据包结构

  1. 它的核心思想是按照约定的自定义规则先把数据进行打包,在使用数据的时候再按照规则进行拆包;
  2. 本例使用长度编码的方式约定通信双方的数据传输方式;
    • 首先将被传输的消息分为定长的消息头(以 header 表示)和不定长的消息体(以 body 表示)两个部分;
    • 同时再将 header 分为序列号和消息长度两个部分,序列号作为区分不同消息包的编号,消息长度用来确定每次提取的内容长度;

数据传输过程:

  1. encode:人为进行数据编码, 获取二进制数据包,然后封装成上面的数据包;
  2. decode:接收端接收到这个数据包后按照规则拆解数据,获取指定长度的数据;

Buffer 数据读写:

  1. 上面的过程需要使用 NodeJS 的 Buffer,主要是两个读写操作:
    • writeInt16BE:将数据从指定位置写入到内存;
    • readInt16BE:从内存指定位置开始读取数据;
  2. PS:还有 32 位的方法,本例使用 16 位就足够了;

示例代码

  1. 实现
    class MyTransform {
      constructor() {
        this.packageHeaderLen = 4 // 规定 header 的字节总长度
        this.serialNum = 0 // 序列号
        this.serialLen = 2 // 消息字节长度
      }
    
    
      /**
       * 编码
        * @param {*} data 数据
        * @param {*} serialNum 指定序列号
        * @returns 
        */
      encode(data, serialNum) {
        // 消息体:将数据转化为二进制
        const bodyBuf = Buffer.from(data)
    
        // 消息头
        // 01 先按照指定的长度申请一片内存空间作为 header 使用
        const headerBuf = Buffer.alloc(this.packageHeaderLen)
        // 02 写入消息头信息
        headerBuf.writeInt16BE(serialNum || this.serialNum, 0)
        headerBuf.writeInt16BE(bodyBuf.length, this.serialLen)
    
        // 如果未指定序列号
        if (serialNum === undefined) this.serialNum++
    
        return Buffer.concat([headerBuf, bodyBuf])
      }
    
      // 解码
      decode(buffer) {
        const headerBuf = buffer.slice(0, this.packageHeaderLen)
        const bodyBuf = buffer.slice(this.packageHeaderLen)
    
        return {
          serialNum: headerBuf.readInt16BE(), // readInt16BE 内部会自动读取有效的值
          bodyLength: headerBuf.readInt16BE(this.serialLen),
          body: bodyBuf.toString()
        }
      }
    
      // 获取包的长度
      getPackageLen(buffer) {
        if (buffer.length < this.packageHeaderLen) {
          // 数据不完整,还不应该取数据
          return 0
        } else {
          return this.packageHeaderLen + buffer.readInt16BE(this.serialLen)
        }
      }
    }
    
    module.exports = MyTransform
    
  2. 测试
    const MyTransformCode = require('./myTransform')
    
    const ts = new MyTransformCode()
    const str1 = '你好张三'
    
    const encodeBuf = ts.encode(str1, 1)
    // 前两个字节是序列号,后两个字节是消息体长度,后面是消息体数据
    console.log(encodeBuf) // <Buffer 00 01 00 0c e4 bd a0 e5 a5 bd e5 bc a0 e4 b8 89>
    console.log(Buffer.from(str1)) // <Buffer e4 bd a0 e5 a5 bd e5 bc a0 e4 b8 89>
    
    const decodeBuf = ts.decode(encodeBuf)
    console.log(decodeBuf) // { serialNum: 1, bodyLength: 12, body: '你好张三' }
    
    const len = ts.getPackageLen(encodeBuf)
    console.log(len) // 16
    

解决粘包

  1. server.js
    const net = require('net')
    
    const MyTransform = require('./myTransform')
    const server = net.createServer()
    const ts = new MyTransform()
    
    // 存储未处理的不完整的数据包
    let overageBuffer = null
    
    server.listen('1234', 'localhost')
    
    server.on('listening', () => console.log('服务端已经开启,地址:localhost:1234'))
    
    server.on('connection', socket => {
      socket.on('data', chunk => {
        // 拼接之前未处理的数据
        if (overageBuffer) chunk = Buffer.concat([overageBuffer, chunk])
    
        // 每次提取的数据包的长度
        let packageLen = 0
        // 循环提取数据包
        while ((packageLen = ts.getPackageLen(chunk))) {
          // 提取一个数据包
          const packageCon = chunk.slice(0, packageLen)
          // 更新 chunk,将已读取的数据删除
          chunk = chunk.slice(packageLen)
    
          // 拆解数据包
          const ret = ts.decode(packageCon)
          // 回送给客户端
          socket.write(ts.encode('你好,' + ret.body, ret.serialNum))
        }
    
        // 将未处理 / 不完整的数据包进行存储
        overageBuffer = chunk
      })
    })
    
  2. client.js
    const net = require('net')
    
    const MyTransform = require('./myTransform')
    const client = net.createConnection({ host: 'localhost', port: 1234 })
    const ts = new MyTransform()
    
    // 存储未处理的不完整的数据包
    let overageBuffer = null
    
    client.write(ts.encode('张三'))
    client.write(ts.encode('张三2'))
    client.write(ts.encode('张三3'))
    client.write(ts.encode('张三4'))
    
    client.on('data', chunk => {
      // 拼接之前未处理的数据
      if (overageBuffer) chunk = Buffer.concat([overageBuffer, chunk])
    
      // 每次提取的数据包的长度
      let packageLen = 0
      // 循环提取数据包
      while ((packageLen = ts.getPackageLen(chunk))) {
        // 提取一个数据包
        const packageCon = chunk.slice(0, packageLen)
        // 更新 chunk
        chunk = chunk.slice(packageLen)
    
        // 拆解数据包
        const ret = ts.decode(packageCon)
        console.log(ret)
      }
    
      // 将未处理的不完整的数据包进行存储
      overageBuffer = chunk
    })
    

打赏作者
您的打赏是我前进的动力
微信
支付宝
评论

中午好👏🏻,我是 ✍🏻   疯狂 codding 中...

粽子

这有关于产品、设计、开发的问题和看法,还有技术文档和你分享。

相信你可以在这里找到对你有用的知识和教程

了解更多

目录

  1. 1. 数据包结构
  2. 2. 数据传输过程:
  3. 3. Buffer 数据读写:
  4. 4. 示例代码
  5. 5. 解决粘包