0

我正在尝试运行批处理 Apache Beam 作业(通过 TensorFlow Extended - TFX 库)。这是一个批处理作业,它应该只从 S3 读取一些 CSV 文件,将它们转换为 TFRecords 格式(写回 s3)并收集有关数据集的统计信息。管道在非常小的数据集(几 MB)上运行良好,但是当我尝试在更大的数据集(~400 MB)上运行它时,作业 m 似乎卡住了(记录/字节数的指标Flink UI 停止增加),而我在 TaskManager 日志中看到重复错误:

2021-08-06 12:53:06,019 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Partition (1/4)#0 (b8941b4b27b50d9c1d275b3e9755471a) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkException: Disconnect from JobManager responsible for f63c7fd0c9674a5faabdb45c96ba2c91.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1660)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.establishJobManagerConnection(TaskExecutor.java:1602)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1600(TaskExecutor.java:181)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$0(TaskExecutor.java:2173)
    at java.util.Optional.ifPresent(Optional.java:159)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerGainedLeadership$1(TaskExecutor.java:2171)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Found new job leader for job id f63c7fd0c9674a5faabdb45c96ba2c91.
    ... 28 more

Flink 集群的版本为 1.13.1,并作为原生 Kubernetes 集群部署在 AWS EKS 集群上。

我已将任务管理器和作业管理器的进程内存设置为 26 GB,所以我假设这里没有内存压力。

谢谢,戈尔扬

4

0 回答 0