我有一个自定义线程池类,它创建一些线程,每个线程都等待自己的事件(信号)。当新作业添加到线程池时,它会唤醒第一个空闲线程,以便执行该作业。
问题如下:我有大约 1000 个循环,每个循环大约 10'000 次迭代。这些循环必须按顺序执行,但我有 4 个 CPU 可用。我尝试做的是将 10'000 个迭代循环拆分为 4 个 2'500 个迭代循环,即每个线程一个。但是我必须等待 4 个小循环完成,然后才能进行下一个“大”迭代。这意味着我不能捆绑工作。
我的问题是使用线程池和 4 个线程比按顺序执行作业要慢得多(由单独的线程执行一个循环比直接在主线程中按顺序执行要慢得多)。
我在 Windows 上,所以我使用创建事件,CreateEvent()
然后等待其中一个使用WaitForMultipleObjects(2, handles, false, INFINITE)
,直到主线程调用SetEvent()
.
看来整个事件的事情(以及使用关键部分的线程之间的同步)非常昂贵!
我的问题是:使用事件需要“很多”时间是否正常?如果是这样,是否有另一种我可以使用的机制并且会更省时?
这是一些代码来说明(从我的线程池类复制的一些相关部分):
// thread function
unsigned __stdcall ThreadPool::threadFunction(void* params) {
// some housekeeping
HANDLE signals[2];
signals[0] = waitSignal;
signals[1] = endSignal;
do {
// wait for one of the signals
waitResult = WaitForMultipleObjects(2, signals, false, INFINITE);
// try to get the next job parameters;
if (tp->getNextJob(threadId, data)) {
// execute job
void* output = jobFunc(data.params);
// tell thread pool that we're done and collect output
tp->collectOutput(data.ID, output);
}
tp->threadDone(threadId);
}
while (waitResult - WAIT_OBJECT_0 == 0);
// if we reach this point, endSignal was sent, so we are done !
return 0;
}
// create all threads
for (int i = 0; i < nbThreads; ++i) {
threadData data;
unsigned int threadId = 0;
char eventName[20];
sprintf_s(eventName, 20, "WaitSignal_%d", i);
data.handle = (HANDLE) _beginthreadex(NULL, 0, ThreadPool::threadFunction,
this, CREATE_SUSPENDED, &threadId);
data.threadId = threadId;
data.busy = false;
data.waitSignal = CreateEvent(NULL, true, false, eventName);
this->threads[threadId] = data;
// start thread
ResumeThread(data.handle);
}
// add job
void ThreadPool::addJob(int jobId, void* params) {
// housekeeping
EnterCriticalSection(&(this->mutex));
// first, insert parameters in the list
this->jobs.push_back(job);
// then, find the first free thread and wake it
for (it = this->threads.begin(); it != this->threads.end(); ++it) {
thread = (threadData) it->second;
if (!thread.busy) {
this->threads[thread.threadId].busy = true;
++(this->nbActiveThreads);
// wake thread such that it gets the next params and runs them
SetEvent(thread.waitSignal);
break;
}
}
LeaveCriticalSection(&(this->mutex));
}