Restore upload support.

Upload works again-- all runs currently require settings
for local file i/o and uploading. Neither can be disabled.

Change-Id: Ia6cb641d592aaaa5ac0051cd7fd245ce8d31e7df
diff --git a/encoder/data_sink.h b/encoder/data_sink.h
index d0df9ff..31f8625 100644
--- a/encoder/data_sink.h
+++ b/encoder/data_sink.h
@@ -18,13 +18,9 @@
  public:
   virtual ~DataSinkInterface() {}
 
-  // Returns true when the class implementing |DataSinkInterface| is ready to
-  // receive data via a call to |WriteData()|.
-  virtual bool Ready() const = 0;
-
-  // Writes data to the sink and returns true when successful.
-  virtual bool WriteData(const uint8* ptr_data, int32 data_length,
-                         const std::string& id) = 0;
+  // Writes data to the sink and returns true when successful. Must not block.
+  virtual bool WriteData(const std::string& id,
+                         const uint8* ptr_data, int data_length) = 0;
 };
 
 }  // namespace webmlive
diff --git a/encoder/http_uploader.cc b/encoder/http_uploader.cc
index 5012578..591c4e6 100644
--- a/encoder/http_uploader.cc
+++ b/encoder/http_uploader.cc
@@ -32,10 +32,11 @@
 
 namespace webmlive {
 
-static const char* kExpectHeader = "Expect:";
-static const char* kContentTypeHeader = "Content-Type: video/webm";
-static const char* kFormName = "webm_file";
-static const char* kWebmMimeType = "video/webm";
+static const char kExpectHeader[] = "Expect:";
+static const char kContentTypeHeader[] = "Content-Type: video/webm";
+static const char kFormName[] = "webm_file";
+static const char kWebmMimeType[] = "video/webm";
+static const char kContentIdHeader[] = "X-Content-Id: ";
 static const int kUnknownFileSize = -1;
 static const int kBytesRequiredForResume = 32*1024;
 
@@ -54,20 +55,11 @@
     // Constant value used to stop libcurl when |StopRequested| returns true
     // in |ProgressCallback|.
     kProgressCallbackStopRequest = 1,
-
-    // Returned by |Upload| when |WaitForUserData| was notified with an
-    // unlocked |upload_buffer_|, which means |Stop| is waiting for
-    // |UploadThread| to exit.
-    kStopping = 2,
   };
 
   HttpUploaderImpl();
   ~HttpUploaderImpl();
 
-  // Returns true when the uploader is ready to start an upload. Always returns
-  // true when no uploads have been attempted.
-  bool UploadComplete() const;
-
   // Copies user settings and configures libcurl.
   int Init(const HttpUploaderSettings& settings);
 
@@ -78,15 +70,12 @@
   int Run();
 
   // Uploads user data.
-  int UploadBuffer(const uint8* ptr_buffer, int32 length);
+  bool UploadBuffer(const std::string& id,
+                    const uint8* ptr_buffer, int32 length);
 
   // Stops the uploader.
   int Stop();
 
-  // Adds |target_url| to |url_queue_|. Each time |UploadBuffer| is called, an
-  // URL is popped off the queue and assigned to |target_url_|
-  void EnqueueTargetUrl(const std::string& target_url);
-
  private:
   // Used by |UploadThread|. Returns true if user has called |Stop|.
   bool StopRequested();
@@ -95,7 +84,7 @@
   CURLcode SetCurlCallbacks();
 
   // Pass user HTTP headers to libcurl, and disable HTTP 100 responses.
-  CURLcode SetHeaders();
+  CURLcode SetHeaders(const std::string& content_id);
 
   // Configures libcurl to POST data buffers as file data in a form/multipart
   // HTTP POST.
@@ -105,7 +94,7 @@
   int SetupPost(const uint8* const ptr_buffer, int32 length);
 
   // Upload user data with libcurl.
-  int Upload();
+  int Upload(BufferQueue::Buffer* buffer);
 
   // Wakes up |UploadThread| when users pass data through |UploadBuffer|.
   int WaitForUserData();
@@ -113,7 +102,8 @@
   // Libcurl progress callback function.  Acquires |mutex_| and updates
   // |stats_|.
   static int ProgressCallback(void* ptr_this,
-                              double, double,  // we ignore download progress
+                              double /*download_total*/,
+                              double /*download_current*/,
                               double upload_total, double upload_current);
 
   // Logs HTTP response data received by libcurl.
@@ -128,6 +118,9 @@
   // using libcurl.
   void UploadThread();
 
+  // Frees HTTP header list.
+  void FreeHeaders();
+
   // Stop flag. Internal callers use |StopRequested| to allow for
   // synchronization via |mutex_|.  Set by |Stop|, and responded to in
   // |UploadThread|.
@@ -139,7 +132,7 @@
 
   // Condition variable used to wake |UploadThread| when a user code passes a
   // buffer to |UploadBuffer|.
-  std::condition_variable buffer_ready_;
+  std::condition_variable wake_condition_;
 
   // Mutex for synchronization of public method calls with |UploadThread|
   // activity. Mutable so |UploadComplete()| can be a const method.
@@ -173,7 +166,7 @@
   // |Upload|.  This second locking mechanism is in place to allow |mutex_| to
   // be unlocked while uploads are in progress (which prevents public methods
   // from blocking).
-  LockableBuffer upload_buffer_;
+  BufferQueue upload_buffer_;
 
   // The name of the file on the local system.  Note that it is not being read,
   // it's information included within the form data contained within the HTTP
@@ -193,11 +186,6 @@
 HttpUploader::~HttpUploader() {
 }
 
-// Return result of |UploadeComplete| on |ptr_uploader_|.
-bool HttpUploader::UploadComplete() const {
-  return ptr_uploader_->UploadComplete();
-}
-
 // Copy user settings, and setup the internal uploader object.
 int HttpUploader::Init(const HttpUploaderSettings& settings) {
   ptr_uploader_.reset(new (std::nothrow) HttpUploaderImpl());  // NOLINT
@@ -229,8 +217,9 @@
 }
 
 // Return result of |UploadBuffer| on |ptr_uploader_|.
-int HttpUploader::UploadBuffer(const uint8* ptr_buffer, int32 length) {
-  return ptr_uploader_->UploadBuffer(ptr_buffer, length);
+bool HttpUploader::UploadBuffer(const std::string& id,
+                                const uint8* ptr_buffer, int length) {
+  return ptr_uploader_->UploadBuffer(id, ptr_buffer, length);
 }
 
 ///////////////////////////////////////////////////////////////////////////////
@@ -256,26 +245,12 @@
     ptr_form_ = NULL;
     ptr_form_end_ = NULL;
   }
-  if (ptr_headers_) {
-    curl_slist_free_all(ptr_headers_);
-    ptr_headers_ = NULL;
-  }
-}
-
-// Obtain lock on |mutex_| and return value of |upload_complete_|.
-bool HttpUploaderImpl::UploadComplete() const {
-  bool complete = false;
-  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
-  if (lock.owns_lock()) {
-    complete = upload_complete_;
-  }
-  return complete;
+  FreeHeaders();
 }
 
 // Initializes the upload:
 // - copies user settings
 // - sets basic libcurl settings (progress and write callbacks)
-// - calls SetHeaders to pass user headers to libcurl
 int HttpUploaderImpl::Init(const HttpUploaderSettings& settings) {
   if (settings.target_url.empty()) {
     LOG(ERROR) << "Empty target URL.";
@@ -306,13 +281,6 @@
     return kLibCurlError;
   }
 
-  // Disable HTTP 100 responses, and set user HTTP headers.
-  curl_ret = SetHeaders();
-  if (curl_ret) {
-    LOG_CURL_ERR(curl_ret, "unable to set headers.");
-    return HttpUploader::kHeaderError;
-  }
-
   local_file_name_ = settings_.local_file;
   ResetStats();
   return kSuccess;
@@ -345,55 +313,32 @@
   return kSuccess;
 }
 
-// Try to obtain lock on |mutex_|, and upload the user buffer stored in
-// |upload_buffer_| if the buffer is unlocked.  If the lock is obtained and the
-// buffer is unlocked, |UploadBuffer| locks the buffer and notifies the upload
-// thread through call to |notify_one| on the |buffer_ready_| condition
-// variable.
-int HttpUploaderImpl::UploadBuffer(const uint8* ptr_buf, int32 length) {
-  int status = HttpUploader::kUploadInProgress;
-  std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
-  if (lock.owns_lock() && !upload_buffer_.IsLocked()) {
-    // Lock obtained; (re)initialize |upload_buffer_| with the user data...
-    status = upload_buffer_.Init(ptr_buf, length);
-    if (status) {
-      LOG(ERROR) << "upload_buffer_ Init failed, status=" << status;
-      return status;
-    }
-
-    // Lock |upload_buffer_|; it's unlocked by |UploadThread| once libcurl
-    // finishes its run.
-    status = upload_buffer_.Lock();
-    if (status) {
-      LOG(ERROR) << "upload_buffer_ Lock failed, status=" << status;
-      return status;
-    }
-    upload_complete_ = false;
-
-    // Wake |UploadThread|.
-    LOG(INFO) << "waking uploader with " << length << " bytes";
-    buffer_ready_.notify_one();
-  }
-  return status;
+// Enqueue the user buffer. Does not lock |mutex_|; relies on
+// |upload_buffer_|'s internal lock.
+bool HttpUploaderImpl::UploadBuffer(const std::string& id,
+                                    const uint8* ptr_buf, int length) {
+  upload_buffer_.EnqueueBuffer(id, ptr_buf, length);
+  // Wake |UploadThread|.
+  LOG(INFO) << "waking uploader with " << length << " bytes";
+  wake_condition_.notify_one();
+  return true;
 }
 
-// Stops |UploadThread|. First it wakes the thread by calling |notify_one| on
-// the |buffer_ready_| condition variable without locking |upload_buffer_|,
-// which causes |Upload| to return |kStopping| to |UploadThread|, breaking the
-// loop. This takes care of stopping if the uploader was waiting for user data
-// in |WaitForUserData|.
-// It then obtains lock on |mutex_|, sets |stop_| to true, and releases lock to
-// ensure a running upload stops when |StopRequested| is called within the
-// libcurl callbacks.
+// Stops UploadThread() by obtaining lock on |mutex_| and setting |stop_| to
+// true, and then waking the upload thread by calling notify_one() on
+// |wake_condition_|.
+// The lock on |mutex_| is released before calling notify_one() to ensure that
+// a running upload stops when StopRequested() is called within the libcurl
+// callbacks.
 int HttpUploaderImpl::Stop() {
   assert(upload_thread_);
-  if (UploadComplete()) {
-    // Wake up the upload thread
-    buffer_ready_.notify_one();
-  }
   mutex_.lock();
   stop_ = true;
   mutex_.unlock();
+
+  // Wake up the upload thread.
+  wake_condition_.notify_one();
+  // And wait for it to exit.
   upload_thread_->join();
   return kSuccess;
 }
@@ -444,7 +389,8 @@
 
 // Disable HTTP 100 responses (send empty Expect header), and pass user HTTP
 // headers into lib curl.
-CURLcode HttpUploaderImpl::SetHeaders() {
+CURLcode HttpUploaderImpl::SetHeaders(const std::string& content_id) {
+  FreeHeaders();
   // Tell libcurl to omit "Expect: 100-continue" from requests
   ptr_headers_ = curl_slist_append(ptr_headers_, kExpectHeader);
   if (settings_.post_mode == webmlive::HTTP_POST) {
@@ -460,7 +406,11 @@
     header << header_iter->first.c_str() << ":" << header_iter->second.c_str();
     ptr_headers_ = curl_slist_append(ptr_headers_, header.str().c_str());
   }
-  CURLcode err = curl_easy_setopt(ptr_curl_, CURLOPT_HTTPHEADER, ptr_headers_);
+  // add |content_id|.
+  const std::string content_id_header = kContentIdHeader + content_id;
+  ptr_headers_ = curl_slist_append(ptr_headers_, content_id_header.c_str());
+  const CURLcode err = curl_easy_setopt(ptr_curl_,
+                                        CURLOPT_HTTPHEADER, ptr_headers_);
   if (err != CURLE_OK) {
     LOG_CURL_ERR(err, "setopt CURLOPT_HTTPHEADER failed err=");
   }
@@ -470,7 +420,7 @@
 // Sets necessary curl options for form based file upload, and adds the user
 // form variables.
 int HttpUploaderImpl::SetupFormPost(const uint8* const ptr_buffer,
-                                    int32 length) {
+                                    int length) {
   if (ptr_form_) {
     curl_formfree(ptr_form_);
     ptr_form_ = NULL;
@@ -513,7 +463,7 @@
 }
 
 // Configures libcurl to POST data buffers as HTTP POST content-data.
-int HttpUploaderImpl::SetupPost(const uint8* const ptr_buffer, int32 length) {
+int HttpUploaderImpl::SetupPost(const uint8* const ptr_buffer, int length) {
   CURLcode err_setopt = curl_easy_setopt(ptr_curl_, CURLOPT_POST, ptr_form_);
   if (err_setopt != CURLE_OK) {
     LOG_CURL_ERR(err_setopt, "setopt CURLOPT_HTTPPOST failed.");
@@ -537,40 +487,32 @@
 }
 
 // Upload data using libcurl.
-int HttpUploaderImpl::Upload() {
-  if (!upload_buffer_.IsLocked()) {
-    LOG(INFO) << "woke with unlocked buffer, stopping.";
-    return kStopping;
-  }
-
-  uint8* ptr_data = NULL;
-  int32 length = 0;
-  int status = upload_buffer_.GetBuffer(&ptr_data, &length);
-  if (status) {
-    LOG(ERROR) << "error, could not get buffer pointer, status=" << status;
-    return HttpUploader::kRunFailed;
-  }
-
-  LOG(INFO) << "upload buffer size=" << length;
+int HttpUploaderImpl::Upload(BufferQueue::Buffer* buffer) {
+  LOG(INFO) << "upload buffer size=" << buffer->data.size();
   CURLcode err = curl_easy_setopt(ptr_curl_, CURLOPT_URL,
                                   settings_.target_url.c_str());
   if (err != CURLE_OK) {
     LOG_CURL_ERR(err, "could not pass URL to curl.");
     return HttpUploader::kUrlConfigError;
   }
-
   if (settings_.post_mode == webmlive::HTTP_FORM_POST) {
-    if (SetupFormPost(ptr_data, length)) {
+    if (SetupFormPost(&buffer->data[0], buffer->data.size())) {
       LOG(ERROR) << "SetupFormPost failed!";
       return HttpUploader::kRunFailed;
     }
   } else {
-    if (SetupPost(ptr_data, length)) {
+    if (SetupPost(&buffer->data[0], buffer->data.size())) {
       LOG(ERROR) << "SetupPost failed!";
       return HttpUploader::kRunFailed;
     }
   }
 
+  // Disable HTTP 100 responses, and set user HTTP headers.
+  err = SetHeaders(buffer->id);
+  if (err) {
+    LOG_CURL_ERR(err, "unable to set headers.");
+    return HttpUploader::kHeaderError;
+  }
   err = curl_easy_perform(ptr_curl_);
   if (err != CURLE_OK) {
     LOG_CURL_ERR(err, "curl_easy_perform failed.");
@@ -590,29 +532,26 @@
     stats_.bytes_sent_current = 0;
     stats_.total_bytes_uploaded += static_cast<int64>(bytes_uploaded);
   }
-
+  VLOG(1) << "upload complete.";
   return kSuccess;
 }
 
 // Idle the upload thread while awaiting user data.
 int HttpUploaderImpl::WaitForUserData() {
   std::unique_lock<std::mutex> lock(mutex_);
-  buffer_ready_.wait(lock);  // Unlock |mutex_| and idle the thread while we
-                             // wait for the next chunk of user data.
+  wake_condition_.wait(lock);  // Unlock |mutex_| and idle the thread while we
+                               // wait for the next chunk of user data.
   return kSuccess;
 }
 
 // Handle libcurl progress updates.
 int HttpUploaderImpl::ProgressCallback(void* ptr_this,
-                                       double download_total,
-                                       double download_current,
+                                       double /*download_total*/,
+                                       double /*download_current*/,
                                        double upload_total,
                                        double upload_current) {
-  // Ignore the download progress variables.
-  download_total;
-  download_current;
   HttpUploaderImpl* ptr_uploader_ =
-    reinterpret_cast<HttpUploaderImpl*>(ptr_this);
+      reinterpret_cast<HttpUploaderImpl*>(ptr_this);
   if (ptr_uploader_->StopRequested()) {
     LOG(ERROR) << "stop requested.";
     return kProgressCallbackStopRequest;
@@ -660,31 +599,29 @@
 // Upload thread.  Wakes when user provides a buffer via call to
 // |UploadBuffer|.
 void HttpUploaderImpl::UploadThread() {
-  LOG(INFO) << "upload thread running...";
   while (!StopRequested()) {
-    LOG(INFO) << "upload thread waiting for buffer...";
-    WaitForUserData();
-    LOG(INFO) << "uploading buffer...";
-    int status = Upload();
-    if (status == kStopping) {
-      break;
+    BufferQueue::Buffer* buffer = upload_buffer_.DequeueBuffer();
+    if (!buffer) {
+      VLOG(1) << "upload thread waiting for buffer...";
+      WaitForUserData();
+      continue;
     }
+    VLOG(1) << "uploading buffer...";
+    const int status = Upload(buffer);
     if (status) {
       LOG(ERROR) << "buffer upload failed, status=" << status;
       // TODO(tomfinegan): Report upload failure, and provide access to
       //                   response code and data.
-    } else {
-      std::lock_guard<std::mutex> lock(mutex_);
-      LOG(INFO) << "unlocking upload buffer...";
-      status = upload_buffer_.Unlock();
-      if (status) {
-        LOG(ERROR) << "unable to unlock buffer, status=" << status;
-        // keep spinning, for now...
-      }
-      upload_complete_ = true;
     }
   }
   LOG(INFO) << "thread done";
 }
 
+void HttpUploaderImpl::FreeHeaders() {
+  if (ptr_headers_) {
+    curl_slist_free_all(ptr_headers_);
+    ptr_headers_ = NULL;
+  }
+}
+
 }  // namespace webmlive
diff --git a/encoder/http_uploader.h b/encoder/http_uploader.h
index c47139a..d0668f8 100644
--- a/encoder/http_uploader.h
+++ b/encoder/http_uploader.h
@@ -117,13 +117,13 @@
   int Stop();
 
   // Sends a buffer to the uploader thread.
-  int UploadBuffer(const uint8* ptr_buffer, int32 length);
+  bool UploadBuffer(const std::string& id,
+                    const uint8* ptr_buffer, int length);
 
   // DataSinkInterface methods.
-  virtual bool Ready() const { return UploadComplete(); }
-  virtual bool WriteData(const uint8* ptr_buffer, int32 length,
-                         const std::string& /*id*/) {
-    return (UploadBuffer(ptr_buffer, length) == kSuccess);
+  virtual bool WriteData(const std::string& id,
+                         const uint8* ptr_buffer, int length) override {
+    return UploadBuffer(id, ptr_buffer, length);
   }
 
  private:
diff --git a/encoder/webm_encoder.cc b/encoder/webm_encoder.cc
index 57bc3fa..9e14f4b 100644
--- a/encoder/webm_encoder.cc
+++ b/encoder/webm_encoder.cc
@@ -386,14 +386,14 @@
     LOG(ERROR) << "DashWriter::WriteManifest failed.";
   }
 
-#if 0
   ptr_data_sink_->WriteData(
+      config_.dash_name + ".mpd",
       reinterpret_cast<const uint8*>(dash_manifest.data()),
-      dash_manifest.length(), "manifest");
-#endif
+      dash_manifest.length());
 
-  // HACK: HERE BE DRAGONS
-  CHECK(WriteManifest(config_.dash_dir + "webmlive.mpd", dash_manifest));
+  // TODO(tomfinegan): Support multiple data sinks/create a local file sink.
+  CHECK(WriteManifest(config_.dash_dir + config_.dash_name + ".mpd",
+                      dash_manifest));
 
   // Wait for an input sample from each input stream-- this sets the
   // |timestamp_offset_| value when one or both streams starts with a negative
@@ -820,29 +820,27 @@
 
 int WebmEncoder::WriteMuxerChunkToDataSink(
     std::unique_ptr<LiveWebmMuxer>* muxer) {
-  if (ptr_data_sink_->Ready()) {
-    int32 chunk_length = 0;
-    const bool chunk_ready = (*muxer)->ChunkReady(&chunk_length);
-    if (chunk_ready) {
-      const int64 chunk_num = (*muxer)->chunks_read();
-      std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
-      // A complete chunk is waiting in |muxer|'s buffer.
-      if (!ReadChunkFromMuxer(muxer, chunk_length)) {
-        LOG(ERROR) << "cannot read WebM chunk from muxer_id: "
-                   << (*muxer)->muxer_id();
-        return kWebmMuxerError;
-      }
-#if 0
-      // Pass the chunk to |ptr_data_sink_|.
-      if (!ptr_data_sink_->WriteData(chunk_buffer_.get(), chunk_length, id)) {
-        LOG(ERROR) << "data sink write failed!";
-        return kDataSinkWriteFail;
-      }
-#endif
-      // HACK: HERE BE DRAGONS
-      CHECK(WriteChunkFile(config_.dash_dir + id,
-                           chunk_buffer_.get(), chunk_length));
+  int32 chunk_length = 0;
+  const bool chunk_ready = (*muxer)->ChunkReady(&chunk_length);
+  if (chunk_ready) {
+    const int64 chunk_num = (*muxer)->chunks_read();
+    const std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
+    // A complete chunk is waiting in |muxer|'s buffer.
+    if (!ReadChunkFromMuxer(muxer, chunk_length)) {
+      LOG(ERROR) << "cannot read WebM chunk from muxer_id: "
+                 << (*muxer)->muxer_id();
+      return kWebmMuxerError;
     }
+
+    // Pass the chunk to |ptr_data_sink_|.
+    if (!ptr_data_sink_->WriteData(id, chunk_buffer_.get(), chunk_length)) {
+      LOG(ERROR) << "data sink write failed!";
+      return kDataSinkWriteFail;
+    }
+
+    // TODO(tomfinegan): Support multiple data sinks/create a local file sink.
+    CHECK(WriteChunkFile(config_.dash_dir + id,
+                         chunk_buffer_.get(), chunk_length));
   }
   return kSuccess;
 }
@@ -859,23 +857,19 @@
   if ((*muxer)->ChunkReady(&chunk_length)) {
     LOG(INFO) << "mkvmuxer Finalize produced a chunk.";
     const int64 chunk_num = (*muxer)->chunks_read();
-    std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
-
-    while (!ptr_data_sink_->Ready())
-      std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    const std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
 
     if (ReadChunkFromMuxer(muxer, chunk_length)) {
-#if 0
       const bool sink_write_ok =
-          ptr_data_sink_->WriteData(chunk_buffer_.get(), chunk_length, id);
+          ptr_data_sink_->WriteData(id, chunk_buffer_.get(), chunk_length);
       if (!sink_write_ok) {
         LOG(ERROR) << "data sink write fail on final chunk for muxer_id:"
                    << (*muxer)->muxer_id();
       } else {
         LOG(INFO) << "Final chunk upload initiated.";
       }
-#endif
-      // HACK: HERE BE DRAGONS
+
+      // TODO(tomfinegan): Support multiple data sinks/create a local file sink.
       CHECK(WriteChunkFile(config_.dash_dir + id,
                            chunk_buffer_.get(), chunk_length));
     }