/* * barrier.c * * This file implements the "barrier" synchronization construct. * * A barrier causes threads to wait until a set of threads has * all "reached" the barrier. The number of threads required is * set when the barrier is initialized, and cannot be changed * except by reinitializing. * * The barrier_init() and barrier_destroy() functions, * respectively, allow you to initialize and destroy the * barrier. * * The barrier_wait() function allows a thread to wait for a * barrier to be completed. One thread (the one that happens to * arrive last) will return from barrier_wait() with the status * -1 on success -- others will return with 0. The special * status makes it easy for the calling code to cause one thread * to do something in a serial region before entering another * parallel section of code. */ #include #include "Error.h" #include "Barrier.h" namespace TNet { /* * Initialize a barrier for use. */ Barrier::Barrier(int count) : threshold_(count), counter_(count), cycle_(0) { if(0 != pthread_mutex_init(&mutex_, NULL)) KALDI_ERR << "Cannot initialize mutex"; if(0 != pthread_cond_init(&cv_, NULL)) { pthread_mutex_destroy(&mutex_); KALDI_ERR << "Cannot initilize condv"; } } /* * Destroy a barrier when done using it. */ Barrier::~Barrier() { if(0 != pthread_mutex_lock(&mutex_)) KALDI_ERR << "Cannot lock mutex"; /* * Check whether any threads are known to be waiting; report * "BUSY" if so. */ if(counter_ != threshold_) { pthread_mutex_unlock (&mutex_); KALDI_ERR << "Cannot destroy barrier with waiting thread"; } if(0 != pthread_mutex_unlock(&mutex_)) KALDI_ERR << "Cannot unlock barrier"; /* * If unable to destroy either 1003.1c synchronization * object, halt */ if(0 != pthread_mutex_destroy(&mutex_)) KALDI_ERR << "Cannot destroy mutex"; if(0 != pthread_cond_destroy(&cv_)) KALDI_ERR << "Cannot destroy condv"; } void Barrier::SetThreshold(int thr) { if(counter_ != threshold_) KALDI_ERR << "Cannot set threshold, while a thread is waiting"; threshold_ = thr; counter_ = thr; } /* * Wait for all members of a barrier to reach the barrier. When * the count (of remaining members) reaches 0, broadcast to wake * all threads waiting. */ int Barrier::Wait() { int status, cancel, tmp, cycle; if(threshold_ == 0) KALDI_ERR << "Cannot wait when Threshold value was not set"; if(0 != pthread_mutex_lock(&mutex_)) KALDI_ERR << "Cannot lock mutex"; cycle = cycle_; /* Remember which cycle we're on */ if(--counter_ == 0) { cycle_ = !cycle_; counter_ = threshold_; status = pthread_cond_broadcast(&cv_); /* * The last thread into the barrier will return status * -1 rather than 0, so that it can be used to perform * some special serial code following the barrier. */ if(status == 0) status = -1; } else { /* * Wait with cancellation disabled, because barrier_wait * should not be a cancellation point. */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel); /* * Wait until the barrier's cycle changes, which means * that it has been broadcast, and we don't want to wait * anymore. */ while (cycle == cycle_) { status = pthread_cond_wait(&cv_, &mutex_); if (status != 0) break; } pthread_setcancelstate(cancel, &tmp); } /* * Ignore an error in unlocking. It shouldn't happen, and * reporting it here would be misleading -- the barrier wait * completed, after all, whereas returning, for example, * EINVAL would imply the wait had failed. The next attempt * to use the barrier *will* return an error, or hang, due * to whatever happened to the mutex. */ pthread_mutex_unlock (&mutex_); return status; /* error, -1 for waker, or 0 */ } }//namespace TNet