6

我有一个自定义线程池类,它创建一些线程,每个线程都等待自己的事件(信号)。当新作业添加到线程池时,它会唤醒第一个空闲线程,以便执行该作业。

问题如下:我有大约 1000 个循环,每个循环大约 10'000 次迭代。这些循环必须按顺序执行,但我有 4 个 CPU 可用。我尝试做的是将 10'000 个迭代循环拆分为 4 个 2'500 个迭代循环,即每个线程一个。但是我必须等待 4 个小循环完成,然后才能进行下一个“大”迭代。这意味着我不能捆绑工作。

我的问题是使用线程池和 4 个线程比按顺序执行作业要慢得多(由单独的线程执行一个循环比直接在主线程中按顺序执行要慢得多)。

我在 Windows 上,所以我使用创建事件,CreateEvent()然后等待其中一个使用WaitForMultipleObjects(2, handles, false, INFINITE),直到主线程调用SetEvent().

看来整个事件的事情(以及使用关键部分的线程之间的同步)非常昂贵!

我的问题是:使用事件需要“很多”时间是否正常?如果是这样,是否有另一种我可以使用的机制并且会更省时?

这是一些代码来说明(从我的线程池类复制的一些相关部分):

// thread function
unsigned __stdcall ThreadPool::threadFunction(void* params) {
    // some housekeeping
    HANDLE signals[2];
    signals[0] = waitSignal;
    signals[1] = endSignal;

    do {
        // wait for one of the signals
        waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);

        // try to get the next job parameters;
        if (tp->getNextJob(threadId, data)) {
            // execute job
            void* output = jobFunc(data.params);

            // tell thread pool that we're done and collect output
            tp->collectOutput(data.ID, output);
        }

        tp->threadDone(threadId);
    }
    while (waitResult - WAIT_OBJECT_0 == 0);

    // if we reach this point, endSignal was sent, so we are done !

    return 0;
}

// create all threads
for (int i = 0; i < nbThreads; ++i) {
    threadData data;
    unsigned int threadId = 0;
    char eventName[20];

    sprintf_s(eventName, 20, "WaitSignal_%d", i);

    data.handle = (HANDLE) _beginthreadex(NULL, 0, ThreadPool::threadFunction,
        this, CREATE_SUSPENDED, &threadId);
    data.threadId = threadId;
    data.busy = false;
    data.waitSignal = CreateEvent(NULL, true, false, eventName);

    this->threads[threadId] = data;

    // start thread
    ResumeThread(data.handle);
}

// add job
void ThreadPool::addJob(int jobId, void* params) {
    // housekeeping
    EnterCriticalSection(&(this->mutex));

    // first, insert parameters in the list
    this->jobs.push_back(job);

    // then, find the first free thread and wake it
    for (it = this->threads.begin(); it != this->threads.end(); ++it) {
        thread = (threadData) it->second;

        if (!thread.busy) {
            this->threads[thread.threadId].busy = true;

            ++(this->nbActiveThreads);

            // wake thread such that it gets the next params and runs them
            SetEvent(thread.waitSignal);
            break;
        }
    }

    LeaveCriticalSection(&(this->mutex));
}
4

9 回答 9

3

是的,WaitForMultipleObjects相当昂贵。如您所见,如果您的工作很小,同步开销将开始超过实际完成工作的成本。

解决此问题的一种方法是将多个作业捆绑为一个:如果您获得一份“小”作业(无论您如何评估此类事情),请将其存储在某个地方,直到您有足够的小作业一起完成一项合理大小的作业。然后将它们全部发送到工作线程进行处理。

或者,您可以使用多读取器单写入器队列来存储您的作业,而不是使用信号。在这个模型中,每个工作线程都试图从队列中获取作业。当它找到一个时,它就完成了工作;如果没有,它会休眠一小段时间,然后醒来并再次尝试。这将降低您的每个任务的开销,但即使没有工作要做,您的线程也会占用 CPU。这完全取决于问题的确切性质。

于 2009-08-20T13:27:11.763 回答
3

这在我看来是一种生产者消费者模式,可以用两个信号量来实现,一个保护队列溢出,另一个保护空队列。

你可以在这里找到一些细节。

于 2009-08-20T13:28:31.920 回答
2

请注意,发出 endSignal 后,您仍在要求下一份工作。

for( ;; ) {
    // wait for one of the signals
    waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);
    if( waitResult - WAIT_OBJECT_0 != 0 )
        return;
    //....
}
于 2009-08-20T13:35:38.477 回答
2

由于您说并行执行比顺序执行慢得多,因此我假设您内部 2500 次循环迭代的处理时间很小(在几微秒范围内)。然后除了查看算法以拆分更大的进动块之外,您无能为力;OpenMP 无济于事,其他所有同步技术也无济于事,因为它们基本上都依赖于事件(自旋循环不符合条件)。

另一方面,如果 2500 次循环迭代的处理时间大于 100 微秒(在当前 PC 上),您可能会遇到硬件限制。如果您的处理使用大量内存带宽,将其拆分为四个处理器不会给您更多的带宽,实际上会因为冲突而给您更少的带宽。您还可能遇到缓存循环问题,其中前 1000 次迭代中的每一个都会刷新并重新加载 4 个内核的缓存。那么就没有一个解决方案,并且根据您的目标硬件,可能没有。

于 2009-08-27T23:19:06.560 回答
1

线程之间的上下文切换也可能很昂贵。在某些情况下,开发一个框架很有趣,您可以使用该框架通过一个线程或多个线程按顺序处理您的作业。这样,您就可以两全其美。

顺便问一下,你的问题到底是什么?我将能够用更精确的问题更准确地回答:)

编辑:

在某些情况下,事件部分可能比您的处理消耗更多,但不应该那么昂贵,除非您的处理真的很快就可以实现。在这种情况下,在 thredas 之间切换也很昂贵,因此我的回答第一部分是按顺序做事......

您应该寻找线程间同步瓶颈。您可以跟踪线程等待时间开始...

编辑:经过更多提示...

如果我猜对了,你的问题是有效地使用你所有的计算机内核/处理器来并行化一些基本顺序的处理。

假设您有 4 个内核和 10000 个循环来计算,如您的示例(在评论中)。您说您需要等待 4 个线程结束才能继续。然后,您可以简化同步过程。你只需要给你的四个线程 thr nth、nth+1、nth+2、nth+3 循环,等待四个线程完成然后继续。您应该使用集合点或屏障(等待 n 个线程完成的同步机制)。Boost就有这样的机制。您可以查看 windows 实现以提高效率。您的线程池并不真正适合该任务。在关键部分中搜索可用线程会消耗您的 CPU 时间。不是事件部分。

于 2009-08-20T13:25:50.207 回答
1

它不应该那么昂贵,但是如果您的工作几乎不需要任何时间,那么线程和同步对象的开销就会变得很大。像这样的线程池对于处理时间较长的作业或那些使用大量 IO 而不是 CPU 资源的作业要好得多。如果您在处理作业时受 CPU 限制,请确保每个 CPU 只有 1 个线程。

可能还有其他问题,getNextJob 是如何获取它的数据来处理的呢?如果有大量数据复制,那么您的开销又会显着增加。

我会通过让每个线程不断从队列中拉出作业直到队列为空来优化它。这样,您可以将一百个作业传递给线程池,并且同步对象将只使用一次来启动线程。我还将作业存储在队列中,并将指向它们的指针、引用或迭代器传递给线程,而不是复制数据。

于 2009-08-20T13:26:49.143 回答
1

看来整个事件的事情(以及使用关键部分的线程之间的同步)非常昂贵!

“昂贵”是一个相对术语。喷气机很贵吗?是汽车吗?还是自行车……鞋子……?

在这种情况下,问题是:相对于 JobFunction 执行所花费的时间,事件是否“昂贵”?发布一些绝对数字会有所帮助:“无线程”时该过程需要多长时间?是几个月,还是几飞秒?

当您增加线程池大小时,时间会发生什么变化?尝试池大小为 1,然后是 2,然后是 4,依此类推。

另外,由于您过去在线程池方面遇到过一些问题,我建议您进行一些调试以计算您的线程函数实际被调用的次数......它是否符合您的预期?

从空中挑选一个数字(不了解您的目标系统,并假设您没有在未显示的代码中做任何“巨大”的事情),我预计每个“工作”的“事件开销”以微秒为单位进行测量。也许一百左右。如果在 JobFunction 中执行算法所花费的时间没有明显超过这个时间,那么您的线程可能会花费您的时间而不是节省时间。

于 2009-08-20T19:38:45.077 回答
1

如果您只是并行化循环并使用 vs 2008,我建议您查看 OpenMP。如果您使用的是 Visual Studio 2010 beta 1,我建议您查看并行模式库,尤其是“并行”/“每个并行”api“任务组类”,因为这些可能会满足您的需求尝试这样做,只需要更少的代码。

关于你关于性能的问题,这真的取决于。您需要查看在迭代期间计划了多少工作以及成本是多少。如果您经常使用 WaitForMultipleObjects 并且您的工作量很小,那么它可能会非常昂贵,这就是我建议使用已经构建的实现的原因。您还需要确保您没有在调试模式下、在调试器下运行,并且任务本身没有阻塞锁、I/O 或内存分配,并且您没有遇到错误共享。这些中的每一个都有可能破坏可扩展性。

我建议在诸如xperf和 Visual Studio 2010 beta 1 中的 f1 分析器(它有 2 种有助于查看争用的新并发模式)或 Intel 的 vtune 之类的分析器下查看这个。

您还可以共享您在任务中运行的代码,这样人们就可以更好地了解您在做什么,因为我总是得到的关于性能问题的答案首先是“它取决于”,其次是“你有吗?描述了它。”

祝你好运

-瑞克

于 2009-08-24T00:33:15.653 回答
0

如前所述,线程增加的开销取决于执行您定义的“作业”所花费的相对时间量。因此,重要的是要在工作块的大小上找到一个平衡点,以最小化块的数量,但不会让处理器空闲等待最后一组计算完成。

您的编码方法通过主动寻找空闲线程来提供新工作,从而增加了开销工作量。操作系统已经在跟踪并更有效地执行此操作。此外,您的函数 ThreadPool::addJob() 可能会发现所有线程都在使用中并且无法委派工作。但它没有提供与该问题相关的任何返回码。如果您没有以某种方式检查这种情况并且没有注意到结果中的错误,则意味着始终存在空闲处理器。我建议重新组织代码,以便 addJob() 执行它的名称 - 仅添加一个工作(不查找甚至不关心谁来完成工作),而每个工作线程在完成现有工作后积极地获得新工作。

于 2017-09-03T17:47:23.787 回答