什么是流

  1. 流是一组有序的,有起点和终点的字节数据传输手段,而且有不错的效率,借助事件和非阻塞 I/O 库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉;

  2. 流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface),stream 模块提供了基础的 API;使用这些 API 可以很容易地来构建实现流接口的对象,比如 HTTP 服务器 requestresponse 对象都是流;

  3. 流可以是可读的、可写的,或是可读写的,所有的流都是 EventEmitter 的实例;

为什么需要流

  1. 如果读取一个文件,使用 fs.readFileSync 同步读取,程序会被阻塞,然后所有数据被写到内存中,使用 fs.readFile 读取,程序不会阻塞,但是所有数据依旧会一次性全被写到内存,然后再让消费者去读取;如果文件很大,内存使用便会成为问题;这种情况下流就比较有优势,流相比一次性写到内存中,它会先写到到一个缓冲区,然后再由消费者去读取,不用将整个文件写进内存,节省了内存空间;

  2. 不使用流时:会发现当文件很大会导致内存占用也非常大;

  3. 使用流时:使用流会发现内存占用会很小;

Node.js 中的流

  1. Readable 可读流:能够实现数据的读取;

  2. Writable 可写流:能够实现数据的写操作;

  3. Duplex 双工流:可读可写(ReadableWritable),例如 net 模块中的 Socket

  4. Transform 转换流:可读可写,还能实现数据修改或转换(可以在读写数据时修改或转换数据的 Duplex 流);

文件流-可读流

文件的可读流操作实际上就是继承了 ReadableEventEmitter 类的内置 API ,可以通过 fs 创建使用

可读流创建

const fs = require('fs');

// 参数 1:是底层数据来源
// 参数 2:是可选的选项对象
const rs = fs.createReadStream('test.txt', {
  flags: 'r', // 以什么模式打开文件,`r` 表示可读模式
  encoding: null, // 编码,默认 `null,表示 Buffer
  fd: null, // 文件描述符,默认 null,从 `3` 开始
  mode: 0o66, // 权限,默认 438(十进制)或 0o66(八进制)
  autoClose: true, // 是否自动关闭文件
  start: 0, // 读取的起始位置,包前又包后
  // end: 3, // 读取的截至位置
  highWaterMark: 4 // 水位线,表示每次读取多少字节的数据
});

rs.on("data", chunk => {
  console.log("读到了一部分数据:", chunk);
});

可读流事件

// 文件打开
// 在创建或实例化可读流后就会触发,并不需要数据被消费时才会触发
rs.on('open', fd => console.log(fd, '文件打开了'))

// 文件关闭
// 默认情况下,可读流是一个暂停模式,所以 close 只能在数据被消费完才会触发
rs.on('close', () => console.log('文件关闭了'))

// 消费数据
rs.on('data', chunk => console.log(chunk.toString()))

// 当数据被消费完成之后,可读流关闭之前触发
rs.on('end', () => console.log('当数据被清空之后触发'))

// 可尝试修改文件路径抛出错误
rs.on('error', err => console.log('出错了'))

// 暂停读取数据
rs.on("pause", () => {
  console.log("暂停了");
  setTimeout(() => {
    rs.resume();
  }, 1000);
});

// 取消暂停,继续读取数据
rs.on("resume", () => {
  console.log("恢复了");
});

可读流消费

  1. 第一种方式:(需按需读取一定量的数据)

    rs.on('data', chunk => {
      console.log(chunk.toString());
    
      // 可以进入暂停模式
      rs.pause();
      // 可以进入流动模式
      setTimeout(() => rs.resume(), 1000);
    })
    
  2. 第二种方式:(需要源源不断的将底层数据全部读出)

    • 可读流首先内部调用 _read 读取 4 个字节(highWaterMark)的数据放入缓冲区,触发 readable 事件;readable 事件回调中通过调用 read 方法读取 1 个字节的数据;
    • 由于缓冲区中还存在数据,可以继续被消费,于是会一直被消费;
    • 直到缓冲区清空,可读流又会调用 _read 从底层数据源读取数据,循环往复直到底层数据被消费完;
    rs.on('readable', () => {
      let data = null
      while ((data = rs.read(1)) !== null) {
        // 获取缓冲区存储的数据的长度
        const len = rs._readableState.length
        console.log(data.toString(), '---', len)
      }
    })
    

手写文件可读流

const fs = require('fs')
const EventEmitter = require('events')

class MyFileReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super()
    this.fd = null
    this.path = path
    this.flags = options.flags || 'r' // 文件打开模式,默认可读模式
    this.mode = options.mode || 438 // 权限位,默认438 (wr权限)
    this.autoClose = options.autoClose || true // 读取完是否自动关闭文件
    this.start = options.start || 0 // 读取的起始位置
    this.end = options.end // 读取的结束位置(包含结束位置的数据)
    this.highWaterMark = options.highWaterMark || 64 * 1024 // 缓存区的水位线(KB)
    this.readOffset = 0 // 每次读取的起始位置
    this.open()

    // 注册事件触发事件
    this.on('newListener', type => {
      if (type === 'data') {
        this.read()
      }
    })
  }

  open() {
    // 原生 open 方法打开指定位置上的文件
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) return this.emit('error', err)

      this.fd = fd
      this.emit('open', fd)
    })
  }

  read() {
    // 注册 data 事件是同步代码,
    // 注册 data 事件的时机可能早于 fs.open 的回调,此时 fd 还未赋值
    // 当文件打开后,调用 read 方法
    if (typeof this.fd !== 'number') return this.once('open', this.read)

    // 申请指定大小的缓存空间
    const buf = Buffer.alloc(this.highWaterMark)
    // 要读取的数据量
    const howMuchToRead = this.end ? Math.min(this.end - this.readOffset + 1, this.highWaterMark) : this.highWaterMark

    // 原生 read 方法读取文件数据
    fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
      console.log(readBytes)
      if (readBytes) {
        // 更新偏移量
        this.readOffset += readBytes
        // 触发 data 事件
        this.emit('data', buf.slice(0, readBytes))
        // 继续读取
        this.read()
      } else {
        // 没有数据可读
        // 先触发 end 再触发 close
        this.emit('end')
        if (this.autoClose) {
          this.close()
        }
      }
    })
  }

  close() {
    fs.close(this.fd, () => this.emit('close'))
  }
}

const rs = new MyFileReadStream('test.txt', {
  highWaterMark: 3,
  end: 7
})
// rs.on('open', fd => console.log('open', fd))
// rs.on('error', err => console.log('error', err))
// 与原生 fs 一样,如果不绑定 data 事件就不会触发 end 和 close 事件
// rs.on('end', () => console.log('end'))
// rs.on('close', () => console.log('close'))
rs.on('data', chunk => console.log(chunk))

文件流-可写流

文件的可写流操作实际上就是继承了 WriteableEventEmitter 类的内置 API ,可以通过 fs 创建使用

可写流创建

const fs = require('fs')

// 参数 1 是写入数据的目标文件
// 参数 2 是可选的选项对象
const ws = fs.createWriteStream('test.txt', {
  flags: 'w', // 以什么模式打开文件,`w` 表示写入模式
  mode: 438, // 权限
  fd: null,
  encoding: 'utf-8',
  start: 0,
  highWaterMark: 3 // 1 个汉字占 3 个字节
})
ws.write("a");

可写流方法

write

  1. 写入一组数据,data 可以是字符串或 Buffer

  2. 根据 highWaterMark 返回一个 boolean 值;

    • true:写入通道没有被填满,接下来的数据可以直接写入,无须排队;
    • false:写入通道目前已被填满,接下来的数据将进入写入队列; (要特别注意背压问题,因为写入队列是内存中的数据,是有限的)
  3. 当写入队列清空时,会触发 drain 事件;

end

  1. 结束写入,将自动关闭文件(是否自动关闭取决于 autoClose 配置)

  2. data 是可选的,表示关闭前的最后一次写入;

可写流事件

// 可写流被创建就会触发 open 事件
ws.on('open', fd => console.log('open', fd))

// close 是在数据写入操作全部完成后触发
ws.on('close', () => console.log('close'))

ws.on('error', err => console.log('在 end 之后不允许执行写操作'))

// 写操作并不能触发 close 事件
ws.write('1')
// 执行写入
// end 执行意味着写操作结束,从而触发 close 事件
// end 可以接收参数,会将参数和缓冲区里的数据执行写入,如果不传参数则只会写入缓冲区里的数据
ws.end()
ws.write('2')

手写文件可写流

  1. 单链表实现

    // 节点
    class Node {
        constructor(element, next) {
            this.element = element
            this.next = next
        }
    }
    
    // 链表
    class LinkedList {
        constructor() {
            this.head = null
            this.size = 0
        }
    
        /**
         * 截取指定位置的节点
        * @param {*} index 
        * @returns 
        */
        _getNode(index) {
            // 处理边界
            if (index < 0 || index >= this.size) throw new Error('越界了')
            // 遍历获取节点
            let currentNode = this.head
            for (let i = 0; i < index; i++) {
                currentNode = currentNode.next
            }
            return currentNode
        }
    
        /**
         * 增加
        * @param {number} index [可选] 增加节点的位置
        * @param {*} element 节点的数据
        */
        add(index, element) {
            if (arguments.length === 1) {
                element = index // 第一个参数为节点数据
                index = this.size // 添加到末尾
            }
    
            // 处理边界
            if (index < 0 || index > this.size) throw new Error('越界了')
    
            if (index === 0) {
                // 添加到首部
                // 保留原有的 head 指向,作为新增节点的 next 指向
                const head = this.head
                // 新的 head 指向新增节点
                this.head = new Node(element, head)
            } else {
                // 添加到中间或尾部
                // 将链表从指定位置截断,获取添加位置前面的节点
                // 节点的 next 指向新增节点
                // 节点之前 next 指向的引用存入新增节点的 next
                const prevNode = this._getNode(index - 1)
                prevNode.next = new Node(element, prevNode.next)
            }
    
            // 更新计数
            this.size++
        }
    
        // 删除指定位置的节点
        remove(index) {
            let rmNode = null
            if (index === 0) {
                rmNode = this.head
                if (!rmNode) {
                    return undefined
                }
                this.head = rmNode.next
            } else {
                const prev = this._getNode(index - 1)
                rmNode = prev.next
                prev.next = rmNode.next
            }
            this.size--
            return rmNode
        }
    
        // 修改
        set(index, element) {
            const node = this._getNode(index)
            node.element = element
        }
    
        // 查询
        get(index) {
            return this._getNode(index)
        }
    
        // 清空
        clear() {
            this.head = null
        }
    }
    
    
    const l1 = new LinkedList()
    l1.add('node1')
    l1.add('node2')
    l1.add(1, 'node3')
    console.log(l1)
    // l1.remove(1)
    l1.set(1, 'node4')
    console.log(l1)
    console.log(l1.get(1))
    l1.clear()
    console.log(l1);
    
  2. 单链表实现队列

    // 节点
    class Node {
        ...
    }
    
    // 链表
    class LinkedList {
        ...
    }
    
    class Queue {
        constructor() {
            this.linkedList = new LinkedList()
        }
        // 入列
        enQueue(data) {
            this.linkedList.add(data)
        }
        // 出列
        deQueue() {
            return this.linkedList.remove(0)
        }
    }
    
    
    const q = new Queue()
    console.log(q)
    q.enQueue('node1')
    q.enQueue('node2')
    console.log(q)
    
    let a = q.deQueue()
    console.log(a)
    a = q.deQueue()
    console.log(a)
    a = q.deQueue()
    console.log(a)
    
  3. 实现代码

    const fs = require('fs')
    const EventEmitter = require('events')
    // 自定义的单向链表队列
    const Queue = require('./linked-queue')
    
    class MyFileWriteStream extends EventEmitter {
        constructor(path, options) {
            super()
            this.path = path
            this.flags = options.flags || 'w'
            this.mode = options.mode || 438
            this.autoClose = options.autoClose || true
            this.start = options.start || 0
            this.encoding = options.encoding || 'utf8'
            this.highWaterMark = options.highWaterMark || 16 * 1024
            this.open()
    
            this.writeOffset = this.start // 执行写入的偏移量
            this.writing = false // 当前是否正在执行写入
            this.length = 0 // 累计待写入量
            this.needDrain = false // 是否需要触发 drain 事件
            this.cache = new Queue()
        }
    
        open() {
            fs.open(this.path, this.flags, (err, fd) => {
                if (err) return this.emit('error', err)
    
                // 正常打开文件
                this.fd = fd
                this.emit('open', fd)
            })
        }
    
        write(chunk, encoding, cb) {
            // 仅作了简单判断:字符串 或 buffer
            chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)
            this.length += chunk.length
            const flag = this.length < this.highWaterMark
            this.needDrain = !flag
    
            // 利用队列来一个一个处理 多个写入操作
            if (this.writing) {
                // 当前正在执行写入,内容应该排队
                this.cache.enQueue({ chunk, encoding, cb })
            } else {
                // 当前不是正在写入,执行写入
                this.writing = true
                this._write(chunk, encoding, cb)
            }
    
            return flag
        }
    
        _write(chunk, encoding, cb) {
            // 保证在 open 后执行
            if (typeof this.fd !== 'number') {
                // 在执行回调的同时处理缓存队列
                return this.once('open', () => this._write(chunk, encoding, cb))
            }
    
            fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
                // 更新写入偏移量
                this.writeOffset += written
                // 更新累计待写入量
                this.length -= written
                // 执行回调
                cb && cb()
                // 清空排队的内容
                this._clearBuffer()
            })
        }
    
        _clearBuffer() {
            const data = this.cache.deQueue()
            if (data) {
                this._write(data.element.chunk, data.element.encoding, data.element.cb)
            } else {
                if (this.needDrain) {
                    // 重置写入写入状态(否则第一次 drain 后无法继续写入)
                    this.writing = false
                    // 重置 drain 状态
                    this.needDrain = false
                    this.emit('drain')
                }
            }
        }
    }
    
    
    const ws = new MyFileWriteStream('./test.txt', { highWaterMark: 3 })
    ws.on('open', fd => console.log('open----', fd))
    
    let flag = ws.write('1', 'utf8', () => console.log('ok1'))
    console.log(flag)
    
    flag = ws.write('2', 'utf8', () => console.log('ok2'))
    console.log(flag)
    
    flag = ws.write('3', 'utf8', () => console.log('ok3'))
    console.log(flag)
    
    flag = ws.write('4', 'utf8', () => console.log('ok4'))
    console.log(flag)
    
    ws.on('drain', () => console.log('drain'))
    

pipe 解决背压问题

bad code

const fs = require("fs");
const path = require("path");

async function method() {
  const from = path.resolve(__dirname, "./temp/abc.txt");
  const to = path.resolve(__dirname, "./temp/abc2.txt");

  console.time("方式1");
  const content = await fs.promises.readFile(from);
  await fs.promises.writeFile(to, content); // 一次性写入,占用大量的内存
  console.timeEnd("方式1");
}

method();

good code

const fs = require("fs");
const path = require("path");

async function method2() {
  const from = path.resolve(__dirname, "./temp/abc.txt");
  const to = path.resolve(__dirname, "./temp/abc2.txt");

  console.time("方式2");
  const rs = fs.createReadStream(from);
  const ws = fs.createWriteStream(to);
  rs.on("data", chunk => {
    // 读到一部分数据
    const flag = ws.write(chunk);
    if (!flag) {
      // 表示下一次写入,会造成背压,暂停读取
      rs.pause();
    }
  });

  ws.on("drain", () => {
    // 可以继续写了
    rs.resume();
  });

  rs.on("close", () => {
    ws.end(); //完毕写入流
    console.timeEnd("方式2");
  });
}

method2();

better code

pipe 是 方式2 的语法糖,使用起来更轻松

const fs = require("fs");
const path = require("path");

async function method3() {
  const from = path.resolve(__dirname, "./temp/abc.txt");
  const to = path.resolve(__dirname, "./temp/abc2.txt");

  console.time("方式3");
  const rs = fs.createReadStream(from);
  const ws = fs.createWriteStream(to);
  rs.pipe(ws);

  rs.on("close", () => {
    console.timeEnd("方式3");
  });
}

method3();
打赏作者
您的打赏是我前进的动力
微信
支付宝
评论

中午好👏🏻,我是 ✍🏻   疯狂 codding 中...

粽子

这有关于前端开发的技术文档和你分享。

相信你可以在这里找到对你有用的知识和教程。

了解更多

目录

  1. 1. 什么是流
  2. 2. 为什么需要流
  3. 3. Node.js 中的流
  4. 4. 文件流-可读流
    1. 4.1. 可读流创建
    2. 4.2. 可读流事件
    3. 4.3. 可读流消费
    4. 4.4. 手写文件可读流
  5. 5. 文件流-可写流
    1. 5.1. 可写流创建
    2. 5.2. 可写流方法
      1. 5.2.1. write
      2. 5.2.2. end
    3. 5.3. 可写流事件
    4. 5.4. 手写文件可写流
  6. 6. pipe 解决背压问题
    1. 6.1. bad code
    2. 6.2. good code
    3. 6.3. better code