3

我正在 node.js 中编写一个AWS Lambda执行脚本,它响应 S3 put 事件(日志文件),读取文件并通过 COPY 命令插入 Postgres 表。似乎除了写入数据库(logToPostgres在下面的脚本中)之外的所有内容都按预期工作。

一些注意事项:

  • 我已经删除了它检查以验证它是 S3 put 事件的部分,以及其他错误处理代码,因为它在这里不相关。
  • 数据库用户INSERT对表和数据库具有权限,ALL并且可以从任何 IP 访问(均已验证)。
  • secrets.js是导出数据库凭据的同一目录中的模块
  • 在本地运行脚本时,我可以很好地写入数据库。
  • 我没有达到任何 AWS Lambda 的限制——从 S3 下载的文件是 521 字节,超时设置为最大 60 秒(在测试和写入同一个数据库时,它的运行时间低于 300 毫秒)。

Cloud watch 中没有错误,并且通过在每一步添加日志记录,我能够将其缩小到stream.pipe(query)...代码的一部分。由于某种原因,这部分没有被 AWS Lambda 执行,但它在本地运行良好。它没有发出'finished'and'end'事件,所以我的猜测是它仍然未执行。

关于问题可能出在哪里的任何想法?

var async = require('async');
var fs = require('fs');
var aws = require('aws-sdk');
var s3 = new aws.S3();
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');

exports.handler = function(event, context) {
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;

    async.waterfall(
        [
            function downloadWebhook(next) {
                s3.getObject({Bucket: bucket, Key: key}, next);
            },
            function saveToDisk(response, next) {
                var file = fs.createWriteStream('/tmp/foo_' + Date.now());
                file.write(response.Body);
                file.close();
                next(null, file.path);
            },
            function createStdinStream(path, next) {
                next(null, fs.createReadStream(path));
            },
            function logToPostgres(stream, next) {
                var client = new pg.Client('pg://' + secrets.user + ':' +
                    secrets.password + '@' + secrets.host + ':' +
                    secrets.port + '/' + secrets.database);
                client.connect(function (error) {
                    if (error) console.error(error);
                    var query = client.query(pgCopy('COPY my_table FROM STDIN'));
                    stream.pipe(query)
                        .on('finish', function () {
                            client.end();
                            next(null, null);
                        });
                });
            }
        ],
        function (error) {
            if (error) console.error(error);
            context.done();
        }
    });
};

更新:

事实证明,可写流现在会发出'finish'事件,因此将其更改为'finish'并包含答案中的建议使其运行时不会出现错误。但是,在 lambda 运行后,数据库中仍然没有一行。我怀疑事务正在回滚,但无法查明原因或位置。我什至尝试明确开始并提交交易,但没有骰子。

4

2 回答 2

0

context.done()立即停止您的 Lambda 函数的执行。因此,当您调用 时next(null, null),瀑布将完成,并且因为您的 Postgres 查询是异步的,所以它不会运行完成。

请尝试:

stream.pipe(query)
    .on('end', function() {
        client.end();
        next(null, null);
    })
    .on('error', context.fail);

请注意,我们仅在流结束后才解决瀑布。

于 2015-04-10T23:28:26.757 回答
0

不确定您的解决方案出了什么问题。这是我经过测试且更紧凑的解决方案。请让我知道这是否有效!

var aws = require('aws-sdk');
var s3 = new aws.S3();
var S3S = require('s3-streams');
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');

exports.handler = function(event, context) {
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;

    var stream = S3S.ReadStream(s3, {Bucket: bucket, Key: key});
    pg.connect(secrets.connector, function(err, client) {
        if (err) console.log(err);
        var query = client.query(pgCopy(
            "COPY event_log(user_id, event, ...) FROM STDIN CSV"
        ));
        stream.pipe(query)
            .on('end', function () {
                client.end();
                context.done();
            })
            .on('error', function(error) {
                console.log(error);
            });
    });
};
于 2015-07-23T18:56:06.243 回答