Share buffers between data sinks.

Instead of copying the data out of the shared_ptr into a
BufferQueue, use SharedBufferQueue (which is a std::queue of
std::shared_ptr) within each sink to hold a reference on the
original sink input buffer.

Change-Id: I45a69129aa07330003665d7cee90738cba02c0af
diff --git a/encoder/buffer_util.h b/encoder/buffer_util.h
index 6eb1ab0..30394c7 100644
--- a/encoder/buffer_util.h
+++ b/encoder/buffer_util.h
@@ -42,7 +42,6 @@
   size_t GetNumBuffers();
 
  private:
-  bool locked_;
   std::mutex mutex_;
   std::queue<Buffer*> buffer_q_;
 };
diff --git a/encoder/data_sink.cc b/encoder/data_sink.cc
index f055442..f8eaea8 100644
--- a/encoder/data_sink.cc
+++ b/encoder/data_sink.cc
@@ -14,6 +14,37 @@
 
 namespace webmlive {
 
+//
+// SharedBufferQueue
+//
+bool SharedBufferQueue::EnqueueBuffer(const SharedDataSinkBuffer& buffer) {
+  if (buffer.get() == NULL) {
+    LOG(ERROR) << "Empty SharedDataSinkBuffer.";
+    return false;
+  }
+  std::lock_guard<std::mutex> lock(mutex_);
+  buffer_q_.push(buffer);
+  return true;
+}
+
+SharedDataSinkBuffer SharedBufferQueue::DequeueBuffer() {
+  SharedDataSinkBuffer buffer;
+  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
+  if (lock.owns_lock() && !buffer_q_.empty()) {
+    buffer = buffer_q_.front();
+    buffer_q_.pop();
+  }
+  return buffer;
+}
+
+size_t SharedBufferQueue::GetNumBuffers() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return buffer_q_.size();
+}
+
+//
+// DataSink
+//
 void DataSink::AddDataSink(DataSinkInterface* data_sink) {
   std::lock_guard<std::mutex> lock(mutex_);
   data_sinks_.push_back(data_sink);
@@ -22,7 +53,7 @@
 bool DataSink::WriteData(const std::string& id,
                          const uint8* ptr_data, int data_length) {
   std::lock_guard<std::mutex> lock(mutex_);
-  DataSinkInterface::SharedDataSinkBuffer buffer;
+  SharedDataSinkBuffer buffer;
   buffer.reset(new (std::nothrow) DataSinkBuffer);
 
   if (!buffer.get()) {
diff --git a/encoder/data_sink.h b/encoder/data_sink.h
index aaa3331..ccf4146 100644
--- a/encoder/data_sink.h
+++ b/encoder/data_sink.h
@@ -10,6 +10,7 @@
 
 #include <memory>
 #include <mutex>
+#include <queue>
 #include <string>
 #include <vector>
 
@@ -21,12 +22,33 @@
   std::string id;
   std::vector<uint8> data;
 };
+typedef std::shared_ptr<DataSinkBuffer> SharedDataSinkBuffer;
+
+class SharedBufferQueue {
+ public:
+  SharedBufferQueue() {}
+  ~SharedBufferQueue() {}
+
+  // Enqueues |buffer| and returns true. Returns false upon failure. Blocks on
+  // |mutex_|.
+  bool EnqueueBuffer(const SharedDataSinkBuffer& buffer);
+
+  // Returns a buffer if one is available. Does not block waiting on |mutex_|;
+  // gives up and returns empty |std::shared_ptr| when unable to obtain lock.
+  SharedDataSinkBuffer DequeueBuffer();
+
+  // Returns number of buffers queued. Blocks on |mutex_| acquisition.
+  size_t GetNumBuffers();
+
+ private:
+  std::mutex mutex_;
+  std::queue<const SharedDataSinkBuffer> buffer_q_;
+};
 
 class DataSinkInterface {
  public:
-  typedef std::shared_ptr<DataSinkBuffer> SharedDataSinkBuffer;
   virtual ~DataSinkInterface() {}
-  virtual bool WriteData(SharedDataSinkBuffer buffer) = 0;
+  virtual bool WriteData(const SharedDataSinkBuffer& buffer) = 0;
   virtual std::string Name() const = 0;
 };
 
diff --git a/encoder/file_writer.cc b/encoder/file_writer.cc
index 66003bb..cfb4407 100644
--- a/encoder/file_writer.cc
+++ b/encoder/file_writer.cc
@@ -58,13 +58,9 @@
   return true;
 }
 
-// Copies data into |buffer_q_| and returns true.
-bool FileWriter::WriteData(DataSinkInterface::SharedDataSinkBuffer buffer) {
-  // TODO(tomfinegan): Copying data is not necessary; SharedDataBufferQueue or
-  // something should be provided by data_sink.h. Abusing BufferQueue
-  // temporarily since it just works and incoming buffers aren't that large.
-  if (!buffer_q_.EnqueueBuffer(buffer->id,
-                               &buffer->data[0], buffer->data.size())) {
+// Stores data in |buffer_q_| and returns true.
+bool FileWriter::WriteData(const SharedDataSinkBuffer& buffer) {
+  if (!buffer_q_.EnqueueBuffer(buffer)) {
     LOG(ERROR) << "Write buffer enqueue failed.";
     return false;
   }
@@ -75,7 +71,7 @@
 }
 
 // Try to obtain lock on |mutex_|, and return the value of |stop_| if lock is
-// obtained.  Returns false if unable to obtain the lock.
+// obtained. Returns false if unable to obtain the lock.
 bool FileWriter::StopRequested() {
   bool stop_requested = false;
   std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
@@ -93,10 +89,10 @@
 }
 
 // Writes |data| contents to file and returns true upon success.
-bool FileWriter::WriteFile(const BufferQueue::Buffer& data) const {
+bool FileWriter::WriteFile(const SharedDataSinkBuffer& buffer) const {
   std::string file_name;
   if (dash_mode_) {
-    file_name = directory_ + data.id;
+    file_name = directory_ + buffer->id;
   } else {
     file_name = directory_ + file_name_;
   }
@@ -106,22 +102,22 @@
     return false;
   }
   const size_t bytes_written =
-      fwrite(reinterpret_cast<const void*>(&data.data[0]),
-             1, data.data.size(), file);
+      fwrite(reinterpret_cast<const void*>(&buffer->data[0]),
+             1, buffer->data.size(), file);
   fclose(file);
-  return (bytes_written == data.data.size());
+  return (bytes_written == buffer->data.size());
 }
 
 // Runs until StopRequested() returns true.
 void FileWriter::WriterThread() {
   while (!StopRequested() || buffer_q_.GetNumBuffers() > 0) {
-    const BufferQueue::Buffer* buffer = buffer_q_.DequeueBuffer();
-    if (!buffer) {
+    SharedDataSinkBuffer buffer = buffer_q_.DequeueBuffer();
+    if (buffer.get() == NULL) {
       // Wait for a buffer.
       WaitForUserData();
       continue;
     }
-    if (!WriteFile(*buffer)) {
+    if (!WriteFile(buffer)) {
       LOG(ERROR) << "Write failed for id: " << buffer->id;
     }
   }
diff --git a/encoder/file_writer.h b/encoder/file_writer.h
index 3c4ea4f..ba472e0 100644
--- a/encoder/file_writer.h
+++ b/encoder/file_writer.h
@@ -40,13 +40,13 @@
   bool Stop();
 
   // DataSinkInferface methods.
-  bool WriteData(SharedDataSinkBuffer buffer) override;
+  bool WriteData(const SharedDataSinkBuffer& buffer) override;
   std::string Name() const override { return "FileWriter"; }
 
  private:
   bool StopRequested();
   void WaitForUserData();
-  bool WriteFile(const BufferQueue::Buffer& data) const;
+  bool WriteFile(const SharedDataSinkBuffer& buffer) const;
   void WriterThread();
 
   bool dash_mode_;
@@ -56,7 +56,7 @@
   std::mutex mutex_;
   std::condition_variable wake_condition_;
   std::shared_ptr<std::thread> thread_;
-  BufferQueue buffer_q_;
+  SharedBufferQueue buffer_q_;
 };
 
 }  // namespace webmlive
diff --git a/encoder/http_uploader.cc b/encoder/http_uploader.cc
index acf0dd7..48121ee 100644
--- a/encoder/http_uploader.cc
+++ b/encoder/http_uploader.cc
@@ -53,9 +53,8 @@
   // Runs |UploadThread|, and starts waiting for user data.
   bool Run();
 
-  // Uploads user data.
-  bool UploadBuffer(const std::string& id,
-                    const uint8* ptr_buffer, int32 length);
+  // Enqueues user data for upload.
+  bool EnqueueBuffer(const SharedDataSinkBuffer& buffer);
 
   // Stops the uploader.
   bool Stop();
@@ -78,7 +77,7 @@
   bool SetupPost(const uint8* const ptr_buffer, int32 length);
 
   // Upload user data with libcurl.
-  bool Upload(BufferQueue::Buffer* buffer);
+  bool Upload(const SharedDataSinkBuffer& buffer);
 
   // Wakes up |UploadThread| when users pass data through |UploadBuffer|.
   void WaitForUserData();
@@ -150,7 +149,7 @@
   // |Upload|.  This second locking mechanism is in place to allow |mutex_| to
   // be unlocked while uploads are in progress (which prevents public methods
   // from blocking).
-  BufferQueue upload_buffer_;
+  SharedBufferQueue buffer_q_;
 
   // The name of the file on the local system.  Note that it is not being read,
   // it's information included within the form data contained within the HTTP
@@ -201,9 +200,8 @@
 }
 
 // Return result of |UploadBuffer| on |ptr_uploader_|.
-bool HttpUploader::UploadBuffer(const std::string& id,
-                                const uint8* ptr_buffer, int length) {
-  return ptr_uploader_->UploadBuffer(id, ptr_buffer, length);
+bool HttpUploader::WriteData(const SharedDataSinkBuffer& buffer) {
+  return ptr_uploader_->EnqueueBuffer(buffer);
 }
 
 ///////////////////////////////////////////////////////////////////////////////
@@ -303,14 +301,13 @@
 
 // Enqueue the user buffer. Does not lock |mutex_|; relies on
 // |upload_buffer_|'s internal lock.
-bool HttpUploaderImpl::UploadBuffer(const std::string& id,
-                                    const uint8* ptr_buf, int length) {
-  if (!upload_buffer_.EnqueueBuffer(id, ptr_buf, length)) {
+bool HttpUploaderImpl::EnqueueBuffer(const SharedDataSinkBuffer& buffer) {
+  if (!buffer_q_.EnqueueBuffer(buffer)) {
     LOG(ERROR) << "Upload buffer enqueue failed.";
     return false;
   }
   // Wake |UploadThread|.
-  LOG(INFO) << "waking uploader with " << length << " bytes";
+  LOG(INFO) << "waking uploader with " << buffer->data.size() << " bytes";
   wake_condition_.notify_one();
   return true;
 }
@@ -484,7 +481,7 @@
 }
 
 // Upload data using libcurl.
-bool HttpUploaderImpl::Upload(BufferQueue::Buffer* buffer) {
+bool HttpUploaderImpl::Upload(const SharedDataSinkBuffer& buffer) {
   LOG(INFO) << "upload buffer size=" << buffer->data.size();
   CURLcode err = curl_easy_setopt(ptr_curl_, CURLOPT_URL,
                                   settings_.target_url.c_str());
@@ -596,9 +593,9 @@
 // Upload thread.  Wakes when user provides a buffer via call to
 // |UploadBuffer|.
 void HttpUploaderImpl::UploadThread() {
-  while (!StopRequested() || upload_buffer_.GetNumBuffers() > 0) {
-    BufferQueue::Buffer* buffer = upload_buffer_.DequeueBuffer();
-    if (!buffer) {
+  while (!StopRequested() || buffer_q_.GetNumBuffers() > 0) {
+    SharedDataSinkBuffer buffer = buffer_q_.DequeueBuffer();
+    if (buffer.get() == NULL) {
       VLOG(1) << "upload thread waiting for buffer...";
       WaitForUserData();
       continue;
diff --git a/encoder/http_uploader.h b/encoder/http_uploader.h
index ecdd6e8..0e01e76 100644
--- a/encoder/http_uploader.h
+++ b/encoder/http_uploader.h
@@ -94,14 +94,8 @@
   // Stops the uploader thread.
   bool Stop();
 
-  // Sends a buffer to the uploader thread.
-  bool UploadBuffer(const std::string& id,
-                    const uint8* ptr_buffer, int length);
-
   // DataSinkInterface methods.
-  bool WriteData(DataSinkInterface::SharedDataSinkBuffer buffer) override {
-    return UploadBuffer(buffer->id, &buffer->data[0], buffer->data.size());
-  }
+  bool WriteData(const SharedDataSinkBuffer& buffer) override;
   std::string Name() const override { return "HttpUploader"; }
 
  private: