1

我将提交 pyspark 任务,并提交一个包含该任务的环境。

我需要 --archives 提交包含完整环境的 zip 包。

工作火花提交命令是这样的

/my/spark/home/spark-submit
--master yarn 
--deploy-mode cluster 
--driver-memory 10G 
--executor-memory 8G 
--executor-cores 4 
--queue rnd 
--num-executors 8 
--archives /data/me/ld_env.zip#prediction_env 
--conf spark.pyspark.python=./prediction_env/ld_env/bin/python 
--conf spark.pyspark.driver.python=./prediction_env/ld_env/bin/python 
--conf spark.executor.memoryOverhead=4096 
--py-files dist/mylib-0.1.0-py3-none-any.whl my_task.py

我正在尝试使用 SparkLauncher 以编程方式启动 Spark 应用程序

String pyPath = "my_task.py"
String archives = "/data/me/ld_env.zip#prediction_env"
SparkAppHandle handle = new SparkLauncher()
        .setSparkHome(sparkHome)
        .setAppResource(jarPath)
        .setMaster("yarn")
        .setDeployMode("cluster")
        .setConf(SparkLauncher.EXECUTOR_MEMORY, "8G")
        .setConf(SparkLauncher.EXECUTOR_CORES, "2")
        .setConf("spark.executor.instances", "8")
        .setConf("spark.yarn.queue", "rnd")
        .setConf("spark.pyspark.python", "./prediction_env/ld_env/bin/python")
        .setConf("spark.pyspark.driver.python", "./prediction_env/ld_env/bin/python")
        .setConf("spark.executor.memoryOverhead", "4096")
        .addPyFile(pyPath)
        // .addPyFile(archives) 
        // .addFile(archives)
        .addAppArgs("--inputPath",
                inputPath,
                "--outputPath",
                outputPath,
                "--option",
                option)
        .startApplication(taskListener);

我需要在某个地方放置我的 zip 文件,该文件将在纱线上解压。但我没有看到任何存档功能。

4

1 回答 1

1

在Running on yarn教程中使用 configspark.yarn.dist.archives作为文档

String pyPath = "my_task.py"
String archives = "/data/me/ld_env.zip#prediction_env"
SparkAppHandle handle = new SparkLauncher()
        .setSparkHome(sparkHome)
        .setAppResource(jarPath)
        .setMaster("yarn")
        .setDeployMode("cluster")
        .setConf(SparkLauncher.EXECUTOR_MEMORY, "8G")
        .setConf(SparkLauncher.EXECUTOR_CORES, "2")
        .setConf("spark.executor.instances", "8")
        .setConf("spark.yarn.queue", "rnd")
        .setConf("spark.pyspark.python", "./prediction_env/ld_env/bin/python")
        .setConf("spark.pyspark.driver.python", "./prediction_env/ld_env/bin/python")
        .setConf("spark.executor.memoryOverhead", "4096")
        .setConf("spark.yarn.dist.archives", archives)
        .addPyFile(pyPath)
        .addAppArgs("--inputPath",
                inputPath,
                "--outputPath",
                outputPath,
                "--option",
                option)
        .startApplication(taskListener);

所以,添加.setConf("spark.yarn.dist.archives", archives)解决问题。

于 2021-07-08T04:37:03.407 回答