什么是流
流是一组有序的,有起点和终点的字节数据传输手段,而且有不错的效率,借助事件和非阻塞 I/O 库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉;
流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface),stream 模块提供了基础的 API;使用这些 API 可以很容易地来构建实现流接口的对象,比如 HTTP 服务器 request 和 response 对象都是流;
流可以是可读的、可写的,或是可读写的,所有的流都是 EventEmitter 的实例;
为什么需要流
如果读取一个文件,使用 fs.readFileSync 同步读取,程序会被阻塞,然后所有数据被写到内存中,使用 fs.readFile 读取,程序不会阻塞,但是所有数据依旧会一次性全被写到内存,然后再让消费者去读取;如果文件很大,内存使用便会成为问题;这种情况下流就比较有优势,流相比一次性写到内存中,它会先写到到一个缓冲区,然后再由消费者去读取,不用将整个文件写进内存,节省了内存空间;
不使用流时:会发现当文件很大会导致内存占用也非常大;
使用流时:使用流会发现内存占用会很小;
Node.js 中的流
Readable 可读流:能够实现数据的读取;
Writable 可写流:能够实现数据的写操作;
Duplex 双工流:可读可写(Readable 和 Writable),例如 net 模块中的 Socket;
Transform 转换流:可读可写,还能实现数据修改或转换(可以在读写数据时修改或转换数据的 Duplex 流);
文件流-可读流
文件的可读流操作实际上就是继承了 Readable 和 EventEmitter 类的内置 API ,可以通过 fs 创建使用
可读流创建
const fs = require('fs');
// 参数 1:是底层数据来源
// 参数 2:是可选的选项对象
const rs = fs.createReadStream('test.txt', {
flags: 'r', // 以什么模式打开文件,`r` 表示可读模式
encoding: null, // 编码,默认 `null,表示 Buffer
fd: null, // 文件描述符,默认 null,从 `3` 开始
mode: 0o66, // 权限,默认 438(十进制)或 0o66(八进制)
autoClose: true, // 是否自动关闭文件
start: 0, // 读取的起始位置,包前又包后
// end: 3, // 读取的截至位置
highWaterMark: 4 // 水位线,表示每次读取多少字节的数据
});
rs.on("data", chunk => {
console.log("读到了一部分数据:", chunk);
});
可读流事件
// 文件打开
// 在创建或实例化可读流后就会触发,并不需要数据被消费时才会触发
rs.on('open', fd => console.log(fd, '文件打开了'))
// 文件关闭
// 默认情况下,可读流是一个暂停模式,所以 close 只能在数据被消费完才会触发
rs.on('close', () => console.log('文件关闭了'))
// 消费数据
rs.on('data', chunk => console.log(chunk.toString()))
// 当数据被消费完成之后,可读流关闭之前触发
rs.on('end', () => console.log('当数据被清空之后触发'))
// 可尝试修改文件路径抛出错误
rs.on('error', err => console.log('出错了'))
// 暂停读取数据
rs.on("pause", () => {
console.log("暂停了");
setTimeout(() => {
rs.resume();
}, 1000);
});
// 取消暂停,继续读取数据
rs.on("resume", () => {
console.log("恢复了");
});
可读流消费
-
第一种方式:(需按需读取一定量的数据)
rs.on('data', chunk => { console.log(chunk.toString()); // 可以进入暂停模式 rs.pause(); // 可以进入流动模式 setTimeout(() => rs.resume(), 1000); })
-
第二种方式:(需要源源不断的将底层数据全部读出)
- 可读流首先内部调用 _read 读取 4 个字节(highWaterMark)的数据放入缓冲区,触发 readable 事件;readable 事件回调中通过调用 read 方法读取 1 个字节的数据;
- 由于缓冲区中还存在数据,可以继续被消费,于是会一直被消费;
- 直到缓冲区清空,可读流又会调用 _read 从底层数据源读取数据,循环往复直到底层数据被消费完;
rs.on('readable', () => { let data = null while ((data = rs.read(1)) !== null) { // 获取缓冲区存储的数据的长度 const len = rs._readableState.length console.log(data.toString(), '---', len) } })
手写文件可读流
const fs = require('fs')
const EventEmitter = require('events')
class MyFileReadStream extends EventEmitter {
constructor(path, options = {}) {
super()
this.fd = null
this.path = path
this.flags = options.flags || 'r' // 文件打开模式,默认可读模式
this.mode = options.mode || 438 // 权限位,默认438 (wr权限)
this.autoClose = options.autoClose || true // 读取完是否自动关闭文件
this.start = options.start || 0 // 读取的起始位置
this.end = options.end // 读取的结束位置(包含结束位置的数据)
this.highWaterMark = options.highWaterMark || 64 * 1024 // 缓存区的水位线(KB)
this.readOffset = 0 // 每次读取的起始位置
this.open()
// 注册事件触发事件
this.on('newListener', type => {
if (type === 'data') {
this.read()
}
})
}
open() {
// 原生 open 方法打开指定位置上的文件
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) return this.emit('error', err)
this.fd = fd
this.emit('open', fd)
})
}
read() {
// 注册 data 事件是同步代码,
// 注册 data 事件的时机可能早于 fs.open 的回调,此时 fd 还未赋值
// 当文件打开后,调用 read 方法
if (typeof this.fd !== 'number') return this.once('open', this.read)
// 申请指定大小的缓存空间
const buf = Buffer.alloc(this.highWaterMark)
// 要读取的数据量
const howMuchToRead = this.end ? Math.min(this.end - this.readOffset + 1, this.highWaterMark) : this.highWaterMark
// 原生 read 方法读取文件数据
fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
console.log(readBytes)
if (readBytes) {
// 更新偏移量
this.readOffset += readBytes
// 触发 data 事件
this.emit('data', buf.slice(0, readBytes))
// 继续读取
this.read()
} else {
// 没有数据可读
// 先触发 end 再触发 close
this.emit('end')
if (this.autoClose) {
this.close()
}
}
})
}
close() {
fs.close(this.fd, () => this.emit('close'))
}
}
const rs = new MyFileReadStream('test.txt', {
highWaterMark: 3,
end: 7
})
// rs.on('open', fd => console.log('open', fd))
// rs.on('error', err => console.log('error', err))
// 与原生 fs 一样,如果不绑定 data 事件就不会触发 end 和 close 事件
// rs.on('end', () => console.log('end'))
// rs.on('close', () => console.log('close'))
rs.on('data', chunk => console.log(chunk))
文件流-可写流
文件的可写流操作实际上就是继承了 Writeable 和 EventEmitter 类的内置 API ,可以通过 fs 创建使用
可写流创建
const fs = require('fs')
// 参数 1 是写入数据的目标文件
// 参数 2 是可选的选项对象
const ws = fs.createWriteStream('test.txt', {
flags: 'w', // 以什么模式打开文件,`w` 表示写入模式
mode: 438, // 权限
fd: null,
encoding: 'utf-8',
start: 0,
highWaterMark: 3 // 1 个汉字占 3 个字节
})
ws.write("a");
可写流方法
write
-
写入一组数据,data 可以是字符串或 Buffer;
-
根据 highWaterMark 返回一个 boolean 值;
true
:写入通道没有被填满,接下来的数据可以直接写入,无须排队;
false
:写入通道目前已被填满,接下来的数据将进入写入队列; (要特别注意背压问题,因为写入队列是内存中的数据,是有限的)
-
当写入队列清空时,会触发 drain 事件;
end
结束写入,将自动关闭文件(是否自动关闭取决于 autoClose 配置)
data 是可选的,表示关闭前的最后一次写入;
可写流事件
// 可写流被创建就会触发 open 事件
ws.on('open', fd => console.log('open', fd))
// close 是在数据写入操作全部完成后触发
ws.on('close', () => console.log('close'))
ws.on('error', err => console.log('在 end 之后不允许执行写操作'))
// 写操作并不能触发 close 事件
ws.write('1')
// 执行写入
// end 执行意味着写操作结束,从而触发 close 事件
// end 可以接收参数,会将参数和缓冲区里的数据执行写入,如果不传参数则只会写入缓冲区里的数据
ws.end()
ws.write('2')
手写文件可写流
-
单链表实现
// 节点 class Node { constructor(element, next) { this.element = element this.next = next } } // 链表 class LinkedList { constructor() { this.head = null this.size = 0 } /** * 截取指定位置的节点 * @param {*} index * @returns */ _getNode(index) { // 处理边界 if (index < 0 || index >= this.size) throw new Error('越界了') // 遍历获取节点 let currentNode = this.head for (let i = 0; i < index; i++) { currentNode = currentNode.next } return currentNode } /** * 增加 * @param {number} index [可选] 增加节点的位置 * @param {*} element 节点的数据 */ add(index, element) { if (arguments.length === 1) { element = index // 第一个参数为节点数据 index = this.size // 添加到末尾 } // 处理边界 if (index < 0 || index > this.size) throw new Error('越界了') if (index === 0) { // 添加到首部 // 保留原有的 head 指向,作为新增节点的 next 指向 const head = this.head // 新的 head 指向新增节点 this.head = new Node(element, head) } else { // 添加到中间或尾部 // 将链表从指定位置截断,获取添加位置前面的节点 // 节点的 next 指向新增节点 // 节点之前 next 指向的引用存入新增节点的 next const prevNode = this._getNode(index - 1) prevNode.next = new Node(element, prevNode.next) } // 更新计数 this.size++ } // 删除指定位置的节点 remove(index) { let rmNode = null if (index === 0) { rmNode = this.head if (!rmNode) { return undefined } this.head = rmNode.next } else { const prev = this._getNode(index - 1) rmNode = prev.next prev.next = rmNode.next } this.size-- return rmNode } // 修改 set(index, element) { const node = this._getNode(index) node.element = element } // 查询 get(index) { return this._getNode(index) } // 清空 clear() { this.head = null } } const l1 = new LinkedList() l1.add('node1') l1.add('node2') l1.add(1, 'node3') console.log(l1) // l1.remove(1) l1.set(1, 'node4') console.log(l1) console.log(l1.get(1)) l1.clear() console.log(l1);
-
单链表实现队列
// 节点 class Node { ... } // 链表 class LinkedList { ... } class Queue { constructor() { this.linkedList = new LinkedList() } // 入列 enQueue(data) { this.linkedList.add(data) } // 出列 deQueue() { return this.linkedList.remove(0) } } const q = new Queue() console.log(q) q.enQueue('node1') q.enQueue('node2') console.log(q) let a = q.deQueue() console.log(a) a = q.deQueue() console.log(a) a = q.deQueue() console.log(a)
-
实现代码
const fs = require('fs') const EventEmitter = require('events') // 自定义的单向链表队列 const Queue = require('./linked-queue') class MyFileWriteStream extends EventEmitter { constructor(path, options) { super() this.path = path this.flags = options.flags || 'w' this.mode = options.mode || 438 this.autoClose = options.autoClose || true this.start = options.start || 0 this.encoding = options.encoding || 'utf8' this.highWaterMark = options.highWaterMark || 16 * 1024 this.open() this.writeOffset = this.start // 执行写入的偏移量 this.writing = false // 当前是否正在执行写入 this.length = 0 // 累计待写入量 this.needDrain = false // 是否需要触发 drain 事件 this.cache = new Queue() } open() { fs.open(this.path, this.flags, (err, fd) => { if (err) return this.emit('error', err) // 正常打开文件 this.fd = fd this.emit('open', fd) }) } write(chunk, encoding, cb) { // 仅作了简单判断:字符串 或 buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) this.length += chunk.length const flag = this.length < this.highWaterMark this.needDrain = !flag // 利用队列来一个一个处理 多个写入操作 if (this.writing) { // 当前正在执行写入,内容应该排队 this.cache.enQueue({ chunk, encoding, cb }) } else { // 当前不是正在写入,执行写入 this.writing = true this._write(chunk, encoding, cb) } return flag } _write(chunk, encoding, cb) { // 保证在 open 后执行 if (typeof this.fd !== 'number') { // 在执行回调的同时处理缓存队列 return this.once('open', () => this._write(chunk, encoding, cb)) } fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => { // 更新写入偏移量 this.writeOffset += written // 更新累计待写入量 this.length -= written // 执行回调 cb && cb() // 清空排队的内容 this._clearBuffer() }) } _clearBuffer() { const data = this.cache.deQueue() if (data) { this._write(data.element.chunk, data.element.encoding, data.element.cb) } else { if (this.needDrain) { // 重置写入写入状态(否则第一次 drain 后无法继续写入) this.writing = false // 重置 drain 状态 this.needDrain = false this.emit('drain') } } } } const ws = new MyFileWriteStream('./test.txt', { highWaterMark: 3 }) ws.on('open', fd => console.log('open----', fd)) let flag = ws.write('1', 'utf8', () => console.log('ok1')) console.log(flag) flag = ws.write('2', 'utf8', () => console.log('ok2')) console.log(flag) flag = ws.write('3', 'utf8', () => console.log('ok3')) console.log(flag) flag = ws.write('4', 'utf8', () => console.log('ok4')) console.log(flag) ws.on('drain', () => console.log('drain'))
pipe 解决背压问题
bad code
const fs = require("fs");
const path = require("path");
async function method() {
const from = path.resolve(__dirname, "./temp/abc.txt");
const to = path.resolve(__dirname, "./temp/abc2.txt");
console.time("方式1");
const content = await fs.promises.readFile(from);
await fs.promises.writeFile(to, content); // 一次性写入,占用大量的内存
console.timeEnd("方式1");
}
method();
good code
const fs = require("fs");
const path = require("path");
async function method2() {
const from = path.resolve(__dirname, "./temp/abc.txt");
const to = path.resolve(__dirname, "./temp/abc2.txt");
console.time("方式2");
const rs = fs.createReadStream(from);
const ws = fs.createWriteStream(to);
rs.on("data", chunk => {
// 读到一部分数据
const flag = ws.write(chunk);
if (!flag) {
// 表示下一次写入,会造成背压,暂停读取
rs.pause();
}
});
ws.on("drain", () => {
// 可以继续写了
rs.resume();
});
rs.on("close", () => {
ws.end(); //完毕写入流
console.timeEnd("方式2");
});
}
method2();
better code
pipe 是 方式2 的语法糖,使用起来更轻松
const fs = require("fs");
const path = require("path");
async function method3() {
const from = path.resolve(__dirname, "./temp/abc.txt");
const to = path.resolve(__dirname, "./temp/abc2.txt");
console.time("方式3");
const rs = fs.createReadStream(from);
const ws = fs.createWriteStream(to);
rs.pipe(ws);
rs.on("close", () => {
console.timeEnd("方式3");
});
}
method3();
node👉 文件 IO
上一篇