Node.js中的Stream

2021-03-02 20:28

阅读:482

标签:自己的   catch   color   await   txt   二进制   从服务器   状态   output   

  Stream主要用于序列化地数据处理(read or write input into output sequentially),比如文件读写,网络数据传输, 或任何端到端的数据交换。Stream在处理数据的时候,与传统方式有所不同,传统方式是把数据作为一个整体进行处理,而stream则是把数据分割成一块一块的进行处理,它不是整个数据一起处理,而是一块数据一块数据地处理。以文件读写为例,文件读写的时候,stream并不是一次性地把一个文件中的所有内容都读取到内存中再进行处理(就是再写入到另外一个文件中),而是一块数据一块数据的进行读取,读取完一块数据就处理一块数据(把这块数据写入到另外一个文件中),而不会让它一直在内存中。相比于传统方式,使用stream来处理数据,可以高效的使用内存,更有可能来处理大文件。再以网络数据传输(网上看视频)为例。我们并不是把整个电影都从服务器上下载下来才开始播放,而是一块一块地下载,下载一块,播放一块。服务器一块一块地写数据,浏览器一块一块的读数据。用流处理数据,时间上也比较高效。

  在Node.js中,有以下4种流

  输入流,就是流中有数据,我们从里面读取数据。输入流负责读取数据,我们只需要从输入流中读取数据。

  输出流:它负责向目的地写入数据,而我们只需要向输出流中写入数据。

  双向流: 即可以从它里面读取数据,也可以向它里面写入数据

  转换流,给它一个流,它把流里面的内容转换一下,然后再把流输出,流的性质是不变的,流的内容发生了变化,通常转化的是输入流
  输入流和输出流是相对于计算机内存而言的,输入流,就是把数据读取到内存中,输出流,则是把内存中的数据写入到目的地。
  我们既可以使用Node.js提供的流,比如,fs.createReadStream 就创建了读取文件的输入流,也可以创建自己的流。通常来说,都是先使用Node.js提供的流,那就使用fs模块提供的输入,输出流。fs.createReadStream() 创建输入流,输入流负责读取数据,所以它接受一个必须的参数,要读取的数据。fs.createReadStream(‘./data.txt‘) 就是从当前文件下的data.txt中读取数据。我们程序需要做的就是从输入流中读取想要的数据,在Node.js中从输入流中读取数据,也有多种方式。这要从输入流的两种模式说起。输入流有两种模式,pause模式,flow模式,pause模式是默认模式,就是创建输入流后,它处于暂停状态,程序不会从输入流中读取数据,我们需要手动地从输入流中读取数据。flow模式,则是程序自动地从输入流中读取数据,我们只需要决定读取到数据后再怎么处理数据就可以了。这时,flow模式,可能是我们想要的。怎么从pause模式切换到flow模式呢?给输入流注册data事件,流就自动转化成flow模式,那再data事件注册一个事件处理函数,处理数据,就可以实现从输入流中读取数据了。
  创建一个文件夹,新建一个data.txt文件和read.js 文件,data.txt文件写一些文字。read.js文件如下
const fs = require(fs);
const readable = fs.createReadStream(./data.txt);
readable.on(data, data => {
    console.log(data);
})

  node read.js 执行程序,控制台输出了buffer, 就是一些二进制数据,默认情况下,从输入流中读取到的内容都是buffer,我们需要手动转化成字符串,调用toString()方法就可以了。console.log(data.toString()) 。假设想用pause模式来读取数据,那就要手动调用输入流的read() 方法了,但还要注意,只有在输入流中有数据的时候,才能调用read() 方法,所以要在readable 事件处理函数中调用read()事件, read()方法,如果读取不到数据,就会返回null

const fs = require(fs);
const readableStream = fs.createReadStream(./data.txt);
readableStream.on(readable, () => {
    let chunk;
    while (null !== (chunk = readableStream.read())) {
        console.log(chunk.toString());
    }
})

  除了使用事件的方式来从输入流中读取数据,还可以使用异步迭代器来消费输入流(从输入流中读取数据)

const fs = require(fs);
const readableStream = fs.createReadStream(./data.txt);
async function logChunks(readable) {
    for await (const chunk of readable) {
        console.log(chunk.toString());
    }
}
logChunks(readableStream);

  当然它的底层还是监听readable 事件。除了在异步迭代器,直接处理数据,也可以把流中的数据暂时存储起来,以便日后消费

const fs = require(fs);
const readableStream = fs.createReadStream(./data.txt);

readableStream.setEncoding(utf-8);

async function readableToString(readable) {
    let result = ‘‘;
    for await (const chunk of readable) {
        result += chunk;
    }
    return result;
}
readableToString(readableStream).then(console.log);

  如果要处理异常,可以用try/catch 把for await 的处理包起来。说完了输入流,再说输出流。fs.createWriteStream 创建一个输出流,输出流,就是把数据输出到什么地方,因此,它也接受一个参数,就是输出的目的地。我们要做的就是向输出流中写放数据,要调用write() 方法

const fs = require(fs);
const readableStream = fs.createReadStream(./data.txt);
const writeStream = fs.createWriteStream(./result.txt);

readableStream.on(data, data => {
    writeStream.write(data);
})
  write() 返回true or false, true表示写入成功,你可以继续写入数据。false则是,写入出错了,你不能继续写入了。至于什么时候能再写,输出流触发drain事件。
只要输出流触发了drain事件,就证明,可以继续向输出流中写入数据了。所以真正安全的做法,向输出流写入数据的时候,还要判断true or false, 并监听 drain 事件。
const fs = require(fs);
const util = require(util);
const stream = require(stream);
const {once} = require(events);

const readableStream = fs.createReadStream(./data.txt);
const writeStream = fs.createWriteStream(./result.txt);

const finished = util.promisify(stream.finished);

async function writeIterableToFile(readable, writable) {
    for await (const chunk of readable) {
        if (!writable.write(chunk)) {
            await once(writable, drain);
        }
    }
    writable.end();
    // Wait until done. Throws if there are errors.
    await finished(writable);
}

writeIterableToFile(readableStream, writeStream)

  end() 方法,就表示,向输出流中写完数据了,不会再写了。finished 事件,则中输出流,把所有的数据都写入到的目的地中。当我们手动去读取和写入文件时候,处理有点麻烦,这就用到pipe()方法。上面的可以写成

const fs = require(fs);

const readableStream = fs.createReadStream(./data.txt);
const writeStream = fs.createWriteStream(./result.txt);

readableStream.pipe(writeStream);

  pipe() 操作,前一个的输出变成后面一个的输入。readableStream输入流的输出,就是读取到的数据,我们就是要把这些数据写入到输出流中,所以它正好是输出流的输入,因此,就可以用pipe把这两个链接起来。pipe()的操作就相当于

readableStream.on("data", chunk => {
    // 自动处理了drain事件。
    writeStream.write(chunk);
});

readableStream.on("end", () => {
    writeStream.end();
});

  

  
  

Node.js中的Stream

标签:自己的   catch   color   await   txt   二进制   从服务器   状态   output   

原文地址:https://www.cnblogs.com/SamWeb/p/14316896.html


评论


亲,登录后才可以留言!