我需要从 MongoDB 读取数据,对其进行转换,然后将其作为文件发送到远程 SFTP 服务器。由于数据库中的文档非常大,我尝试将其作为流来执行。我有一个可行但效率不高的解决方案。欣赏如何改进它的任何想法。到目前为止,它是这样工作的:
- 从 Mongo 读取数据作为流
const aggregationStream = mongoConnection.db.collection.aggregate.stream();
- 创建一个新的转换流来对数据进行一些操作
const csvStream = new Transform({ ... some csv transformation });
- 通过管道传输两个流
require('util').promisify(stream.pipeline)
await streamPipeline(
aggregationStream,
csvStream,
);
到目前为止一切正常。
- 将数据放入 SFTP:
await sftp.connect();
await sftp.put(csvStream, targetPath, { encoding: null });
这就是问题所在。我在 Mongo 和 transform 之间使用流,但流并没有进一步作为流进入 SFTP。相反,它会等待所有对象从 Mongo 发送到转换流,并且仅在 csvStream 完成转换 Mongo 数据后才开始写入 SFTP。需要想法如何立即流式传输数据 mongo -> transform -> sftp。