| // Copyright 2010 The Goma Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| |
| #include "http_rpc.h" |
| |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/memory/memory.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/time/clock.h" |
| #include "absl/time/time.h" |
| #include "autolock_timer.h" |
| #include "callback.h" |
| #include "compiler_specific.h" |
| #include "glog/logging.h" |
| MSVC_PUSH_DISABLE_WARNING_FOR_PROTO() |
| #include "google/protobuf/message.h" |
| #include "google/protobuf/io/gzip_stream.h" |
| #include "google/protobuf/io/zero_copy_stream.h" |
| #include "google/protobuf/io/zero_copy_stream_impl.h" |
| #include "google/protobuf/io/zero_copy_stream_impl_lite.h" |
| MSVC_POP_WARNING() |
| #include "http_util.h" |
| MSVC_PUSH_DISABLE_WARNING_FOR_PROTO() |
| #include "prototmp/goma_data.pb.h" |
| MSVC_POP_WARNING() |
| #include "scoped_fd.h" |
| #include "simple_timer.h" |
| #include "worker_thread_manager.h" |
| |
| using std::string; |
| |
| namespace devtools_goma { |
| |
| class HttpRPC::Request : public HttpClient::Request { |
| public: |
| Request(const google::protobuf::Message* req, |
| HttpRPC::Status* status) : |
| req_(req), |
| status_(status) { |
| } |
| ~Request() override {} |
| |
| std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> NewStream() |
| const override = 0; |
| |
| std::unique_ptr<HttpClient::Request> Clone() const override = 0; |
| |
| protected: |
| const google::protobuf::Message* req_; |
| HttpRPC::Status* status_; |
| |
| private: |
| DISALLOW_ASSIGN(Request); |
| }; |
| |
| class HttpRPC::CallRequest : public HttpRPC::Request { |
| public: |
| CallRequest(const google::protobuf::Message* req, HttpRPC::Status* status); |
| ~CallRequest() override {} |
| void EnableCompression(int level, const string& accept_encoding) { |
| compression_level_ = level; |
| accept_encoding_ = accept_encoding; |
| } |
| std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> |
| NewStream() const override; |
| |
| std::unique_ptr<HttpClient::Request> Clone() const override { |
| return std::unique_ptr<HttpClient::Request>( |
| new HttpRPC::CallRequest(*this)); |
| } |
| |
| private: |
| int compression_level_; |
| string accept_encoding_; |
| DISALLOW_ASSIGN(CallRequest); |
| }; |
| |
| class HttpRPC::Response : public HttpClient::Response { |
| public: |
| Response(google::protobuf::Message* resp, |
| HttpRPC::Status* status) |
| : resp_(resp), |
| status_(status) { |
| } |
| ~Response() override {} |
| |
| HttpClient::Response::Body* NewBody( |
| size_t content_length, bool is_chunked, |
| EncodingType encoding_type) override { |
| response_body_ = |
| absl::make_unique<HttpResponse::Body>( |
| content_length, is_chunked, encoding_type); |
| return response_body_.get(); |
| } |
| |
| protected: |
| void ParseBody() override; |
| |
| google::protobuf::Message* resp_; |
| HttpRPC::Status* status_; |
| |
| private: |
| std::unique_ptr<HttpResponse::Body> response_body_; |
| DISALLOW_COPY_AND_ASSIGN(Response); |
| }; |
| |
| class HttpRPC::CallResponse : public HttpRPC::Response { |
| public: |
| CallResponse(google::protobuf::Message* resp, |
| HttpRPC::Status* status) |
| : Response(resp, status) {} |
| ~CallResponse() override {} |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(CallResponse); |
| }; |
| |
| class HttpRPC::CallData { |
| public: |
| CallData(std::unique_ptr<HttpRPC::Request> req, |
| std::unique_ptr<HttpRPC::Response> resp, |
| OneshotClosure* callback) |
| : req_(std::move(req)), |
| resp_(std::move(resp)), |
| callback_(callback) { |
| } |
| ~CallData() { |
| if (callback_) { |
| callback_->Run(); |
| } |
| } |
| |
| HttpRPC::Request* req() const { return req_.get(); } |
| HttpRPC::Response* resp() const { return resp_.get(); } |
| |
| private: |
| std::unique_ptr<HttpRPC::Request> req_; |
| std::unique_ptr<HttpRPC::Response> resp_; |
| OneshotClosure* callback_; |
| DISALLOW_COPY_AND_ASSIGN(CallData); |
| }; |
| |
| HttpRPC::Options::Options() |
| : compression_level(0), |
| start_compression(false) { |
| } |
| |
| string HttpRPC::Options::DebugString() const { |
| std::ostringstream ss; |
| ss << " compression_level=" << compression_level; |
| if (start_compression) |
| ss << " start_compression"; |
| ss << " accept_encoding=" << accept_encoding; |
| ss << " content_type_for_protobuf=" << content_type_for_protobuf; |
| return ss.str(); |
| } |
| |
| HttpRPC::HttpRPC(HttpClient* client, |
| const Options& options) |
| : client_(client), |
| options_(options), |
| compression_enabled_(options.start_compression) { |
| LOG(INFO) << options_.DebugString(); |
| CHECK(!options_.content_type_for_protobuf.empty()); |
| CHECK(options_.content_type_for_protobuf.find_first_of("\r\n") |
| == string::npos) |
| << "content_type_for_protobuf must not contain CR LF:" |
| << options_.content_type_for_protobuf; |
| } |
| |
| HttpRPC::~HttpRPC() { |
| LOG(INFO) << "HttpRPC terminated."; |
| } |
| |
| int HttpRPC::Ping(WorkerThreadManager* wm, |
| const string& path, |
| Status* status) { |
| std::unique_ptr<Status> ping_status(new Status); |
| DCHECK(status); |
| *ping_status = *status; |
| if (ping_status->trace_id.empty()) { |
| ping_status->trace_id = "ping"; |
| } |
| long long timeout_secs = -1; |
| if (!ping_status->timeout_secs.empty()) { |
| timeout_secs = ping_status->timeout_secs.front(); |
| LOG(INFO) << "ping " << path << " timeout=" << timeout_secs; |
| } else { |
| LOG(INFO) << "ping " << path << " no timeout"; |
| } |
| DCHECK(wm) << "There isn't any worker thread to send to"; |
| // Make client active until PingDone is called. |
| // Without this, client could shutdown after ping rpc is finished |
| // and before it calls Wait in PingDone. |
| client_->IncNumActive(); |
| std::unique_ptr<SimpleTimer> timer(new SimpleTimer); |
| // Ping may be called on the thread not in the worker thread manager. |
| wm->RunClosure( |
| FROM_HERE, |
| NewCallback( |
| this, &HttpRPC::DoPing, path, ping_status.get()), |
| WorkerThreadManager::PRIORITY_LOW); |
| // We can't use Wait() since wm->Dispatch() can be called |
| // on a thread in the worker thread manager only. |
| // TODO: use conditional variable to wait? |
| while (!ping_status->finished) { |
| absl::SleepFor(absl::Milliseconds(100)); |
| if (timeout_secs > 0 && |
| timer->GetInNanoseconds() > timeout_secs * 1000000000) { |
| // TODO: fix HttpRPC's timeout. |
| LOG(ERROR) << "ping timed out, but not finished yet." |
| << "timer=" << timer->GetInMilliseconds() << " [ms]"; |
| break; |
| } |
| } |
| if (ping_status->finished) { |
| *status = *ping_status; |
| } else { |
| status->err = ERR_TIMEOUT; |
| } |
| wm->RunClosure( |
| FROM_HERE, |
| NewCallback(this, &HttpRPC::PingDone, |
| std::move(ping_status), std::move(timer)), |
| WorkerThreadManager::PRIORITY_LOW); |
| int status_code = client_->UpdateHealthStatusMessageForPing( |
| static_cast<const HttpClient::Status&>(*status), -1); |
| const string& health_status = client_->GetHealthStatusMessage(); |
| if (health_status != "ok") { |
| LOG(WARNING) << "Update health status:" << health_status; |
| } |
| return status_code; |
| } |
| |
| void HttpRPC::DoPing(string path, Status* status) { |
| CallWithCallback(path, nullptr, nullptr, status, nullptr); |
| } |
| |
| void HttpRPC::PingDone(std::unique_ptr<Status> status, |
| std::unique_ptr<SimpleTimer> timer) { |
| LOG(INFO) << "Wait ping status " << status.get(); |
| Wait(status.get()); |
| int round_trip_time = timer->GetInIntMilliseconds(); |
| LOG_IF(WARNING, !status->connect_success) |
| << "failed to connect to backend servers"; |
| LOG_IF(WARNING, status->err == ERR_TIMEOUT) |
| << "timed out to send request to backend servers"; |
| LOG_IF(WARNING, status->http_return_code != 200) |
| << "http=" << status->http_return_code; |
| LOG_IF(WARNING, !status->err_message.empty()) |
| << "http err_message=" << status->err_message; |
| LOG_IF(WARNING, !status->response_header.empty()) |
| << "http response header=" << status->response_header; |
| LOG_IF(WARNING, status->err != OK) |
| << "http status err=" << status->err; |
| const string old_health_status = client_->GetHealthStatusMessage(); |
| client_->UpdateHealthStatusMessageForPing( |
| static_cast<const HttpClient::Status&>(*status), round_trip_time); |
| const string new_health_status = client_->GetHealthStatusMessage(); |
| if (old_health_status != new_health_status) { |
| if (new_health_status == "ok") { |
| LOG(INFO) << "Update health status:" << old_health_status |
| << " to " << new_health_status; |
| } else { |
| LOG(WARNING) << "Update health status:" << old_health_status |
| << " to " << new_health_status; |
| } |
| } |
| LOG(INFO) << "Release ping status " << status.get(); |
| client_->DecNumActive(); |
| } |
| |
| int HttpRPC::Call(const string& path, |
| const google::protobuf::Message* req, |
| google::protobuf::Message* resp, |
| Status* status) { |
| DCHECK(status); |
| CallWithCallback(path, req, resp, status, nullptr); |
| Wait(status); |
| return status->err; |
| } |
| |
| void HttpRPC::Wait(Status* status) { |
| client_->Wait(static_cast<HttpClient::Status*>(status)); |
| } |
| |
| void HttpRPC::CallWithCallback( |
| const string& path, |
| const google::protobuf::Message* req, |
| google::protobuf::Message* resp, |
| Status* status, |
| OneshotClosure* callback) { |
| std::unique_ptr<CallRequest> call_req(new CallRequest(req, status)); |
| if (IsCompressionEnabled()) { |
| VLOG(2) << "compression enabled level=" << options_.compression_level |
| << " accept_encoding=" << options_.accept_encoding; |
| call_req->EnableCompression( |
| options_.compression_level, options_.accept_encoding); |
| } else { |
| VLOG(2) << "compression is not enabled"; |
| } |
| std::unique_ptr<Request> http_req = std::move(call_req); |
| client_->InitHttpRequest(http_req.get(), "POST", path); |
| std::unique_ptr<Response> http_resp(new CallResponse(resp, status)); |
| http_req->SetContentType(options_.content_type_for_protobuf); |
| std::unique_ptr<CallData> call( |
| new CallData(std::move(http_req), std::move(http_resp), callback)); |
| |
| // Take pointers before call is moved to pass these addresses DoAsync. |
| const auto* ptr_req = call->req(); |
| auto* ptr_resp = call->resp(); |
| |
| VLOG(3) << "Call async " << call.get(); |
| OneshotClosure* done = |
| NewCallback(this, &HttpRPC::CallDone, std::move(call)); |
| |
| client_->DoAsync(ptr_req, ptr_resp, |
| static_cast<HttpClient::Status*>(status), |
| done); |
| return; |
| } |
| |
| void HttpRPC::CallDone(std::unique_ptr<CallData> call) { |
| VLOG(3) << "CallDone " << call.get(); |
| if (call->resp()->status_code() != 200) { |
| // Apiary returns 415 to reject Content-Encoding. |
| if (call->resp()->status_code() == 400 || |
| call->resp()->status_code() == 415 || |
| call->resp()->result() == FAIL) { |
| DisableCompression(); |
| } |
| } else { |
| EnableCompression(call->resp()->Header()); |
| } |
| // destructor runs call->callback_ |
| } |
| |
| string HttpRPC::DebugString() const { |
| AUTOLOCK(lock, &mu_); |
| std::ostringstream ss; |
| ss << "Compression:"; |
| if (compression_enabled_) { |
| ss << "enabled"; |
| } else { |
| ss << "disabled"; |
| } |
| ss << std::endl; |
| ss << "Accept-Encoding:" << options_.accept_encoding << std::endl; |
| ss << "Content-Type:" << options_.content_type_for_protobuf << std::endl; |
| ss << std::endl; |
| return ss.str(); |
| } |
| |
| void HttpRPC::DumpToJson(Json::Value* json) const { |
| client_->DumpToJson(json); |
| AUTOLOCK(lock, &mu_); |
| (*json)["compression"] = (compression_enabled_ ? "enabled" : "disabled"); |
| (*json)["accept_encoding"] = options_.accept_encoding; |
| (*json)["content_type"] = options_.content_type_for_protobuf; |
| } |
| |
| void HttpRPC::DumpStatsToProto(HttpRPCStats* stats) const { |
| client_->DumpStatsToProto(stats); |
| } |
| |
| void HttpRPC::DisableCompression() { |
| AUTOLOCK(lock, &mu_); |
| if (compression_enabled_) |
| LOG(WARNING) << "Compression disabled"; |
| compression_enabled_ = false; |
| } |
| |
| void HttpRPC::EnableCompression(absl::string_view header) { |
| AUTOLOCK(lock, &mu_); |
| absl::string_view::size_type accept_encoding = |
| header.find(absl::StrCat(kAcceptEncoding, ": deflate")); |
| if (accept_encoding != absl::string_view::npos) { |
| if (!compression_enabled_) |
| LOG(INFO) << "Compression enabled"; |
| compression_enabled_ = true; |
| } |
| } |
| |
| bool HttpRPC::IsCompressionEnabled() const { |
| AUTOLOCK(lock, &mu_); |
| if (!compression_enabled_) |
| return false; |
| if (options_.compression_level == 0) |
| return false; |
| return true; |
| } |
| |
| HttpRPC::CallRequest::CallRequest( |
| const google::protobuf::Message* req, |
| HttpRPC::Status* status) |
| : Request(req, status), |
| compression_level_(0) { |
| } |
| |
| std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> |
| HttpRPC::CallRequest::NewStream() const { |
| std::vector<std::unique_ptr<google::protobuf::io::ZeroCopyInputStream>> |
| streams; |
| std::vector<string> headers; |
| if (compression_level_ > 0 && accept_encoding_ != "" && req_) { |
| string compressed; |
| headers.push_back(CreateHeader(kAcceptEncoding, accept_encoding_)); |
| SimpleTimer compression_timer; |
| google::protobuf::io::StringOutputStream stream(&compressed); |
| google::protobuf::io::GzipOutputStream::Options options; |
| options.format = google::protobuf::io::GzipOutputStream::ZLIB; |
| options.compression_level = compression_level_; |
| google::protobuf::io::GzipOutputStream gzip_stream(&stream, options); |
| req_->SerializeToZeroCopyStream(&gzip_stream); |
| if (!gzip_stream.Close()) { |
| LOG(ERROR) << "GzipOutputStream error:" |
| << gzip_stream.ZlibErrorMessage(); |
| } else if (compressed.size() > 1 && (compressed[1] >> 5 & 1)) { |
| LOG(WARNING) << "response has FDICT, which should not be supported"; |
| } else { |
| headers.push_back(CreateHeader(kContentEncoding, "deflate")); |
| status_->raw_req_size = gzip_stream.ByteCount(); |
| absl::string_view body(compressed); |
| // Omit zlib header (since server assumes no zlib header). |
| body.remove_prefix(2); |
| streams.reserve(2); |
| streams.push_back( |
| absl::make_unique<StringInputStream>( |
| BuildHeader(headers, body.size()))); |
| streams.push_back( |
| absl::make_unique<StringInputStream>(string(body))); |
| return absl::make_unique<ChainedInputStream>(std::move(streams)); |
| } |
| } else { |
| VLOG(1) << "compression unavailable."; |
| } |
| |
| // Fallback if compression is not supported or failed. |
| string raw_body; |
| if (req_) { |
| req_->SerializeToString(&raw_body); |
| } |
| status_->raw_req_size = raw_body.size(); |
| streams.reserve(2); |
| streams.push_back( |
| absl::make_unique<StringInputStream>( |
| BuildHeader(headers, raw_body.size()))); |
| streams.push_back( |
| absl::make_unique<StringInputStream>(std::move(raw_body))); |
| return absl::make_unique<ChainedInputStream>(std::move(streams)); |
| } |
| |
| void HttpRPC::Response::ParseBody() { |
| if (resp_) { |
| std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> input = |
| response_body_->ParsedStream(); |
| if (input == nullptr) { |
| err_message_ = "failed to create parsed stream"; |
| result_ = FAIL; |
| return; |
| } |
| if (!resp_->ParseFromZeroCopyStream(input.get())) { |
| LOG(WARNING) << trace_id_ << " Parse response failed"; |
| err_message_ = "Parse response failed"; |
| result_ = FAIL; |
| return; |
| } |
| status_->raw_resp_size = resp_->ByteSize(); |
| } |
| result_ = OK; |
| return; |
| } |
| |
| ExecServiceClient::ExecServiceClient(HttpRPC* http_rpc, string path) |
| : http_rpc_(http_rpc), path_(std::move(path)) {} |
| |
| void ExecServiceClient::ExecAsync(const ExecReq* req, ExecResp* resp, |
| HttpClient::Status* status, |
| OneshotClosure* callback) { |
| http_rpc_->CallWithCallback(path_, req, resp, status, callback); |
| } |
| |
| void ExecServiceClient::Exec(const ExecReq* req, ExecResp* resp, |
| HttpClient::Status* status) { |
| http_rpc_->Call(path_, req, resp, status); |
| } |
| |
| } // namespace devtools_goma |