4

我想从输出文件中逐行并行读取。每个线程读取一行然后处理数据。同时,下一个线程必须读取下一行。

std::ifstream infile("test.txt");
std::mutex mtx;

void read(int id_thread){
   while(infile.good()){
     mtx.lock();
     std::string sLine;
     getline(infile, sLine);
     std::cout << "Read by thread: " << id_thread;
     std::cout << sLine << std::endl;
     mtx.unlock();
   }
}

void main(){
  std::vector<std::thread> threads;
  for(int i = 0; i < num; i++){
     threads.push_back(std::thread(parallelFun, i));
  }

  for(auto& thread : threads){
      thread.join();
  }
  return 0;
}

当我运行此代码时,我得到:第一个线程读取所有行。我怎样才能让每个线程都读取一行?

在此处输入图像描述

编辑

正如评论中提到的,我需要做的就是更大的测试文件。多谢你们!

4

3 回答 3

6

我会将循环更改为

while(infile.good()){
     mtx.lock();
     std::string sLine;
     getline(infile, sLine);
     mtx.unlock();
     std::cout << "Read by thread: " << id_thread;
     std::cout << sLine << std::endl;
   }

你的 std::cout 东西是你测试循环的繁忙部分,你想稍后交换真正的代码。这使其他线程有时间启动。此外,使您的测试文件变。线程初始化需要一些时间而第一个线程吃掉所有数据的情况并不少见。

于 2014-11-06T13:07:57.970 回答
2

如果您希望您的 5 个线程准确地每 5 行读取一次,则必须同步读取,因此每个线程必须知道前一个线程已完成读取其部分。此要求可能会造成巨大的效率低下,因为某些线程可能会等待前一个线程很长时间才能运行。

概念代码,未经测试使用自担风险。

让我们首先创建一个默认类来处理原子锁。我们将其对齐以避免错误共享和相关的缓存乒乓。

constexpr size_t CACHELINESIZE = 64; // could differ on your architecture
template<class dType>
class alignas(CACHELINESIZE) lockstep {
  std::atomic<dType> lock = dType(0);

public:
  // spinlock spins until the previous value is prev and then tries to set lock to value
  // until success, restart the spin if prev changes.
  dType Spinlock(dType prev = dType(0), dType next = dType(1)) {
     dType expected = prev;
     while (!lock.compare_exchange_weak(expected, next)) { // request for locked-exclusiv ~100 cycles?
       expected = prev;  // we wish to continue to wait for expected
       do {
         pause(); // on intel waits roughly one L2 latency time.
       } while(lock.load(std::memory_order_relaxed) != prev);  // only one cache miss per change
     }
     return expected;
  }

  void store(dType value) {
    lock.store(value);
  }
};

lockstep<int> lock { 0 };

constexpr int NoThreads = 5;

std::ifstream infile("test.txt");

void read(int id_thread) {
   locks[id_thread].lock = id_thread;
   bool izNoGood = false;
   int next = id_thread;

   while(!izNoGood){
     // get lock for next iteration
     lock.spinlock(next, next); // wait on our number

     // moved file check into locked region     
     izNoGood = !infile.good();
     if (izNoGood) {
       lock.store(next+1); // release next thread to end run.
       return;
     }

     std::string sLine;
     getline(infile, sLine);

     // release next thread
     lock.store(next+1);

     // do work asynchronous
     // ...

     // debug log, hopefully the whole line gets written in one go (atomic)
     // but can be in "random" order relative to other lines.
     std::cout << "Read by thread: " << id_thread << " line no. " << next
               << " text:" << sLine << std::endl;  // endl flushes cout, implicit sync?
     next += NoThreads;  // our next expected line to process
   }
}

void main() {
  std::vector<std::thread> threads;
  for(int i = 0; i < NoThreads; i++) {
     threads.push_back(std::thread(parallelFun, i));
  }

  for(auto& thread : threads){
      thread.join();
  }
  return 0;
}
于 2014-11-06T15:22:04.680 回答
1

万一您希望每个线程读取一行(从您的描述中可以明显看出),请删除 while 循环,然后您需要确保线程数与文件中的行数相同。

要摆脱上述约束,您可以使用 boost 线程池。

于 2014-11-06T13:04:20.007 回答