| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef SHARED_QUEUE_H |
| #define SHARED_QUEUE_H |
| |
| #include <pthread.h> |
| #include <cassert> |
| #include <deque> |
| |
| #include "thread_safe_ref_count.h" |
| |
| namespace event_queue { |
| |
| // This file provides a queue that uses a mutex and condition variable so that |
| // one thread can put pointers into the queue and another thread can pull items |
| // out of the queue. |
| |
| const int kPthreadMutexSuccess = 0; |
| |
| // Specifies whether we want to wait for the queue. |
| enum QueueWaitingFlag { |
| kWait = 0, |
| kDontWait |
| }; |
| |
| // Indicates if we got an item, did not wait, or if the queue was cancelled. |
| enum QueueGetResult { |
| kReturnedItem = 0, |
| kDidNotWait = 1, |
| kQueueWasCancelled |
| }; |
| |
| // LockingQueue contains a collection of <T>, such as a collection of |
| // objects or pointers. The Push() method is used to add items to the |
| // queue in a thread-safe manner. The GetItem() is used to retrieve |
| // items from the queue in a thread-safe manner. |
| template <class T> |
| class LockingQueue { |
| public: |
| LockingQueue() : quit_(false) { |
| int result = pthread_mutex_init(&queue_mutex_, NULL); |
| assert(result == 0); |
| result = pthread_cond_init(&queue_condition_var_, NULL); |
| assert(result == 0); |
| } |
| ~LockingQueue() { |
| pthread_mutex_destroy(&queue_mutex_); |
| } |
| |
| // The producer (who instantiates the queue) calls this to tell the |
| // consumer that the queue is no longer being used. |
| void CancelQueue() { |
| ScopedLock scoped_mutex(&queue_mutex_); |
| quit_ = true; |
| // Signal the condition var so that if a thread is waiting in |
| // GetItem the thread will wake up and see that the queue has |
| // been cancelled. |
| pthread_cond_signal(&queue_condition_var_); |
| } |
| |
| // The consumer calls this to see if the queue has been cancelled by |
| // the producer. If so, the thread should not call GetItem and may |
| // need to terminate -- i.e. in a case where the producer created |
| // the consumer thread. |
| bool IsCancelled() { |
| ScopedLock scoped_mutex(&queue_mutex_); |
| return quit_; |
| } |
| |
| // Grabs the mutex and pushes a new item to the end of the queue if the |
| // queue is not full. Signals the condition variable so that a thread |
| // that is waiting will wake up and grab the item. |
| void Push(const T& item) { |
| ScopedLock scoped_mutex(&queue_mutex_); |
| the_queue_.push_back(item); |
| pthread_cond_signal(&queue_condition_var_); |
| } |
| |
| // Tries to pop the front element from the queue; returns an enum: |
| // kReturnedItem if an item is returned in |item_ptr|, |
| // kDidNotWait if |wait| was kDontWait and the queue was empty, |
| // kQueueWasCancelled if the producer called CancelQueue(). |
| // If |wait| is kWait, GetItem will wait to return until the queue |
| // contains an item (unless the queue is cancelled). |
| QueueGetResult GetItem(T* item_ptr, QueueWaitingFlag wait) { |
| // Because we use both pthread_mutex_lock and pthread_cond_wait, |
| // we directly use the mutex instead of using ScopedLock. |
| ScopedLock scoped_mutex(&queue_mutex_); |
| // Use a while loop to get an item. If the user does not want to wait, |
| // we will exit from the loop anyway, unlocking the mutex. |
| // If the user does want to wait, we will wait for pthread_cond_wait, |
| // and the while loop will check is_empty_no_locking() one more |
| // time so that a spurious wake-up of pthread_cond_wait is handled. |
| // If |quit_| has been set, break out of the loop. |
| while (!quit_ && is_empty_no_locking()) { |
| // If user doesn't want to wait, return... |
| if (kDontWait == wait) { |
| return kDidNotWait; |
| } |
| // Wait for signal to occur. |
| pthread_cond_wait(&queue_condition_var_, &queue_mutex_); |
| } |
| // Check to see if quit_ woke us up |
| if (quit_) { |
| return kQueueWasCancelled; |
| } |
| |
| // At this point, the queue was either not empty or, if it was empty, |
| // we called pthread_cond_wait (which released the mutex, waited for the |
| // signal to occur, and then atomically reacquired the mutex). |
| // Thus, if we are here, the queue cannot be empty because we either |
| // had the mutex and verified it was not empty, or we waited for the |
| // producer to put an item in and signal a single thread (us). |
| T& item = the_queue_.front(); |
| *item_ptr = item; |
| the_queue_.pop_front(); |
| return kReturnedItem; |
| } |
| |
| private: |
| std::deque<T> the_queue_; |
| bool quit_; |
| pthread_mutex_t queue_mutex_; |
| pthread_cond_t queue_condition_var_; |
| |
| // This is used by methods that already have the lock. |
| bool is_empty_no_locking() const { |
| return the_queue_.empty(); |
| } |
| }; |
| |
| } // end of unnamed namespace |
| |
| #endif // SHARED_QUEUE_H |
| |