blob: 0850eec8b63d8273c02bb2849d74a611f6ad428e [file] [log] [blame]
// Copyright 2014 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "http.h"
#ifndef _WIN32
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#endif
#include <time.h>
#include <algorithm>
#include <iostream>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include "absl/memory/memory.h"
#include "absl/strings/numbers.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "autolock_timer.h"
#include "callback.h"
#include "compiler_proxy_info.h"
#include "compiler_specific.h"
#include "compress_util.h"
#include "descriptor.h"
#include "env_flags.h"
#include "fileflag.h"
#include "glog/logging.h"
MSVC_PUSH_DISABLE_WARNING_FOR_PROTO()
#include "google/protobuf/message.h"
#include "google/protobuf/io/zero_copy_stream.h"
MSVC_POP_WARNING()
#include "histogram.h"
#include "http_util.h"
#include "oauth2.h"
#include "oauth2_token.h"
#include "openssl_engine.h"
MSVC_PUSH_DISABLE_WARNING_FOR_PROTO()
#include "prototmp/goma_stats.pb.h"
MSVC_POP_WARNING()
#include "scoped_fd.h"
#include "simple_timer.h"
#include "socket_descriptor.h"
#include "socket_factory.h"
#include "socket_pool.h"
#include "tls_descriptor.h"
#include "worker_thread_manager.h"
using std::string;
namespace devtools_goma {
// Note: we can't use X-Goma-Content-Length, because
// FindContentLengthAndBodyOffset in file.cc would confuse with Content-Length.
const char HttpClient::kGomaLength[] = "X-Goma-Length: ";
const int kDefaultThrottleTimeoutMilliSec = 600 * 1000;
const int kDefaultTimeoutSec = 900;
const size_t kMaxTrafficHistory = 120U;
const int kMaxQPS = 700;
const int kRampUpDurationSec = 600; // 10 min
const int kMaxConnectionFailure = 5;
const int kDefaultErrorThresholdPercent = 30;
static bool IsFatalNetworkErrorCode(int status_code) {
return status_code == 302 || status_code == 401 || status_code == 403;
}
static time_t CalculateEnabledFrom(int status_code, time_t enabled_from) {
const int kMinDisableDurationSec = 600; // 10 min
const int kMaxDisableDurationSec = 1200; // 20 min
if (IsFatalNetworkErrorCode(status_code)) {
// status code for blocking by dos server.
time_t t = time(nullptr) + kMinDisableDurationSec +
(rand() % (kMaxDisableDurationSec - kMinDisableDurationSec));
if (t > enabled_from) {
LOG(INFO) << "status=" << status_code
<< " extend enabled from: " << enabled_from
<< " to " << t;
enabled_from = t;
}
return enabled_from;
}
// status_code == 200; success
// status_code == 204; no response
// status_code == 400; bad request (app error)
// status_code == 408; timeout
// status_code == 415; unsupported media type (disable compression)
// status_code == 5xx; server error
if ((status_code / 100) != 2) {
// no update of enabled_from for other than 2xx.
return enabled_from;
}
if (enabled_from == 0) {
return 0;
}
time_t now = time(nullptr);
if (now < enabled_from) {
// ramp up from now to now+kRampUpDurationSec.
LOG(INFO) << "got 200 respose in enabled_from=" << enabled_from
<< " start ramp up from " << now;
enabled_from = now;
} else if (enabled_from <= now && now < enabled_from + kRampUpDurationSec) {
// nothing to do in ramp up period:
} else if (enabled_from + kRampUpDurationSec <= now) {
LOG(INFO) << "got 200 response. finish ramp up period";
enabled_from = 0;
}
return enabled_from;
}
HttpClient::Options::Options()
: dest_port(0), proxy_port(0),
capture_response_header(false),
use_ssl(false), ssl_crl_max_valid_duration(-1),
socket_read_timeout_sec(1.0),
min_retry_backoff_ms(500), max_retry_backoff_ms(5000),
fail_fast(false), network_error_margin(0),
network_error_threshold_percent(kDefaultErrorThresholdPercent),
allow_throttle(true), reuse_connection(true) {
}
bool HttpClient::Options::InitFromURL(absl::string_view url) {
size_t pos = url.find("://");
if (pos == string::npos) {
return false;
}
absl::string_view scheme = url.substr(0, pos);
if (scheme == "http") {
use_ssl = false;
dest_port = 80;
} else if (scheme == "https") {
use_ssl = true;
dest_port = 443;
} else {
return false;
}
absl::string_view hostport = url.substr(pos + 3);
pos = hostport.find("/");
if (pos != string::npos) {
url_path_prefix = string(hostport.substr(pos));
hostport = hostport.substr(0, pos);
} else {
url_path_prefix = "/";
}
pos = hostport.find(":");
if (pos != string::npos) {
dest_host_name = string(hostport.substr(0, pos));
dest_port = atoi(string(hostport.substr(pos+1)).c_str());
} else {
dest_host_name = string(hostport);
}
return true;
}
string HttpClient::Options::SocketHost() const {
if (!proxy_host_name.empty()) {
return proxy_host_name;
}
return dest_host_name;
}
int HttpClient::Options::SocketPort() const {
if (!proxy_host_name.empty()) {
return proxy_port;
}
return dest_port;
}
string HttpClient::Options::RequestURL(absl::string_view path) const {
std::ostringstream url;
if ((dest_host_name != SocketHost()
|| dest_port != SocketPort())
&& !use_ssl) {
// without SSL and with proxy, send request with absolute-form.
url << "http://" << dest_host_name << ':' << dest_port;
}
url << url_path_prefix << path;
url << extra_params;
return url.str();
}
string HttpClient::Options::Host() const {
if (!http_host_name.empty()) {
return http_host_name;
}
if ((dest_host_name != SocketHost()
|| dest_port != SocketPort())
&& use_ssl) {
return dest_host_name;
}
return SocketHost();
}
string HttpClient::Options::DebugString() const {
std::ostringstream ss;
ss << "dest=" << dest_host_name << ":" << dest_port;
if (!http_host_name.empty())
ss << " http_host=" << http_host_name;
if (!url_path_prefix.empty())
ss << " url_path_prefix=" << url_path_prefix;
if (!proxy_host_name.empty())
ss << " proxy=" << proxy_host_name << ":" << proxy_port;
if (!extra_params.empty())
ss << " extra=" << extra_params;
if (!authorization.empty())
ss << " authorization:enabled";
if (!cookie.empty())
ss << " cookie=" << cookie;
if (oauth2_config.enabled())
ss << " oauth2:enabled";
if (!service_account_json_filename.empty())
ss << " service_account:" << service_account_json_filename;
if (!gce_service_account.empty())
ss << " gce_service_account:" << gce_service_account;
if (capture_response_header)
ss << " capture_response_header";
if (use_ssl)
ss << " use_ssl";
if (!ssl_extra_cert.empty())
ss << " ssl_extra_cert=" << ssl_extra_cert;
if (!ssl_extra_cert_data.empty())
ss << " ssl_extra_cert_data:set";
ss << " socket_read_timeout_sec=" << socket_read_timeout_sec;
ss << " retry_backoff_ms="
<< min_retry_backoff_ms << " .. " << max_retry_backoff_ms;
if (fail_fast) {
ss << " fail_fast";
}
return ss.str();
}
void HttpClient::Options::ClearAuthConfig() {
gce_service_account.clear();
service_account_json_filename.clear();
oauth2_config.clear();
luci_context_auth.clear();
}
// This object is created when asynchronously waiting for
// HttpClient. The object is deleted by RunCallback,DoCallback.
class HttpClient::Task {
public:
Task(HttpClient* client,
const HttpClient::Request* req,
HttpClient::Response* resp,
Status* status,
WorkerThreadManager* wm,
OneshotClosure* callback)
: client_(client),
req_(req),
resp_(resp),
status_(status),
wm_(wm),
thread_id_(wm_->GetCurrentThreadId()),
d_(nullptr),
active_(false),
close_state_(HttpClient::ERROR_CLOSE),
auth_status_(OK),
is_ping_(status_->trace_id == "ping"),
callback_(callback) {
if (status_->timeout_secs.empty())
status_->timeout_secs.push_back(kDefaultTimeoutSec);
client_->IncNumActive();
resp_->SetRequestPath(req_->request_path());
resp_->SetTraceId(status_->trace_id);
}
void Start() {
CHECK(!status_->finished);
CHECK(!active_);
if (client_->failnow()) {
status_->enabled = false;
RunCallback(FAIL, "http fail now");
return;
}
// TODO: rethink the way refreshing OAuth2 access token.
// Refreshing OAuth2 access token is a bit complex operation, and
// difficult to track the behavior. Refactoring must be needed.
if (auth_status_ == NEED_REFRESH) {
const string& authorization = client_->GetOAuth2Authorization();
if (authorization.empty()) {
RunCallback(FAIL, "authorization not available");
return;
}
cloned_req_ = req_->Clone();
cloned_req_->SetAuthorization(authorization);
auth_status_ = OK;
req_ = cloned_req_.get();
LOG(INFO) << status_->trace_id
<< " cloned HttpClient::Request to set authorization.";
}
if (client_->ShouldRefreshOAuth2AccessToken()) {
LOG(INFO) << status_->trace_id
<< " authorization is not ready, going to run after refresh.";
auth_status_ = NEED_REFRESH;
client_->RunAfterOAuth2AccessTokenGetReady(
wm_->GetCurrentThreadId(),
NewCallback(this, &HttpClient::Task::Start));
return;
}
int throttle_time = timer_.GetInIntMilliseconds();
status_->throttle_time += throttle_time;
int backoff = client_->TryStart();
if (backoff > 0) {
if (status_->num_throttled == 0) { // only increment first time.
DCHECK_EQ(Status::INIT, status_->state);
status_->state = Status::PENDING;
client_->IncNumPending();
}
++status_->num_throttled;
if (status_->throttle_time > kDefaultThrottleTimeoutMilliSec) {
LOG(WARNING) << status_->trace_id
<< " Timeout in throttled. throttle_time="
<< status_->throttle_time;
RunCallback(ERR_TIMEOUT, "Time-out in throttled");
return;
}
LOG(WARNING) << status_->trace_id
<< " Throttled backoff=" << backoff << "msec"
<< " remaining="
<< (kDefaultThrottleTimeoutMilliSec - status_->throttle_time)
<< "ms";
// TODO: might need to cancel this on shutdown?
wm_->RunDelayedClosureInThread(
FROM_HERE,
wm_->GetCurrentThreadId(),
backoff,
NewCallback(this, &HttpClient::Task::Start));
timer_.Start();
return;
}
LOG_IF(INFO, status_->num_throttled > 0)
<< status_->trace_id << " http: Start throttled req. "
<< status_->num_throttled
<< " time=" << status_->throttle_time
<< " [last throttle=" << throttle_time << "]";
if (status_->timeout_secs.empty()) {
LOG(WARNING) << status_->trace_id
<< " Time-out in connect";
RunCallback(ERR_TIMEOUT, "Time-out in connect");
return;
}
// TODO: make connect async.
d_ = client_->NewDescriptor();
if (d_ == nullptr) {
++status_->num_connect_failed;
// Note we do not retry if handling ping because its scenario
// does not match what we expect.
//
// As written below, this code's goal is mitigating temporary
// network failure while several requests are on-flight concurrently.
// Since we usually run only one ping request, it does not meet
// the scenario below.
if (is_ping_ || status_->num_connect_failed > kMaxConnectionFailure) {
RunCallback(FAIL, "Can't establish connection to server");
return;
}
// Note that goal of this backoff and retry is mitigating a temporary
// network failure suggested in: b/36575944#comment6
// The scenario like:
// 1. send request A
// 2. send request B
// 3. got error as response A or B
// 4. send request C, need to connect -> fail. no address available
// 5. got success as response A or B
// (Considered elapsed time from Step 3 to Step 5 is expected to be small,
// say less than 1 second)
//
// Since we expect the address is marked as success again in Step 5.
// we do not retry for long time. (e.g. 60 seconds to error address
// become available in socket_pool.)
int start_backoff = client_->GetRandomizeBackoffTimeInMs();
LOG(WARNING) << status_->trace_id
<< " Can't establish connection to server"
<< " retry after backoff=" << start_backoff;
// TODO: might need to cancel this on shutdown?
wm_->RunDelayedClosureInThread(
FROM_HERE,
wm_->GetCurrentThreadId(),
start_backoff,
NewCallback(this, &HttpClient::Task::Start));
timer_.Start();
return;
}
if (status_->state == Status::PENDING) {
client_->DecNumPending();
}
DCHECK(status_->state == Status::INIT || status_->state == Status::PENDING)
<< status_->trace_id << " state=" << status_->state;
status_->state = Status::SENDING_REQUEST;
resp_->Reset();
active_ = true;
status_->connect_success = true;
double t = static_cast<double>(status_->timeout_secs.front());
status_->timeout_secs.pop_front();
timer_.Start();
request_stream_ = req_->NewStream();
status_->req_build_time = timer_.GetInIntMilliseconds();
d_->NotifyWhenWritable(
NewPermanentCallback(this, &HttpClient::Task::DoWrite));
d_->NotifyWhenTimedout(
t, NewCallback(this, &HttpClient::Task::DoTimeout));
timer_.Start();
}
private:
enum AuthorizationStatus {
OK,
NEED_REFRESH,
};
~Task() {
CHECK(!active_);
}
void DoWrite() {
if (!active_) {
LOG(WARNING) << "Already finished?";
RunCallback(FAIL, "Writable, but already inactive");
return;
}
if (client_->failnow()) {
status_->enabled = false;
RunCallback(FAIL, "http fail now");
return;
}
CHECK(d_);
VLOG(7) << "DoWrite " << d_;
const void* data = nullptr;
int size = 0;
if (!request_stream_->Next(&data, &size)) {
// Request has been sent.
DCHECK_EQ(Status::SENDING_REQUEST, status_->state);
status_->req_size = request_stream_->ByteCount();
status_->state = Status::REQUEST_SENT;
d_->StopWrite();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(this, &HttpClient::Task::DoRequestDone),
WorkerThreadManager::PRIORITY_IMMEDIATE);
return;
}
int n = d_->Write(data, size);
VLOG(3) << status_->trace_id << " DoWrite "
<< size << " -> " << n;
if (n < 0 && d_->NeedRetry()) {
request_stream_->BackUp(size);
return;
}
if (n <= 0) {
LOG(WARNING) << status_->trace_id
<< " Write failed " << n
<< " err=" << d_->GetLastErrorMessage();
std::ostringstream err_message;
err_message << status_->trace_id
<< " Write failed ret=" << n
<< " @" << request_stream_->ByteCount()
<< " : " << d_->GetLastErrorMessage();
RunCallback(FAIL, err_message.str());
return;
}
request_stream_->BackUp(size - n);
client_->IncWriteByte(n);
}
void DoRead() {
if (!active_) {
LOG(WARNING) << "Already finished?";
RunCallback(FAIL, "Readable, but already inactive");
return;
}
if (client_->failnow()) {
status_->enabled = false;
RunCallback(FAIL, "http fail now");
return;
}
if (status_->state != Status::RECEIVING_RESPONSE) {
DCHECK_EQ(Status::REQUEST_SENT, status_->state);
status_->state = Status::RECEIVING_RESPONSE;
}
CHECK(d_);
char* buf;
int buf_size;
resp_->Buffer(&buf, &buf_size);
int r = d_->Read(buf, buf_size);
VLOG(7) << "DoRead " << d_ << " buf_size=" << buf_size << " r=" << r;
if (r < 0 && d_->NeedRetry()) {
return;
}
if (r < 0) { // error
LOG(WARNING) << status_->trace_id
<< " Read failed " << r
<< " err=" << d_->GetLastErrorMessage();
std::ostringstream err_message;
err_message << status_->trace_id
<< " Read failed ret=" << r
<< " @" << resp_->len()
<< " of " << resp_->buffer_size()
<< " : " << d_->GetLastErrorMessage();
err_message << " : received=" << resp_->Header();
RunCallback(FAIL, err_message.str());
return;
}
if (status_->wait_time == 0 && resp_->len() == 0) {
status_->wait_time = timer_.GetInIntMilliseconds();
timer_.Start();
d_->ChangeTimeout(client_->options().socket_read_timeout_sec);
}
client_->IncReadByte(r);
if (resp_->Recv(r)) {
VLOG(1) << status_->trace_id << " response\n"
<< resp_->Header();
status_->resp_recv_time = timer_.GetInIntMilliseconds();
timer_.Start();
resp_->Parse();
status_->resp_parse_time = timer_.GetInIntMilliseconds();
status_->resp_size = resp_->len();
if (resp_->status_code() != 200 || resp_->result() == FAIL) {
DCHECK_EQ(close_state_, HttpClient::ERROR_CLOSE);
CaptureResponseHeader();
} else {
DCHECK_EQ(resp_->result(), OK);
DCHECK_EQ(resp_->status_code(), 200);
if (resp_->HasConnectionClose() ||
!client_->options().reuse_connection) {
close_state_ = HttpClient::NORMAL_CLOSE;
} else {
close_state_ = HttpClient::NO_CLOSE;
}
}
status_->http_return_code = resp_->status_code();
DCHECK_EQ(Status::RECEIVING_RESPONSE, status_->state);
status_->state = Status::RESPONSE_RECEIVED;
RunCallback(resp_->result(), resp_->err_message());
return;
}
if (client_->options().capture_response_header &&
resp_->HasHeader()) {
CaptureResponseHeader();
}
d_->ChangeTimeout(
client_->options().socket_read_timeout_sec +
client_->EstimatedRecvTime(kNetworkBufSize));
}
void DoTimeout() {
if (!active_) {
LOG(WARNING) << "Already finished?";
return;
}
if (client_->failnow()) {
status_->enabled = false;
RunCallback(FAIL, "http fail now");
return;
}
if (status_->timeout_secs.empty()) {
std::ostringstream err_message;
err_message << "Timed out: ";
if (request_stream_) {
err_message << "sending request header "
<< request_stream_->ByteCount()
<< " " << timer_.GetInMilliseconds() << "ms";
} else if (resp_->len() == 0) {
err_message << "waiting response "
<< " " << timer_.GetInMilliseconds() << "ms";
} else {
err_message << "receiving response "
<< resp_->len()
<< " of " << resp_->buffer_size()
<< " " << timer_.GetInMilliseconds() << "ms";
}
LOG(WARNING) << status_->trace_id << " " << err_message.str();
RunCallback(ERR_TIMEOUT, err_message.str());
return;
}
d_->StopRead();
d_->StopWrite();
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(this, &HttpClient::Task::DoRetry),
WorkerThreadManager::PRIORITY_MED);
}
void RunCallback(int err, const string& err_message) {
VLOG(2) << status_->trace_id
<< " RunCallback"
<< " err=" << err
<< " msg=" << err_message;
if (d_) {
d_->StopRead();
d_->StopWrite();
}
active_ = false;
status_->err = err;
status_->err_message = err_message;
if (status_->state == Status::PENDING) {
client_->DecNumPending();
}
// We MUST use lower priority than Descriptor to ensure the TLS write
// closure stopped.
wm_->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(this, &HttpClient::Task::DoCallback),
WorkerThreadManager::PRIORITY_MED);
}
void DoRetry() {
LOG(INFO) << status_->trace_id << " DoRetry ";
if (!active_)
return;
Descriptor* d = d_;
d_ = nullptr;
client_->ReleaseDescriptor(d, HttpClient::ERROR_CLOSE);
active_ = false;
request_stream_.reset();
resp_->Reset();
++status_->num_retry;
Start();
}
void DoRequestDone() {
VLOG(3) << status_->trace_id << " DoWrite " << " done";
if (!active_)
return;
status_->req_send_time = timer_.GetInIntMilliseconds();
request_stream_.reset();
d_->ClearWritable();
d_->NotifyWhenReadable(
NewPermanentCallback(this, &HttpClient::Task::DoRead));
timer_.Start();
}
void DoCallback() {
VLOG(3) << status_->trace_id << " DoCallback"
<< " close_state=" << close_state_;
CHECK(!active_);
Descriptor* d = d_;
d_ = nullptr;
// once callback_ is called, it is not safe to touch status_.
status_->finished = true;
// Since status for ping would be updated in
// UpdateHealthStatusMessageForPing, we do not need to update it here.
// (b/26701852)
if (!is_ping_) {
client_->UpdateStats(*status_);
} else {
LOG(INFO) << "We will not update status for ping.";
}
OneshotClosure* callback = callback_;
callback_ = nullptr;
if (callback)
callback->Run();
client_->ReleaseDescriptor(d, close_state_);
client_->DecNumActive();
delete this;
}
void CaptureResponseHeader() {
if (!status_->response_header.empty())
return;
status_->response_header = string(resp_->Header());
}
HttpClient* client_;
const HttpClient::Request* req_;
std::unique_ptr<HttpClient::Request> cloned_req_;
HttpClient::Response* resp_;
Status* status_;
WorkerThreadManager* wm_;
WorkerThreadManager::ThreadId thread_id_;
Descriptor* d_;
bool active_;
HttpClient::ConnectionCloseState close_state_;
AuthorizationStatus auth_status_;
std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> request_stream_;
const bool is_ping_;
SimpleTimer timer_;
// Callback that is called when RPC is received and has completed.
OneshotClosure* callback_;
DISALLOW_COPY_AND_ASSIGN(Task);
};
/* static */
absl::string_view HttpClient::Status::StateName(State state) {
switch (state) {
case INIT:
return "INIT";
case PENDING:
return "PENDING";
case SENDING_REQUEST:
return "SENDING_REQUEST";
case REQUEST_SENT:
return "REQUEST_SENT";
case RECEIVING_RESPONSE:
return "RECEIVING_RESPONSE";
case RESPONSE_RECEIVED:
return "RESPONSE_RECEIVED";
default:
return "invalid HttpClient::Status::State";
}
}
HttpClient::Status::Status()
: state(Status::INIT),
timeout_should_be_http_error(true),
connect_success(false),
finished(false),
err(0),
enabled(true),
http_return_code(0),
req_size(0),
resp_size(0),
raw_req_size(0),
raw_resp_size(0),
throttle_time(0),
pending_time(0),
req_build_time(0),
req_send_time(0),
wait_time(0),
resp_recv_time(0),
resp_parse_time(0),
num_retry(0),
num_throttled(0),
num_connect_failed(0) {
}
string HttpClient::Status::DebugString() const {
std::ostringstream ss;
ss << "state=" << state
<< " timeout_should_be_http_error=" << timeout_should_be_http_error
<< " connect_success=" << connect_success
<< " finished=" << finished
<< " err=" << err
<< " http_return_code=" << http_return_code
<< " req_size=" << req_size
<< " resp_size=" << resp_size
<< " raw_req_size=" << raw_req_size
<< " raw_resp_size=" << raw_resp_size
<< " throttle_time=" << throttle_time
<< " pending_time=" << pending_time
<< " req_build_time=" << req_build_time
<< " req_send_time=" << req_send_time
<< " wait_time=" << wait_time
<< " resp_recv_time=" << resp_recv_time
<< " resp_parse_time=" << resp_parse_time
<< " num_retry=" << num_retry
<< " num_throttled=" << num_throttled
<< " num_connect_failed=" << num_connect_failed;
return ss.str();
}
HttpClient::TrafficStat::TrafficStat()
: read_byte(0), write_byte(0), query(0), http_err(0) {
}
/* static */
std::unique_ptr<SocketFactory> HttpClient::NewSocketFactoryFromOptions(
const Options& options) {
return std::unique_ptr<SocketFactory>(
new SocketPool(options.SocketHost(), options.SocketPort()));
}
std::unique_ptr<TLSEngineFactory> HttpClient::NewTLSEngineFactoryFromOptions(
const Options& options) {
if (options.use_ssl) {
std::unique_ptr<OpenSSLEngineCache> ssl_engine_fact(new OpenSSLEngineCache);
if (!options.ssl_extra_cert.empty())
ssl_engine_fact->AddCertificateFromFile(options.ssl_extra_cert);
if (!options.ssl_extra_cert_data.empty())
ssl_engine_fact->AddCertificateFromString(options.ssl_extra_cert_data);
ssl_engine_fact->SetHostname(options.dest_host_name);
if (!options.proxy_host_name.empty()) {
ssl_engine_fact->SetProxy(options.proxy_host_name, options.proxy_port);
}
ssl_engine_fact->SetCRLMaxValidDurationInSeconds(
options.ssl_crl_max_valid_duration);
return std::unique_ptr<TLSEngineFactory>(std::move(ssl_engine_fact));
}
return nullptr;
}
HttpClient::HttpClient(std::unique_ptr<SocketFactory> socket_factory,
std::unique_ptr<TLSEngineFactory> tls_engine_factory,
const Options& options,
WorkerThreadManager* wm)
: options_(options),
tls_engine_factory_(std::move(tls_engine_factory)),
socket_pool_(std::move(socket_factory)),
wm_(wm),
health_status_("initializing"),
shutting_down_(false),
bad_status_num_in_recent_http_(0),
network_error_status_(options.network_error_margin),
num_query_(0),
num_active_(0),
total_pending_(0),
peak_pending_(0),
num_pending_(0),
num_http_retry_(0),
num_http_timeout_(0),
num_http_error_(0),
total_write_byte_(0),
total_read_byte_(0),
num_writable_(0),
num_readable_(0),
read_size_(new Histogram),
write_size_(new Histogram),
total_resp_byte_(0),
total_resp_time_(0),
ping_http_return_code_(-1),
ping_round_trip_time_ms_(-1),
traffic_history_closure_id_(kInvalidPeriodicClosureId),
retry_backoff_ms_(options.min_retry_backoff_ms),
enabled_from_(0),
num_network_error_(0),
num_network_recovered_(0) {
LOG(INFO) << options_.DebugString();
CHECK_GT(retry_backoff_ms_, 0);
CHECK_LT(options.min_retry_backoff_ms, options.max_retry_backoff_ms);
read_size_->SetName("read size distribution");
write_size_->SetName("write size distribution");
if (!options_.authorization.empty()) {
CHECK(options_.authorization.find_first_of("\r\n") == string::npos)
<< "authorization must not contain CR LF:" << options_.authorization;
}
if (!options_.cookie.empty()) {
CHECK(options_.cookie.find_first_of("\r\n") == string::npos)
<< "cookie must not contain CR LF:" << options_.cookie;
}
LOG_IF(ERROR, !socket_pool_->IsInitialized())
<< "socket pool is not initialized yet.";
traffic_history_.push_back(TrafficStat());
traffic_history_closure_id_ = wm_->RegisterPeriodicClosure(
FROM_HERE, 1000, NewPermanentCallback(
this, &HttpClient::UpdateTrafficHistory));
if (options_.use_ssl) {
DCHECK(tls_engine_factory_.get() != nullptr);
socket_pool_->SetObserver(tls_engine_factory_.get());
}
HttpClient::Options oauth2_options;
oauth2_options.proxy_host_name = options.proxy_host_name;
oauth2_options.proxy_port = options.proxy_port;
oauth2_options.gce_service_account = options.gce_service_account;
oauth2_options.service_account_json_filename =
options.service_account_json_filename;
oauth2_options.oauth2_config = options.oauth2_config;
oauth2_options.luci_context_auth = options.luci_context_auth;
oauth_refresh_task_ = OAuth2AccessTokenRefreshTask::New(
wm_, oauth2_options);
}
HttpClient::~HttpClient() {
{
AUTOLOCK(lock, &mu_);
shutting_down_ = true;
LOG(INFO) << "wait all tasks num_active=" << num_active_;
while (num_active_ > 0)
cond_.Wait(&mu_);
}
if (oauth_refresh_task_.get()) {
oauth_refresh_task_->Shutdown();
oauth_refresh_task_->Wait();
}
if (traffic_history_closure_id_ != kInvalidPeriodicClosureId) {
wm_->UnregisterPeriodicClosure(traffic_history_closure_id_);
traffic_history_closure_id_ = kInvalidPeriodicClosureId;
}
LOG(INFO) << "HttpClient terminated.";
}
void HttpClient::InitHttpRequest(
Request* req, const string& method, const string& path) const {
req->Init(method, path, options_);
const string& auth = GetOAuth2Authorization();
if (!auth.empty()) {
req->SetAuthorization(auth);
LOG_IF(WARNING, !options_.authorization.empty())
<< "authorization option is given but ignored.";
}
}
void HttpClient::Do(const Request* req, Response* resp, Status* status) {
DCHECK(status);
DCHECK(wm_);
DoAsync(req, resp, status, nullptr);
Wait(status);
}
void HttpClient::DoAsync(
const Request* req, Response* resp,
Status* status, OneshotClosure* callback) {
if (failnow()) {
status->enabled = false;
status->connect_success = false;
status->finished = true;
status->err = FAIL;
status->err_message = "http disabled";
status->http_return_code = 403;
// once callback_ is called, it is not safe to touch status.
if (callback)
callback->Run();
return;
}
DCHECK(wm_) << "There isn't any worker thread to send to";
Task* task = new Task(this, req, resp, status, wm_, callback);
task->Start();
return;
}
void HttpClient::Wait(Status* status) {
while (!status->finished) {
CHECK(wm_->Dispatch());
}
}
void HttpClient::Shutdown() {
{
AUTOLOCK(lock, &mu_);
LOG(INFO) << "shutdown";
shutting_down_ = true;
health_status_ = "shutting down";
}
if (oauth_refresh_task_.get()) {
oauth_refresh_task_->Shutdown();
}
}
bool HttpClient::shutting_down() const {
AUTOLOCK(lock, &mu_);
return shutting_down_;
}
Descriptor* HttpClient::NewDescriptor() {
ScopedSocket fd(socket_pool_->NewSocket());
// Note that unlike our past implementation, even on seeing previous network
// error we can get at least one socket if getaddrinfo succeeds.
// Thus, invalid fd means no address found by getaddrinfo.
if (!fd.valid()) {
{
AUTOLOCK(lock, &mu_);
NetworkErrorDetectedUnlocked();
}
return nullptr;
}
if (options_.use_ssl) {
TLSEngine *engine = tls_engine_factory_->NewTLSEngine(fd.get());
TLSDescriptor::Options tls_desc_options;
if (!options_.proxy_host_name.empty()) {
tls_desc_options.use_proxy = true;
tls_desc_options.dest_host_name = options_.dest_host_name;
tls_desc_options.dest_port = options_.dest_port;
}
TLSDescriptor* d = new TLSDescriptor(
wm_->RegisterSocketDescriptor(std::move(fd),
WorkerThreadManager::PRIORITY_MED),
engine, tls_desc_options, wm_);
d->Init();
return d;
}
return wm_->RegisterSocketDescriptor(std::move(fd),
WorkerThreadManager::PRIORITY_MED);
}
void HttpClient::ReleaseDescriptor(
Descriptor* d, ConnectionCloseState close_state) {
if (d == nullptr)
return;
bool reuse_socket = (close_state == NO_CLOSE) && d->CanReuse();
SocketDescriptor* sd = d->socket_descriptor();
DCHECK(!reuse_socket || !sd->IsClosed())
<< "should not reuse the socket if it has already been closed."
<< " fd=" << sd->fd()
<< " reuse_socket=" << reuse_socket
<< " close_state=" << close_state
<< " is_closed=" << sd->IsClosed()
<< " can_reuse=" << d->CanReuse();
if (options_.use_ssl) {
TLSDescriptor* tls_desc = static_cast<TLSDescriptor*>(d);
delete tls_desc;
}
ScopedSocket fd(wm_->DeleteSocketDescriptor(sd));
VLOG(3) << "Release fd=" << fd.get()
<< " reuse_socket=" << reuse_socket
<< " close_state=" << close_state;
if (fd.valid()) {
if (reuse_socket) {
socket_pool_->ReleaseSocket(std::move(fd));
} else {
socket_pool_->CloseSocket(std::move(fd), close_state == ERROR_CLOSE);
}
}
}
bool HttpClient::failnow() const {
AUTOLOCK(lock, &mu_);
if (shutting_down_) {
return true;
}
if (enabled_from_ == 0) {
return false;
}
return time(nullptr) < enabled_from_;
}
int HttpClient::ramp_up() const {
AUTOLOCK(lock, &mu_);
if (enabled_from_ == 0) {
return 100;
}
time_t now = time(nullptr);
if (now < enabled_from_) {
return 0;
}
return std::min<int>(100, (now - enabled_from_) * 100 / kRampUpDurationSec);
}
string HttpClient::GetHealthStatusMessage() const {
AUTOLOCK(lock, &mu_);
return health_status_;
}
void HttpClient::UpdateStatusCodeHistoryUnlocked() {
const int kHTTPStatusCodeHistoryHoldingSec = 3;
time_t now = time(nullptr);
while (!recent_http_status_code_.empty() &&
recent_http_status_code_.front().first <
now - kHTTPStatusCodeHistoryHoldingSec) {
if (recent_http_status_code_.front().second != 200) {
--bad_status_num_in_recent_http_;
}
recent_http_status_code_.pop_front();
}
}
void HttpClient::AddStatusCodeHistoryUnlocked(int status_code) {
UpdateStatusCodeHistoryUnlocked();
time_t now = time(nullptr);
if (status_code != 200) {
++bad_status_num_in_recent_http_;
}
recent_http_status_code_.emplace_back(now, status_code);
}
bool HttpClient::IsHealthyRecently() {
AUTOLOCK(lock, &mu_);
UpdateStatusCodeHistoryUnlocked();
return bad_status_num_in_recent_http_ <=
recent_http_status_code_.size() *
options_.network_error_threshold_percent / 100;
}
bool HttpClient::IsHealthy() const {
AUTOLOCK(lock, &mu_);
return health_status_ == "ok";
}
string HttpClient::GetAccount() {
if (oauth_refresh_task_.get() == nullptr) {
return "";
}
return oauth_refresh_task_->GetAccount();
}
bool HttpClient::GetOAuth2Config(OAuth2Config* config) const {
if (oauth_refresh_task_.get() == nullptr) {
return false;
}
return oauth_refresh_task_->GetOAuth2Config(config);
}
bool HttpClient::SetOAuth2Config(const OAuth2Config& config) {
if (oauth_refresh_task_.get() == nullptr) {
return false;
}
if (oauth_refresh_task_->SetOAuth2Config(config)) {
AUTOLOCK(lock, &mu_);
// if disabled by 401 error, could try now with new oauth2 config.
LOG(INFO) << "new oauth2 config: reset enabled_from_=" << enabled_from_
<< " to 0";
enabled_from_ = 0;
return true;
}
return false;
}
string HttpClient::DebugString() const {
AUTOLOCK(lock, &mu_);
std::ostringstream ss;
ss << "Status:" << health_status_ << std::endl;
ss << "Remote host: " << socket_pool_->DestName();
if (!options_.url_path_prefix.empty()) {
ss << " " << options_.url_path_prefix;
}
if (!options_.extra_params.empty()) {
ss << ": " << options_.extra_params;
}
if (!options_.proxy_host_name.empty()) {
ss << " to "
<< "http://" << options_.dest_host_name << ":" << options_.dest_port;
}
ss << std::endl;
ss << "User-Agent: " << kUserAgentString << std::endl;
ss << "SocketPool: " << socket_pool_->DebugString() << std::endl;
if (!options_.http_host_name.empty())
ss << "Host: " << options_.http_host_name << std::endl;
if (!options_.authorization.empty())
ss << "Authorization: enabled" << std::endl;
if (!options_.cookie.empty())
ss << "Cookie: " << options_.cookie << std::endl;
if (options_.oauth2_config.enabled()) {
ss << "OAuth2: enabled";
if (!options_.service_account_json_filename.empty())
ss << " service_account:" << options_.service_account_json_filename;
if (!options_.gce_service_account.empty())
ss << " gce service_account:" << options_.gce_service_account;
ss << std::endl;
}
ss << std::endl;
if (options_.capture_response_header)
ss << "Capture response header: enabled" << std::endl;
ss << std::endl;
ss << "http status:" << std::endl;
for (const auto& iter : num_http_status_code_) {
ss << " " << iter.first << ": " << iter.second
<< " (" << (iter.second * 100.0 / num_query_) << "%)" << std::endl;
}
ss << " Retry: " << num_http_retry_;
if (num_query_ > 0)
ss << " (" << (num_http_retry_ * 100.0 / num_query_) << "%)";
ss << std::endl;
ss << " Timeout: " << num_http_timeout_;
if (num_query_ > 0)
ss << " (" << (num_http_timeout_ * 100.0 / num_query_) << "%)";
ss << std::endl;
ss << " Error: " << num_http_error_;
if (num_query_ > 0)
ss << " (" << (num_http_error_ * 100.0 / num_query_) << "%)";
ss << std::endl;
ss << " Pending: " << total_pending_;
if (num_query_ > 0)
ss << " (" << (total_pending_ * 100.0 / num_query_) << "%)";
ss << " peek " << peak_pending_;
ss << std::endl;
ss << std::endl;
ss << "Backoff: " << retry_backoff_ms_ << "msec" << std::endl;
if (enabled_from_ > 0) {
ss << "Disabled for " << (enabled_from_ - time(nullptr)) << " sec"
<< std::endl;
}
ss << std::endl;
ss << "Write: " << total_write_byte_ << "bytes "
<< num_writable_ << "calls" << std::endl;
ss << "Read: " << total_read_byte_ << "bytes "
<< num_readable_ << "calls "
<< "(" << total_resp_byte_ << "bytes in " << total_resp_time_ << "msec)";
ss << std::endl;
ss << std::endl;
ss << write_size_->DebugString() << std::endl;
ss << read_size_->DebugString() << std::endl;
ss << std::endl;
if (options_.use_ssl) {
ss << "SSL enabled" << std::endl;
ss << "Certificate(s) and CRLs:" << std::endl;
ss << tls_engine_factory_->GetCertsInfo();
} else {
ss << "SSL disabled" << std::endl;
}
ss << std::endl;
ss << "Network: " << std::endl
<< " Error Count: " << num_network_error_ << std::endl
<< " Recovered Count: " << num_network_recovered_ << std::endl;
return ss.str();
}
void HttpClient::DumpToJson(Json::Value* json) const {
AUTOLOCK(lock, &mu_);
(*json)["health_status"] = health_status_;
if (!options_.http_host_name.empty()) {
(*json)["http_host_name"] = options_.http_host_name;
}
if (!options_.url_path_prefix.empty()) {
(*json)["url_path_prefix"] = options_.url_path_prefix;
}
if (!options_.extra_params.empty()) {
(*json)["extra_params"] = options_.extra_params;
}
(*json)["user_agent"] = kUserAgentString;
(*json)["socket_pool"] = socket_pool_->DebugString();
(*json)["authorization"] = (
options_.authorization.empty() ? "none" : "enabled");
(*json)["cookie"] = options_.cookie;
(*json)["oauth2"] = (!options_.oauth2_config.enabled() ? "none" : "enabled");
(*json)["capture_response_header"] = (
options_.capture_response_header ? "enabled" : "disabled");
(*json)["ssl"] = (options_.use_ssl ? "enabled" : "disabled");
if (!options_.ssl_extra_cert.empty()) {
(*json)["ssl_extra_cert"] = options_.ssl_extra_cert;
}
if (!options_.ssl_extra_cert_data.empty()) {
(*json)["ssl_extra_cert_data"] = "set";
}
(*json)["socket_read_timeout_sec"] = options_.socket_read_timeout_sec;
(*json)["num_query"] = num_query_;
(*json)["num_active"] = num_active_;
(*json)["num_http_retry"] = num_http_retry_;
(*json)["num_http_timeout"] = num_http_timeout_;
(*json)["num_http_error"] = num_http_error_;
(*json)["write_byte"] = Json::Int64(total_write_byte_);
(*json)["read_byte"] = Json::Int64(total_read_byte_);
(*json)["num_writable"] = Json::Int64(num_writable_);
(*json)["num_readable"] = Json::Int64(num_readable_);
(*json)["resp_byte"] = Json::Int64(total_resp_byte_);
(*json)["resp_time"] = Json::Int64(total_resp_time_);
{
TrafficHistory::const_reverse_iterator iter = traffic_history_.rbegin();
++iter;
if (iter != traffic_history_.rend()) {
(*json)["read_bps"] = iter->read_byte;
(*json)["write_bps"] = iter->write_byte;
} else {
(*json)["read_bps"] = 0;
(*json)["write_bps"] = 0;
}
}
double byte_max = 0.0;
double q_max = 0.0;
std::vector<double> read_value;
std::vector<double> write_value;
std::vector<double> qps;
std::vector<double> http_err;
for (size_t i = 0; i < kMaxTrafficHistory - traffic_history_.size(); ++i) {
read_value.push_back(-1.0);
write_value.push_back(-1.0);
qps.push_back(-1.0);
http_err.push_back(-1.0);
}
for (TrafficHistory::const_iterator iter = traffic_history_.begin();
iter != traffic_history_.end();
++iter) {
byte_max = std::max<double>(iter->read_byte, byte_max);
read_value.push_back(static_cast<double>(iter->read_byte));
byte_max = std::max<double>(iter->write_byte, byte_max);
write_value.push_back(static_cast<double>(iter->write_byte));
q_max = std::max<double>(iter->query, q_max);
qps.push_back(static_cast<double>(iter->query));
q_max = std::max<double>(iter->http_err, q_max);
http_err.push_back(static_cast<double>(iter->http_err));
}
byte_max = byte_max * 1.1;
q_max = q_max * 1.1;
}
void HttpClient::DumpStatsToProto(HttpRPCStats* stats) const {
AUTOLOCK(lock, &mu_);
stats->set_ping_status_code(ping_http_return_code_);
stats->set_ping_round_trip_time_ms(ping_round_trip_time_ms_);
stats->set_query(num_query_);
stats->set_retry(num_http_retry_);
stats->set_timeout(num_http_timeout_);
stats->set_error(num_http_error_);
stats->set_network_error(num_network_error_);
stats->set_network_recovered(num_network_recovered_);
stats->set_current_pending(num_pending_);
stats->set_peak_pending(peak_pending_);
stats->set_total_pending(total_pending_);
for (const auto& iter : num_http_status_code_) {
HttpRPCStats_HttpStatus* http_status = stats->add_status_code();
http_status->set_status_code(iter.first);
http_status->set_count(iter.second);
}
}
int HttpClient::UpdateHealthStatusMessageForPing(const Status& status,
int round_trip_time) {
LOG(INFO) << "Ping status:"
<< " http_return_code=" << status.http_return_code
<< " throttle_time=" << status.throttle_time
<< " pending_time=" << status.pending_time
<< " req_build_time=" << status.req_build_time
<< " req_send_time=" << status.req_send_time
<< " wait_time=" << status.wait_time
<< " resp_recv_time=" << status.resp_recv_time
<< " resp_parse_time=" << status.resp_parse_time
<< " round_trip_time=" << round_trip_time;
AUTOLOCK(lock, &mu_);
AddStatusCodeHistoryUnlocked(status.http_return_code);
if (shutting_down_) {
health_status_ = "shutting down";
ping_http_return_code_ = 0;
return ping_http_return_code_;
}
// Under race condition of initial ping, good ping status could be
// overridden by bad ping status. (b/26701852)
if (ping_http_return_code_ == 200 && status.http_return_code != 200) {
LOG(INFO) << "We do not update status with bad status."
<< " ping_http_return_code_=" << ping_http_return_code_
<< " status.http_return_code=" << status.http_return_code;
return ping_http_return_code_;
}
if (!status.finished) {
health_status_ = "error: ping no response";
ping_http_return_code_ = 408; // status timeout.
return ping_http_return_code_;
}
if (!status.connect_success) {
health_status_ = "error: failed to connect to backend servers";
ping_http_return_code_ = 0;
return ping_http_return_code_;
}
if (status.err == ERR_TIMEOUT) {
health_status_ = "error: timed out to send request to backend servers";
ping_http_return_code_ = 408;
return ping_http_return_code_;
}
ping_http_return_code_ = status.http_return_code;
if (round_trip_time > 0)
ping_round_trip_time_ms_ = round_trip_time;
const string running = options_.fail_fast ? "error:" : "running:";
if (status.http_return_code != 200) {
int status_code = status.http_return_code;
enabled_from_ =
CalculateEnabledFrom(status.http_return_code, enabled_from_);
if (IsFatalNetworkErrorCode(status.http_return_code)) {
NetworkErrorDetectedUnlocked();
}
if (status.http_return_code == 401) {
// TODO: make it error, so goma_ctl abort "start"?
health_status_ = running + " access to backend servers was rejected.";
} else if (status.http_return_code == 302
|| status.http_return_code == 403) {
std::ostringstream ss;
ss << running << " access to backend servers was blocked:"
<< status.http_return_code;
health_status_ = ss.str();
} else if (status.http_return_code == 0 && status.err < 0) {
health_status_ = running + " failed to send request to backend servers";
status_code = 500;
} else {
std::ostringstream ss;
ss << running << " access to backend servers was failed:"
<< status.http_return_code;
health_status_ = ss.str();
}
return status_code;
}
health_status_ = "ok";
return status.http_return_code;
}
double HttpClient::EstimatedRecvTime(size_t bytes) {
AUTOLOCK(lock, &mu_);
double t = 0.0;
// total_resp_time_ is millisec.
if (total_resp_byte_ > 0) {
t += bytes * (static_cast<double>(total_resp_time_) /
(1000.0 * total_resp_byte_));
}
return t;
}
/* static */
int HttpClient::BackoffMsec(
const Options& options, int prev_backoff_msec, bool in_error) {
// Multiply factor used in chromium.
// URLRequestThrottlerEntry::kDefaultMultiplyFactor
// in net/url_request/url_request_throttler_entry.cc
const double kBackoffBase = 1.4;
CHECK_GT(prev_backoff_msec, 0);
double uncapped_backoff = static_cast<double>(prev_backoff_msec);
if (in_error) {
uncapped_backoff *= kBackoffBase;
return static_cast<int>(
std::min<double>(uncapped_backoff, options.max_retry_backoff_ms));
}
uncapped_backoff /= kBackoffBase;
return static_cast<int>(
std::max<double>(uncapped_backoff, options.min_retry_backoff_ms));
}
void HttpClient::UpdateBackoffUnlocked(bool in_error) {
const int orig_backoff = retry_backoff_ms_;
CHECK_GT(retry_backoff_ms_, 0);
retry_backoff_ms_ = BackoffMsec(options_, retry_backoff_ms_, in_error);
if (in_error) {
LOG(INFO) << "UpdateBackoff error "
<< orig_backoff << " -> " << retry_backoff_ms_;
} else {
VLOG(2) << "UpdateBackoff ok "
<< orig_backoff << " -> " << retry_backoff_ms_;
}
}
string HttpClient::GetOAuth2Authorization() const {
if (!oauth_refresh_task_.get()) {
return "";
}
// TODO: disable http on error.
return oauth_refresh_task_->GetAuthorization();
}
bool HttpClient::ShouldRefreshOAuth2AccessToken() const {
if (!oauth_refresh_task_.get()) {
return false;
}
return oauth_refresh_task_->ShouldRefresh();
}
void HttpClient::RunAfterOAuth2AccessTokenGetReady(
WorkerThreadManager::ThreadId thread_id, OneshotClosure* closure) {
CHECK(oauth_refresh_task_.get());
oauth_refresh_task_->RunAfterRefresh(thread_id, closure);
}
// Randomizes backoff by subtracting 40%, so it returns
// [backoff_ms*0.6, backoff_ms].
int RandomizeBackoff(int backoff_ms) {
const double kRandomizedRatio = 0.4;
int randomize_backoff = static_cast<int>(
static_cast<double>(backoff_ms) * kRandomizedRatio);
if (randomize_backoff == 0)
randomize_backoff = 1;
backoff_ms -= (rand() % (randomize_backoff + 1));
return std::max(1, backoff_ms);
}
int HttpClient::GetRandomizeBackoffTimeInMs() {
return RandomizeBackoff(retry_backoff_ms_);
}
int HttpClient::TryStart() {
AUTOLOCK(lock, &mu_);
if ((traffic_history_.back().http_err > 0 ||
traffic_history_.back().query >= kMaxQPS) &&
options_.allow_throttle) {
LOG(WARNING) << "Throttled. queries=" << traffic_history_.back().query
<< " err=" << traffic_history_.back().http_err
<< " retry_backoff_ms=" << retry_backoff_ms_;
return GetRandomizeBackoffTimeInMs();
}
++num_query_;
++traffic_history_.back().query;
return 0;
}
void HttpClient::IncNumActive() {
AUTOLOCK(lock, &mu_);
++num_active_;
}
void HttpClient::DecNumActive() {
AUTOLOCK(lock, &mu_);
--num_active_;
DCHECK_GE(num_active_, 0);
if (num_active_ == 0)
cond_.Signal();
}
void HttpClient::WaitNoActive() {
AUTOLOCK(lock, &mu_);
while (num_active_ > 0)
cond_.Wait(&mu_);
}
void HttpClient::IncNumPending() {
AUTOLOCK(lock, &mu_);
++num_pending_;
++total_pending_;
peak_pending_ = std::max(peak_pending_, num_pending_);
}
void HttpClient::DecNumPending() {
AUTOLOCK(lock, &mu_);
--num_pending_;
DCHECK_GE(num_pending_, 0);
}
void HttpClient::IncReadByte(int n) {
AUTOLOCK(lock, &mu_);
traffic_history_.back().read_byte += n;
total_read_byte_ += n;
++num_readable_;
read_size_->Add(n);
}
void HttpClient::IncWriteByte(int n) {
AUTOLOCK(lock, &mu_);
traffic_history_.back().write_byte += n;
total_write_byte_ += n;
++num_writable_;
write_size_->Add(n);
}
void HttpClient::UpdateStats(const Status& status) {
AUTOLOCK(lock, &mu_);
AddStatusCodeHistoryUnlocked(status.http_return_code);
++num_http_status_code_[status.http_return_code];
if (status.err != OK) {
UpdateBackoffUnlocked(true);
if (status.err == ERR_TIMEOUT) {
++num_http_timeout_;
if (status.timeout_should_be_http_error) {
++traffic_history_.back().http_err;
}
} else {
++num_http_error_;
if (status.err == FAIL && status.http_return_code == 408) {
if (status.timeout_should_be_http_error) {
++traffic_history_.back().http_err;
}
} else {
++traffic_history_.back().http_err;
}
}
} else {
UpdateBackoffUnlocked(false);
}
enabled_from_ = CalculateEnabledFrom(status.http_return_code, enabled_from_);
if (IsFatalNetworkErrorCode(status.http_return_code)) {
NetworkErrorDetectedUnlocked();
}
num_http_retry_ += status.num_retry;
total_resp_byte_ += status.resp_size;
total_resp_time_ += status.resp_recv_time;
// clear network_error_started_time_ in 2xx response.
if (status.http_return_code / 100 == 2) {
NetworkRecoveredUnlocked();
}
}
void HttpClient::UpdateTrafficHistory() {
AUTOLOCK(lock, &mu_);
if (!shutting_down_) {
if (traffic_history_.back().query > 0 && total_resp_time_ > 0) {
if (traffic_history_.back().http_err == 0) {
if (health_status_ != "ok") {
LOG(INFO) << "Update health status:" << health_status_ << " to ok";
}
health_status_ = "ok";
} else {
const string running = options_.fail_fast ? "error:" : "running:";
if (health_status_ == "ok") {
LOG(WARNING) << "Update health status: ok to "
<< running
<< " had some http errors from backend servers";
}
health_status_ = running + " had some http errors from backend servers";
}
}
}
traffic_history_.push_back(TrafficStat());
if (traffic_history_.size() >= kMaxTrafficHistory) {
traffic_history_.pop_front();
}
}
void HttpClient::NetworkErrorDetectedUnlocked() {
// set network error started time if it is not set.
time_t now = time(nullptr);
if (!network_error_status_.OnNetworkErrorDetected(now)) {
LOG(INFO) << "Network error continues from "
<< network_error_status_.NetworkErrorStartedTime();
return;
}
LOG(INFO) << "Network error started: time=" << now;
++num_network_error_;
if (monitor_.get())
monitor_->OnNetworkErrorDetected();
}
void HttpClient::NetworkRecoveredUnlocked() {
time_t now = time(nullptr);
time_t network_error_started_time =
network_error_status_.NetworkErrorStartedTime();
if (!network_error_status_.OnNetworkRecovered(now)) {
LOG_IF(INFO, network_error_started_time > 0)
<< "Waiting network recover until "
<< network_error_status_.NetworkErrorUntil();
return;
}
LOG(INFO) << "Network recovered"
<< " started=" << network_error_started_time
<< " recovered=" << now
<< " duration=" << (now - network_error_started_time);
++num_network_recovered_;
if (monitor_.get())
monitor_->OnNetworkRecovered();
}
void HttpClient::SetMonitor(
std::unique_ptr<HttpClient::NetworkErrorMonitor> monitor) {
AUTOLOCK(lock, &mu_);
monitor_ = std::move(monitor);
}
time_t HttpClient::NetworkErrorStartedTime() const {
AUTOLOCK(lock, &mu_);
return network_error_status_.NetworkErrorStartedTime();
}
HttpClient::Request::Request()
: content_type_("application/octet-stream") {
}
HttpClient::Request::~Request() {
}
void HttpClient::Request::Init(
const string& method, const string& path,
const HttpClient::Options& options) {
SetMethod(method);
SetRequestPath(options.RequestURL(path));
SetHost(options.Host());
if (!options.authorization.empty()) {
SetAuthorization(options.authorization);
}
if (!options.cookie.empty()) {
SetCookie(options.cookie);
}
}
void HttpClient::Request::SetMethod(const string& method) {
method_ = method;
}
void HttpClient::Request::SetRequestPath(const string& path) {
request_path_ = path;
}
void HttpClient::Request::SetHost(const string& host) {
host_ = host;
}
void HttpClient::Request::SetContentType(const string& content_type) {
content_type_ = content_type;
}
void HttpClient::Request::SetAuthorization(const string& authorization) {
authorization_ = authorization;
}
void HttpClient::Request::SetCookie(const string& cookie) {
cookie_ = cookie;
}
void HttpClient::Request::AddHeader(const string& key, const string& value) {
headers_.push_back(CreateHeader(key, value));
}
/* static */
string HttpClient::Request::CreateHeader(
absl::string_view key, absl::string_view value) {
std::ostringstream line;
line << key << ": " << value;
return line.str();
}
string HttpClient::Request::BuildHeader(
const std::vector<string>& headers,
int content_length) const {
std::ostringstream msg;
msg << method_ << " " << request_path_ << " HTTP/1.1\r\n";
if (host_ != "") {
msg << kHost << ": " << host_ << "\r\n";
}
msg << kUserAgent << ": " << kUserAgentString << "\r\n";
msg << kContentType << ": " << content_type_ << "\r\n";
if (content_length >= 0) {
msg << kContentLength << ": " << content_length << "\r\n";
}
if (authorization_ != "") {
msg << kAuthorization << ": " << authorization_ << "\r\n";
}
if (cookie_ != "") {
msg << kCookie << ": " << cookie_ << "\r\n";
}
bool chunked = false;
for (const auto& header : headers_) {
msg << header << "\r\n";
if (absl::StartsWith(header, absl::StrCat(kTransferEncoding, ":")) &&
absl::StrContains(header, "chunked")) {
chunked = true;
}
}
for (const auto& header : headers) {
msg << header << "\r\n";
if (absl::StartsWith(header, absl::StrCat(kTransferEncoding, ":")) &&
absl::StrContains(header, "chunked")) {
chunked = true;
}
}
if (content_length < 0) {
CHECK(chunked) << "content-length is not give, but not chunked encoding";
}
// TODO: request_stream_ should provide chunked-body.
msg << "\r\n";
VLOG(1) << "request\n" << msg.str();
return msg.str();
}
HttpRequest::HttpRequest() {
}
HttpRequest::~HttpRequest() {
}
void HttpRequest::SetBody(const string& body) {
body_ = body;
}
std::unique_ptr<google::protobuf::io::ZeroCopyInputStream>
HttpRequest::NewStream() const {
std::vector<std::unique_ptr<google::protobuf::io::ZeroCopyInputStream>> s;
s.reserve(2);
s.push_back(absl::make_unique<StringInputStream>(
BuildHeader(std::vector<string>(), body_.size())));
s.push_back(absl::make_unique<google::protobuf::io::ArrayInputStream>(
body_.data(), body_.size()));
return absl::make_unique<ChainedInputStream>(std::move(s));
}
// GetConentEncoding reports EncodingType specified in header.
// not http_util because it depends lib/compress_util EncodingType.
static EncodingType GetContentEncoding(absl::string_view header) {
absl::string_view content_encoding =
ExtractHeaderField(header, kContentEncoding);
return GetEncodingFromHeader(content_encoding);
}
HttpClient::Response::Response()
// to initialize in class definition, http.h needs to include
// scoped_fd.h for FAIL.
: result_(FAIL) {
}
HttpClient::Response::~Response() {
}
void HttpClient::Response::SetRequestPath(const string& path) {
request_path_ = path;
}
void HttpClient::Response::SetTraceId(const string& trace_id) {
trace_id_ = trace_id;
}
void HttpClient::Response::Reset() {
result_ = FAIL;
len_ = 0;
body_offset_ = 0;
status_code_ = 0;
body_ = nullptr;
}
bool HttpClient::Response::HasHeader() const {
return body_offset_ > 0;
}
absl::string_view HttpClient::Response::Header() const {
if (body_offset_ > 0) {
return absl::string_view(buffer_.data(), body_offset_);
}
absl::string_view::size_type header_size = buffer_.find("\r\n\r\n");
if (header_size == string::npos) {
header_size = len_;
}
return absl::string_view(buffer_.data(), header_size);
}
void HttpClient::Response::Buffer(char** buf, int* buf_size) {
if (!body_) {
*buf_size = buffer_.size() - len_;
if (*buf_size < kNetworkBufSize / 2) {
buffer_.resize(buffer_.size() + kNetworkBufSize);
}
*buf = &buffer_[len_];
*buf_size = buffer_.size() - len_;
} else {
body_->Next(buf, buf_size);
}
CHECK_GT(*buf_size, 0)
<< " response len=" << len_
<< " size=" << buffer_.size()
<< " body_offset=" << body_offset_;
}
bool HttpClient::Response::Recv(int r) {
if (body_) {
return BodyRecv(r);
}
// header
if (r == 0) { // EOF
LOG(WARNING) << trace_id_ <<
" not received a header but connection closed by a peer.";
err_message_ = "connection closed before receiving a header.";
result_ = FAIL;
body_offset_ = len_;
return true;
}
len_ += r;
absl::string_view resp(buffer_.data(), len_);
size_t content_length = string::npos;
bool is_chunked = false;
if (!ParseHttpResponse(resp, &status_code_, &body_offset_,
&content_length,
&is_chunked)) {
// still reading header.
return false;
}
VLOG(2) << "header ready " << status_code_
<< " offset=" << body_offset_
<< " content_length=" << content_length
<< " is_chunked=" << is_chunked
<< " len=" << len_;
// Apiary returns 204 No Content for SaveLog.
if (status_code_ == 204 && body_offset_ == len_) {
// Go to next step quickly since Status 204 has nothing to parse.
result_ = OK;
return true;
}
if (status_code_ != 200) {
// heder found and error code.
LOG(WARNING) << trace_id_ << " read "
<< " http=" << status_code_
<< " path=" << request_path_
<< " Details:" << resp;
std::ostringstream err;
err << "Got HTTP error:" << status_code_;
err_message_ = err.str();
result_ = FAIL;
return true;
}
if (body_offset_ == len_ && content_length == 0) {
// nothing to parse for body.
result_ = OK;
return true;
}
EncodingType encoding = GetContentEncoding(Header());
body_ = NewBody(content_length, is_chunked, encoding);
if (body_offset_ < len_) {
// header buffer_ has head of body.
absl::string_view body(buffer_.data(), len_);
body.remove_prefix(body_offset_);
VLOG(3) << trace_id_ << " body " << body.size() << " after header";
do {
char* buf = nullptr;
int buf_size = 0;
body_->Next(&buf, &buf_size);
if (body.size() <= buf_size) {
buf_size = body.size();
}
memcpy(buf, body.data(), buf_size);
body.remove_prefix(buf_size);
if (BodyRecv(buf_size)) {
return true;
}
} while (!body.empty());
}
return false;
}
bool HttpClient::Response::BodyRecv(int r) {
VLOG(3) << trace_id_ << " body receive=" << r;
switch (body_->Process(r)) {
case Body::State::Error:
if (r == 0) {
LOG(WARNING) << trace_id_
<< " connection closed before receiving all data at "
<< body_->ByteCount();
err_message_ = "connection closed before receiving all data.";
result_ = FAIL;
return true;
}
LOG(WARNING) << trace_id_
<< " body receive failed @" << body_->ByteCount()
<< " size=" << r;
err_message_ = "body receive failed";
result_ = FAIL;
return true;
case Body::State::Ok:
CHECK_GE(r, 0);
VLOG(3) << trace_id_ << " received full content";
return true;
case Body::State::Incomplete:
CHECK_GT(r, 0);
VLOG(3) << trace_id_ << " need more data";
return false;
}
}
bool HttpClient::Response::HasConnectionClose() const {
return ExtractHeaderField(Header(), kConnection) == "close";
}
void HttpClient::Response::Parse() {
if (result_ == OK) {
return;
}
if (!err_message_.empty()) {
return;
}
if (!body_) {
return;
}
ParseBody();
}
HttpResponse::Body::Body(size_t content_length,
bool is_chunked,
EncodingType encoding_type)
: content_length_(content_length),
encoding_type_(encoding_type) {
if (is_chunked) {
chunk_parser_ = absl::make_unique<HttpChunkParser>();
}
}
void HttpResponse::Body::Next(char** buf, int* buf_size) {
size_t allocated = buffer_.size() * kNetworkBufSize;
if (len_ == allocated) {
VLOG(3) << "allocate resp body buffer len=" << len_;
buffer_.emplace_back(absl::make_unique<char[]>(kNetworkBufSize));
allocated += kNetworkBufSize;
}
*buf_size = allocated - len_;
*buf = buffer_.back().get() + len_ % kNetworkBufSize;
CHECK_GT(*buf_size, 0)
<< " body len=" << len_
<< " allocated=" << allocated;
}
HttpClient::Response::Body::State
HttpResponse::Body::Process(int data_size) {
VLOG(3) << "body process " << data_size
<< " len=" << len_
<< " content_length=" << content_length_
<< " is_chunked=" << (chunk_parser_ ? true : false);
if (data_size < 0) {
return State::Error;
}
if (data_size == 0) { // EOF
if (!chunk_parser_) {
if (content_length_ == string::npos) {
VLOG(3) << "content finished with EOF";
return State::Ok;
}
if (content_length_ == len_) {
// empty body's case
VLOG(3) << "empty content";
return State::Ok;
}
}
VLOG(3) << "unexpected EOF at " << len_;
return State::Error;
}
DCHECK_LE(data_size, kNetworkBufSize);
CHECK_LE(len_ + data_size, buffer_.size() * kNetworkBufSize);
absl::string_view data(buffer_.back().get() + len_ % kNetworkBufSize,
data_size);
len_ += data_size;
if (chunk_parser_) {
if (!chunk_parser_->Parse(data, &chunks_)) {
VLOG(3) << "failed to parse chunk";
return State::Error;
}
if (!chunk_parser_->done()) {
VLOG(3) << "chunk not fully received yet";
return State::Incomplete;
}
VLOG(3) << "all chunk finished";
return State::Ok;
}
chunks_.emplace_back(data);
if (content_length_ == string::npos) {
// read until EOF.
return State::Incomplete;
}
if (len_ > content_length_) {
LOG(WARNING) << "received extra data?? len=" << len_
<< " content_length=" << content_length_;
return State::Error;
}
if (len_ == content_length_) {
VLOG(3) << "content finished at " << content_length_;
return State::Ok;
}
return State::Incomplete;
}
std::unique_ptr<google::protobuf::io::ZeroCopyInputStream>
HttpResponse::Body::ParsedStream() const {
std::vector<std::unique_ptr<google::protobuf::io::ZeroCopyInputStream>>
chunk_streams;
for (const auto& chunk : chunks_) {
chunk_streams.push_back(
absl::make_unique<google::protobuf::io::ArrayInputStream>(
chunk.data(), chunk.size()));
}
std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> input
= absl::make_unique<ChainedInputStream>(std::move(chunk_streams));
switch (encoding_type_) {
case ENCODING_DEFLATE:
return absl::make_unique<InflateInputStream>(std::move(input));
case ENCODING_LZMA2:
#ifdef ENABLE_LZMA
return absl::make_unique<LZMAInputStream>(std::move(input));
#else
LOG(WARNING) << "unsuported encoding: lzma2. need ENABLE_LZMA";
return nullptr;
#endif
default:
VLOG(1) << "encoding: not specified";
break;
}
return input;
}
HttpResponse::HttpResponse() {
}
HttpResponse::~HttpResponse() {
}
HttpClient::Response::Body* HttpResponse::NewBody(
size_t content_length, bool is_chunked, EncodingType encoding_type) {
response_body_ =
absl::make_unique<Body>(content_length, is_chunked, encoding_type);
return response_body_.get();
}
void HttpResponse::ParseBody() {
std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> input =
response_body_->ParsedStream();
if (input == nullptr) {
err_message_ = "failed to create parsed stream";
result_ = FAIL;
return;
}
std::ostringstream ss;
const void* buffer;
int size;
while (input->Next(&buffer, &size)) {
ss.write(static_cast<const char*>(buffer), size);
}
parsed_body_ = ss.str();
result_ = OK;
}
bool HttpClient::NetworkErrorStatus::OnNetworkErrorDetected(
time_t now) {
if (error_started_time_ > 0) {
error_until_ = now + error_recover_margin_;
return false;
}
error_started_time_ = now;
error_until_ = now + error_recover_margin_;
return true;
}
bool HttpClient::NetworkErrorStatus::OnNetworkRecovered(
time_t now) {
if (error_started_time_ == 0)
return false;
// We don't consider the network is recovered until error_until_.
if (now < error_until_) {
return false;
}
// Here, we consider the network error is really recovered.
error_started_time_ = 0;
error_until_ = 0;
return true;
}
StringInputStream::StringInputStream(string data)
: data_(std::move(data)),
stream_(absl::make_unique<google::protobuf::io::ArrayInputStream>(
data_.data(), data_.size())) {
}
ChainedInputStream::ChainedInputStream(
std::vector<std::unique_ptr<google::protobuf::io::ZeroCopyInputStream>>
streams)
: streams_(std::move(streams)) {
streams_array_.reserve(streams_.size());
for (const auto& s : streams_) {
streams_array_.push_back(s.get());
}
stream_ = absl::make_unique<
google::protobuf::io::ConcatenatingInputStream>(
streams_array_.data(), streams_array_.size());
}
} // namespace devtools_goma