Restore upload support.
Upload works again-- all runs currently require settings
for local file i/o and uploading. Neither can be disabled.
Change-Id: Ia6cb641d592aaaa5ac0051cd7fd245ce8d31e7df
diff --git a/encoder/data_sink.h b/encoder/data_sink.h
index d0df9ff..31f8625 100644
--- a/encoder/data_sink.h
+++ b/encoder/data_sink.h
@@ -18,13 +18,9 @@
public:
virtual ~DataSinkInterface() {}
- // Returns true when the class implementing |DataSinkInterface| is ready to
- // receive data via a call to |WriteData()|.
- virtual bool Ready() const = 0;
-
- // Writes data to the sink and returns true when successful.
- virtual bool WriteData(const uint8* ptr_data, int32 data_length,
- const std::string& id) = 0;
+ // Writes data to the sink and returns true when successful. Must not block.
+ virtual bool WriteData(const std::string& id,
+ const uint8* ptr_data, int data_length) = 0;
};
} // namespace webmlive
diff --git a/encoder/http_uploader.cc b/encoder/http_uploader.cc
index 5012578..591c4e6 100644
--- a/encoder/http_uploader.cc
+++ b/encoder/http_uploader.cc
@@ -32,10 +32,11 @@
namespace webmlive {
-static const char* kExpectHeader = "Expect:";
-static const char* kContentTypeHeader = "Content-Type: video/webm";
-static const char* kFormName = "webm_file";
-static const char* kWebmMimeType = "video/webm";
+static const char kExpectHeader[] = "Expect:";
+static const char kContentTypeHeader[] = "Content-Type: video/webm";
+static const char kFormName[] = "webm_file";
+static const char kWebmMimeType[] = "video/webm";
+static const char kContentIdHeader[] = "X-Content-Id: ";
static const int kUnknownFileSize = -1;
static const int kBytesRequiredForResume = 32*1024;
@@ -54,20 +55,11 @@
// Constant value used to stop libcurl when |StopRequested| returns true
// in |ProgressCallback|.
kProgressCallbackStopRequest = 1,
-
- // Returned by |Upload| when |WaitForUserData| was notified with an
- // unlocked |upload_buffer_|, which means |Stop| is waiting for
- // |UploadThread| to exit.
- kStopping = 2,
};
HttpUploaderImpl();
~HttpUploaderImpl();
- // Returns true when the uploader is ready to start an upload. Always returns
- // true when no uploads have been attempted.
- bool UploadComplete() const;
-
// Copies user settings and configures libcurl.
int Init(const HttpUploaderSettings& settings);
@@ -78,15 +70,12 @@
int Run();
// Uploads user data.
- int UploadBuffer(const uint8* ptr_buffer, int32 length);
+ bool UploadBuffer(const std::string& id,
+ const uint8* ptr_buffer, int32 length);
// Stops the uploader.
int Stop();
- // Adds |target_url| to |url_queue_|. Each time |UploadBuffer| is called, an
- // URL is popped off the queue and assigned to |target_url_|
- void EnqueueTargetUrl(const std::string& target_url);
-
private:
// Used by |UploadThread|. Returns true if user has called |Stop|.
bool StopRequested();
@@ -95,7 +84,7 @@
CURLcode SetCurlCallbacks();
// Pass user HTTP headers to libcurl, and disable HTTP 100 responses.
- CURLcode SetHeaders();
+ CURLcode SetHeaders(const std::string& content_id);
// Configures libcurl to POST data buffers as file data in a form/multipart
// HTTP POST.
@@ -105,7 +94,7 @@
int SetupPost(const uint8* const ptr_buffer, int32 length);
// Upload user data with libcurl.
- int Upload();
+ int Upload(BufferQueue::Buffer* buffer);
// Wakes up |UploadThread| when users pass data through |UploadBuffer|.
int WaitForUserData();
@@ -113,7 +102,8 @@
// Libcurl progress callback function. Acquires |mutex_| and updates
// |stats_|.
static int ProgressCallback(void* ptr_this,
- double, double, // we ignore download progress
+ double /*download_total*/,
+ double /*download_current*/,
double upload_total, double upload_current);
// Logs HTTP response data received by libcurl.
@@ -128,6 +118,9 @@
// using libcurl.
void UploadThread();
+ // Frees HTTP header list.
+ void FreeHeaders();
+
// Stop flag. Internal callers use |StopRequested| to allow for
// synchronization via |mutex_|. Set by |Stop|, and responded to in
// |UploadThread|.
@@ -139,7 +132,7 @@
// Condition variable used to wake |UploadThread| when a user code passes a
// buffer to |UploadBuffer|.
- std::condition_variable buffer_ready_;
+ std::condition_variable wake_condition_;
// Mutex for synchronization of public method calls with |UploadThread|
// activity. Mutable so |UploadComplete()| can be a const method.
@@ -173,7 +166,7 @@
// |Upload|. This second locking mechanism is in place to allow |mutex_| to
// be unlocked while uploads are in progress (which prevents public methods
// from blocking).
- LockableBuffer upload_buffer_;
+ BufferQueue upload_buffer_;
// The name of the file on the local system. Note that it is not being read,
// it's information included within the form data contained within the HTTP
@@ -193,11 +186,6 @@
HttpUploader::~HttpUploader() {
}
-// Return result of |UploadeComplete| on |ptr_uploader_|.
-bool HttpUploader::UploadComplete() const {
- return ptr_uploader_->UploadComplete();
-}
-
// Copy user settings, and setup the internal uploader object.
int HttpUploader::Init(const HttpUploaderSettings& settings) {
ptr_uploader_.reset(new (std::nothrow) HttpUploaderImpl()); // NOLINT
@@ -229,8 +217,9 @@
}
// Return result of |UploadBuffer| on |ptr_uploader_|.
-int HttpUploader::UploadBuffer(const uint8* ptr_buffer, int32 length) {
- return ptr_uploader_->UploadBuffer(ptr_buffer, length);
+bool HttpUploader::UploadBuffer(const std::string& id,
+ const uint8* ptr_buffer, int length) {
+ return ptr_uploader_->UploadBuffer(id, ptr_buffer, length);
}
///////////////////////////////////////////////////////////////////////////////
@@ -256,26 +245,12 @@
ptr_form_ = NULL;
ptr_form_end_ = NULL;
}
- if (ptr_headers_) {
- curl_slist_free_all(ptr_headers_);
- ptr_headers_ = NULL;
- }
-}
-
-// Obtain lock on |mutex_| and return value of |upload_complete_|.
-bool HttpUploaderImpl::UploadComplete() const {
- bool complete = false;
- std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
- if (lock.owns_lock()) {
- complete = upload_complete_;
- }
- return complete;
+ FreeHeaders();
}
// Initializes the upload:
// - copies user settings
// - sets basic libcurl settings (progress and write callbacks)
-// - calls SetHeaders to pass user headers to libcurl
int HttpUploaderImpl::Init(const HttpUploaderSettings& settings) {
if (settings.target_url.empty()) {
LOG(ERROR) << "Empty target URL.";
@@ -306,13 +281,6 @@
return kLibCurlError;
}
- // Disable HTTP 100 responses, and set user HTTP headers.
- curl_ret = SetHeaders();
- if (curl_ret) {
- LOG_CURL_ERR(curl_ret, "unable to set headers.");
- return HttpUploader::kHeaderError;
- }
-
local_file_name_ = settings_.local_file;
ResetStats();
return kSuccess;
@@ -345,55 +313,32 @@
return kSuccess;
}
-// Try to obtain lock on |mutex_|, and upload the user buffer stored in
-// |upload_buffer_| if the buffer is unlocked. If the lock is obtained and the
-// buffer is unlocked, |UploadBuffer| locks the buffer and notifies the upload
-// thread through call to |notify_one| on the |buffer_ready_| condition
-// variable.
-int HttpUploaderImpl::UploadBuffer(const uint8* ptr_buf, int32 length) {
- int status = HttpUploader::kUploadInProgress;
- std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
- if (lock.owns_lock() && !upload_buffer_.IsLocked()) {
- // Lock obtained; (re)initialize |upload_buffer_| with the user data...
- status = upload_buffer_.Init(ptr_buf, length);
- if (status) {
- LOG(ERROR) << "upload_buffer_ Init failed, status=" << status;
- return status;
- }
-
- // Lock |upload_buffer_|; it's unlocked by |UploadThread| once libcurl
- // finishes its run.
- status = upload_buffer_.Lock();
- if (status) {
- LOG(ERROR) << "upload_buffer_ Lock failed, status=" << status;
- return status;
- }
- upload_complete_ = false;
-
- // Wake |UploadThread|.
- LOG(INFO) << "waking uploader with " << length << " bytes";
- buffer_ready_.notify_one();
- }
- return status;
+// Enqueue the user buffer. Does not lock |mutex_|; relies on
+// |upload_buffer_|'s internal lock.
+bool HttpUploaderImpl::UploadBuffer(const std::string& id,
+ const uint8* ptr_buf, int length) {
+ upload_buffer_.EnqueueBuffer(id, ptr_buf, length);
+ // Wake |UploadThread|.
+ LOG(INFO) << "waking uploader with " << length << " bytes";
+ wake_condition_.notify_one();
+ return true;
}
-// Stops |UploadThread|. First it wakes the thread by calling |notify_one| on
-// the |buffer_ready_| condition variable without locking |upload_buffer_|,
-// which causes |Upload| to return |kStopping| to |UploadThread|, breaking the
-// loop. This takes care of stopping if the uploader was waiting for user data
-// in |WaitForUserData|.
-// It then obtains lock on |mutex_|, sets |stop_| to true, and releases lock to
-// ensure a running upload stops when |StopRequested| is called within the
-// libcurl callbacks.
+// Stops UploadThread() by obtaining lock on |mutex_| and setting |stop_| to
+// true, and then waking the upload thread by calling notify_one() on
+// |wake_condition_|.
+// The lock on |mutex_| is released before calling notify_one() to ensure that
+// a running upload stops when StopRequested() is called within the libcurl
+// callbacks.
int HttpUploaderImpl::Stop() {
assert(upload_thread_);
- if (UploadComplete()) {
- // Wake up the upload thread
- buffer_ready_.notify_one();
- }
mutex_.lock();
stop_ = true;
mutex_.unlock();
+
+ // Wake up the upload thread.
+ wake_condition_.notify_one();
+ // And wait for it to exit.
upload_thread_->join();
return kSuccess;
}
@@ -444,7 +389,8 @@
// Disable HTTP 100 responses (send empty Expect header), and pass user HTTP
// headers into lib curl.
-CURLcode HttpUploaderImpl::SetHeaders() {
+CURLcode HttpUploaderImpl::SetHeaders(const std::string& content_id) {
+ FreeHeaders();
// Tell libcurl to omit "Expect: 100-continue" from requests
ptr_headers_ = curl_slist_append(ptr_headers_, kExpectHeader);
if (settings_.post_mode == webmlive::HTTP_POST) {
@@ -460,7 +406,11 @@
header << header_iter->first.c_str() << ":" << header_iter->second.c_str();
ptr_headers_ = curl_slist_append(ptr_headers_, header.str().c_str());
}
- CURLcode err = curl_easy_setopt(ptr_curl_, CURLOPT_HTTPHEADER, ptr_headers_);
+ // add |content_id|.
+ const std::string content_id_header = kContentIdHeader + content_id;
+ ptr_headers_ = curl_slist_append(ptr_headers_, content_id_header.c_str());
+ const CURLcode err = curl_easy_setopt(ptr_curl_,
+ CURLOPT_HTTPHEADER, ptr_headers_);
if (err != CURLE_OK) {
LOG_CURL_ERR(err, "setopt CURLOPT_HTTPHEADER failed err=");
}
@@ -470,7 +420,7 @@
// Sets necessary curl options for form based file upload, and adds the user
// form variables.
int HttpUploaderImpl::SetupFormPost(const uint8* const ptr_buffer,
- int32 length) {
+ int length) {
if (ptr_form_) {
curl_formfree(ptr_form_);
ptr_form_ = NULL;
@@ -513,7 +463,7 @@
}
// Configures libcurl to POST data buffers as HTTP POST content-data.
-int HttpUploaderImpl::SetupPost(const uint8* const ptr_buffer, int32 length) {
+int HttpUploaderImpl::SetupPost(const uint8* const ptr_buffer, int length) {
CURLcode err_setopt = curl_easy_setopt(ptr_curl_, CURLOPT_POST, ptr_form_);
if (err_setopt != CURLE_OK) {
LOG_CURL_ERR(err_setopt, "setopt CURLOPT_HTTPPOST failed.");
@@ -537,40 +487,32 @@
}
// Upload data using libcurl.
-int HttpUploaderImpl::Upload() {
- if (!upload_buffer_.IsLocked()) {
- LOG(INFO) << "woke with unlocked buffer, stopping.";
- return kStopping;
- }
-
- uint8* ptr_data = NULL;
- int32 length = 0;
- int status = upload_buffer_.GetBuffer(&ptr_data, &length);
- if (status) {
- LOG(ERROR) << "error, could not get buffer pointer, status=" << status;
- return HttpUploader::kRunFailed;
- }
-
- LOG(INFO) << "upload buffer size=" << length;
+int HttpUploaderImpl::Upload(BufferQueue::Buffer* buffer) {
+ LOG(INFO) << "upload buffer size=" << buffer->data.size();
CURLcode err = curl_easy_setopt(ptr_curl_, CURLOPT_URL,
settings_.target_url.c_str());
if (err != CURLE_OK) {
LOG_CURL_ERR(err, "could not pass URL to curl.");
return HttpUploader::kUrlConfigError;
}
-
if (settings_.post_mode == webmlive::HTTP_FORM_POST) {
- if (SetupFormPost(ptr_data, length)) {
+ if (SetupFormPost(&buffer->data[0], buffer->data.size())) {
LOG(ERROR) << "SetupFormPost failed!";
return HttpUploader::kRunFailed;
}
} else {
- if (SetupPost(ptr_data, length)) {
+ if (SetupPost(&buffer->data[0], buffer->data.size())) {
LOG(ERROR) << "SetupPost failed!";
return HttpUploader::kRunFailed;
}
}
+ // Disable HTTP 100 responses, and set user HTTP headers.
+ err = SetHeaders(buffer->id);
+ if (err) {
+ LOG_CURL_ERR(err, "unable to set headers.");
+ return HttpUploader::kHeaderError;
+ }
err = curl_easy_perform(ptr_curl_);
if (err != CURLE_OK) {
LOG_CURL_ERR(err, "curl_easy_perform failed.");
@@ -590,29 +532,26 @@
stats_.bytes_sent_current = 0;
stats_.total_bytes_uploaded += static_cast<int64>(bytes_uploaded);
}
-
+ VLOG(1) << "upload complete.";
return kSuccess;
}
// Idle the upload thread while awaiting user data.
int HttpUploaderImpl::WaitForUserData() {
std::unique_lock<std::mutex> lock(mutex_);
- buffer_ready_.wait(lock); // Unlock |mutex_| and idle the thread while we
- // wait for the next chunk of user data.
+ wake_condition_.wait(lock); // Unlock |mutex_| and idle the thread while we
+ // wait for the next chunk of user data.
return kSuccess;
}
// Handle libcurl progress updates.
int HttpUploaderImpl::ProgressCallback(void* ptr_this,
- double download_total,
- double download_current,
+ double /*download_total*/,
+ double /*download_current*/,
double upload_total,
double upload_current) {
- // Ignore the download progress variables.
- download_total;
- download_current;
HttpUploaderImpl* ptr_uploader_ =
- reinterpret_cast<HttpUploaderImpl*>(ptr_this);
+ reinterpret_cast<HttpUploaderImpl*>(ptr_this);
if (ptr_uploader_->StopRequested()) {
LOG(ERROR) << "stop requested.";
return kProgressCallbackStopRequest;
@@ -660,31 +599,29 @@
// Upload thread. Wakes when user provides a buffer via call to
// |UploadBuffer|.
void HttpUploaderImpl::UploadThread() {
- LOG(INFO) << "upload thread running...";
while (!StopRequested()) {
- LOG(INFO) << "upload thread waiting for buffer...";
- WaitForUserData();
- LOG(INFO) << "uploading buffer...";
- int status = Upload();
- if (status == kStopping) {
- break;
+ BufferQueue::Buffer* buffer = upload_buffer_.DequeueBuffer();
+ if (!buffer) {
+ VLOG(1) << "upload thread waiting for buffer...";
+ WaitForUserData();
+ continue;
}
+ VLOG(1) << "uploading buffer...";
+ const int status = Upload(buffer);
if (status) {
LOG(ERROR) << "buffer upload failed, status=" << status;
// TODO(tomfinegan): Report upload failure, and provide access to
// response code and data.
- } else {
- std::lock_guard<std::mutex> lock(mutex_);
- LOG(INFO) << "unlocking upload buffer...";
- status = upload_buffer_.Unlock();
- if (status) {
- LOG(ERROR) << "unable to unlock buffer, status=" << status;
- // keep spinning, for now...
- }
- upload_complete_ = true;
}
}
LOG(INFO) << "thread done";
}
+void HttpUploaderImpl::FreeHeaders() {
+ if (ptr_headers_) {
+ curl_slist_free_all(ptr_headers_);
+ ptr_headers_ = NULL;
+ }
+}
+
} // namespace webmlive
diff --git a/encoder/http_uploader.h b/encoder/http_uploader.h
index c47139a..d0668f8 100644
--- a/encoder/http_uploader.h
+++ b/encoder/http_uploader.h
@@ -117,13 +117,13 @@
int Stop();
// Sends a buffer to the uploader thread.
- int UploadBuffer(const uint8* ptr_buffer, int32 length);
+ bool UploadBuffer(const std::string& id,
+ const uint8* ptr_buffer, int length);
// DataSinkInterface methods.
- virtual bool Ready() const { return UploadComplete(); }
- virtual bool WriteData(const uint8* ptr_buffer, int32 length,
- const std::string& /*id*/) {
- return (UploadBuffer(ptr_buffer, length) == kSuccess);
+ virtual bool WriteData(const std::string& id,
+ const uint8* ptr_buffer, int length) override {
+ return UploadBuffer(id, ptr_buffer, length);
}
private:
diff --git a/encoder/webm_encoder.cc b/encoder/webm_encoder.cc
index 57bc3fa..9e14f4b 100644
--- a/encoder/webm_encoder.cc
+++ b/encoder/webm_encoder.cc
@@ -386,14 +386,14 @@
LOG(ERROR) << "DashWriter::WriteManifest failed.";
}
-#if 0
ptr_data_sink_->WriteData(
+ config_.dash_name + ".mpd",
reinterpret_cast<const uint8*>(dash_manifest.data()),
- dash_manifest.length(), "manifest");
-#endif
+ dash_manifest.length());
- // HACK: HERE BE DRAGONS
- CHECK(WriteManifest(config_.dash_dir + "webmlive.mpd", dash_manifest));
+ // TODO(tomfinegan): Support multiple data sinks/create a local file sink.
+ CHECK(WriteManifest(config_.dash_dir + config_.dash_name + ".mpd",
+ dash_manifest));
// Wait for an input sample from each input stream-- this sets the
// |timestamp_offset_| value when one or both streams starts with a negative
@@ -820,29 +820,27 @@
int WebmEncoder::WriteMuxerChunkToDataSink(
std::unique_ptr<LiveWebmMuxer>* muxer) {
- if (ptr_data_sink_->Ready()) {
- int32 chunk_length = 0;
- const bool chunk_ready = (*muxer)->ChunkReady(&chunk_length);
- if (chunk_ready) {
- const int64 chunk_num = (*muxer)->chunks_read();
- std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
- // A complete chunk is waiting in |muxer|'s buffer.
- if (!ReadChunkFromMuxer(muxer, chunk_length)) {
- LOG(ERROR) << "cannot read WebM chunk from muxer_id: "
- << (*muxer)->muxer_id();
- return kWebmMuxerError;
- }
-#if 0
- // Pass the chunk to |ptr_data_sink_|.
- if (!ptr_data_sink_->WriteData(chunk_buffer_.get(), chunk_length, id)) {
- LOG(ERROR) << "data sink write failed!";
- return kDataSinkWriteFail;
- }
-#endif
- // HACK: HERE BE DRAGONS
- CHECK(WriteChunkFile(config_.dash_dir + id,
- chunk_buffer_.get(), chunk_length));
+ int32 chunk_length = 0;
+ const bool chunk_ready = (*muxer)->ChunkReady(&chunk_length);
+ if (chunk_ready) {
+ const int64 chunk_num = (*muxer)->chunks_read();
+ const std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
+ // A complete chunk is waiting in |muxer|'s buffer.
+ if (!ReadChunkFromMuxer(muxer, chunk_length)) {
+ LOG(ERROR) << "cannot read WebM chunk from muxer_id: "
+ << (*muxer)->muxer_id();
+ return kWebmMuxerError;
}
+
+ // Pass the chunk to |ptr_data_sink_|.
+ if (!ptr_data_sink_->WriteData(id, chunk_buffer_.get(), chunk_length)) {
+ LOG(ERROR) << "data sink write failed!";
+ return kDataSinkWriteFail;
+ }
+
+ // TODO(tomfinegan): Support multiple data sinks/create a local file sink.
+ CHECK(WriteChunkFile(config_.dash_dir + id,
+ chunk_buffer_.get(), chunk_length));
}
return kSuccess;
}
@@ -859,23 +857,19 @@
if ((*muxer)->ChunkReady(&chunk_length)) {
LOG(INFO) << "mkvmuxer Finalize produced a chunk.";
const int64 chunk_num = (*muxer)->chunks_read();
- std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
-
- while (!ptr_data_sink_->Ready())
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ const std::string id = NextChunkId((*muxer)->muxer_id(), chunk_num);
if (ReadChunkFromMuxer(muxer, chunk_length)) {
-#if 0
const bool sink_write_ok =
- ptr_data_sink_->WriteData(chunk_buffer_.get(), chunk_length, id);
+ ptr_data_sink_->WriteData(id, chunk_buffer_.get(), chunk_length);
if (!sink_write_ok) {
LOG(ERROR) << "data sink write fail on final chunk for muxer_id:"
<< (*muxer)->muxer_id();
} else {
LOG(INFO) << "Final chunk upload initiated.";
}
-#endif
- // HACK: HERE BE DRAGONS
+
+ // TODO(tomfinegan): Support multiple data sinks/create a local file sink.
CHECK(WriteChunkFile(config_.dash_dir + id,
chunk_buffer_.get(), chunk_length));
}