Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node基础篇之stream流 #39

Open
kekobin opened this issue Sep 20, 2019 · 0 comments
Open

node基础篇之stream流 #39

kekobin opened this issue Sep 20, 2019 · 0 comments

Comments

@kekobin
Copy link
Owner

kekobin commented Sep 20, 2019

简介

流是数据的集合 - 就像数组或者字符串。差异就是流可能不是一次性获取到的,它们不需要匹配内存。这让流在处理大容量数据,或者从一个额外的源每次获取一块数据的时候变得非常强大。
然而,流不仅可以处理大容量的数据。它们也给了我们在代码中组合的能力。类似linux的管道命令一样:

const grep = ... // grep 输出流
const wc = ... // wc 输入流
grep.pipe(wc)

很多在 Node.js 内置的模块都实现了流接口:
image

上面的TCP sockets,zlib,crypto 流即是可读也是可写流。

Stream分类

在nodejs中,有四种stream类型:

  • Readable:用来读取数据,比如 fs.createReadStream()。
  • Writable:用来写数据,比如 fs.createWriteStream()。
  • Duplex:可读+可写,比如 net.Socket()。
  • Transform:在读写的过程中,可以对数据进行修改,比如 zlib.createDeflate()(数据压缩/解压)。

所有的流都是 EventEmitter 的实例。触发它们的事件可以读或者写入数据,然而,我们可以使用 pipe 方法消费流的数据。

消费流的方式一:pipe 方法

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

pipe 方法是消费流最简单的方法,类似linux中的管道,好处在于,如果源文件较大,对于降低内存占用有好处。

消费流的方式二: Stream 事件

pipe方式随便很简单和方便,但是在某些场景下,比如需要对读取的流进程某些操作之后再写出时,就必须得使用事件的方式了:

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

可读流和可写流使用的重要事件和函数列表:
image

上面的drain 事件指的是: 当可写流可以接受更多的数据时的一个标志。
finish 事件指的是: 当所有的数据都写入到底层系统中时会触发。

实现一个可写流

可写流基于以下api实现:

const { Writable } = require('stream');

完整的一个示例:

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

这等价于内部实现的 process.stdout, 即下面的代码与上面的功能等价:

process.stdin.pipe(process.stdout);

实现一个可读流

可读流基于以下api实现:

const { Readable } = require('stream');
const inStream = new Readable({});

简单的完整示例:

const { Readable } = require('stream'); 
const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

实现双向流和转换流

双向流也就是将上面的可读可写流的实现进行结合即可:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

转换流

对于转换流,们没必要实现 read 或者 write 方法,我们仅仅需要实现 transform 方法,它结合了它们两个。它有一个 write 方法的特性并且我们也可以使用它来 push 数据:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
//将每个 chunk 转换到大写的格式,并且作为可读的一部分被 push 到可读流里面了
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

Node 的内置转换流

主要是zlib 和 crypto 流。

例一:使用 zlib.crreateGzip() 流与 fs readable/writable 流结合起来创建一个文件压缩的脚本

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

也可以结合各种事件进行监听:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

大文件读取与写入示例

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  // res 是可写流,所以可以由可读流直接流入
  src.pipe(res);
});

server.listen(8000);

image

参考

Node.js 流(stream):你需要知道的一切

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant