如果您希望您的 5 个线程准确地每 5 行读取一次,则必须同步读取,因此每个线程必须知道前一个线程已完成读取其部分。此要求可能会造成巨大的效率低下,因为某些线程可能会等待前一个线程很长时间才能运行。
概念代码,未经测试使用自担风险。
让我们首先创建一个默认类来处理原子锁。我们将其对齐以避免错误共享和相关的缓存乒乓。
constexpr size_t CACHELINESIZE = 64; // could differ on your architecture
template<class dType>
class alignas(CACHELINESIZE) lockstep {
std::atomic<dType> lock = dType(0);
public:
// spinlock spins until the previous value is prev and then tries to set lock to value
// until success, restart the spin if prev changes.
dType Spinlock(dType prev = dType(0), dType next = dType(1)) {
dType expected = prev;
while (!lock.compare_exchange_weak(expected, next)) { // request for locked-exclusiv ~100 cycles?
expected = prev; // we wish to continue to wait for expected
do {
pause(); // on intel waits roughly one L2 latency time.
} while(lock.load(std::memory_order_relaxed) != prev); // only one cache miss per change
}
return expected;
}
void store(dType value) {
lock.store(value);
}
};
lockstep<int> lock { 0 };
constexpr int NoThreads = 5;
std::ifstream infile("test.txt");
void read(int id_thread) {
locks[id_thread].lock = id_thread;
bool izNoGood = false;
int next = id_thread;
while(!izNoGood){
// get lock for next iteration
lock.spinlock(next, next); // wait on our number
// moved file check into locked region
izNoGood = !infile.good();
if (izNoGood) {
lock.store(next+1); // release next thread to end run.
return;
}
std::string sLine;
getline(infile, sLine);
// release next thread
lock.store(next+1);
// do work asynchronous
// ...
// debug log, hopefully the whole line gets written in one go (atomic)
// but can be in "random" order relative to other lines.
std::cout << "Read by thread: " << id_thread << " line no. " << next
<< " text:" << sLine << std::endl; // endl flushes cout, implicit sync?
next += NoThreads; // our next expected line to process
}
}
void main() {
std::vector<std::thread> threads;
for(int i = 0; i < NoThreads; i++) {
threads.push_back(std::thread(parallelFun, i));
}
for(auto& thread : threads){
thread.join();
}
return 0;
}