我有一个脚本可以处理一批 20MiB 的 CSV 文件,可以选择 gzip 压缩到大约 4MiB。有成千上万个文件,每个单独处理大约需要 30 秒;读取未压缩文件或压缩文件并解压缩“几乎是即时的”,这强烈表明该进程可以在进程级别上并行化。事实上,这就是使用复杂的 Ruby 管道所做的事情。但是,我正在尝试使用 bash 将 Ruby 代码分解成更小的部分。对于工作控制,我想出了这个 bash 功能
wait_until_job_available() {
maximum_jobs=${MAXIMUM_JOBS}
[ $# -eq 0 ] || maximum_jobs="${1}"
exit_status=0
RUNNING_JOBS=( $(jobs -p) )
while [ ${maximum_jobs} -le ${#RUNNING_JOBS[@]} ] && [ 0 -eq "${exit_status}" ]
do
# `wait -n` requires bash 4.3 which is unfortunately not available on several recent RHEL-based Linux distributions such as Oracle Linux
wait -n
exit_status=$?
RUNNING_JOBS=( $(jobs -p) )
done
return ${exit_status}
}
这允许我在后台运行 bash 管道之前调用wait_until_job_available
,并允许运行作业的可选最小数量(如果省略,则默认为机器上可用的内核数)。
所以我可能会像这样使用它:
while read file
do
CAT_COMMAND=cat
# if input file is gzip-compressed, pipe zcat instead of cat
if [ "${INFILE: -3}" == ".gz" ]
then
CAT_COMMAND=zcat
fi
# wait for a job to become available
wait_until_job_available
# read the uncompressed file, write processed data to file.out
process_file -i <(${CAT_COMMAND} ${file}) -o ${file}.out &
# while searching for filesystem paths of type _f_ile
done < <(find ${search_path} -type f)
# wait for all background jobs to finish
wait
如您所见,这应该找到其中的所有文件search_path
并将其传递给process_file
命令。在这样做时,我使用进程替换来对文件进行分类或即时解压缩文件;输入文件名被替换为一个进程,该进程将发出未压缩文件的内容,输出文件是原始文件名,附加了“.out”。的调用process_file
被后台处理并发送到作业控制。看起来花花公子,对吧?
除了我注意到某些文件没有完全正确处理。
我注意到报告要处理的文件process_file
总是报告为/dev/fd/63
,即使对于process_file
. 另一方面,当我将文件单独复制或解压缩到临时文件并将临时文件的名称传递给 时process_file
,执行正常并且所有文件似乎都得到了正确处理。
我想避免创建一个临时文件,特别是在触摸磁盘(性能)和需要在处理后清理(删除)临时文件方面;有这个问题阻碍了这一点。所以我很好奇替代流程管道的伪文件名称是否存在某种竞争条件?还是我似乎误解了流程替代或工作控制?
作为参考,我使用的是 Ubuntu Server 14.04、linux 3.19.0-59 Bash 4.3.11 gzip 1.6