35

我正在寻找 C/CPP 中工作窃取队列的正确实现。我环顾了谷歌,但没有发现任何有用的东西。

也许有人熟悉一个好的开源实现?(我不喜欢实现取自原始学术论文的伪代码)。

4

10 回答 10

16

没有免费的午餐。

请看一下偷纸的原作。这篇论文很难理解。我知道那篇论文包含理论证明而不是伪代码。然而,没有比 TBB更简单的版本了。如果有的话,它不会提供最佳性能。工作窃取本身会产生一些开销,因此优化和技巧非常重要。特别是,出队必须是线程安全的。实现高度可扩展和低开销的同步具有挑战性。

我真的很想知道你为什么需要它。我认为正确的实施意味着像 TBB 和 Cilk 这样的东西。同样,偷工减料很难实施。

于 2010-01-26T06:49:30.157 回答
15

实施“偷工减料”在理论上并不难。您需要一组包含任务的队列,这些任务通过组合计算和生成其他任务来完成更多工作。您需要对队列进行原子访问,以将新生成的任务放入这些队列中。最后,您需要每个任务最后调用的过程,以便为执行任务的线程找到更多工作;该程序需要在工作队列中查找工作。

大多数此类工作窃取系统都假设存在少量线程(通常由真实处理器内核支持),并且每个线程只有一个工作队列。然后你首先尝试从自己的队列中窃取工作,如果它是空的,则尝试从其他人那里窃取。棘手的是知道要查看哪些队列;连续扫描它们以寻找工作非常昂贵,并且可能在寻找工作的线程之间产生大量争用。

到目前为止,这都是非常通用的东西,只有两个主要例外:1) 不能在纯 C 或 C++ 中说明切换上下文(例如,设置处理器上下文寄存器,例如“堆栈”)。您可以通过同意在目标平台特定的机器代码中编写包的一部分来解决此问题。2) 对多处理器队列的原子访问不能纯粹在 C 或 C++ 中完成(忽略 Dekker 算法),因此您需要使用汇编语言同步原语(如 X86 LOCK XCH 或比较和交换)对这些进行编码。现在,一旦您拥有安全访问权限,更新队列所涉及的代码并不是很复杂,您可以轻松地用几行 C 语言编写它。

但是,我认为您会发现,尝试使用混合汇编程序在 C 和 C++ 中编写这样的包仍然是相当低效的,并且最终您最终还是会在汇编程序中编写整个代码。剩下的都是 C/C++ 兼容的入口点:-}

我为我们的PARLANSE并行编程语言做了这个,它提供了任意大量并行计算在任何时刻实时和交互(同步)的想法。它是在 X86 上的幕后实现的,每个 CPU 一个线程,并且完全在汇编程序中实现。窃取工作的代码总共可能有 1000 行,而且它的代码很棘手,因为您希望它在非争用情况下非常快。

C 和 C++ 真正的美中不足是,当你创建一个代表工作的任务时,你分配了多少堆栈空间?串行 C/C++ 程序通过简单地过度分配大量(例如,10Mb)线性堆栈,没有人关心浪费了多少堆栈空间。但是,如果您可以创建数千个任务并让它们都在特定时刻运行,那么您就无法合理地为每个任务分配 10Mb。所以现在你要么需要静态确定一个任务需要多少堆栈空间(图灵硬),要么你需要分配堆栈块(例如,每个函数调用),这是广泛可用的 C/C++ 编译器不做的(例如,您可能使用的那个)。最后的出路是限制任务创建,使其在任何时候限制在几百个,并在实时任务中多路复用几百个非常巨大的堆栈。如果任务可以互锁/暂停状态,则不能执行最后一个,因为您会遇到阈值。所以你只能在任务只有做计算。这似乎是一个非常严格的约束。

对于 PARLANSE,我们构建了一个编译器,为每个函数调用在堆上分配激活记录。

于 2010-01-31T00:00:13.627 回答
2

有一种工具可以以一种非常优雅的方式简单地完成它。这是在很短的时间内并行化您的程序的一种非常有效的方法。

Cilk项目

高性能计算挑战奖

我们的 Cilk 参赛作品的 HPC Challenge Class 2 奖获得了 2006 年“优雅与性能的最佳结合”奖。该奖项于 2006 年 11 月 14 日在坦帕的 SC'06 颁发。

于 2010-01-27T14:47:49.670 回答
2

如果您正在寻找基于 pthread 或 boost::thread 构建的 C++ 中的独立工作窃取队列类实现,那么祝您好运,据我所知没有。

然而,正如其他人所说,Cilk、TBB 和 Microsoft 的 PPL 都在幕后实现了工作窃取。

问题是您想使用工作窃取队列还是实施一个?如果您只想使用一个,那么上面的选择是很好的起点,只需在其中任何一个中安排一个“任务”就足够了。

正如BlueRaja 所说,PPL 中的task_group 和structured_task_group 将执行此操作,还请注意,这些类也可以在最新版本的英特尔TBB 中使用。并行循环(parallel_for、parallel_for_each)也通过工作窃取来实现。

如果您必须查看源代码而不是使用实现,TBB 是开源的,Microsoft 为其 CRT 提供源代码,因此您可以进行探索。

您还可以查看 Joe Duffy 的博客以了解 C# 实现(但它是 C# 并且内存模型不同)。

-瑞克

于 2010-01-31T03:42:05.470 回答
2

这个开源库https://github.com/cpp-taskflow/cpp-taskflow自 2018 年 12 月起支持工作窃取线程池。

看看WorkStealingQueue实现工作窃取队列的类,如论文“Dynamic Circular Work-stealing Deque”,SPAA,2015 中所述。

于 2018-12-31T02:02:07.083 回答
1

PPL结构化任务组类使用工作窃取队列来实现。如果您需要 WSQ 进行线程处理,我建议您这样做。 如果你真的在找源码,我不知道ppl.h中是否给出了代码,或者是否有预编译的对象;我今晚回家时必须检查一下。

于 2010-01-25T17:10:03.080 回答
1

我发现这个工作窃取算法最接近的实现是Karl-Filip Faxén 的Woolsrc /报告/比较

于 2014-01-20T16:59:00.903 回答
1

OpenMP 可能很好地支持工作窃取,尽管它被称为递归并行

OpenMP 论坛帖子

OpenMP 规范定义了任务构造(可以嵌套,因此非常适合递归并行),但没有详细说明它们是如何实现的。OpenMP 实现,包括 gcc,通常使用某种形式的工作窃取任务,尽管确切的算法(以及由此产生的性能)可能会有所不同!

#pragma omp task#pragma omp taskwait

更新

C++ Concurrency in Action一书的第 9 章描述了如何实现“池线程的工作窃取”。我自己没有阅读/实现它,但它看起来并不太难。

于 2015-11-17T15:30:45.247 回答
0

我已将此 C 项目移植到 C++。

扩展数组时,原始Steal数据可能会遇到脏读。我试图修复这个错误,但最终让步了,因为我实际上并不需要动态增长的堆栈。该方法不尝试分配空间,而是Push简单地返回false. 调用者然后可以执行旋转等待,即while(!stack->Push(value)){}

#pragma once
#include <atomic>

  // A lock-free stack.
  // Push = single producer
  // Pop = single consumer (same thread as push)
  // Steal = multiple consumer

  // All methods, including Push, may fail. Re-issue the request
  // if that occurs (spinwait).

  template<class T, size_t capacity = 131072>
  class WorkStealingStack {

  public:
    inline WorkStealingStack() {
      _top = 1;
      _bottom = 1;
    }

    WorkStealingStack(const WorkStealingStack&) = delete;

    inline ~WorkStealingStack()
    {

    }

    // Single producer
    inline bool Push(const T& item) {
      auto oldtop = _top.load(std::memory_order_relaxed);
      auto oldbottom = _bottom.load(std::memory_order_relaxed);
      auto numtasks = oldbottom - oldtop;

      if (
        oldbottom > oldtop && // size_t is unsigned, validate the result is positive
        numtasks >= capacity - 1) {
        // The caller can decide what to do, they will probably spinwait.
        return false;
      }

      _values[oldbottom % capacity].store(item, std::memory_order_relaxed);
      _bottom.fetch_add(1, std::memory_order_release);
      return true;
    }

    // Single consumer
    inline bool Pop(T& result) {

      size_t oldtop, oldbottom, newtop, newbottom, ot;

      oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
      ot = oldtop = _top.load(std::memory_order_acquire);
      newtop = oldtop + 1;
      newbottom = oldbottom - 1;

      // Bottom has wrapped around.
      if (oldbottom < oldtop) {
        _bottom.store(oldtop, std::memory_order_relaxed);
        return false;
      }

      // The queue is empty.
      if (oldbottom == oldtop) {
        _bottom.fetch_add(1, std::memory_order_release);
        return false;
      }

      // Make sure that we are not contending for the item.
      if (newbottom == oldtop) {
        auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
        if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
          _bottom.fetch_add(1, std::memory_order_release);
          return false;
        }
        else {
          result = ret;
          _bottom.store(newtop, std::memory_order_release);
          return true;
        }
      }

      // It's uncontended.
      result = _values[newbottom % capacity].load(std::memory_order_acquire);
      return true;
    }

    // Multiple consumer.
    inline bool Steal(T& result) {
      size_t oldtop, newtop, oldbottom;

      oldtop = _top.load(std::memory_order_acquire);
      oldbottom = _bottom.load(std::memory_order_relaxed);
      newtop = oldtop + 1;

      if (oldbottom <= oldtop)
        return false;

      // Make sure that we are not contending for the item.
      if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
        return false;
      }

      result = _values[oldtop % capacity].load(std::memory_order_relaxed);
      return true;
    }

  private:

    // Circular array
    std::atomic<T> _values[capacity];
    std::atomic<size_t> _top; // queue
    std::atomic<size_t> _bottom; // stack
  };

完整要点(包括单元测试)。我只在强架构(x86/64)上运行了测试,所以就弱架构而言,如果您尝试在 Neon/PPC 上使用它,您的里程可能会有所不同。

于 2014-12-30T10:34:10.070 回答
-1

我不认为JobSwarm使用工作窃取,但这是第一步。我不知道其他用于此目的的开源库。

于 2010-01-20T15:18:21.583 回答