blob: 94772e289d13f3bdf665cc6e612cf9dd3bd09a68 [file] [log] [blame]
// Copyright 2011 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "compile_task.h"
#ifndef _WIN32
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#endif
#include <algorithm>
#include <iomanip>
#include <memory>
#include <sstream>
#include <unordered_map>
#include <utility>
#include <google/protobuf/text_format.h>
#include <json/json.h>
#include "absl/base/call_once.h"
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/time/clock.h"
#include "autolock_timer.h"
#include "callback.h"
#include "clang_tidy_flags.h"
#include "compilation_database_reader.h"
#include "compile_service.h"
#include "compile_stats.h"
#include "compiler_flag_type_specific.h"
#include "compiler_flags.h"
#include "compiler_flags_parser.h"
#include "compiler_flags_util.h"
#include "compiler_info.h"
#include "compiler_proxy_info.h"
#include "compiler_specific.h"
#include "compiler_type_specific_collection.h"
#include "cxx/include_processor/cpp_include_processor.h"
#include "cxx/include_processor/include_file_utils.h"
#include "file_dir.h"
#include "file_hash_cache.h"
#include "file_helper.h"
#include "filesystem.h"
#include "flat_map.h"
#include "gcc_flags.h"
#include "glog/logging.h"
#include "glog/stl_logging.h"
#include "goma_blob.h"
#include "goma_data_util.h"
#include "goma_file.h"
#include "goma_file_dump.h"
#include "goma_file_http.h"
#include "http_rpc.h"
#include "ioutil.h"
#include "java/jar_parser.h"
#include "java_flags.h"
#include "linker/linker_input_processor/linker_input_processor.h"
#include "linker/linker_input_processor/thinlto_import_processor.h"
#include "local_output_cache.h"
#include "lockhelper.h"
#include "multi_http_rpc.h"
#include "mypath.h"
#include "path.h"
#include "path_resolver.h"
#include "path_util.h"
#include "simple_timer.h"
#include "subprocess_task.h"
#include "util.h"
#include "vc_flags.h"
#include "worker_thread_manager.h"
MSVC_PUSH_DISABLE_WARNING_FOR_PROTO()
#include "prototmp/goma_data.pb.h"
#include "prototmp/subprocess.pb.h"
MSVC_POP_WARNING()
#ifdef _WIN32
# include "posix_helper_win.h"
#endif
namespace devtools_goma {
static const int kMaxExecRetry = 4;
static string GetLastErrorMessage() {
char error_message[1024];
#ifndef _WIN32
// Meaning of returned value of strerror_r is different between
// XSI and GNU. Need to ignore.
(void)strerror_r(errno, error_message, sizeof(error_message));
#else
FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM, 0, GetLastError(), 0,
error_message, sizeof error_message, 0);
#endif
return error_message;
}
static bool IsFatalError(ExecResp::ExecError error_code) {
return error_code == ExecResp::BAD_REQUEST;
}
static void DumpSubprograms(
const google::protobuf::RepeatedPtrField<SubprogramSpec>& subprogram_specs,
std::ostringstream* ss) {
for (int i = 0; i < subprogram_specs.size(); ++i) {
const SubprogramSpec& spec = subprogram_specs.Get(i);
if (i > 0)
*ss << ", ";
*ss << "path=" << spec.path() << " hash=" << spec.binary_hash();
}
}
static void LogCompilerOutput(
const string& trace_id, const string& name, absl::string_view out) {
LOG(INFO) << trace_id << " " << name << ": size=" << out.size();
static const int kMaxLines = 32;
static const size_t kMaxCols = 200;
static const char* kClExeShowIncludePrefix = "Note: including file:";
if (out.size() == 0)
return;
if (out.size() < kMaxCols) {
LOG(INFO) << trace_id << " " << name << ":" << out;
return;
}
for (int i = 0; out.size() > 0 && i < kMaxLines;) {
size_t end = out.find_first_of("\r\n");
absl::string_view line;
if (end == string::npos) {
line = out;
out = absl::string_view();
} else if (end == 0) {
out.remove_prefix(1);
continue;
} else {
line = out.substr(0, end);
out.remove_prefix(end + 1);
}
if (line.size() == 0)
continue;
if (absl::StartsWith(line, kClExeShowIncludePrefix))
continue;
size_t found = line.find("error");
if (found == string::npos)
found = line.find("warning");
if (found != string::npos) {
++i;
if (line.size() > kMaxCols) {
LOG(INFO) << trace_id << " " << name << ":"
<< line.substr(0, kMaxCols) << "...";
} else {
LOG(INFO) << trace_id << " " << name << ":" << line;
}
}
}
}
static void ReleaseMemoryForExecReqInput(ExecReq* req) {
ExecReq new_req;
new_req.Swap(req);
new_req.clear_input();
*req = new_req;
}
absl::once_flag CompileTask::init_once_;
Lock CompileTask::global_mu_;
std::deque<CompileTask*>* CompileTask::link_file_req_tasks_ = nullptr;
static string CreateCommandVersionString(const CommandSpec& spec) {
return spec.name() + ' ' + spec.version() + " (" + spec.binary_hash() + ")";
}
static string StateName(CompileTask::State state) {
static const char* names[] = {
"INIT",
"SETUP",
"FILE_REQ",
"CALL_EXEC",
"LOCAL_OUTPUT",
"FILE_RESP",
"FINISHED",
"LOCAL_RUN",
"LOCAL_FINISHED",
};
static_assert(CompileTask::NUM_STATE == arraysize(names),
"CompileTask::NUM_STATE and arraysize(names) is not matched");
CHECK_GE(state, 0);
CHECK_LT(state, CompileTask::NUM_STATE);
return names[state];
}
template <typename Iter>
static void NormalizeSystemIncludePaths(const string& home, const string& cwd,
Iter path_begin, Iter path_end) {
if (home.empty())
return;
for (Iter it = path_begin; it != path_end; ++it) {
if (HasPrefixDir(*it, home)) {
it->assign(PathResolver::WeakRelativePath(*it, cwd));
}
}
}
// Returns true if |buf| is bigobj format header.
// |buf| should contain 32 byte at least.
static bool IsBigobjFormat(const unsigned char* buf) {
static const unsigned char kV1UUID[16] = {
0x38, 0xFE, 0xB3, 0x0C, 0xA5, 0xD9, 0xAB, 0x4D,
0xAC, 0x9B, 0xD6, 0xB6, 0x22, 0x26, 0x53, 0xC2,
};
static const unsigned char kV2UUID[16] = {
0xC7, 0xA1, 0xBA, 0xD1, 0xEE, 0xBA, 0xA9, 0x4B,
0xAF, 0x20, 0xFA, 0xF6, 0x6A, 0xA4, 0xDC, 0xB8
};
if (*reinterpret_cast<const unsigned short*>(buf) != 0)
return false;
if (*reinterpret_cast<const unsigned short*>(buf + 2) != 0xFFFF)
return false;
// UUID can be different by bigobj version.
const unsigned char* uuid = nullptr;
if (*reinterpret_cast<const unsigned short*>(buf + 4) == 0x0001) {
uuid = kV1UUID;
} else if (*reinterpret_cast<const unsigned short*>(buf + 4) == 0x0002) {
uuid = kV2UUID;
} else {
// Unknown bigobj version
return false;
}
unsigned short magic = *reinterpret_cast<const unsigned short*>(buf + 6);
if (!(magic == 0x014C || magic == 0x8664))
return false;
for (int i = 0; i < 16; ++i) {
if (buf[12 + i] != uuid[i])
return false;
}
return true;
}
class CompileTask::InputFileTask {
public:
// Gets InputFileTask for the filename.
// If an InputFileTask for the same filename already exists, use the same
// InputFileTask.
static InputFileTask* NewInputFileTask(
WorkerThreadManager* wm,
std::unique_ptr<BlobClient::Uploader> blob_uploader,
FileHashCache* file_hash_cache,
const FileStat& file_stat,
const string& filename,
bool missed_content,
bool linking,
bool is_new_file,
const string& old_hash_key,
CompileTask* task,
ExecReq_Input* input) {
DCHECK(file::IsAbsolutePath(filename)) << filename;
absl::call_once(init_once_,
&CompileTask::InputFileTask::InitializeStaticOnce);
InputFileTask* input_file_task = nullptr;
{
AUTOLOCK(lock, &global_mu_);
std::pair<std::unordered_map<string, InputFileTask*>::iterator, bool> p =
task_by_filename_->insert(std::make_pair(filename, input_file_task));
if (p.second) {
p.first->second = new InputFileTask(
wm, std::move(blob_uploader), file_hash_cache, file_stat,
filename, missed_content, linking, is_new_file, old_hash_key);
}
input_file_task = p.first->second;
DCHECK(input_file_task != nullptr);
input_file_task->SetTaskInput(task, input);
}
DCHECK_GT(input_file_task->num_tasks(), 0U);
VLOG(1) << task->trace_id_ << " start input "
<< task->num_input_file_task_ << " " << filename;
task->StartInputFileTask();
return input_file_task;
}
void Run(CompileTask* task, OneshotClosure* closure) {
WorkerThreadManager::ThreadId thread_id = task->thread_id_;
{
AUTOLOCK(lock, &mu_);
switch (state_) {
case INIT: // first run.
state_ = RUN;
break;
case RUN:
VLOG(1) << task->trace_id() << " input running ("
<< tasks_.size() << " tasks)";
callbacks_.emplace_back(thread_id, closure);
return;
case DONE:
VLOG(1) << task->trace_id() << " input done";
wm_->RunClosureInThread(FROM_HERE, thread_id, closure,
WorkerThreadManager::PRIORITY_LOW);
return;
}
}
if (missed_content_) {
LOG(INFO) << task->trace_id() << " (" << num_tasks() << " tasks)"
<< " input " << filename_ << " [missed content]";
} else {
VLOG(1) << task->trace_id() << " (" << num_tasks() << " tasks)"
<< " input " << filename_;
}
bool uploaded_in_side_channel = false;
// TODO: use string_view in file_hash_cache methods.
string hash_key = old_hash_key_;
if (need_to_compute_key()) {
VLOG(1) << task->trace_id()
<< " (" << num_tasks() << " tasks)"
<< " compute hash key:" << filename_
<< " size:" << file_stat_.size;
success_ = blob_uploader_->ComputeKey();
if (success_) {
hash_key = blob_uploader_->hash_key();
new_cache_key_ = !file_hash_cache_->IsKnownCacheKey(hash_key);
}
}
if (need_to_upload_content(hash_key)) {
if (need_hash_only_ || file_stat_.size > 2*1024*1024) {
// upload in side channel.
LOG(INFO) << task->trace_id()
<< "(" << num_tasks() << " tasks)"
<< " upload:" << filename_
<< " size:" << file_stat_.size
<< " reason:" << upload_reason(hash_key);
success_ = blob_uploader_->Upload();
if (success_) {
uploaded_in_side_channel = true;
}
} else {
// upload embedded.
LOG(INFO) << task->trace_id()
<< " (" << num_tasks() << " tasks)"
<< " embed:" << filename_
<< " size:" << file_stat_.size
<< " reason:" << upload_reason(hash_key);
success_ = blob_uploader_->Embed();
}
} else if (file_stat_.size < 512) {
// For small size of file blob, embed it even if the copmile task
// requested hash key only.
LOG(INFO) << task->trace_id()
<< " (" << num_tasks() << " tasks)"
<< " embed:" << filename_
<< " size:" << file_stat_.size
<< " reason:small";
need_hash_only_ = false;
success_ = blob_uploader_->Embed();
} else {
VLOG(1) << task->trace_id()
<< " (" << num_tasks() << " tasks)"
<< " hash only:" << filename_
<< " size:" << file_stat_.size
<< " missed_content:" << missed_content_
<< " is_new_file:" << is_new_file_
<< " new_cache_key:" << new_cache_key_
<< " success:" << success_;
}
if (!success_) {
LOG(WARNING) << task->trace_id()
<< " (" << num_tasks() << " tasks)"
<< " input file failed:" << filename_;
} else {
hash_key = blob_uploader_->hash_key();
CHECK(!hash_key.empty())
<< task->trace_id()
<< " (" << num_tasks() << " tasks)"
<< " no hash key?" << filename_;
// Stores file cache key only if we have already uploaded the blob
// in side channel, or we assume the blob has already been uploaded
// since it's old enough.
// When we decide to upload the blob by embedding it to the request,
// we have to store file cache key after the compile request without no
// missing inputs error. If missing inputs error happens, it's safer to
// resend the blob since we might send the second request to
// the different cluster. That cluster might not have the cache.
// If blob is old enough, we assume that the file has already been
// uploaded. In that case, we register file hash id to
// |file_hash_cache_|.
// See b/11261931
// b/12087209
if (uploaded_in_side_channel || !is_new_file_) {
// Set upload_timestamp_ms only if we have uploaded the content.
absl::optional<absl::Time> upload_timestamp_ms;
if (uploaded_in_side_channel) {
upload_timestamp_ms = absl::Now();
}
new_cache_key_ = file_hash_cache_->StoreFileCacheKey(
filename_, hash_key, upload_timestamp_ms, file_stat_);
VLOG(1) << task->trace_id() << " (" << num_tasks() << " tasks)"
<< " input file ok: " << filename_
<< (uploaded_in_side_channel ? " upload" : " hash only");
} else {
VLOG(1) << task->trace_id() << " (" << num_tasks() << " tasks)"
<< " input file ok: " << filename_
<< (new_cache_key_ ? " embedded upload"
: " already uploaded");
}
}
{
AUTOLOCK(lock, &global_mu_);
std::unordered_map<string, InputFileTask*>::iterator found =
task_by_filename_->find(filename_);
DCHECK(found != task_by_filename_->end());
DCHECK(found->second == this);
task_by_filename_->erase(found);
VLOG(1) << task->trace_id() << " (" << num_tasks() << " tasks)"
<< " clear task by filename" << filename_;
}
std::vector<std::pair<WorkerThreadManager::ThreadId,
OneshotClosure*>> callbacks;
{
AUTOLOCK(lock, &mu_);
DCHECK_EQ(RUN, state_);
state_ = DONE;
callbacks.swap(callbacks_);
}
wm_->RunClosureInThread(FROM_HERE, thread_id, closure,
WorkerThreadManager::PRIORITY_LOW);
for (const auto& callback : callbacks)
wm_->RunClosureInThread(FROM_HERE,
callback.first, callback.second,
WorkerThreadManager::PRIORITY_LOW);
}
void Done(CompileTask* task) {
bool all_finished = false;
{
AUTOLOCK(lock, &mu_);
std::map<CompileTask*, ExecReq_Input*>::iterator found =
tasks_.find(task);
CHECK(found != tasks_.end());
tasks_.erase(found);
all_finished = tasks_.empty();
}
task->MaybeRunInputFileCallback(true);
if (all_finished)
delete this;
}
const string& filename() const { return filename_; }
bool missed_content() const { return missed_content_; }
bool need_hash_only() const { return need_hash_only_; }
time_t mtime() const { return file_stat_.mtime; }
const SimpleTimer& timer() const { return timer_; }
ssize_t file_size() const { return file_stat_.size; }
const string& old_hash_key() const { return old_hash_key_; }
const string& hash_key() const { return blob_uploader_->hash_key(); }
bool success() const { return success_; }
bool new_cache_key() const { return new_cache_key_; }
size_t num_tasks() const {
AUTOLOCK(lock, &mu_);
return tasks_.size();
}
bool UpdateInputInTask(CompileTask* task) const {
ExecReq_Input* input = GetInputForTask(task);
CHECK(input != nullptr) << task->trace_id() << " filename:" << filename_;
return blob_uploader_->GetInput(input);
}
ExecReq_Input* GetInputForTask(CompileTask* task) const {
AUTOLOCK(lock, &mu_);
std::map<CompileTask*, ExecReq_Input*>::const_iterator found =
tasks_.find(task);
if (found != tasks_.end()) {
return found->second;
}
return nullptr;
}
bool need_to_compute_key() const {
if (need_to_upload_content(old_hash_key_)) {
// we'll calculate hash key during uploading.
return false;
}
return file_stat_.size >= 512;
}
bool need_to_upload_content(absl::string_view hash_key) const {
if (missed_content_) {
return true;
}
if (absl::EndsWith(filename_, ".rsp")) {
return true;
}
if (is_new_file_) {
if (new_cache_key_) {
return true;
}
}
if (old_hash_key_.empty()) {
// old file and first check. we assume the file was already uploaded.
return false;
}
return old_hash_key_ != hash_key;
}
const char* upload_reason(absl::string_view hash_key) const {
if (missed_content_) {
return "missed content";
}
if (absl::EndsWith(filename_, ".rsp")) {
return "rsp file";
}
if (is_new_file_) {
if (new_cache_key_) {
return "new file cache_key";
}
}
if (old_hash_key_.empty()) {
return "no need to upload - maybe already in cache.";
}
if (old_hash_key_ != hash_key) {
return "update cache_key";
}
return "no need to upload - cache_key matches";
}
const HttpClient::Status& http_status() const {
// TODO: blob_uploader should support this API?
return blob_uploader_->http_status();
}
private:
enum State {
INIT,
RUN,
DONE,
};
InputFileTask(WorkerThreadManager* wm,
std::unique_ptr<BlobClient::Uploader> blob_uploader,
FileHashCache* file_hash_cache,
const FileStat& file_stat,
string filename,
bool missed_content,
bool linking,
bool is_new_file,
string old_hash_key)
: wm_(wm),
blob_uploader_(std::move(blob_uploader)),
file_hash_cache_(file_hash_cache),
file_stat_(file_stat),
filename_(std::move(filename)),
state_(INIT),
missed_content_(missed_content),
need_hash_only_(linking), // we need hash key only in linking.
is_new_file_(is_new_file),
old_hash_key_(std::move(old_hash_key)),
success_(false),
new_cache_key_(false) {
timer_.Start();
}
~InputFileTask() {
CHECK(tasks_.empty());
}
void SetTaskInput(CompileTask* task, ExecReq_Input* input) {
AUTOLOCK(lock, &mu_);
tasks_.insert(std::make_pair(task, input));
}
static void InitializeStaticOnce() {
AUTOLOCK(lock, &global_mu_);
task_by_filename_ = new std::unordered_map<string, InputFileTask*>;
}
WorkerThreadManager* wm_;
std::unique_ptr<BlobClient::Uploader> blob_uploader_;
FileHashCache* file_hash_cache_;
const FileStat file_stat_;
const string filename_;
State state_;
mutable Lock mu_;
std::map<CompileTask*, ExecReq_Input*> tasks_ GUARDED_BY(mu_);
std::vector<std::pair<WorkerThreadManager::ThreadId, OneshotClosure*>>
callbacks_ GUARDED_BY(mu_);
// true if goma servers couldn't find the content, so we must upload it.
const bool missed_content_;
// true if we'll use hash key only in ExecReq to prevent from bloating it.
// false to embed content in ExecReq.
bool need_hash_only_;
// true if the file is considered as new file, so the file might not be
// in goma cache yet.
// false means the file is old enough, so we could think someone else already
// uploaded the content in goma cache.
const bool is_new_file_;
// hash key stored in file_hash_cache.
const string old_hash_key_;
SimpleTimer timer_;
// true if goma file ops is succeeded.
bool success_;
// true if the hash_key_ is first inserted in file hash cache.
bool new_cache_key_;
static absl::once_flag init_once_;
static Lock global_mu_;
static std::unordered_map<string, InputFileTask*>* task_by_filename_
GUARDED_BY(global_mu_);
DISALLOW_COPY_AND_ASSIGN(InputFileTask);
};
absl::once_flag CompileTask::InputFileTask::init_once_;
Lock CompileTask::InputFileTask::global_mu_;
std::unordered_map<string, CompileTask::InputFileTask*>*
CompileTask::InputFileTask::task_by_filename_;
// Returns true if all outputs are FILE blob (so no need of further http_rpc).
bool IsOutputFileEmbedded(const ExecResult& result) {
for (const auto& output : result.output()) {
if (output.blob().blob_type() != FileBlob::FILE)
return false;
}
return true;
}
struct CompileTask::OutputFileInfo {
OutputFileInfo() : mode(0666), size(0) {}
// actual output filename.
string filename;
// file mode/permission.
int mode;
size_t size;
// tmp_filename is filename written by OutputFileTask.
// tmp_filename may be the same as output filename (when !need_rename), or
// rename it to real output filename in CommitOutput().
// if tmp file was not written in OutputFileTask, because it holds content
// in content field, tmp_filename will be "".
string tmp_filename;
// hash_key is hash of output filename. It will be stored in file hash cache
// once output file is committed.
string hash_key;
// content is output content.
// it is used to hold output content in memory while output file task.
// it will be used iff tmp_filename == "".
string content;
};
class CompileTask::OutputFileTask {
public:
// Takes ownership of |file_service|.
// Doesn't take ownership of |info|.
OutputFileTask(WorkerThreadManager* wm,
std::unique_ptr<FileServiceHttpClient> file_service,
CompileTask* task,
int output_index,
const ExecResult_Output& output,
OutputFileInfo* info)
: wm_(wm),
thread_id_(wm->GetCurrentThreadId()),
file_service_(std::move(file_service)),
task_(task),
output_index_(output_index),
output_(output),
output_size_(output.blob().file_size()),
info_(info),
success_(false) {
timer_.Start();
task_->StartOutputFileTask();
}
~OutputFileTask() {
task_->MaybeRunOutputFileCallback(output_index_, true);
}
void Run(OneshotClosure* closure) {
VLOG(1) << task_->trace_id() << " output " << info_->filename;
std::unique_ptr<FileServiceClient::Output> dest(OpenOutput());
// TODO: We might want to restrict paths this program may write?
success_ = file_service_->OutputFileBlob(output_.blob(), dest.get());
if (success_) {
info_->hash_key = FileServiceClient::ComputeHashKey(output_.blob());
} else {
LOG(WARNING) << task_->trace_id()
<< " " << (task_->cache_hit() ? "cached" : "no-cached")
<< " output file failed:" << info_->filename;
}
wm_->RunClosureInThread(FROM_HERE, thread_id_, closure,
WorkerThreadManager::PRIORITY_LOW);
}
CompileTask* task() const { return task_; }
const ExecResult_Output& output() const { return output_; }
const SimpleTimer& timer() const { return timer_; }
bool success() const { return success_; }
bool IsInMemory() const {
return info_->tmp_filename.empty();
}
int num_rpc() const {
return file_service_->num_rpc();
}
const HttpRPC::Status& http_rpc_status() const {
return file_service_->http_rpc_status();
}
private:
std::unique_ptr<FileServiceClient::Output> OpenOutput() {
if (info_->tmp_filename.empty()) {
return FileServiceClient::StringOutput(info_->filename, &info_->content);
}
remove(info_->tmp_filename.c_str());
return FileServiceClient::FileOutput(info_->tmp_filename, info_->mode);
}
WorkerThreadManager* wm_;
WorkerThreadManager::ThreadId thread_id_;
std::unique_ptr<FileServiceHttpClient> file_service_;
CompileTask* task_;
int output_index_;
const ExecResult_Output& output_;
size_t output_size_;
OutputFileInfo* info_;
SimpleTimer timer_;
bool success_;
DISALLOW_COPY_AND_ASSIGN(OutputFileTask);
};
class CompileTask::LocalOutputFileTask {
public:
LocalOutputFileTask(WorkerThreadManager* wm,
std::unique_ptr<FileServiceClient> file_service,
FileHashCache* file_hash_cache,
const FileStat& file_stat,
CompileTask* task,
string filename)
: wm_(wm),
thread_id_(wm_->GetCurrentThreadId()),
file_service_(std::move(file_service)),
file_hash_cache_(file_hash_cache),
file_stat_(file_stat),
task_(task),
filename_(std::move(filename)),
success_(false) {
timer_.Start();
task_->StartLocalOutputFileTask();
}
~LocalOutputFileTask() {
task_->MaybeRunLocalOutputFileCallback(true);
}
void Run(OneshotClosure* closure) {
// Store hash_key of output file. This file would be used in link phase.
VLOG(1) << task_->trace_id() << " local output " << filename_;
success_ = file_service_->CreateFileBlob(
filename_, true, &blob_);
if (success_) {
CHECK(FileServiceClient::IsValidFileBlob(blob_)) << filename_;
string hash_key = FileServiceClient::ComputeHashKey(blob_);
bool new_cache_key = file_hash_cache_->StoreFileCacheKey(
filename_, hash_key, absl::Now(), file_stat_);
if (new_cache_key) {
LOG(INFO) << task_->trace_id()
<< " local output store:" << filename_
<< " size=" << blob_.file_size();
success_ = file_service_->StoreFileBlob(blob_);
}
}
if (!success_) {
LOG(WARNING) << task_->trace_id()
<< " local output read failed:" << filename_;
}
wm_->RunClosureInThread(FROM_HERE, thread_id_, closure,
WorkerThreadManager::PRIORITY_LOW);
}
CompileTask* task() const { return task_; }
const string& filename() const { return filename_; }
const FileBlob& blob() const { return blob_; }
const SimpleTimer& timer() const { return timer_; }
bool success() const { return success_; }
private:
WorkerThreadManager* wm_;
WorkerThreadManager::ThreadId thread_id_;
std::unique_ptr<FileServiceClient> file_service_;
FileHashCache* file_hash_cache_;
const FileStat file_stat_;
CompileTask* task_;
const string filename_;
FileBlob blob_;
SimpleTimer timer_;
bool success_;
DISALLOW_COPY_AND_ASSIGN(LocalOutputFileTask);
};
/* static */
void CompileTask::InitializeStaticOnce() {
AUTOLOCK(lock, &global_mu_);
link_file_req_tasks_ = new std::deque<CompileTask*>;
}
CompileTask::CompileTask(CompileService* service, int id)
: service_(service),
id_(id),
rpc_(nullptr),
caller_thread_id_(service->wm()->GetCurrentThreadId()),
done_(nullptr),
stats_(new CompileStats),
responsecode_(0),
state_(INIT),
abort_(false),
finished_(false),
req_(new ExecReq),
linking_(false),
precompiling_(false),
compiler_type_specific_(nullptr),
gomacc_pid_(SubProcessState::kInvalidPid),
canceled_(false),
resp_(new ExecResp),
exit_status_(0),
delayed_setup_subproc_(nullptr),
subproc_(nullptr),
subproc_weight_(SubProcessReq::LIGHT_WEIGHT),
subproc_exit_status_(0),
want_fallback_(false),
should_fallback_(false),
verify_output_(false),
fail_fallback_(false),
local_run_(false),
local_killed_(false),
depscache_used_(false),
gomacc_revision_mismatched_(false),
input_file_callback_(nullptr),
num_input_file_task_(0),
input_file_success_(false),
output_file_callback_(nullptr),
num_output_file_task_(0),
output_file_success_(false),
local_output_file_callback_(nullptr),
num_local_output_file_task_(0),
refcnt_(0) {
thread_id_ = GetCurrentThreadId();
absl::call_once(init_once_, InitializeStaticOnce);
Ref();
std::ostringstream ss;
ss << "Task:" << id_;
trace_id_ = ss.str();
time_t start_time;
time(&start_time);
stats_->set_start_time(start_time);
stats_->set_compiler_proxy_user_agent(kUserAgentString);
}
void CompileTask::Ref() {
AUTOLOCK(lock, &refcnt_mu_);
refcnt_++;
}
void CompileTask::Deref() {
int refcnt;
{
AUTOLOCK(lock, &refcnt_mu_);
refcnt_--;
refcnt = refcnt_;
}
if (refcnt == 0)
delete this;
}
void CompileTask::Init(CompileService::RpcController* rpc,
const ExecReq* req,
ExecResp* resp,
OneshotClosure* done) {
VLOG(1) << trace_id_ << " init";
CHECK_EQ(INIT, state_);
CHECK(service_ != nullptr);
CHECK_EQ(caller_thread_id_, service_->wm()->GetCurrentThreadId());
rpc_ = rpc;
rpc_resp_ = resp;
done_ = done;
*req_ = *req;
#ifdef _WIN32
pathext_ = GetEnvFromEnvIter(req->env().begin(), req->env().end(),
"PATHEXT", true);
#endif
}
void CompileTask::Start() {
VLOG(1) << trace_id_ << " start";
CHECK_EQ(INIT, state_);
stats_->set_pending_time(handler_timer_.GetInIntMilliseconds());
// We switched to new thread.
DCHECK(!BelongsToCurrentThread());
thread_id_ = GetCurrentThreadId();
input_file_stat_cache_ = absl::make_unique<FileStatCache>();
output_file_stat_cache_ = absl::make_unique<FileStatCache>();
rpc_->NotifyWhenClosed(NewCallback(this, &CompileTask::GomaccClosed));
int api_version = req_->requester_info().api_version();
if (api_version != RequesterInfo::CURRENT_VERSION) {
LOG(ERROR) << trace_id_ << " unexpected api_version=" << api_version
<< " want=" << RequesterInfo::CURRENT_VERSION;
}
#if defined(ENABLE_REVISION_CHECK)
if (req_->requester_info().has_goma_revision() &&
req_->requester_info().goma_revision() != kBuiltRevisionString) {
LOG(WARNING) << trace_id_ << " goma revision mismatch:"
<< " gomacc=" << req_->requester_info().goma_revision()
<< " compiler_proxy=" << kBuiltRevisionString;
gomacc_revision_mismatched_ = true;
}
#endif
CopyEnvFromRequest();
InitCompilerFlags();
if (flags_.get() == nullptr) {
LOG(ERROR) << trace_id_ << " Start error: CompilerFlags is nullptr";
AddErrorToResponse(TO_USER, "Unsupported command", true);
ProcessFinished("Unsupported command");
return;
}
if (!IsLocalCompilerPathValid(trace_id_, *req_, flags_.get())) {
LOG(ERROR) << trace_id_ << " Start error: invalid local compiler."
<< " path=" << req_->command_spec().local_compiler_path();
AddErrorToResponse(TO_USER, "Invalid command", true);
ProcessFinished("Invalid command");
return;
}
if (!flags_->is_successful()) {
LOG(WARNING) << trace_id_ << " Start error:" << flags_->fail_message();
// It should fallback.
} else if (precompiling_) {
LOG(INFO) << trace_id_ << " Start precompile "
<< (flags_->input_filenames().empty() ? "(no input)" :
flags_->input_filenames()[0])
<< " gomacc_pid=" << gomacc_pid_;
if (!flags_->input_filenames().empty() && !flags_->output_files().empty()) {
DCHECK_EQ(1U, flags_->input_filenames().size()) << trace_id_;
const string& input_filename =
file::JoinPathRespectAbsolute(flags_->cwd(),
flags_->input_filenames()[0]);
string output_filename;
for (const auto& output_file : flags_->output_files()) {
if (absl::EndsWith(output_file, ".gch")) {
int output_filelen = output_file.size();
// Full path and strip ".gch".
output_filename =
file::JoinPathRespectAbsolute(
flags_->cwd(),
output_file.substr(0, output_filelen - 4));
break;
}
}
// Copy the header file iff precompiling header to *.gch.
if (!output_filename.empty()) {
LOG(INFO) << trace_id_ << " copy " << input_filename
<< " " << output_filename;
if (input_filename != output_filename) {
if (file::Copy(input_filename, output_filename, file::Overwrite())
.ok()) {
VLOG(1) << trace_id_ << " copy ok";
resp_->mutable_result()->set_exit_status(0);
} else {
AddErrorToResponse(TO_USER,
"Failed to copy " + input_filename + " to " +
output_filename, true);
}
}
} else {
AddErrorToResponse(TO_LOG, "Precompile to no *.gch output", false);
}
}
} else if (linking_) {
// build_dir will be used to infer the build directory
// in `goma_ctl.py report`. See b/25487955.
LOG(INFO) << trace_id_ << " Start linking "
<< (flags_->output_files().empty() ? "(no output)" :
flags_->output_files()[0])
<< " gomacc_pid=" << gomacc_pid_
<< " build_dir=" << flags_->cwd();
} else {
// build_dir will be used to infer the build directory
// in `goma_ctl.py report`. See b/25487955.
LOG(INFO) << trace_id_ << " Start "
<< (flags_->input_filenames().empty() ? "(no input)" :
flags_->input_filenames()[0])
<< " gomacc_pid=" << gomacc_pid_
<< " build_dir=" << flags_->cwd();
}
if (!FindLocalCompilerPath()) {
// Unable to fallback.
LOG(ERROR) << trace_id_ << " Failed to find local compiler path:"
<< req_->DebugString()
<< " env:" << requester_env_.DebugString();
AddErrorToResponse(TO_USER, "Failed to find local compiler path", true);
ProcessFinished("fail to find local compiler");
return;
}
VLOG(1) << "local_compiler:" << req_->command_spec().local_compiler_path();
local_compiler_path_ = req_->command_spec().local_compiler_path();
verify_output_ = ShouldVerifyOutput();
should_fallback_ = ShouldFallback();
subproc_weight_ = GetTaskWeight();
int ramp_up = service_->http_client()->ramp_up();
if (verify_output_) {
VLOG(1) << trace_id_ << " verify_output";
SetupSubProcess();
RunSubProcess("verify output");
service_->RecordForcedFallbackInSetup(CompileService::kRequestedByUser);
// we run both local and goma backend.
return;
}
if (should_fallback_) {
VLOG(1) << trace_id_ << " should fallback";
SetupSubProcess();
RunSubProcess("should fallback");
// we don't call goma rpc.
return;
}
if ((rand() % 100) >= ramp_up) {
LOG(WARNING) << trace_id_ << " http disabled "
<< " ramp_up=" << ramp_up;
should_fallback_ = true;
service_->RecordForcedFallbackInSetup(CompileService::kHTTPDisabled);
SetupSubProcess();
RunSubProcess("http disabled");
// we don't call goma rpc.
return;
}
if (precompiling_ && service_->enable_gch_hack()) {
VLOG(1) << trace_id_ << " gch hack";
SetupSubProcess();
RunSubProcess("gch hack");
// we run both local and goma backend in parallel.
} else if (!requester_env_.fallback()) {
stats_->set_local_run_reason("should not run under GOMA_FALLBACK=false");
LOG(INFO) << trace_id_ << " GOMA_FALLBACK=false";
} else if (subproc_weight_ == SubProcessReq::HEAVY_WEIGHT) {
stats_->set_local_run_reason("should not start running heavy subproc.");
} else if (requester_env_.use_local()) {
int num_pending_subprocs = SubProcessTask::NumPending();
bool is_failed_input = false;
if (service_->local_run_for_failed_input()) {
is_failed_input = service_->ContainFailedInput(flags_->input_filenames());
}
int delay_subproc_ms =
absl::ToInt64Milliseconds(service_->GetEstimatedSubprocessDelayTime());
if (num_pending_subprocs == 0) {
stats_->set_local_run_reason("local idle");
SetupSubProcess();
} else if (is_failed_input) {
stats_->set_local_run_reason("previous failed");
SetupSubProcess();
// TODO: RunSubProcess to run it soon?
} else if (delay_subproc_ms <= 0) {
stats_->set_local_run_reason("slow goma");
SetupSubProcess();
} else if (!service_->http_client()->IsHealthyRecently()) {
stats_->set_local_run_reason("goma unhealthy");
SetupSubProcess();
} else {
stats_->set_local_run_reason("should not run while delaying subproc");
stats_->set_local_delay_time(delay_subproc_ms);
VLOG(1) << trace_id_ << " delay subproc " << delay_subproc_ms << "msec";
DCHECK(delayed_setup_subproc_ == nullptr) << trace_id_ << " subproc";
delayed_setup_subproc_ =
service_->wm()->RunDelayedClosureInThread(
FROM_HERE,
thread_id_,
delay_subproc_ms,
NewCallback(
this,
&CompileTask::SetupSubProcess));
}
} else {
stats_->set_local_run_reason("should not run under GOMA_USE_LOCAL=false");
LOG(INFO) << trace_id_ << " GOMA_USE_LOCAL=false";
}
if (subproc_ != nullptr && ShouldStopGoma()) {
state_ = LOCAL_RUN;
stats_->set_local_run_reason("slow goma, local run started in INIT");
return;
}
ProcessSetup();
}
CompileTask::~CompileTask() {
CHECK_EQ(0, refcnt_);
CHECK(output_file_.empty());
}
bool CompileTask::BelongsToCurrentThread() const {
return THREAD_ID_IS_SELF(thread_id_);
}
bool CompileTask::IsGomaccRunning() {
if (gomacc_pid_ == SubProcessState::kInvalidPid)
return false;
#ifndef _WIN32
int ret = kill(gomacc_pid_, 0);
if (ret != 0) {
if (errno == ESRCH) {
gomacc_pid_ = SubProcessState::kInvalidPid;
} else {
PLOG(ERROR) << trace_id_ << " kill 0 failed with unexpected errno."
<< " gomacc_pid=" << gomacc_pid_;
}
}
#else
SimpleTimer timer;
bool running = false;
{
ScopedFd proc(OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, FALSE,
gomacc_pid_));
running = proc.valid();
}
int ms = timer.GetInIntMilliseconds();
LOG_IF(WARNING, ms > 100) << trace_id_
<< " SLOW IsGomaccRunning in " << ms << " msec";
if (!running) {
gomacc_pid_ = SubProcessState::kInvalidPid;
}
#endif
return gomacc_pid_ != SubProcessState::kInvalidPid;
}
void CompileTask::GomaccClosed() {
LOG(INFO) << trace_id_ << " gomacc closed "
<< "at state=" << StateName(state_)
<< " subproc pid="
<< (subproc_ != nullptr ? subproc_->started().pid() : 0);
canceled_ = true;
gomacc_pid_ = SubProcessState::kInvalidPid;
// Kill subprocess either it is running, or pending.
if (subproc_ != nullptr) {
KillSubProcess();
}
}
bool CompileTask::IsSubprocRunning() const {
return subproc_ != nullptr &&
subproc_->started().pid() != SubProcessState::kInvalidPid;
}
void CompileTask::ProcessSetup() {
VLOG(1) << trace_id_ << " setup";
CHECK(BelongsToCurrentThread());
CHECK_EQ(INIT, state_);
CHECK(!abort_);
CHECK(!should_fallback_);
state_ = SETUP;
if (ShouldStopGoma()) {
state_ = LOCAL_RUN;
stats_->set_local_run_reason("slow goma, local run started in SETUP");
return;
}
FillCompilerInfo();
}
void CompileTask::TryProcessFileRequest() {
file_request_timer_.Start();
if (linking_) {
AUTOLOCK(lock, &global_mu_);
DCHECK(link_file_req_tasks_ != nullptr);
link_file_req_tasks_->push_back(this);
if (link_file_req_tasks_->front() != this) {
VLOG(1) << trace_id_ << " pending file req "
<< link_file_req_tasks_->size();
return;
}
}
ProcessFileRequest();
}
void CompileTask::ProcessFileRequest() {
VLOG(1) << trace_id_ << " file req";
CHECK(BelongsToCurrentThread());
// SETUP: first pass
// FILE_REQ: failed in input file task, and retry
// FILE_RESP: failed with missing inputs, and retry
CHECK(state_ == SETUP || state_ == FILE_REQ || state_ == FILE_RESP)
<< trace_id_ << " " << StateName(state_);
stats_->add_include_fileload_pending_time(
file_request_timer_.GetInIntMilliseconds());
file_request_timer_.Start();
if (abort_) {
ProcessPendingFileRequest();
ProcessFinished("aborted before file req");
return;
}
state_ = FILE_REQ;
if (ShouldStopGoma()) {
ProcessPendingFileRequest();
state_ = LOCAL_RUN;
stats_->set_local_run_reason("slow goma, local run started in FILE_REQ");
return;
}
VLOG(1) << trace_id_
<< " start processing of input files "
<< required_files_.size();
std::set<string> missed_content_files;
for (const auto& filename : resp_->missing_input()) {
missed_content_files.insert(filename);
VLOG(2) << trace_id_ << " missed content: " << filename;
if (interleave_uploaded_files_.find(filename) !=
interleave_uploaded_files_.end()) {
LOG(WARNING) << trace_id_ << " interleave-uploaded file missing:"
<< filename;
}
}
// InputFileTask assumes that filename is unique in single compile task.
RemoveDuplicateFiles(flags_->cwd(), &required_files_);
// TODO: We don't need to clear the input when we are retrying.
req_->clear_input();
interleave_uploaded_files_.clear();
SetInputFileCallback();
std::vector<OneshotClosure*> closures;
time_t now = time(nullptr);
stats_->set_num_total_input_file(required_files_.size());
for (const string& filename : required_files_) {
ExecReq_Input* input = req_->add_input();
input->set_filename(filename);
const std::string abs_filename =
file::JoinPathRespectAbsolute(flags_->cwd(), filename);
bool missed_content =
missed_content_files.find(filename) != missed_content_files.end();
time_t mtime = 0;
string hash_key;
bool hash_key_is_ok = false;
absl::optional<absl::Time> missed_timestamp;
if (missed_content) {
missed_timestamp = last_req_timestamp_;
}
// If the file was reported as missing, we need to send the file content.
//
// Otherwise,
// if hash_key_is_ok is true, we can believe hash_key is valid,
// so uses hash_key only (no content uploading here)
//
// if hash_key_is_ok is false, we're not sure hash_key is valid or not,
// so try reading the content. InputFileTaskFinished determines whether
// we should upload the content or not, based on mtime and hash_key.
// if the content's hash matches with this hash_key, we can believe
// hash_key is valid, so don't upload content in this session.
//
// If we believed hash_key is valid, but goma servers couldn't find the
// content, then it would be reported as missing_inputs_ and we'll set
// missed_content to true in the retry session.
// Even in this case, we need to consider the race condition of upload and
// execution. If the file is uploaded by the other task during the task is
// getting missing_inputs_, we do not have to upload the file again. We use
// the timestamp of file upload and execution to identify this condition.
// If upload time is later than execution time (last_req_timestamp_),
// we can assume the file is uploaded by others.
const FileStat& input_file_stat = input_file_stat_cache_->Get(abs_filename);
if (input_file_stat.IsValid()) {
mtime = input_file_stat.mtime;
}
hash_key_is_ok = service_->file_hash_cache()->GetFileCacheKey(
abs_filename, missed_timestamp, input_file_stat, &hash_key);
if (missed_content) {
if (hash_key_is_ok) {
VLOG(2) << trace_id_ << " interleave uploaded: "
<< " filename=" << abs_filename;
// TODO: warn if interleave uploaded file is missing.
interleave_uploaded_files_.insert(filename);
} else {
LOG(INFO) << trace_id_ << " missed content:" << abs_filename;
}
}
if (mtime > stats_->latest_input_mtime()) {
stats_->set_latest_input_filename(abs_filename);
stats_->set_latest_input_mtime(mtime);
}
if (hash_key_is_ok) {
input->set_hash_key(hash_key);
continue;
}
// In linking, we'll use hash_key instead of content in ExecReq to prevent
// from bloating ExecReq.
VLOG(1) << trace_id_ << " input file:" << abs_filename
<< (linking_ ? " [linking]" : "");
bool is_new_file = false;
if (mtime > 0) {
if (linking_) {
// For linking, we assume input files is old if it is older than
// compiler_proxy start time. (i.e. it would be built in previous
// build session, so that the files were generated by goma backends
// or uploaded by previous compiler_proxy.
is_new_file = mtime > absl::ToTimeT(service_->start_time());
} else {
is_new_file = ((now - mtime) < service_->new_file_threshold());
}
}
// If need_to_send_content is set to true, we consider all file is new file.
if (service_->need_to_send_content())
is_new_file = true;
InputFileTask* input_file_task = InputFileTask::NewInputFileTask(
service_->wm(),
service_->blob_client()->NewUploader(
abs_filename, requester_info_, trace_id_),
service_->file_hash_cache(), input_file_stat_cache_->Get(abs_filename),
abs_filename, missed_content, linking_, is_new_file, hash_key, this,
input);
closures.push_back(
NewCallback(
input_file_task,
&InputFileTask::Run,
this,
NewCallback(
this,
&CompileTask::InputFileTaskFinished,
input_file_task)));
DCHECK_EQ(closures.size(), static_cast<size_t>(num_input_file_task_));
}
DCHECK_EQ(closures.size(), static_cast<size_t>(num_input_file_task_));
stats_->add_num_uploading_input_file(closures.size());
stats_->add_num_file_uploaded_during_exec_failure(
interleave_uploaded_files_.size());
if (closures.empty()) {
MaybeRunInputFileCallback(false);
return;
}
for (auto* closure : closures)
service_->wm()->RunClosure(
FROM_HERE, closure, WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::ProcessFileRequestDone() {
VLOG(1) << trace_id_ << " file req done";
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_REQ, state_);
stats_->add_include_fileload_run_time(
file_request_timer_.GetInIntMilliseconds());
stats_->set_include_fileload_time(include_timer_.GetInIntMilliseconds() -
stats_->include_preprocess_time());
VLOG(1) << trace_id_
<< " input files processing preprocess "
<< stats_->include_preprocess_time() << "ms"
<< ", loading " << stats_->include_fileload_time() << "ms";
ProcessPendingFileRequest();
if (abort_) {
ProcessFinished("aborted in file req");
return;
}
if (!input_file_success_) {
if (IsSubprocRunning()) {
VLOG(1) << trace_id_ << " file request failed,"
<< " but subprocess running";
state_ = LOCAL_RUN;
stats_->set_local_run_reason("fail goma, local run started in FILE_REQ");
return;
}
AddErrorToResponse(TO_LOG, "Failed to process file request", true);
if (service_->http_client()->IsHealthyRecently() &&
stats_->num_uploading_input_file_size() > 0 &&
stats_->num_uploading_input_file(
stats_->num_uploading_input_file_size() - 1) > 0) {
// TODO: don't retry for permanent error (no such file, etc).
stats_->set_exec_request_retry(stats_->exec_request_retry() + 1);
if (stats_->exec_request_retry() <= kMaxExecRetry) {
std::ostringstream ss;
ss << "Failed to upload "
<< stats_->num_uploading_input_file(
stats_->num_uploading_input_file_size() - 1)
<< " files";
stats_->add_exec_request_retry_reason(ss.str());
LOG(INFO) << trace_id_ << " retry in FILE_REQ";
resp_->clear_error_message();
service_->wm()->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(this, &CompileTask::TryProcessFileRequest),
WorkerThreadManager::PRIORITY_LOW);
return;
}
}
ProcessFinished("fail in file request");
return;
}
// Fix for GOMA_GCH.
// We're sending *.gch.goma on local disk, but it must appear as *.gch
// on backend.
if (service_->enable_gch_hack()) {
for (auto& input : *req_->mutable_input()) {
if (absl::EndsWith(input.filename(), GOMA_GCH_SUFFIX)) {
input.mutable_filename()->resize(
input.filename().size() - strlen(".goma"));
}
}
}
// Here, |req_| is all prepared.
// TODO: Instead of here, maybe we need to call this
// in end of ProcessFileRequest?
if (LocalOutputCache::IsEnabled()) {
local_output_cache_key_ = LocalOutputCache::MakeCacheKey(*req_);
if (LocalOutputCache::instance()->Lookup(local_output_cache_key_,
resp_.get(),
trace_id_)) {
LOG(INFO) << trace_id_ << " lookup succeeded";
stats_->set_cache_hit(true);
stats_->set_cache_source(ExecLog::LOCAL_OUTPUT_CACHE);
ReleaseMemoryForExecReqInput(req_.get());
state_ = LOCAL_OUTPUT;
ProcessFileResponse();
return;
}
}
ProcessCallExec();
}
void CompileTask::ProcessPendingFileRequest() {
if (!linking_)
return;
CompileTask* pending_task = nullptr;
{
AUTOLOCK(lock, &global_mu_);
DCHECK_EQ(this, link_file_req_tasks_->front());
link_file_req_tasks_->pop_front();
if (!link_file_req_tasks_->empty()) {
pending_task = link_file_req_tasks_->front();
}
}
if (pending_task != nullptr) {
VLOG(1) << pending_task->trace_id_ << " start file req";
service_->wm()->RunClosureInThread(
FROM_HERE,
pending_task->thread_id_,
NewCallback(pending_task, &CompileTask::ProcessFileRequest),
WorkerThreadManager::PRIORITY_LOW);
}
}
void CompileTask::ProcessCallExec() {
VLOG(1) << trace_id_ << " call exec";
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_REQ, state_);
if (abort_) {
ProcessFinished("aborted before call exec");
return;
}
CHECK(!requester_env_.verify_command().empty() ||
req_->input_size() > 0) << trace_id_ << " call exec";
state_ = CALL_EXEC;
if (ShouldStopGoma()) {
state_ = LOCAL_RUN;
stats_->set_local_run_reason("slow goma, local run started in CALL_EXEC");
return;
}
if (req_->trace()) LOG(INFO) << trace_id_ << " requesting remote trace";
rpc_call_timer_.Start();
req_->mutable_requester_info()->set_retry(stats_->exec_request_retry());
VLOG(2) << trace_id_
<< " request string to send:" << req_->DebugString();
{
AUTOLOCK(lock, &mu_);
http_rpc_status_ = absl::make_unique<HttpRPC::Status>();
http_rpc_status_->trace_id = trace_id_;
const auto& timeouts = service_->timeouts();
for (const auto& timeout : timeouts) {
// TODO: convert |http_rpc_status_| to absl::Duration.
http_rpc_status_->timeout_secs.push_back(
absl::ToInt64Milliseconds(timeout));
}
}
exec_resp_ = absl::make_unique<ExecResp>();
service_->exec_service_client()->ExecAsync(
req_.get(), exec_resp_.get(), http_rpc_status_.get(),
NewCallback(this, &CompileTask::ProcessCallExecDone));
last_req_timestamp_ = absl::Now();
if (requester_env_.use_local() &&
(subproc_weight_ == SubProcessReq::HEAVY_WEIGHT) &&
subproc_ == nullptr) {
// now, it's ok to run subprocess.
stats_->set_local_run_reason("slow goma linking");
SetupSubProcess();
}
}
void CompileTask::ProcessCallExecDone() {
VLOG(1) << trace_id_ << " call exec done";
CHECK(BelongsToCurrentThread());
CHECK_EQ(CALL_EXEC, state_);
exit_status_ = exec_resp_->result().exit_status();
resp_->Swap(exec_resp_.get());
exec_resp_.reset();
string retry_reason;
for (const auto& msg : resp_->error_message()) {
exec_error_message_.push_back(msg);
if (!retry_reason.empty()) {
retry_reason += "\n";
}
retry_reason += msg;
}
// clear error_message from server.
// server error message logged, but not send back to user.
resp_->clear_error_message();
stats_->add_rpc_call_time(rpc_call_timer_.GetInIntMilliseconds());
if (http_rpc_status_->master_trace_id.empty() ||
http_rpc_status_->master_trace_id == http_rpc_status_->trace_id) {
stats_->add_rpc_req_size(http_rpc_status_->req_size);
stats_->add_rpc_resp_size(http_rpc_status_->resp_size);
stats_->add_rpc_raw_req_size(http_rpc_status_->raw_req_size);
stats_->add_rpc_raw_resp_size(http_rpc_status_->raw_resp_size);
stats_->add_rpc_throttle_time(http_rpc_status_->throttle_time);
stats_->add_rpc_pending_time(http_rpc_status_->pending_time);
stats_->add_rpc_req_build_time(http_rpc_status_->req_build_time);
stats_->add_rpc_req_send_time(http_rpc_status_->req_send_time);
stats_->add_rpc_wait_time(http_rpc_status_->wait_time);
stats_->add_rpc_resp_recv_time(http_rpc_status_->resp_recv_time);
stats_->add_rpc_resp_parse_time(http_rpc_status_->resp_parse_time);
}
stats_->add_rpc_master_trace_id(http_rpc_status_->master_trace_id);
stats_->set_cache_hit(resp_->cache_hit() == ExecResp::LOCAL_OUTPUT_CACHE ||
(http_rpc_status_->finished && resp_->has_cache_hit() &&
resp_->cache_hit() != ExecResp::NO_CACHE));
if (stats_->cache_hit()) {
if (!resp_->has_cache_hit()) {
// for old backends.
stats_->set_cache_source(ExecLog::UNKNOWN_CACHE);
} else {
switch (resp_->cache_hit()) {
case ExecResp::NO_CACHE:
LOG(ERROR) << trace_id_ << " cache_hit, but NO_CACHE";
break;
case ExecResp::MEM_CACHE:
stats_->set_cache_source(ExecLog::MEM_CACHE);
break;
case ExecResp::STORAGE_CACHE:
stats_->set_cache_source(ExecLog::STORAGE_CACHE);
break;
case ExecResp::LOCAL_OUTPUT_CACHE:
stats_->set_cache_source(ExecLog::LOCAL_OUTPUT_CACHE);
break;
default:
LOG(ERROR) << trace_id_ << " unknown cache_source="
<< resp_->cache_hit();
stats_->set_cache_source(ExecLog::UNKNOWN_CACHE);
}
}
}
if (resp_->has_cache_key())
resp_cache_key_ = resp_->cache_key();
if (abort_) {
ProcessFinished("aborted in call exec");
return;
}
if (!http_rpc_status_->enabled) {
stats_->set_network_failure_type(ExecLog::DISABLED);
} else if (http_rpc_status_->err == 0) {
stats_->set_network_failure_type(ExecLog::NO_NETWORK_ERROR);
} else { // i.e. http_rpc_status_->err != 0.
stats_->set_network_failure_type(ExecLog::UNKNOWN_NETWORK_ERROR);
switch (http_rpc_status_->state) {
case HttpClient::Status::INIT: FALLTHROUGH_INTENDED;
case HttpClient::Status::PENDING:
stats_->set_network_failure_type(ExecLog::CONNECT_FAILED);
break;
case HttpClient::Status::SENDING_REQUEST:
stats_->set_network_failure_type(ExecLog::SEND_FAILED);
break;
case HttpClient::Status::REQUEST_SENT:
stats_->set_network_failure_type(ExecLog::TIMEDOUT_AFTER_SEND);
break;
case HttpClient::Status::RECEIVING_RESPONSE:
stats_->set_network_failure_type(ExecLog::RECEIVE_FAILED);
break;
case HttpClient::Status::RESPONSE_RECEIVED:
if (http_rpc_status_->http_return_code != 200) {
stats_->set_network_failure_type(ExecLog::BAD_HTTP_STATUS_CODE);
}
break;
}
}
const int err = http_rpc_status_->err;
if (err < 0) {
LOG(WARNING) << trace_id_ << " rpc err=" << err << " "
<< (err == ERR_TIMEOUT ? " timed out" : " failed")
<< " " << http_rpc_status_->err_message;
if (IsSubprocRunning() &&
http_rpc_status_->state != HttpClient::Status::RECEIVING_RESPONSE) {
// If rpc was failed while receiving response, goma should retry Exec call
// because the reponse will be replied from cache with high probability.
LOG(WARNING) << trace_id_ << " goma failed, but subprocess running.";
state_ = LOCAL_RUN;
stats_->set_local_run_reason("fail goma, local run started in CALL_EXEC");
return;
}
AddErrorToResponse(TO_LOG, "", true);
// Don't Retry if it is client error: 3xx or 4xx.
// Retry if it is server error: 5xx (e.g. 502 error from GFE)
//
// Also, OK to retry on socket timeout occurred during reciving response.
if (((http_rpc_status_->http_return_code / 100) == 5) ||
(http_rpc_status_->state == HttpClient::Status::RECEIVING_RESPONSE)) {
std::ostringstream ss;
ss << "RPC failed http=" << http_rpc_status_->http_return_code
<< ": " << http_rpc_status_->err_message;
if (!retry_reason.empty()) {
retry_reason += "\n";
}
retry_reason += ss.str();
} else {
// No retry for client error: 3xx, 4xx (302, 403 for dos block,
// 401 for auth error, etc).
LOG(WARNING) << trace_id_ << " RPC failed http="
<< http_rpc_status_->http_return_code
<< " state="
<< HttpClient::Status::StateName(http_rpc_status_->state)
<< ": " << http_rpc_status_->err_message
<< ": no retry";
}
}
if (err == OK && resp_->missing_input_size() > 0) {
// missing input will be handled in ProcessFileResponse and
// ProcessFileRequest will retry the request with uploading
// contents of missing inputs.
// Just retrying the request here would not upload contents
// so probably fails with missing input again, so don't retry here.
LOG_IF(WARNING, !retry_reason.empty())
<< trace_id_ << " missing inputs:" << resp_->missing_input_size()
<< " but retry_reason set:" << retry_reason;
} else if (!retry_reason.empty()) {
if (service_->http_client()->IsHealthyRecently()) {
LOG(INFO) << trace_id_ << " exec retry:"
<< stats_->exec_request_retry()
<< " error=" << resp_->error()
<< " " << retry_reason;
stats_->set_exec_request_retry(stats_->exec_request_retry() + 1);
if (stats_->exec_request_retry() <= kMaxExecRetry &&
!(resp_->has_error() && IsFatalError(resp_->error()))) {
stats_->add_exec_request_retry_reason(retry_reason);
LOG(INFO) << trace_id_ << " retry in CALL_EXEC";
resp_->clear_error_message();
resp_->clear_error();
state_ = FILE_REQ;
service_->wm()->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(this, &CompileTask::ProcessCallExec),
WorkerThreadManager::PRIORITY_LOW);
return;
}
if (service_->should_fail_for_unsupported_compiler_flag() &&
resp_->bad_request_reason_code() ==
ExecResp::UNSUPPORTED_COMPILER_FLAGS) {
// TODO: Make a simple test for this after goma server has
// started returning bad request reason code.
string msg = "compile request was rejected by goma server. "
"The request might contain unsupported compiler flag.\n"
"If you want to continue compile with local fallback, set "
"environment variable "
"GOMA_FAIL_FOR_UNSUPPORTED_COMPILER_FLAGS=false and "
"restart the compiler_proxy.\n";
AddErrorToResponse(TO_USER, msg, true);
want_fallback_ = false;
} else {
LOG(WARNING) << trace_id_ << " exec error:"
<< resp_->error()
<< " " << retry_reason
<< " but http is healthy";
}
} else {
LOG(WARNING) << trace_id_
<< " won't retry because http client is not healthy.";
}
CheckNoMatchingCommandSpec(retry_reason);
ProcessFinished("fail in call exec");
return;
}
if (err < 0) {
ProcessFinished("fail in call exec");
return;
}
// Saves embedded upload information. We have to call this before
// clearing inputs.
StoreEmbeddedUploadInformationIfNeeded();
ReleaseMemoryForExecReqInput(req_.get());
if (resp_->missing_input_size() == 0) {
// Check command spec when not missing input response.
CheckCommandSpec();
}
ProcessFileResponse();
}
void CompileTask::ProcessFileResponse() {
VLOG(1) << trace_id_ << " file resp";
CHECK(BelongsToCurrentThread());
CHECK(state_ == CALL_EXEC || state_ == LOCAL_OUTPUT) << state_;
if (abort_) {
ProcessFinished("aborted before file resp");
return;
}
state_ = FILE_RESP;
if (ShouldStopGoma()) {
state_ = LOCAL_RUN;
stats_->set_local_run_reason("slow goma, local run started in FILE_RESP");
return;
}
file_response_timer_.Start();
if (resp_->missing_input_size() > 0) {
stats_->add_num_missing_input_file(resp_->missing_input_size());
LOG(WARNING) << trace_id_
<< " request didn't have full content:"
<< resp_->missing_input_size()
<< " in "
<< required_files_.size()
<< " : retry=" << stats_->exec_request_retry();
for (const auto& filename : resp_->missing_input()) {
std::ostringstream ss;
ss << "Required file not on goma cache:" << filename;
if (interleave_uploaded_files_.find(filename)
!= interleave_uploaded_files_.end()) {
ss << " (interleave uploaded)";
}
AddErrorToResponse(TO_LOG, ss.str(), true);
}
for (const auto& reason : resp_->missing_reason()) {
AddErrorToResponse(TO_LOG, reason, true);
}
int need_to_send_content_threshold = required_files_.size() / 2;
if (!service_->need_to_send_content()
&& (resp_->missing_input_size() > need_to_send_content_threshold)) {
LOG(WARNING) << trace_id_
<< " Lots of missing files. Will send file contents"
<< " even if it's old enough.";
service_->SetNeedToSendContent(true);
}
output_file_success_ = false;
ProcessFileResponseDone();
return;
}
if (stats_->exec_request_retry() == 0 && service_->need_to_send_content()) {
LOG(INFO) << trace_id_ << " no missing files."
<< " Turn off to force sending old file contents";
service_->SetNeedToSendContent(false);
}
// No missing input files.
if (!IsGomaccRunning()) {
PLOG(WARNING) << trace_id_
<< " pid:" << gomacc_pid_ << " does not receive signal 0 "
<< " abort=" << abort_;
// user may not receive the error message, because gomacc already killed.
AddErrorToResponse(TO_LOG, "gomacc killed?", true);
// If the requesting process was already dead, we should not write output
// files.
ProcessFinished("gomacc killed");
return;
}
// Decide if it could use in-memory output or not and should write output
// in tmp file or not.
bool want_in_memory_output = true;
string need_rename_reason;
if (verify_output_) {
VLOG(1) << trace_id_ << " output need_rename for verify_output";
want_in_memory_output = false;
need_rename_reason = "verify_output";
} else if (!success()) {
VLOG(1) << trace_id_ << " output need_rename for fail exec";
// TODO: we don't need to write remote output for fail exec?
want_in_memory_output = false;
need_rename_reason = "fail exec";
} else {
// resp_ contains whole output data, and no need to more http_rpc to
// fetch output file data, so no need to run local compiler any more.
if (delayed_setup_subproc_ != nullptr) {
delayed_setup_subproc_->Cancel();
delayed_setup_subproc_ = nullptr;
}
if (subproc_ != nullptr) {
// racing between remote and local.
// even if subproc_->started().pid() == kInvalidPid, subproc might
// have started (because compile_proxy and subproc is async).
// The compile task wants in_memory output by default, but when it
// couldn't use in memory output because of lack of memory, it
// should write output in tmp file (i.e. need to rename).
// TODO: cancel subproc if it was not started yet,
// or use local subproc if it has already started.
VLOG(1) << trace_id_ << " output need_rename for local_subproc "
<< subproc_->started().pid();
std::ostringstream ss;
ss << "local_subproc pid=" << subproc_->started().pid();
need_rename_reason = ss.str();
}
}
exec_output_file_.clear();
ClearOutputFile();
output_file_.resize(resp_->result().output_size());
SetOutputFileCallback();
std::vector<OneshotClosure*> closures;
for (int i = 0; i < resp_->result().output_size(); ++i) {
const string& output_filename = resp_->result().output(i).filename();
CheckOutputFilename(output_filename);
exec_output_file_.push_back(output_filename);
string filename = file::JoinPathRespectAbsolute(
stats_->cwd(), output_filename);
// TODO: check output paths matches with flag's output filenames?
if (service_->enable_gch_hack() && absl::EndsWith(filename, ".gch"))
filename += ".goma";
OutputFileInfo* output_info = &output_file_[i];
output_info->filename = filename;
bool try_acquire_output_buffer = want_in_memory_output;
if (FileServiceClient::IsValidFileBlob(resp_->result().output(i).blob())) {
output_info->size = resp_->result().output(i).blob().file_size();
} else {
LOG(ERROR) << trace_id_ << " output is invalid:"
<< filename;
try_acquire_output_buffer = false;
}
if (try_acquire_output_buffer && service_->AcquireOutputBuffer(
output_info->size, &output_info->content)) {
output_info->tmp_filename.clear();
VLOG(1) << trace_id_ << " output in buffer:"
<< filename
<< " size="
<< output_info->size;
} else {
if (!need_rename_reason.empty()) {
std::ostringstream ss;
ss << filename << ".tmp." << id();
output_info->tmp_filename = ss.str();
LOG(INFO) << trace_id_ << " output in tmp file:"
<< output_info->tmp_filename
<< " for " << need_rename_reason;
} else {
// no need to rename, so write output directly to the output file.
output_info->tmp_filename = filename;
LOG(INFO) << trace_id_ << " output in file:" << filename;
}
}
if (resp_->result().output(i).is_executable())
output_info->mode = 0777;
if (requester_env_.has_umask()) {
output_info->mode &= ~requester_env_.umask();
VLOG(1) << trace_id_ << " output file mode is updated."
<< " filename=" << filename
<< " mode=" << std::oct << output_info->mode;
}
std::unique_ptr<OutputFileTask> output_file_task(
new OutputFileTask(
service_->wm(),
service_->file_service()->WithRequesterInfoAndTraceId(
requester_info_, trace_id_),
this, i, resp_->result().output(i),
output_info));
OutputFileTask* output_file_task_pointer = output_file_task.get();
closures.push_back(
NewCallback(
output_file_task_pointer,
&OutputFileTask::Run,
NewCallback(
this,
&CompileTask::OutputFileTaskFinished,
std::move(output_file_task))));
}
stats_->set_num_output_file(closures.size());
if (closures.empty()) {
MaybeRunOutputFileCallback(-1, false);
} else {
for (auto* closure : closures) {
service_->wm()->RunClosure(
FROM_HERE, closure, WorkerThreadManager::PRIORITY_LOW);
}
}
}
void CompileTask::ProcessFileResponseDone() {
VLOG(1) << trace_id_ << " file resp done";
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_RESP, state_);
stats_->set_file_response_time(file_response_timer_.GetInIntMilliseconds());
if (abort_) {
ProcessFinished("aborted in file resp");
return;
}
if (!output_file_success_) {
if (!abort_) {
if (!(precompiling_ && service_->enable_gch_hack()) &&
IsSubprocRunning()) {
VLOG(1) << trace_id_ << " failed to process file response,"
<< " but subprocess running";
state_ = LOCAL_RUN;
stats_->set_local_run_reason(
"fail goma, local run started in FILE_RESP");
return;
}
// For missing input error, we don't make it as error but warning
// when this is the first try and we will retry it later.
bool should_error = stats_->exec_request_retry() > 0;
std::ostringstream ss;
ss << "Try:" << stats_->exec_request_retry() << ": ";
if (resp_->missing_input_size() > 0) {
// goma server replied with missing inputs.
// retry: use the list of missing files in response to fill in
// needed files
ss << "Missing " << resp_->missing_input_size() << " input files.";
} else {
should_error = true;
ss << "Failed to download "
<< stats_->num_output_file()
<< " files"
<< " in " << (cache_hit() ? "cached" : "no-cached") << "result";
}
bool do_retry = false;
std::ostringstream no_retry_reason;
if (compiler_info_state_.disabled()) {
no_retry_reason << "compiler disabled. no retry."
<< " disabled_reason="
<< compiler_info_state_.GetDisabledReason();
} else if (!service_->http_client()->IsHealthyRecently()) {
no_retry_reason << "http is unhealthy. no retry."
<< " health_status="
<< service_->http_client()->GetHealthStatusMessage();
} else {
stats_->set_exec_request_retry(stats_->exec_request_retry() + 1);
do_retry = stats_->exec_request_retry() <= kMaxExecRetry;
if (!do_retry) {
no_retry_reason << "too many retry";
}
}
if (!do_retry)
should_error = true;
AddErrorToResponse(TO_LOG, ss.str(), should_error);
if (do_retry) {
if (!service_->http_client()->IsHealthy()) {
LOG(WARNING) << trace_id_ << " http is unhealthy, but retry."
<< " health_status="
<< service_->http_client()->GetHealthStatusMessage();
}
VLOG(2) << trace_id_
<< " Failed to process file response (we will retry):"
<< resp_->DebugString();
stats_->add_exec_request_retry_reason(ss.str());
LOG(INFO) << trace_id_ << " retry in FILE_RESP";
resp_->clear_error_message();
TryProcessFileRequest();
return;
}
AddErrorToResponse(TO_LOG, no_retry_reason.str(), true);
}
VLOG(2) << trace_id_
<< " Failed to process file response (second time):"
<< resp_->DebugString();
ProcessFinished("failed in file response");
return;
}
if (verify_output_) {
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
for (const auto& info : output_file_) {
const string& filename = info.filename;
const string& tmp_filename = info.tmp_filename;
if (!VerifyOutput(filename, tmp_filename)) {
output_file_success_ = false;
}
}
output_file_.clear();
ProcessFinished("verify done");
return;
}
if (success()) {
ProcessFinished("");
} else {
ClearOutputFile();
ProcessFinished("fail exec");
}
}
void CompileTask::ProcessFinished(const string& msg) {
if (abort_ || !msg.empty()) {
LOG(INFO) << trace_id_ << " finished " << msg
<< " state=" << StateName(state_)
<< " abort=" << abort_;
} else {
VLOG(1) << trace_id_ << " finished " << msg
<< " state=" << StateName(state_);
DCHECK(success()) << trace_id_ << " finished";
DCHECK_EQ(FILE_RESP, state_) << trace_id_ << " finished";
}
CHECK(BelongsToCurrentThread());
CHECK_LT(state_, FINISHED);
DCHECK(!finished_);
finished_ = true;
if (state_ == INIT) {
// failed to find local compiler path.
// it also happens if user uses old gomacc.
LOG(ERROR) << trace_id_ << " failed in INIT.";
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
CHECK(!abort_);
state_ = FINISHED;
ReplyResponse("failed in INIT");
return;
}
if (!abort_)
state_ = FINISHED;
if (verify_output_) {
VLOG(2) << trace_id_ << " verify response:" << resp_->DebugString();
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
ReplyResponse("verify done");
return;
}
if (precompiling_ && service_->enable_gch_hack()) {
// In gch hack mode, we'll run both local and remote simultaneously.
if (subproc_ != nullptr) {
// subprocess still running.
// we'll reply response when subprocess is finished.
return;
}
// subprocess finished first.
CHECK(delayed_setup_subproc_ == nullptr);
VLOG(1) << trace_id_ << " gch hack: local and goma finished.";
ProcessReply();
return;
}
if (!requester_env_.fallback()) {
VLOG(1) << trace_id_ << " goma finished and no fallback.";
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
ProcessReply();
return;
}
if (abort_) {
// local finished first (race or verify output).
if (local_output_file_callback_ == nullptr)
Done();
// If local_output_file_callback_ is not nullptr, uploading local output
// file is on the fly, so ProcessLocalFileOutputDone() will be called
// later.
return;
}
CHECK_EQ(FINISHED, state_);
if (success() || !IsGomaccRunning() || !want_fallback_) {
if (!success() && !want_fallback_) {
LOG(INFO) << trace_id_ << " failed and no need to fallback";
} else {
VLOG(1) << trace_id_ << " success or gomacc killed.";
}
stats_->clear_local_run_reason();
if (delayed_setup_subproc_ != nullptr) {
delayed_setup_subproc_->Cancel();
delayed_setup_subproc_ = nullptr;
}
if (subproc_ != nullptr) {
LOG(INFO) << trace_id_ << " goma finished, killing subproc pid="
<< subproc_->started().pid();
KillSubProcess(); // FinishSubProcess will be called.
} else {
ProcessReply(); // GOMA_FALLBACK=false or GOMA_USE_LOCAL=false
}
return;
}
LOG(INFO) << trace_id_ << " fail fallback"
<< " exit=" << resp_->result().exit_status()
<< " cache_key=" << resp_->cache_key()
<< " flag=" << flag_dump_;
DCHECK(requester_env_.fallback());
DCHECK(!fail_fallback_);
stdout_ = resp_->result().stdout_buffer();
stderr_ = resp_->result().stderr_buffer();
LogCompilerOutput(trace_id_, "stdout", stdout_);
LogCompilerOutput(trace_id_, "stderr", stderr_);
fail_fallback_ = true;
// TODO: active fail fallback only for http error?
// b/36576025 b/36577821
if (!service_->IncrementActiveFailFallbackTasks()) {
AddErrorToResponse(
TO_USER, "reached max number of active fail fallbacks", true);
if (delayed_setup_subproc_ != nullptr) {
delayed_setup_subproc_->Cancel();
delayed_setup_subproc_ = nullptr;
}
if (subproc_ != nullptr) {
LOG(INFO) << trace_id_ << " killing subproc pid="
<< subproc_->started().pid();
KillSubProcess(); // FinishSubProcess will be called.
} else {
ProcessReply(); // GOMA_FALLBACK=false or GOMA_USE_LOCAL=false
}
return;
}
if (subproc_ == nullptr) {
// subproc_ might be nullptr (e.g. GOMA_USE_LOCAL=false).
SetupSubProcess();
}
RunSubProcess(msg);
}
void CompileTask::ProcessReply() {
VLOG(1) << trace_id_ << " process reply";
DCHECK(BelongsToCurrentThread());
CHECK_EQ(FINISHED, state_);
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
CHECK(!abort_);
string msg;
if (IsGomaccRunning()) {
VLOG(2) << trace_id_ << " goma result:" << resp_->DebugString();
if (local_run_ && service_->dont_kill_subprocess()) {
// if we ran local process and dont_kill_subprocess is true, we just
// use local results, so we don't need to rename remote outputs.
CommitOutput(false);
msg = "goma success, but local used";
} else {
CommitOutput(true);
if (local_cache_hit()) {
msg = "goma success (local cache hit)";
} else if (cache_hit()) {
msg = "goma success (cache hit)";
} else {
msg = "goma success";
}
}
if (LocalOutputCache::IsEnabled()) {
if (!local_cache_hit() && !local_output_cache_key_.empty() && success()) {
// Here, local or remote output has been performed,
// and output cache key exists.
// Note: we need to save output before ReplyResponse. Otherwise,
// output file might be removed by ninja.
if (!LocalOutputCache::instance()->SaveOutput(local_output_cache_key_,
req_.get(),
resp_.get(),
trace_id_)) {
LOG(ERROR) << trace_id_ << " failed to save localoutputcache";
}
}
}
} else {
msg = "goma canceled";
}
if (!subproc_stdout_.empty()) remove(subproc_stdout_.c_str());
if (!subproc_stderr_.empty()) remove(subproc_stderr_.c_str());
ReplyResponse(msg);
}
struct CompileTask::RenameParam {
string oldpath;
string newpath;
};
void CompileTask::RenameCallback(RenameParam* param, string* err) {
err->clear();
int r = rename(param->oldpath.c_str(), param->newpath.c_str());
if (r == 0) {
return;
}
// if errno != EEXIST, log, AddErrorToResponse and returns without
// setting *err (so no retry in DoOutput), since non-EEXIST error might
// not be worth to retry?
std::ostringstream ss;
ss << "rename error:" << param->oldpath << " " << param->newpath
<< " errno=" << errno;
*err = ss.str();
}
struct CompileTask::ContentOutputParam {
ContentOutputParam() : info(nullptr) {}
string filename;
OutputFileInfo* info;
};
void CompileTask::ContentOutputCallback(
ContentOutputParam* param, string* err) {
err->clear();
remove(param->filename.c_str());
std::unique_ptr<FileServiceClient::Output> fout(
FileServiceClient::FileOutput(param->filename, param->info->mode));
if (!fout->IsValid()) {
std::ostringstream ss;
ss << "open for write error:" << param->filename;
*err = ss.str();
return;
}
if (!fout->WriteAt(0L, param->info->content) || !fout->Close()) {
std::ostringstream ss;
ss << "write error:" << param->filename;
*err = ss.str();
return;
}
}
#ifdef _WIN32
void CompileTask::DoOutput(const string& opname, const string& filename,
PermanentClosure* closure, string* err) {
static const int kMaxDeleteRetryForDoOutput = 5;
// Large sleep time will not harm a normal user.
// Followings are executed after termination of the child process,
// and deletion usually succeeds without retrying.
static const int kInitialRetrySleepInMs = 100;
// On Posix, rename success if target file already exists and it is
// in writable directory.
// On Win32, rename will fail if target file already exists, so we
// need to delete it explicitly before rename.
// In this code, we assume a file is temporary locked by a process
// like AntiVirus, and the lock will be released for a while.
//
// You may consider to use MoveFileEx with MOVEFILE_REPLACE_EXISTING.
// Calling it may take forever and stall compiler_proxy if the process
// having the lock is not behaving. As a result, we do not use it.
int sleep_in_ms = kInitialRetrySleepInMs;
for (int retry = 0; retry < kMaxDeleteRetryForDoOutput; ++retry) {
closure->Run();
if (err->empty()) {
return;
}
LOG(WARNING) << trace_id_ << " DoOutput operation failed."
<< " opname=" << opname
<< " filename=" << filename
<< " err=" << *err;
// TODO: identify a process that has a file lock.
// As far as I know, people seems to use NtQueryInformationProcess,
// which is an obsoleted function, to list up processes.
// http://msdn.microsoft.com/en-us/library/windows/desktop/aa364944(v=vs.85).aspx
DWORD attr = GetFileAttributesA(filename.c_str());
if (attr == INVALID_FILE_ATTRIBUTES) {
LOG_SYSRESULT(GetLastError());
std::ostringstream ss;
ss << opname << " failed but GetFileAttributes "
<< "returns INVALID_FILE_ATTRIBUTES"
<< " filename=" << filename
<< " attr=" << attr;
AddErrorToResponse(TO_USER, ss.str(), true);
return;
}
LOG(INFO) << trace_id_ << " "
<< "The file exists. We need to remove."
<< " filename=" << filename
<< " attr=" << attr;
if (remove(filename.c_str()) == 0) {
LOG(INFO) << trace_id_ << " "
<< "Delete succeeds."
<< " filename=" << filename;
continue;
}
LOG(WARNING) << trace_id_ << " "
<< "Failed to delete file:"
<< " filename=" << filename
<< " retry=" << retry
<< " sleep_in_ms=" << sleep_in_ms;
Sleep(sleep_in_ms);
sleep_in_ms *= 2;
}
if (err->empty()) {
std::ostringstream ss;
ss << opname << " failed but err is empty?";
*err = ss.str();
}
PLOG(ERROR) << trace_id_ << " " << *err;
AddErrorToResponse(TO_USER, *err, true);
}
#else
void CompileTask::DoOutput(const string& opname,
const string& filename,
PermanentClosure* closure,
string* err) {
closure->Run();
if (!err->empty()) {
PLOG(ERROR) << trace_id_ << " DoOutput operation failed."
<< " opname=" << opname
<< " filename=" << filename
<< " err=" << *err;
AddErrorToResponse(TO_USER, *err, true);
}
}
#endif
void CompileTask::RewriteCoffTimestamp(const string& filename) {
absl::string_view ext = file::Extension(filename);
if (ext != "obj")
return;
ScopedFd fd(ScopedFd::OpenForRewrite(filename));
if (!fd.valid()) {
LOG(ERROR) << trace_id_ << " failed to open file for coff rewrite: "
<< filename;
return;
}
// Check COFF file header. COFF header is like this.
// c.f. http://delorie.com/djgpp/doc/coff/
// 0-1 version. must be 0x014c for x86, 0x8664 for x64
// 2-3 number of sections (not necessary for us)
// 4-7 timestamp
// ...
//
// All numeric fields are stored in host native order.
// Currently we're checking magic is x86 or x64, all numeric
// should be little endian here.
//
// When /bigobj is specified in cl.exe, microsoft extends COFF file format
// to accept more sections.
// In this case, the file header is like this:
// 0-1 0x0000 (IMAGE_FILE_MACHINE_UNKNOWN)
// 2-3 0xFFFF
// 4-5 version (0x0001 or 0x0002)
// 6-7 machine (0x014c or 0x8664)
// 8-11 timestamp
// 12-27 uuid: 38feb30ca5d9ab4dac9bd6b6222653c2 for version 0x0001
// c7a1bad1eebaa94baf20faf66aa4dcb8 for version 0x0002
//
// TODO: Find bigobj version 1 document and add link here.
unsigned char buf[32];
ssize_t read_byte = fd.Read(buf, sizeof(buf));
if (read_byte != sizeof(buf)) {
LOG(ERROR) << trace_id_
<< " couldn't read the first " << sizeof(buf)
<< " byte. file is too small?"
<< " filename=" << filename
<< " read_byte=" << read_byte;
return;
}
unsigned short magic = *reinterpret_cast<unsigned short*>(buf);
int offset = 0;
if (magic == 0x014c || magic == 0x8664) {
offset = 4;
} else if (IsBigobjFormat(buf)) {
offset = 8;
}
if (offset > 0) {
unsigned int old = *reinterpret_cast<unsigned int*>(buf + offset);
unsigned int now = time(nullptr);
fd.Seek(offset, ScopedFd::SeekAbsolute);
fd.Write(&now, 4);
LOG(INFO) << trace_id_
<< " Rewriting timestamp:" << " file=" << filename
<< " offset=" << offset
<< " old=" << old << " new=" << now;
return;
}
std::stringstream ss;
for (size_t i = 0; i < sizeof(buf); ++i) {
ss << std::hex << std::setw(2) << std::setfill('0')
<< (static_cast<unsigned int>(buf[i]) & 0xFF);
}
LOG(ERROR) << trace_id_
<< " Unknown COFF header."
<< " filename=" << filename
<< " first " << sizeof(buf) << "byte=" << ss.str();
return;
}
void CompileTask::CommitOutput(bool use_remote) {
VLOG(1) << trace_id_ << " commit output " << use_remote;
DCHECK(BelongsToCurrentThread());
CHECK(state_ == FINISHED);
CHECK(!abort_);
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
std::vector<string> output_bases;
bool has_obj = false;
for (auto& info : output_file_) {
SimpleTimer timer;
const string& filename = info.filename;
const string& tmp_filename = info.tmp_filename;
const string& hash_key = info.hash_key;
DCHECK(!hash_key.empty()) << filename;
const bool use_content = tmp_filename.empty();
bool need_rename = !tmp_filename.empty() && tmp_filename != filename;
if (!use_remote) {
// If use_remote is false, we should have outputs of local process.
VLOG(1) << trace_id_ << " commit output (use local) in "
<< filename;
if (access(filename.c_str(), R_OK) == 0) {
if (need_rename) {
// We might have written tmp file for remote output, but decided
// to use local output.
// In this case, we want to remove tmp file of remote output.
remove(tmp_filename.c_str());
}
} else {
// !use_remote, but local output doesn't exist?
PLOG(ERROR) << trace_id_ << " " << filename;
}
if (use_content) {
VLOG(1) << trace_id_ << " release buffer of remote output";
service_->ReleaseOutputBuffer(info.size, &info.content);
}
need_rename = false;
} else if (use_content) {
// If use_remote is true, and use_content is true,
// write content (remote output) in filename.
VLOG(1) << trace_id_ << " commit output (use remote content) to "
<< filename;
ContentOutputParam param;
param.filename = filename;
param.info = &info;
string err;
std::unique_ptr<PermanentClosure> callback(
NewPermanentCallback(
this,
&CompileTask::ContentOutputCallback,
&param, &err));
DoOutput("content_output", filename, callback.get(), &err);
service_->ReleaseOutputBuffer(info.size, &info.content);
need_rename = false;
} else if (need_rename) {
// If use_remote is true, use_content is false, and
// need_rename is true, we wrote remote output in
// tmp_filename, and we need to rename tmp_filename
// to filename.
VLOG(1) << trace_id_ << " commit output (use remote tmp file) "
<< "rename " << tmp_filename << " => " << filename;
RenameParam param;
param.oldpath = tmp_filename;
param.newpath = filename;
string err;
std::unique_ptr<PermanentClosure> callback(
NewPermanentCallback(
this, &CompileTask::RenameCallback, &param, &err));
DoOutput("rename", filename, callback.get(), &err);
} else {
// If use_remote is true, use_content is false, and
// need_rename is false, we wrote remote output in
// filename, so do nothing here.
VLOG(1) << trace_id_ << " commit output (use remote file) in "
<< filename;
}
// Incremental Link doesn't work well if object file timestamp is wrong.
// If it's Windows object file (.obj) from remote,
// we'd like to rewrite timestamp when the content is from remote cache.
// According to our measurement, this doesn't have
// measureable performance penalty.
// see b/24388745
if (use_remote && stats_->cache_hit() &&
flags_->type() == CompilerFlagType::Clexe) {
// We should not rewrite coff if /Brepro or something similar is set.
// See b/72768585
const VCFlags& vc_flag = static_cast<const VCFlags&>(*flags_);
if (!vc_flag.has_Brepro()) {
RewriteCoffTimestamp(filename);
}
}
service_->RecordOutputRename(need_rename);
// The output file is generated in goma cache, so we believe the cache_key
// is valid. It would be used in link phase.
service_->file_hash_cache()->StoreFileCacheKey(
filename, hash_key, absl::Now(),
output_file_stat_cache_->Get(filename));
VLOG(1) << trace_id_ << " "
<< tmp_filename << " -> " << filename
<< " " << hash_key;
LOG_IF(ERROR, !info.content.empty())
<< trace_id_ << " content was not released: " << filename;
int ms = timer.GetInIntMilliseconds();
LOG_IF(WARNING, ms > 100) << trace_id_
<< " CommitOutput " << ms << " msec"
<< " size=" << info.size
<< " filename=" << info.filename;
absl::string_view output_base = file::Basename(info.filename);
output_bases.push_back(string(output_base));
absl::string_view ext = file::Extension(output_base);
if (flags_->type() == CompilerFlagType::Gcc && ext == "o") {
has_obj = true;
} else if (flags_->type() == CompilerFlagType::Clexe && ext == "obj") {
has_obj = true;
} else if (flags_->type() == CompilerFlagType::Javac && ext == "class") {
has_obj = true;
}
}
output_file_.clear();
// TODO: For clang-tidy, maybe we don't need to output
// no obj warning?
if (has_obj) {
LOG(INFO) << trace_id_ << " CommitOutput num=" << output_bases.size()
<< " cache_key=" << resp_->cache_key()
<< ": " << output_bases;
} else {
LOG(WARNING) << trace_id_ << " CommitOutput num=" << output_bases.size()
<< " no obj: cache_key=" << resp_->cache_key()
<< ": " << output_bases;
}
}
void CompileTask::ReplyResponse(const string& msg) {
LOG(INFO) << trace_id_ << " ReplyResponse: " << msg;
DCHECK(BelongsToCurrentThread());
CHECK(state_ == FINISHED || state_ == LOCAL_FINISHED || abort_);
CHECK(rpc_ != nullptr);
CHECK(rpc_resp_ != nullptr);
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
if (failed() || fail_fallback_) {
auto allowed_error_duration = service_->AllowedNetworkErrorDuration();
time_t error_start_time =
service_->http_client()->NetworkErrorStartedTime();
if (allowed_error_duration.has_value() && error_start_time > 0) {
time_t now = time(nullptr);
// TODO: Convert to absl::Duration.
const int allowed_error_duration_sec =
absl::ToInt64Seconds(*allowed_error_duration);
if (now > error_start_time + allowed_error_duration_sec) {
AddErrorToResponse(
TO_USER, "network error continued for a long time", true);
}
}
}
if (resp_->has_result()) {
VLOG(1) << trace_id_ << " exit=" << resp_->result().exit_status();
stats_->set_exec_exit_status(resp_->result().exit_status());
} else {
LOG(WARNING) << trace_id_ << " empty result";
stats_->set_exec_exit_status(-256);
}
if (service_->local_run_for_failed_input() && flags_.get() != nullptr) {
service_->RecordInputResult(flags_->input_filenames(),
stats_->exec_exit_status() == 0);
}
if (resp_->error_message_size() != 0) {
std::vector<string> errs(resp_->error_message().begin(),
resp_->error_message().end());
LOG_IF(ERROR, resp_->result().exit_status() == 0)
<< trace_id_ << " should not have error message on exit_status=0."
<< " errs=" << errs;
service_->RecordErrorsToUser(errs);
}
UpdateStats();
*rpc_resp_ = *resp_;
// Here, rpc_resp_ has created, so we can set gomacc_resp_size. b/109783082
stats_->gomacc_resp_size = rpc_resp_->ByteSize();
OneshotClosure* done = done_;
done_ = nullptr;
rpc_resp_ = nullptr;
rpc_ = nullptr;
if (done) {
service_->wm()->RunClosureInThread(
FROM_HERE,
caller_thread_id_, done, WorkerThreadManager::PRIORITY_IMMEDIATE);
}
if (!canceled_ && stats_->exec_exit_status() != 0) {
if (exit_status_ == 0 && subproc_exit_status_ == 0) {
stats_->set_compiler_proxy_error(true);
LOG(ERROR) << trace_id_ << " compilation failure "
<< "due to compiler_proxy error.";
}
}
responsecode_ = 200;
stats_->set_handler_time(handler_timer_.GetInIntMilliseconds());
gomacc_pid_ = SubProcessState::kInvalidPid;
static const int kSlowTaskInMs = 5 * 60 * 1000; // 5 mins
if (stats_->handler_time() > kSlowTaskInMs) {
ExecLog stats = *stats_;
// clear non-stats fields.
stats.clear_username();
stats.clear_nodename();
stats.clear_port();
stats.clear_compiler_proxy_start_time();
stats.clear_task_id();
stats.clear_compiler_proxy_user_agent();
stats.clear_start_time();
stats.clear_arg();
stats.clear_env();
stats.clear_cwd();
stats.clear_expanded_arg();
stats.clear_command_version();
stats.clear_command_target();
LOG(ERROR) << trace_id_ << " SLOW:" << stats.DebugString();
}
// if abort_, remote process is still on the fly.
// Done() will be called later in ProcessFinished.
if (abort_)
CHECK(!finished_);
// if local_output_file_callback_ is not nullptr, uploading local output file
// is on the fly, so ProcessLocalFileOutputDone() will be called later.
if (finished_ && local_output_file_callback_ == nullptr) {
CHECK_GE(state_, FINISHED);
CHECK_EQ(0, num_local_output_file_task_);
Done();
}
}
void CompileTask::ProcessLocalFileOutput() {
VLOG(1) << trace_id_ << " local output";
CHECK(BelongsToCurrentThread());
CHECK(local_output_file_callback_ == nullptr);
CHECK_EQ(0, num_local_output_file_task_);
if (!service_->store_local_run_output())
return;
SetLocalOutputFileCallback();
std::vector<OneshotClosure*> closures;
for (const auto& output_file : flags_->output_files()) {
const string& filename =
file::JoinPathRespectAbsolute(flags_->cwd(), output_file);
// only uploads *.o
if (!absl::EndsWith(filename, ".o"))
continue;
string hash_key;
const FileStat& output_file_stat = output_file_stat_cache_->Get(filename);
bool found_in_cache = service_->file_hash_cache()->GetFileCacheKey(
filename, absl::nullopt, output_file_stat, &hash_key);
if (found_in_cache) {
VLOG(1) << "file:" << filename << " already on cache: " << hash_key;
continue;
}
LOG(INFO) << trace_id_ << " local output:" << filename;
std::unique_ptr<LocalOutputFileTask> local_output_file_task(
new LocalOutputFileTask(
service_->wm(),
service_->file_service()->WithRequesterInfoAndTraceId(
requester_info_, trace_id_),
service_->file_hash_cache(), output_file_stat_cache_->Get(filename),
this, filename));
LocalOutputFileTask* local_output_file_task_pointer =
local_output_file_task.get();
closures.push_back(
NewCallback(
local_output_file_task_pointer,
&LocalOutputFileTask::Run,
NewCallback(
this,
&CompileTask::LocalOutputFileTaskFinished,
std::move(local_output_file_task))));
}
if (closures.empty()) {
VLOG(1) << trace_id_ << " no local output upload";
service_->wm()->RunClosureInThread(
FROM_HERE,
thread_id_,
NewCallback(
this,
&CompileTask::MaybeRunLocalOutputFileCallback, false),
WorkerThreadManager::PRIORITY_LOW);
return;
}
for (auto* closure : closures)
service_->wm()->RunClosure(
FROM_HERE, closure, WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::ProcessLocalFileOutputDone() {
VLOG(1) << trace_id_ << " local output done";
CHECK(BelongsToCurrentThread());
local_output_file_callback_ = nullptr;
if (finished_) {
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
Done();
return;
}
// if !finished_, remote call is still on the fly, and eventually
// ProcessFinished will be called, and Done will be called
// because local_output_file_callback_ is already nullptr.
}
void CompileTask::Done() {
VLOG(1) << trace_id_ << " Done";
// FINISHED: normal case.
// LOCAL_FINISHED: fallback by should_fallback_.
// abort_: idle fallback.
if (!abort_)
CHECK_GE(state_, FINISHED);
CHECK(rpc_ == nullptr) << trace_id_
<< " " << StateName(state_) << " abort:" << abort_;
CHECK(rpc_resp_ == nullptr);
CHECK(done_ == nullptr);
CHECK(subproc_ == nullptr);
CHECK(delayed_setup_subproc_ == nullptr);
CHECK(input_file_callback_ == nullptr);
CHECK(output_file_callback_ == nullptr);
CHECK(local_output_file_callback_ == nullptr);
ClearOutputFile();
// If compile failed, delete deps cache entry here.
if (DepsCache::IsEnabled()) {
if ((failed() || fail_fallback_) && deps_identifier_.has_value()) {
DepsCache::instance()->RemoveDependency(deps_identifier_);
LOG(INFO) << trace_id_ << " remove deps cache entry.";
}
}
SaveInfoFromInputOutput();
service_->CompileTaskDone(this);
VLOG(1) << trace_id_ << " finalized.";
}
void CompileTask::DumpToJson(bool need_detail, Json::Value* root) const {
SubProcessState::State subproc_state = SubProcessState::NUM_STATE;
pid_t subproc_pid = static_cast<pid_t>(SubProcessState::kInvalidPid);
{
AUTOLOCK(lock, &mu_);
if (subproc_ != nullptr) {
subproc_state = subproc_->state();
subproc_pid = subproc_->started().pid();
}
}
(*root)["id"] = id_;
if ((state_ < FINISHED && !abort_) || state_ == LOCAL_RUN) {
// elapsed total time for current running process.
(*root)["elapsed"] = handler_timer_.GetInIntMilliseconds();
}
if (stats_->handler_time()) (*root)["time"] = stats_->handler_time();
if (gomacc_pid_ != SubProcessState::kInvalidPid)
(*root)["pid"] = gomacc_pid_;
if (!flag_dump_.empty()) (*root)["flag"] = flag_dump_;
if (local_cache_hit()) {
(*root)["cache"] = "local hit";
} else if (stats_->cache_hit()) {
(*root)["cache"] = "hit";
}
(*root)["state"] = StateName(state_);
if (abort_) (*root)["abort"] = 1;
if (subproc_pid != static_cast<pid_t>(SubProcessState::kInvalidPid)) {
(*root)["subproc_state"] =
SubProcessState::State_Name(subproc_state);
(*root)["subproc_pid"] = Json::Value::Int64(subproc_pid);
}
string major_factor_str = stats_->major_factor();
if (!major_factor_str.empty())
(*root)["major_factor"] = major_factor_str;
if (stats_->has_exec_command_version_mismatch()) {
(*root)["command_version_mismatch"] =
stats_->exec_command_version_mismatch();
}
if (stats_->has_exec_command_binary_hash_mismatch()) {
(*root)["command_binary_hash_mismatch"] =
stats_->exec_command_binary_hash_mismatch();
}
if (stats_->has_exec_command_subprograms_mismatch()) {
(*root)["command_subprograms_mismatch"] =
stats_->exec_command_subprograms_mismatch();
}
// for task color.
if (responsecode_) (*root)["http"] = responsecode_;
if (stats_->exec_exit_status())
(*root)["exit"] = stats_->exec_exit_status();
if (stats_->exec_request_retry())
(*root)["retry"] = stats_->exec_request_retry();
if (fail_fallback_) (*root)["fail_fallback"]= 1;
if (stats_->goma_error())
(*root)["goma_error"] = 1;
if (stats_->compiler_proxy_error())
(*root)["compiler_proxy_error"] = 1;
if (canceled_)
(*root)["canceled"] = 1;
// additional message
if (gomacc_revision_mismatched_) {
(*root)["gomacc_revision_mismatch"] = 1;
}
if (need_detail) {
struct tm local_start_time;
char timebuf[64];
const time_t start_time = static_cast<time_t>(stats_->start_time());
#ifndef _WIN32
localtime_r(&start_time, &local_start_time);
strftime(timebuf, sizeof timebuf, "%Y-%m-%d %H:%M:%S %z",
&local_start_time);
#else
localtime_s(&local_start_time, &start_time);
strftime(timebuf, sizeof timebuf, "%Y-%m-%d %H:%M:%S ",
&local_start_time);
long tzoff = 0;
_get_timezone(&tzoff);
char tzsign = tzoff >= 0 ? '-' : '+';
tzoff = abs(tzoff);
sprintf_s(timebuf + strlen(timebuf), sizeof timebuf - strlen(timebuf),
"%c%02d%02d",
tzsign, tzoff / 3600, (tzoff % 3600) / 60);
#endif
(*root)["start_time"] = timebuf;
if (stats_->has_latest_input_filename()) {
(*root)["latest_input_filename"] =
stats_->latest_input_filename();
}
if (stats_->has_latest_input_mtime()) {
(*root)["input_wait"] =
stats_->start_time() - stats_->latest_input_mtime();
}
if (stats_->num_total_input_file())
(*root)["total_input"] = stats_->num_total_input_file();
if (stats_->num_uploading_input_file_size() > 0) {
(*root)["uploading_input"] = Json::Value::Int64(
SumRepeatedInt32(stats_->num_uploading_input_file()));
}
if (num_input_file_task_ > 0) {
(*root)["num_input_file_task"] = num_input_file_task_;
}
if (stats_->num_missing_input_file_size() > 0) {
(*root)["missing_input"] = Json::Value::Int64(
SumRepeatedInt32(stats_->num_missing_input_file()));
}
if (stats_->compiler_info_process_time()) {
(*root)["compiler_info_process_time"] =
stats_->compiler_info_process_time();
}
// When depscache_used() is true, we ran include_preprocessor but its
// processing time was 0ms. So, we'd like to show it.
if (stats_->include_preprocess_time() || stats_->depscache_used()) {
(*root)["include_preprocess_time"] = stats_->include_preprocess_time();
}
if (stats_->depscache_used()) {
(*root)["depscache_used"] =
(stats_->depscache_used() ? "true" : "false");
}
if (stats_->include_fileload_time()) {
(*root)["include_fileload_time"] = stats_->include_fileload_time();
}
if (stats_->include_fileload_pending_time_size()) {
int64_t sum = SumRepeatedInt32(stats_->include_fileload_pending_time());
if (sum) {
(*root)["include_fileload_pending_time"] = Json::Value::Int64(sum);
}
}
if (stats_->include_fileload_run_time_size()) {
int64_t sum = SumRepeatedInt32(stats_->include_fileload_run_time());
if (sum) {
(*root)["include_fileload_run_time"] = Json::Value::Int64(sum);
}
}
if (stats_->rpc_call_time_size()) {
(*root)["rpc_call_time"] = Json::Value::Int64(
SumRepeatedInt32(stats_->rpc_call_time()));
}
if (stats_->file_response_time())
(*root)["file_response_time"] = stats_->file_response_time();
if (stats_->gomacc_req_size)
(*root)["gomacc_req_size"] = Json::Value::Int64(stats_->gomacc_req_size);
if (stats_->gomacc_resp_size)
(*root)["gomacc_resp_size"] =
Json::Value::Int64(stats_->gomacc_resp_size);
{
AUTOLOCK(lock, &mu_);
if (http_rpc_status_.get()) {
if (!http_rpc_status_->response_header.empty()) {
(*root)["response_header"] =
http_rpc_status_->response_header;
}
}
}
if (stats_->rpc_req_size_size() > 0) {
(*root)["exec_req_size"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_req_size()));
}
if (stats_->rpc_master_trace_id_size() > 0) {
string masters = absl::StrJoin(stats_->rpc_master_trace_id(), " ");
(*root)["exec_rpc_master"] = masters;
}
if (stats_->rpc_throttle_time_size() > 0) {
(*root)["exec_throttle_time"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_throttle_time()));
}
if (stats_->rpc_pending_time_size() > 0) {
(*root)["exec_pending_time"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_pending_time()));
}
if (stats_->rpc_req_build_time_size() > 0) {
(*root)["exec_req_build_time"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_req_build_time()));
}
if (stats_->rpc_req_send_time_size() > 0) {
(*root)["exec_req_send_time"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_req_send_time()));
}
if (stats_->rpc_wait_time_size() > 0) {
(*root)["exec_wait_time"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_wait_time()));
}
if (stats_->rpc_resp_size_size() > 0) {
(*root)["exec_resp_size"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_resp_size()));
}
if (stats_->rpc_resp_recv_time_size() > 0) {
(*root)["exec_resp_recv_time"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_resp_recv_time()));
}
if (stats_->rpc_resp_parse_time_size() > 0) {
(*root)["exec_resp_parse_time"] =
Json::Value::Int64(SumRepeatedInt32(stats_->rpc_resp_parse_time()));
}
if (stats_->has_local_run_reason()) {
(*root)["local_run_reason"] =
stats_->local_run_reason();
}
if (stats_->local_delay_time() > 0)
(*root)["local_delay_ms"] = stats_->local_delay_time();
if (stats_->local_pending_time() > 0)
(*root)["local_pending_ms"] = stats_->local_pending_time();
if (stats_->local_run_time() > 0)
(*root)["local_run_ms"] = stats_->local_run_time();
if (stats_->local_mem_kb() > 0)
(*root)["local_mem_kb"] = Json::Value::Int64(stats_->local_mem_kb());
if (stats_->local_output_file_time_size() > 0) {
(*root)["local_output_file_time"] = Json::Value::Int64(
SumRepeatedInt32(stats_->local_output_file_time()));
}
if (stats_->local_output_file_size_size() > 0) {
(*root)["local_output_file_size"] = Json::Value::Int64(
SumRepeatedInt32(stats_->local_output_file_size()));
}
if (stats_->output_file_size_size() > 0) {
(*root)["output_file_size"] =
Json::Value::Int64(SumRepeatedInt32(stats_->output_file_size()));
}
if (stats_->chunk_resp_size_size() > 0) {
(*root)["chunk_resp_size"] =
Json::Value::Int64(SumRepeatedInt32(stats_->chunk_resp_size()));
}
if (stats_->output_file_rpc)
(*root)["output_file_rpc"] = Json::Value::Int64(stats_->output_file_rpc);
if (stats_->output_file_rpc_req_build_time) {
(*root)["output_file_rpc_req_build_time"] =
Json::Value::Int64(stats_->output_file_rpc_req_build_time);
}
if (stats_->output_file_rpc_req_send_time) {
(*root)["output_file_rpc_req_send_time"] =
Json::Value::Int64(stats_->output_file_rpc_req_send_time);
}
if (stats_->output_file_rpc_wait_time) {
(*root)["output_file_rpc_wait_time"] =
Json::Value::Int64(stats_->output_file_rpc_wait_time);
}
if (stats_->output_file_rpc_resp_recv_time) {
(*root)["output_file_rpc_resp_recv_time"] =
Json::Value::Int64(stats_->output_file_rpc_resp_recv_time);
}
if (stats_->output_file_rpc_resp_parse_time) {
(*root)["output_file_rpc_resp_parse_time"] =
Json::Value::Int64(stats_->output_file_rpc_resp_parse_time);
}
if (exec_output_file_.size() > 0) {
Json::Value exec_output_file(Json::arrayValue);
for (size_t i = 0; i < exec_output_file_.size(); ++i) {
exec_output_file.append(exec_output_file_[i]);
}
(*root)["exec_output_file"] = exec_output_file;
}
if (!resp_cache_key_.empty())
(*root)["cache_key"] = resp_cache_key_;
if (stats_->exec_request_retry_reason_size() > 0) {
Json::Value exec_output_retry_reason(Json::arrayValue);
for (int i = 0; i < stats_->exec_request_retry_reason_size(); ++i) {
exec_output_retry_reason.append(
stats_->exec_request_retry_reason(i));
}
(*root)["exec_request_retry_reason"] = exec_output_retry_reason;
}
if (exec_error_message_.size() > 0) {
Json::Value error_message(Json::arrayValue);
for (size_t i = 0; i < exec_error_message_.size(); ++i) {
error_message.append(exec_error_message_[i]);
}
(*root)["error_message"] = error_message;
}
if (!stats_->cwd().empty())
(*root)["cwd"] = stats_->cwd();
if (!orig_flag_dump_.empty())
(*root)["orig_flag"] = orig_flag_dump_;
if (stats_->env_size() > 0) {
Json::Value env(Json::arrayValue);
for (int i = 0; i < stats_->env_size(); ++i) {
env.append(stats_->env(i));
}
(*root)["env"] = env;
}
if (!stdout_.empty())
(*root)["stdout"] = stdout_;
if (!stderr_.empty())
(*root)["stderr"] = stderr_;
Json::Value inputs(Json::arrayValue);
for (std::set<string>::const_iterator iter = required_files_.begin();
iter != required_files_.end();
++iter) {
inputs.append(*iter);
}
(*root)["inputs"] = inputs;
if (system_library_paths_.size() > 0) {
Json::Value system_library_paths(Json::arrayValue);
for (size_t i = 0; i < system_library_paths_.size(); ++i) {
system_library_paths.append(system_library_paths_[i]);
}
(*root)["system_library_paths"] = system_library_paths;
}
} else {
(*root)["summaryOnly"] = 1;
}
}
// ----------------------------------------------------------------
// state_: INIT
void CompileTask::CopyEnvFromRequest() {
CHECK_EQ(INIT, state_);
requester_env_ = req_->requester_env();
want_fallback_ = requester_env_.fallback();
req_->clear_requester_env();
for (const auto& arg : req_->arg())
stats_->add_arg(arg);
for (const auto& env : req_->env())
stats_->add_env(env);
stats_->set_cwd(req_->cwd());
gomacc_pid_ = req_->requester_info().pid();
if (service_->CanSendUserInfo()) {
if (!service_->username().empty())
req_->mutable_requester_info()->set_username(service_->username());
stats_->set_username(req_->requester_info().username());
stats_->set_nodename(service_->nodename());
}
req_->mutable_requester_info()->set_compiler_proxy_id(
GenerateCompilerProxyId());
stats_->set_port(rpc_->server_port());
// TODO: Update stats_ to use absl/time.
stats_->set_compiler_proxy_start_time(absl::ToTimeT(service_->start_time()));
stats_->set_task_id(id_);
requester_info_ = req_->requester_info();
}
string CompileTask::GenerateCompilerProxyId() const {
std::ostringstream s;
s << service_->compiler_proxy_id_prefix() << id_;
return s.str();
}
// static
bool CompileTask::IsLocalCompilerPathValid(
const string& trace_id,
const ExecReq& req, const CompilerFlags* flags) {
// Compiler_proxy will resolve local_compiler_path
// if gomacc is masqueraded or prepended compiler is basename.
// No need to think this as error.
if (!req.command_spec().has_local_compiler_path()) {
return true;
}
// If local_compiler_path exists, it must be the same compiler_name with
// flag_'s.
const string name = CompilerFlagTypeSpecific::GetCompilerNameFromArg(
req.command_spec().local_compiler_path());
if (req.command_spec().has_name() &&
req.command_spec().name() != name) {
LOG(ERROR) << trace_id << " compiler name mismatches."
<< " command_spec.name=" << req.command_spec().name()
<< " name=" << name;
return false;
}
if (flags && flags->compiler_name() != name) {
LOG(ERROR) << trace_id << " compiler name mismatches."
<< " flags.compiler_name=" << flags->compiler_name()
<< " name=" << name;
return false;
}
return true;
}
// static
void CompileTask::RemoveDuplicateFiles(const std::string& cwd,
std::set<std::string>* filenames) {
FlatMap<std::string, std::string> path_map;
path_map.reserve(filenames->size());
std::set<std::string> unique_files;
for (const auto& filename : *filenames) {
std::string abs_filename = file::JoinPathRespectAbsolute(cwd, filename);
auto p = path_map.emplace(std::move(abs_filename), filename);
if (p.second) {
unique_files.insert(filename);
continue;
}
// If there is already registered filename, compare and take shorter one.
// If length is same, take lexicographically smaller one.
const std::string& existing_filename = p.first->second;
if (filename.size() < existing_filename.size() ||
(filename.size() == existing_filename.size() &&
filename < existing_filename)) {
unique_files.erase(existing_filename);
unique_files.insert(filename);
p.first->second = filename;
}
}
*filenames = std::move(unique_files);
}
void CompileTask::InitCompilerFlags() {
CHECK_EQ(INIT, state_);
std::vector<string> args(req_->arg().begin(), req_->arg().end());
VLOG(1) << trace_id_ << " " << args;
flags_ = CompilerFlagsParser::New(args, req_->cwd());
if (flags_.get() == nullptr) {
return;
}
compiler_type_specific_ =
service_->compiler_type_specific_collection()->Get(flags_->type());
flag_dump_ = flags_->DebugString();
if (flags_->type() == CompilerFlagType::Gcc) {
const GCCFlags& gcc_flag = static_cast<const GCCFlags&>(*flags_);
linking_ = (gcc_flag.mode() == GCCFlags::LINK);
precompiling_ = gcc_flag.is_precompiling_header();
} else if (flags_->type() == CompilerFlagType::Clexe) {
// TODO: check linking_ etc.
} else if (flags_->type() == CompilerFlagType::ClangTidy) {
// Sets the actual gcc_flags for clang_tidy_flags here.
ClangTidyFlags& clang_tidy_flags = static_cast<ClangTidyFlags&>(*flags_);
if (clang_tidy_flags.input_filenames().size() != 1) {
LOG(WARNING) << trace_id_ << " Input file is not unique.";
clang_tidy_flags.set_is_successful(false);
return;
}
const string& input_file = clang_tidy_flags.input_filenames()[0];
const string input_file_abs =
file::JoinPathRespectAbsolute(clang_tidy_flags.cwd(), input_file);
string compdb_path = CompilationDatabaseReader::FindCompilationDatabase(
clang_tidy_flags.build_path(), file::Dirname(input_file_abs));
std::vector<string> clang_args;
string build_dir;
if (!CompilationDatabaseReader::MakeClangArgs(clang_tidy_flags,
compdb_path,
&clang_args,
&build_dir)) {
// Failed to make clang args. Then Mark CompilerFlags unsuccessful.
LOG(WARNING) << trace_id_
<< " Failed to make clang args. local fallback.";
clang_tidy_flags.set_is_successful(false);
return;
}
DCHECK(!build_dir.empty());
clang_tidy_flags.SetCompilationDatabasePath(compdb_path);
clang_tidy_flags.SetClangArgs(clang_args, build_dir);
}
}
bool CompileTask::FindLocalCompilerPath() {
CHECK_EQ(INIT, state_);
CHECK(flags_.get());
// If gomacc sets local_compiler_path, just use it.
if (!req_->command_spec().local_compiler_path().empty()) {
string local_compiler = PathResolver::PlatformConvert(
req_->command_spec().local_compiler_path());
// TODO: confirm why local_compiler_path should not be
// basename, and remove the code if possible.
// local_compiler_path should not be basename only.
if (local_compiler.find(PathResolver::kPathSep) == string::npos) {
LOG(ERROR) << trace_id_ << " local_compiler_path should not be basename:"
<< local_compiler;
} else if (service_->FindLocalCompilerPath(
requester_env_.gomacc_path(),
local_compiler,
stats_->cwd(),
requester_env_.local_path(),
pathext_,
&local_compiler,
&local_path_)) {
// Since compiler_info resolves relative path to absolute path,
// we do not need to make local_comiler_path to absolute path
// any more. (b/6340137, b/28088682)
if (!pathext_.empty() &&
!absl::EndsWith(local_compiler,
req_->command_spec().local_compiler_path())) {
// PathExt should be resolved on Windows. Let me use it.
req_->mutable_command_spec()->set_local_compiler_path(local_compiler);
}
return true;
}
return false;
}
if (!requester_env_.has_local_path() ||
requester_env_.local_path().empty()) {
LOG(ERROR) << "no PATH in requester env." << requester_env_.DebugString();
AddErrorToResponse(TO_USER,
"no PATH in requester env. Using old gomacc?", true);
return false;
}
if (!requester_env_.has_gomacc_path()) {
LOG(ERROR) << "no gomacc path in requester env."
<< requester_env_.DebugString();
AddErrorToResponse(TO_USER,
"no gomacc in requester env. Using old gomacc?", true);
return false;
}
string local_compiler_path;
if (service_->FindLocalCompilerPath(
requester_env_.gomacc_path(),
flags_->compiler_base_name(),
stats_->cwd(),
requester_env_.local_path(),
pathext_,
&local_compiler_path,
&local_path_)) {
req_->mutable_command_spec()->set_local_compiler_path(
local_compiler_path);
return true;
}
return false;
}
bool CompileTask::ShouldFallback() const {
CHECK_EQ(INIT, state_);
CHECK(flags_.get());
if (!requester_env_.verify_command().empty())
return false;
if (!flags_->is_successful()) {
service_->RecordForcedFallbackInSetup(CompileService::kFailToParseFlags);
LOG(INFO) << trace_id_
<< " force fallback. failed to parse compiler flags.";
return true;
}
if (flags_->input_filenames().empty()) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback. no input files give.";
return true;
}
if (flags_->type() == CompilerFlagType::Gcc) {
const GCCFlags& gcc_flag = static_cast<const GCCFlags&>(*flags_);
if (gcc_flag.is_stdin_input()) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback."
<< " cannot use stdin as input in goma backend.";
return true;
}
if (gcc_flag.has_wrapper()) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback. -wrapper is not supported";
return true;
}
if (!verify_output_ && gcc_flag.mode() == GCCFlags::PREPROCESS) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback. preprocess is usually light-weight.";
return true;
}
if (!service_->enable_gch_hack() && precompiling_) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback. gch hack is not enabled and precompiling.";
return true;
}
if (!service_->enable_remote_link() && linking_) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback linking.";
return true;
}
absl::string_view ext = file::Extension(flags_->input_filenames()[0]);
if (ext == "s" || ext == "S") {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback. assembler should be light-weight.";
return true;
}
} else if (flags_->type() == CompilerFlagType::Clexe) {
const VCFlags& vc_flag = static_cast<const VCFlags&>(*flags_);
// GOMA doesn't work with PCH so we generate it only for local builds.
if (!vc_flag.creating_pch().empty()) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback. cannot create pch in goma backend.";
return true;
}
if (vc_flag.require_mspdbserv()) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback. cannot run mspdbserv in goma backend.";
return true;
}
} else if (flags_->type() == CompilerFlagType::Javac) {
const JavacFlags& javac_flag = static_cast<const JavacFlags&>(*flags_);
// TODO: remove following code when goma backend get ready.
// Force fallback a compile request with -processor (b/38215808)
if (!javac_flag.processors().empty()) {
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
LOG(INFO) << trace_id_
<< " force fallback to avoid running annotation processor in"
<< " goma backend (b/38215808)";
return true;
}
} else if (flags_->type() == CompilerFlagType::Java) {
LOG(INFO) << trace_id_
<< " force fallback to avoid running java program in"
<< " goma backend";
return true;
}
#ifndef _WIN32
// TODO: check "NUL", "CON", "AUX" on windows?
for (const auto & input_filename : flags_->input_filenames()) {
const string input = file::JoinPathRespectAbsolute(
flags_->cwd(), input_filename);
struct stat st;
if (stat(input.c_str(), &st) != 0) {
PLOG(INFO) << trace_id_ << " " << input << ": stat error";
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
return true;
}
if (!S_ISREG(st.st_mode)) {
LOG(INFO) << trace_id_ << " " << input << " not regular file";
service_->RecordForcedFallbackInSetup(
CompileService::kNoRemoteCompileSupported);
return true;
}
}
#endif
// TODO: fallback input file should be flag of compiler proxy?
if (requester_env_.fallback_input_file_size() == 0)
return false;
std::vector<string> fallback_input_files(
requester_env_.fallback_input_file().begin(),
requester_env_.fallback_input_file().end());
std::sort(fallback_input_files.begin(), fallback_input_files.end());
for (const auto& input_filename : flags_->input_filenames()) {
if (binary_search(fallback_input_files.begin(),
fallback_input_files.end(),
input_filename)) {
service_->RecordForcedFallbackInSetup(CompileService::kRequestedByUser);
return true;
}
}
return false;
}
bool CompileTask::ShouldVerifyOutput() const {
CHECK_EQ(INIT, state_);
return requester_env_.verify_output();
}
SubProcessReq::Weight CompileTask::GetTaskWeight() const {
CHECK_EQ(INIT, state_);
int weight_score = req_->arg_size();
if (linking_)
weight_score *= 10;
if (weight_score > 1000)
return SubProcessReq::HEAVY_WEIGHT;
return SubProcessReq::LIGHT_WEIGHT;
}
bool CompileTask::ShouldStopGoma() const {
if (verify_output_)
return false;
if (precompiling_ && service_->enable_gch_hack())
return false;
if (subproc_ == nullptr) {
DCHECK(!abort_);
return false;
}
if (IsSubprocRunning()) {
if (service_->dont_kill_subprocess()) {
// When dont_kill_subprocess is true, we'll ignore remote results and
// always use local results, so calling remote is not useless when
// subprocess is already running.
return true;
}
if (service_->local_run_preference() >= state_)
return true;
}
if (stats_->exec_request_retry() > 1) {
int num_pending = SubProcessTask::NumPending();
// Prefer local when pendings are few.
return num_pending <= service_->max_subprocs_pending();
}
if (service_->http_client()->ramp_up() == 0) {
// If http blocked (i.e. got 302, 403 error), stop calling remote.
LOG(INFO) << trace_id_ << " stop goma. http disabled";
return true;
}
return false;
}
// ----------------------------------------------------------------
// state_: SETUP
void CompileTask::FillCompilerInfo() {
CHECK_EQ(SETUP, state_);
compiler_info_timer_.Start();
std::vector<string> key_envs(stats_->env().begin(), stats_->env().end());
std::vector<string> run_envs(key_envs);
if (!local_path_.empty())
run_envs.push_back("PATH=" + local_path_);
#ifdef _WIN32
if (!pathext_.empty())
run_envs.push_back("PATHEXT=" + pathext_);
if (flags_->type() == CompilerFlagType::Clexe) {
run_envs.push_back("TMP=" + service_->tmp_dir());
run_envs.push_back("TEMP=" + service_->tmp_dir());
}
#endif
std::unique_ptr<CompileService::GetCompilerInfoParam> param(
new CompileService::GetCompilerInfoParam);
param->thread_id = service_->wm()->GetCurrentThreadId();
param->trace_id = trace_id_;
DCHECK_NE(
req_->command_spec().local_compiler_path().find(PathResolver::kPathSep),
string::npos)
<< trace_id_ << " expect local_compiler_path is relative path"
" or absolute path but " << req_->command_spec().local_compiler_path();
param->key = CompilerInfoCache::CreateKey(
*flags_,
req_->command_spec().local_compiler_path(),
key_envs);
param->flags = flags_.get();
param->run_envs = run_envs;
CompileService::GetCompilerInfoParam* param_pointer = param.get();
service_->GetCompilerInfo(
param_pointer,
NewCallback(
this, &CompileTask::FillCompilerInfoDone, std::move(param)));
}
void CompileTask::FillCompilerInfoDone(
std::unique_ptr<CompileService::GetCompilerInfoParam> param) {
CHECK_EQ(SETUP, state_);
int msec = compiler_info_timer_.GetInIntMilliseconds();
stats_->set_compiler_info_process_time(msec);
std::ostringstream ss;
ss << " cache_hit=" << param->cache_hit
<< " updated=" << param->updated
<< " state=" << param->state.get()
<< " in " << msec << " msec";
if (msec > 1000) {
LOG(WARNING) << trace_id_ << " SLOW fill compiler info"
<< ss.str();
} else {
LOG(INFO) << trace_id_ << " fill compiler info"
<< ss.str();
}
if (param->state.get() == nullptr) {
AddErrorToResponse(TO_USER,
"something wrong trying to get compiler info.", true);
service_->RecordForcedFallbackInSetup(
CompileService::kFailToGetCompilerInfo);
SetupRequestDone(false);
return;
}
compiler_info_state_ = std::move(param->state);
DCHECK(compiler_info_state_.get() != nullptr);
if (compiler_info_state_.get()->info().HasError()) {
// In this case, it found local compiler, but failed to get necessary
// information, such as system include paths.
// It would happen when multiple -arch options are used.
if (requester_env_.fallback()) {
// Force to fallback mode to handle this case.
should_fallback_ = true;
service_->RecordForcedFallbackInSetup(
CompileService::kFailToGetCompilerInfo);
}
AddErrorToResponse(should_fallback_ ? TO_LOG : TO_USER,
compiler_info_state_.get()->info().error_message(),
true);
SetupRequestDone(false);
return;
}
if (compiler_info_state_.disabled()) {
// In this case, it found local compiler, but not in server side
// (by past compile task).
if (service_->hermetic_fallback() || requester_env_.fallback()) {
should_fallback_ = true;
service_->RecordForcedFallbackInSetup(CompileService::kCompilerDisabled);
}
// we already responded "<local compiler path> is disabled" when it
// was disabled the compiler info, so won't show the same error message
// to user.
AddErrorToResponse(TO_LOG, "compiler is disabled", true);
SetupRequestDone(false);
return;
}
if (service_->hermetic()) {
req_->set_hermetic_mode(true);
}
#ifndef _WIN32
if (service_->use_relative_paths_in_argv()) {
MakeWeakRelativeInArgv();
}
#endif
MayUpdateSubprogramSpec();
UpdateExpandedArgs();
ModifyRequestArgs();
ModifyRequestEnvs();
UpdateCommandSpec();
stats_->set_command_version(req_->command_spec().version());
stats_->set_command_target(req_->command_spec().target());
UpdateRequiredFiles();
}
void CompileTask::UpdateRequiredFiles() {
CHECK_EQ(SETUP, state_);
include_timer_.Start();
include_wait_timer_.Start();
// TODO: Move compiler-specific code to CompilerTypeSpecific.
if (flags_->type() == CompilerFlagType::Gcc) {
const GCCFlags& gcc_flag = static_cast<const GCCFlags&>(*flags_);
if (gcc_flag.lang() == "ir") {
if (gcc_flag.thinlto_index().empty()) {
// No need to read .imports file.
UpdateRequiredFilesDone(true);
return;
}
// ThinLTO backend phase.
GetThinLTOImports();
return;
}
if (gcc_flag.mode() != GCCFlags::LINK) {
CHECK(!linking_);
GetIncludeFiles();
return;
}
if (gcc_flag.args().size() == 2 &&
gcc_flag.args()[1] == "--version") {
// for requester_env_.verify_command()
VLOG(1) << trace_id_ << " --version";
UpdateRequiredFilesDone(true);
return;
}
// TODO: if input files are not obj/ar, check include files as well?
VLOG(1) << trace_id_ << " link mode";
CHECK(linking_);
GetLinkRequiredFiles();
return;
}
if (flags_->type() == CompilerFlagType::Clexe) {
// TODO: fix for linking_ mode.
GetIncludeFiles();
return;
}
if (flags_->type() == CompilerFlagType::ClangTidy) {
GetIncludeFiles();
return;
}
// Go to the general include processor phase.
StartIncludeProcessor();
}
void CompileTask::UpdateRequiredFilesDone(bool ok) {
if (!ok) {
// Failed to update required_files.
if (requester_env_.verify_command().empty()) {
LOG(INFO) << trace_id_ << " failed to update required files. ";
should_fallback_ = true;
SetupRequestDone(false);
return;
}
VLOG(1) << trace_id_ << "verify_command="
<< requester_env_.verify_command();
}
// Add the input files as well.
for (const auto& input_filename : flags_->input_filenames()) {
required_files_.insert(input_filename);
}
for (const auto& opt_input_filename: flags_->optional_input_filenames()) {
const string& abs_filename = file::JoinPathRespectAbsolute(
stats_->cwd(), opt_input_filename);
if (access(abs_filename.c_str(), R_OK) == 0) {
required_files_.insert(opt_input_filename);
} else {
LOG(WARNING) << trace_id_ << " optional file not found:" << abs_filename;
}
}
// If gomacc sets input file, add them as well.
for (const auto& input : req_->input()) {
required_files_.insert(input.filename());
}
if (VLOG_IS_ON(2)) {
for (const auto& required_file : required_files_) {
LOG(INFO) << trace_id_ << " required files:" << required_file;
}
}
req_->clear_input();
stats_->set_include_preprocess_time(include_timer_.GetInIntMilliseconds());
stats_->set_depscache_used(depscache_used_);
LOG_IF(WARNING, stats_->include_processor_run_time() > 1000)
<< trace_id_ << " SLOW run IncludeProcessor"
<< " required_files=" << required_files_.size()
<< " depscache=" << depscache_used_
<< " in " << stats_->include_processor_run_time() << " msec";
SetupRequestDone(true);
}
void CompileTask::SetupRequestDone(bool ok) {
CHECK_EQ(SETUP, state_);
if (abort_) {
// subproc of local idle was already finished.
ProcessFinished("aborted in setup");
return;
}
if (!ok) {
if (should_fallback_) {
VLOG(1) << trace_id_ << " should fallback by setup failure";
// should_fallback_ expects INIT state when subprocess finishes
// in CompileTask::FinishSubProcess().
state_ = INIT;
if (subproc_ == nullptr)
SetupSubProcess();
RunSubProcess("fallback by setup failure");
return;
}
// no fallback.
AddErrorToResponse(TO_USER, "Failed to setup request", true);
ProcessFinished("fail in setup");
return;
}
TryProcessFileRequest();
}
#ifndef _WIN32
bool CompileTask::MakeWeakRelativeInArgv() {
CHECK_EQ(SETUP, state_);
DCHECK(compiler_info_state_.get() != nullptr);
// Support only C/C++.
if (compiler_info_state_.get()->info().type() != CompilerInfoType::Cxx) {
return false;
}
orig_flag_dump_ = flag_dump_;
// If cwd is in tmp directory, we can't know output path is
// whether ./path/to/output or $TMP/path/to/output.
// If latter, make the path relative would produce wrong output file.
if (HasPrefixDir(req_->cwd(), "/tmp") || HasPrefixDir(req_->cwd(), "/var")) {
LOG(WARNING) << "GOMA_USE_RELATIVE_PATHS_IN_ARGV=true, but cwd may be "
<< "under temp directory: " << req_->cwd() << ". "
<< "Use original args.";
orig_flag_dump_ = "";
return false;
}
bool changed = false;
std::ostringstream ss;
const std::vector<string>& parsed_args = CompilerFlagsUtil::MakeWeakRelative(
flags_->args(), req_->cwd(),
ToCxxCompilerInfo(compiler_info_state_.get()->info()));
for (size_t i = 0; i < parsed_args.size(); ++i) {
if (req_->arg(i) != parsed_args[i]) {
VLOG(1) << "Arg[" << i << "]: " << req_->arg(i) << " => "
<< parsed_args[i];
req_->set_arg(i, parsed_args[i]);
changed = true;
}
ss << req_->arg(i) << " ";
}
flag_dump_ = ss.str();
if (!changed) {
VLOG(1) << "GOMA_USE_RELATIVE_PATHS_IN_ARGV=true, "
<< "but no argv changed";
orig_flag_dump_ = "";
}
return changed;
}
#endif
static void FixCommandSpec(const CompilerInfo& compiler_info,
const CompilerFlags& flags,
CommandSpec* command_spec) {
// Overwrites name in command_spec if possible.
// The name is used for selecting a compiler in goma backend.
// The name set by gomacc could be wrong if a given compiler, especially it is
// cc or c++, is a symlink to non-gcc compiler. Since compiler_info knows
// more details on the compiler, we overwrite the name with the value comes
// from it.
//
// You may think we can use realpath(3) in gomacc. We do not do that because
// of two reasons:
// 1. compiler_info is cached.
// 2. we can know more detailed info there.
if (compiler_info.HasName())
command_spec->set_name(compiler_info.name());
if (!command_spec->has_version())
command_spec->set_version(compiler_info.version());
if (!command_spec->has_target())
command_spec->set_target(compiler_info.target());
command_spec->set_binary_hash(compiler_info.request_compiler_hash());
command_spec->clear_system_include_path();
command_spec->clear_cxx_system_include_path();
command_spec->clear_system_framework_path();
command_spec->clear_system_library_path();
// C++ program should only send C++ include paths, otherwise, include order
// might be wrong. For C program, cxx_system_include_paths would be empty.
// c.f. b/25675250
if (compiler_info.type() == CompilerInfoType::Cxx) {
bool is_cplusplus = false;
if (flags.type() == CompilerFlagType::Gcc) {
is_cplusplus = static_cast<const GCCFlags&>(flags).is_cplusplus();
} else if (flags.type() == CompilerFlagType::Clexe) {
is_cplusplus = static_cast<const VCFlags&>(flags).is_cplusplus();
} else if (flags.type() == CompilerFlagType::ClangTidy) {
is_cplusplus = static_cast<const ClangTidyFlags&>(flags).is_cplusplus();
}
const CxxCompilerInfo& cxxci = ToCxxCompilerInfo(compiler_info);
if (!is_cplusplus) {
for (const auto& path : cxxci.system_include_paths())
command_spec->add_system_include_path(path);
}
for (const auto& path : cxxci.cxx_system_include_paths())
command_spec->add_cxx_system_include_path(path);
for (const auto& path : cxxci.system_framework_paths())
command_spec->add_system_framework_path(path);
}
}
static void FixSystemLibraryPath(const std::vector<string>& library_paths,
CommandSpec* command_spec) {
for (const auto& path : library_paths)
command_spec->add_system_library_path(path);
}
void CompileTask::UpdateExpandedArgs() {
for (const auto& expanded_arg : flags_->expanded_args()) {
req_->add_expanded_arg(expanded_arg);
stats_->add_expanded_arg(expanded_arg);
}
}
void CompileTask::ModifyRequestArgs() {
DCHECK(compiler_info_state_.get() != nullptr);
const CompilerInfo& compiler_info = compiler_info_state_.get()->info();
for (const auto& r : compiler_info.resource()) {
const string& path = r.name;
req_->add_input()->set_filename(path);
LOG(INFO) << trace_id_ << " input automatically added: " << path;
}
bool modified_args = false;
bool use_expanded_args = (req_->expanded_arg_size() > 0);
for (const auto& flag : compiler_info.additional_flags()) {
req_->add_arg(flag);
if (use_expanded_args) {
req_->add_expanded_arg(flag);
}
modified_args = true;
}
if (flags_->type() == CompilerFlagType::Clexe) {
// If /Yu is specified, we add /Y- to tell the backend compiler not
// to try using PCH. We add this here because we don't want to show
// this flag in compiler_proxy's console.
const string& using_pch = static_cast<const VCFlags&>(*flags_).using_pch();
if (!using_pch.empty()) {
req_->add_arg("/Y-");
if (use_expanded_args) {
req_->add_expanded_arg("/Y-");
}
modified_args = true;
}
}
LOG_IF(INFO, modified_args) << trace_id_ << " modified args: "
<< absl::StrJoin(req_->arg(), " ");
}
void CompileTask::ModifyRequestEnvs() {
std::vector<string> envs;
for (const auto& env : req_->env()) {
if (flags_->IsServerImportantEnv(env.c_str())) {
envs.push_back(env);
}
}
if (envs.size() == (size_t)req_->env_size()) {
return;
}
req_->clear_env();
for (const auto& env : envs) {
req_->add_env(env);
}
LOG(INFO) << trace_id_ << " modified env: " << envs;
}
void CompileTask::UpdateCommandSpec() {
CHECK_EQ(SETUP, state_);
command_spec_ = req_->command_spec();
CommandSpec* command_spec = req_->mutable_command_spec();
if (compiler_info_state_.get() == nullptr)
return;
const CompilerInfo& compiler_info = compiler_info_state_.get()->info();
FixCommandSpec(compiler_info, *flags_, command_spec);
}
void CompileTask::MayFixSubprogramSpec(
google::protobuf::RepeatedPtrField<SubprogramSpec>* subprogram_specs)
const {
std::set<string> used_subprogram_name;
subprogram_specs->Clear();
if (compiler_info_state_.get() == nullptr) {
return;
}
for (const auto& info : compiler_info_state_.get()->info().subprograms()) {
DCHECK(file::IsAbsolutePath(info.name))
<< "filename of subprogram is expected to be absolute path."
<< " info.name=" << info.name
<< " info.hash=" << info.hash;
if (!used_subprogram_name.insert(info.name).second) {
LOG(ERROR) << "The same subprogram is added twice. Ignoring."
<< " info.name=" << info.name
<< " info.hash=" << info.hash;
continue;
}
SubprogramSpec* subprog_spec = subprogram_specs->Add();
subprog_spec->set_path(info.name);
subprog_spec->set_binary_hash(info.hash);
}
}
void CompileTask::MayUpdateSubprogramSpec() {
CHECK_EQ(SETUP, state_);
MayFixSubprogramSpec(req_->mutable_subprogram());
if (VLOG_IS_ON(3)) {
for (const auto& subprog_spec : req_->subprogram()) {
LOG(INFO) << trace_id_ << " update subprogram spec:"
<< " path=" << subprog_spec.path()
<< " hash=" << subprog_spec.binary_hash();
}
}
}
struct CompileTask::RunCppIncludeProcessorParam {
RunCppIncludeProcessorParam()
: result_status(false), total_files(0), skipped_files(0) {}
// request
string input_filename;
string abs_input_filename;
// response
bool result_status;
std::set<string> required_files;
int total_files;
int skipped_files;
std::unique_ptr<FileStatCache> file_stat_cache;
private:
DISALLOW_COPY_AND_ASSIGN(RunCppIncludeProcessorParam);
};
void CompileTask::GetIncludeFiles() {
CHECK_EQ(SETUP, state_);
DCHECK(flags_->type() == CompilerFlagType::Gcc ||
flags_->type() == CompilerFlagType::Clexe ||
flags_->type() == CompilerFlagType::ClangTidy);
DCHECK(compiler_info_state_.get() != nullptr);
// We don't support multiple input files.
if (flags_->input_filenames().size() != 1U) {
LOG(ERROR) << trace_id_ << " multiple inputs? "
<< flags_->input_filenames().size()
<< " " << flags_->input_filenames();
AddErrorToResponse(TO_USER, "multiple inputs are not supported. ", true);
UpdateRequiredFilesDone(false);
return;
}
const string& input_filename = flags_->input_filenames()[0];
const string& abs_input_filename =
file::JoinPathRespectAbsolute(flags_->cwd(), input_filename);
if (DepsCache::IsEnabled()) {
DepsCache* dc = DepsCache::instance();
deps_identifier_ = DepsCache::MakeDepsIdentifier(
compiler_info_state_.get()->info(), *flags_);
if (deps_identifier_.has_value() &&
dc->GetDependencies(deps_identifier_, flags_->cwd(), abs_input_filename,
&required_files_, input_file_stat_cache_.get())) {
LOG(INFO) << trace_id_ << " use deps cache. required_files="
<< required_files_.size();
depscache_used_ = true;
UpdateRequiredFilesDone(true);
return;
}
}
std::unique_ptr<RunCppIncludeProcessorParam> param(
new RunCppIncludeProcessorParam);
param->input_filename = input_filename;
param->abs_input_filename = abs_input_filename;
input_file_stat_cache_->ReleaseOwner();
param->file_stat_cache = std::move(input_file_stat_cache_);
OneshotClosure* closure =
NewCallback(this, &CompileTask::RunCppIncludeProcessor, std::move(param));
service_->wm()->RunClosureInPool(
FROM_HERE, service_->include_processor_pool(),
closure,
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::RunCppIncludeProcessor(
std::unique_ptr<RunCppIncludeProcessorParam> param) {
DCHECK(compiler_info_state_.get() != nullptr);
// Pass ownership temporary to IncludeProcessor thread.
param->file_stat_cache->AcquireOwner();
stats_->set_include_processor_wait_time(
include_wait_timer_.GetInIntMilliseconds());
LOG_IF(WARNING, stats_->include_processor_wait_time() > 1000)
<< trace_id_ << " SLOW start IncludeProcessor"
<< " in " << stats_->include_processor_wait_time() << " msec";
SimpleTimer include_timer(SimpleTimer::START);
CppIncludeProcessor include_processor;
param->result_status = include_processor.GetIncludeFiles(
param->input_filename, flags_->cwd_for_include_processor(), *flags_,
ToCxxCompilerInfo(compiler_info_state_.get()->info()),
&param->required_files, param->file_stat_cache.get());
stats_->set_include_processor_run_time(include_timer.GetInIntMilliseconds());
if (!param->result_status) {
LOG(WARNING) << trace_id_
<< " Unsupported feature detected "
<< "in our pseudo includer! "
<< flags_->DebugString();
}
param->total_files = include_processor.total_files();
param->skipped_files = include_processor.skipped_files();
// Back ownership from IncludeProcessor thread to CompileTask thread.
param->file_stat_cache->ReleaseOwner();
service_->wm()->RunClosureInThread(
FROM_HERE, thread_id_,
NewCallback(this, &CompileTask::RunCppIncludeProcessorDone,
std::move(param)),
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::RunCppIncludeProcessorDone(
std::unique_ptr<RunCppIncludeProcessorParam> param) {
DCHECK(BelongsToCurrentThread());
input_file_stat_cache_ = std::move(param->file_stat_cache);
input_file_stat_cache_->AcquireOwner();
required_files_.swap(param->required_files);
stats_->set_include_preprocess_total_files(param->total_files);
stats_->set_include_preprocess_skipped_files(param->skipped_files);
if (DepsCache::IsEnabled()) {
if (param->result_status && deps_identifier_.has_value()) {
DepsCache* dc = DepsCache::instance();
if (!dc->SetDependencies(deps_identifier_, flags_->cwd(),
param->abs_input_filename, required_files_,
input_file_stat_cache_.get())) {
LOG(INFO) << trace_id_ << " failed to save dependencies.";
}
}
}
UpdateRequiredFilesDone(param->result_status);
}
struct CompileTask::RunLinkerInputProcessorParam {
RunLinkerInputProcessorParam() : result_status(false) {}
// request
// response
bool result_status;
std::set<string> required_files;
std::vector<string> system_library_paths;
private:
DISALLOW_COPY_AND_ASSIGN(RunLinkerInputProcessorParam);
};
void CompileTask::GetLinkRequiredFiles() {
CHECK_EQ(SETUP, state_);
DCHECK(compiler_info_state_.get() != nullptr);
std::unique_ptr<RunLinkerInputProcessorParam> param(
new RunLinkerInputProcessorParam);
OneshotClosure* closure =
NewCallback(
this, &CompileTask::RunLinkerInputProcessor, std::move(param));
service_->wm()->RunClosureInPool(
FROM_HERE, service_->include_processor_pool(),
closure,
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::RunLinkerInputProcessor(
std::unique_ptr<RunLinkerInputProcessorParam> param) {
DCHECK(compiler_info_state_.get() != nullptr);
LinkerInputProcessor linker_input_processor(
flags_->args(), flags_->cwd());
param->result_status = linker_input_processor.GetInputFilesAndLibraryPath(
compiler_info_state_.get()->info(),
req_->command_spec(),
&param->required_files,
&param->system_library_paths);
if (!param->result_status) {
LOG(WARNING) << trace_id_
<< " Failed to get input files "
<< flags_->DebugString();
}
service_->wm()->RunClosureInThread(
FROM_HERE, thread_id_,
NewCallback(
this, &CompileTask::RunLinkerInputProcessorDone, std::move(param)),
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::RunLinkerInputProcessorDone(
std::unique_ptr<RunLinkerInputProcessorParam> param) {
DCHECK(BelongsToCurrentThread());
required_files_.swap(param->required_files);
system_library_paths_.swap(param->system_library_paths);
FixSystemLibraryPath(system_library_paths_, req_->mutable_command_spec());
UpdateRequiredFilesDone(param->result_status);
}
struct CompileTask::ReadThinLTOImportsParam {
ReadThinLTOImportsParam() {}
ReadThinLTOImportsParam(ReadThinLTOImportsParam&&) = delete;
ReadThinLTOImportsParam(const ReadThinLTOImportsParam&) = delete;
ReadThinLTOImportsParam& operator=(const ReadThinLTOImportsParam&) = delete;
ReadThinLTOImportsParam& operator=(ReadThinLTOImportsParam&&) = delete;
// request
// response
std::set<string> required_files;
};
void CompileTask::GetThinLTOImports() {
CHECK_EQ(SETUP, state_);
std::unique_ptr<ReadThinLTOImportsParam> param(new ReadThinLTOImportsParam);
OneshotClosure* closure =
NewCallback(this, &CompileTask::ReadThinLTOImports, std::move(param));
service_->wm()->RunClosureInPool(
FROM_HERE, service_->include_processor_pool(),
closure,
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::ReadThinLTOImports(
std::unique_ptr<ReadThinLTOImportsParam> param) {
DCHECK_EQ(CompilerFlagType::Gcc, flags_->type());
const GCCFlags& gcc_flags = static_cast<const GCCFlags&>(*flags_);
ThinLTOImportProcessor processor;
if (!processor.GetIncludeFiles(gcc_flags.thinlto_index(), gcc_flags.cwd(),
&param->required_files)) {
LOG(ERROR) << trace_id_ << " failed to get ThinLTO imports";
}
service_->wm()->RunClosureInThread(
FROM_HERE, thread_id_,
NewCallback(this, &CompileTask::ReadThinLTOImportsDone, std::move(param)),
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::ReadThinLTOImportsDone(
std::unique_ptr<ReadThinLTOImportsParam> param) {
DCHECK(BelongsToCurrentThread());
required_files_.swap(param->required_files);
UpdateRequiredFilesDone(true);
}
struct CompileTask::IncludeProcessorRequestParam {
std::unique_ptr<FileStatCache> file_stat_cache;
};
struct CompileTask::IncludeProcessorResponseParam {
CompilerTypeSpecific::IncludeProcessorResult result;
// Move file_stat_cache from request.
std::unique_ptr<FileStatCache> file_stat_cache;
};
void CompileTask::StartIncludeProcessor() {
VLOG(1) << "StartIncludeProcessor";
CHECK_EQ(SETUP, state_);
// TODO: DepsCache handling here.
auto request_param = absl::make_unique<IncludeProcessorRequestParam>();
input_file_stat_cache_->ReleaseOwner();
request_param->file_stat_cache = std::move(input_file_stat_cache_);
OneshotClosure* closure = NewCallback(this, &CompileTask::RunIncludeProcessor,
std::move(request_param));
service_->wm()->RunClosureInPool(
FROM_HERE, service_->include_processor_pool(),
closure,
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::RunIncludeProcessor(
std::unique_ptr<IncludeProcessorRequestParam> request_param) {
VLOG(1) << "RunIncludeProcessor";
DCHECK(compiler_info_state_.get() != nullptr);
// Pass ownership temporary to IncludeProcessor thread.
request_param->file_stat_cache->AcquireOwner();
stats_->set_include_processor_wait_time(
include_wait_timer_.GetInIntMilliseconds());
LOG_IF(WARNING, stats_->include_processor_wait_time() > 1000)
<< trace_id_ << " SLOW start IncludeProcessor"
<< " in " << stats_->include_processor_wait_time() << " msec";
SimpleTimer include_timer(SimpleTimer::START);
CompilerTypeSpecific::IncludeProcessorResult result =
compiler_type_specific_->RunIncludeProcessor(
trace_id_, *flags_, compiler_info_state_.get()->info(),
req_->command_spec(), request_param->file_stat_cache.get());
stats_->set_include_processor_run_time(include_timer.GetInIntMilliseconds());
auto response_param = absl::make_unique<IncludeProcessorResponseParam>();
response_param->result = std::move(result);
response_param->file_stat_cache = std::move(request_param->file_stat_cache);
response_param->file_stat_cache->ReleaseOwner();
service_->wm()->RunClosureInThread(
FROM_HERE, thread_id_,
NewCallback(this, &CompileTask::RunIncludeProcessorDone,
std::move(response_param)),
WorkerThreadManager::PRIORITY_LOW);
}
void CompileTask::RunIncludeProcessorDone(
std::unique_ptr<IncludeProcessorResponseParam> response_param) {
VLOG(1) << "RunIncludeProcessorDone";
DCHECK(BelongsToCurrentThread());
DCHECK(response_param->file_stat_cache.get() != nullptr);
input_file_stat_cache_ = std::move(response_param->file_stat_cache);
input_file_stat_cache_->AcquireOwner();
required_files_ = std::move(response_param->result.required_files);
if (!response_param->result.system_library_paths.empty()) {
system_library_paths_ =
std::move(response_param->result.system_library_paths);
FixSystemLibraryPath(system_library_paths_, req_->mutable_command_spec());
}
if (response_param->result.total_files) {
stats_->set_include_preprocess_total_files(
*response_param->result.total_files);
}
if (response_param->result.skipped_files) {
stats_->set_include_preprocess_skipped_files(
*response_param->result.skipped_files);
}
if (!response_param->result.ok) {
LOG(WARNING) << trace_id_ << "include processor failed"
<< " error_reason=" << response_param->result.error_reason
<< " flags=" << flags_->DebugString();
if (response_param->result.error_to_user) {
AddErrorToResponse(TO_USER, response_param->result.error_reason, true);
}
}
// TODO: DepsCache handling here.
UpdateRequiredFilesDone(response_param->result.ok);
}
// ----------------------------------------------------------------
// state_: FILE_REQ.
void CompileTask::SetInputFileCallback() {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_REQ, state_);
CHECK(!input_file_callback_);
input_file_callback_ = NewCallback(
this, &CompileTask::ProcessFileRequestDone);
num_input_file_task_ = 0;
input_file_success_ = true;
}
void CompileTask::StartInputFileTask() {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_REQ, state_);
++num_input_file_task_;
}
void CompileTask::InputFileTaskFinished(InputFileTask* input_file_task) {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_REQ, state_);
if (abort_) {
VLOG(1) << trace_id_ << "aborted ";
input_file_success_ = false;
input_file_task->Done(this);
return;
}
const string& filename = input_file_task->filename();
const string& hash_key = input_file_task->hash_key();
const ssize_t file_size = input_file_task->file_size();
const time_t mtime = input_file_task->mtime();
VLOG(1) << trace_id_ << " input done:" << filename;
if (mtime > stats_->latest_input_mtime()) {
stats_->set_latest_input_filename(filename);
stats_->set_latest_input_mtime(mtime);
}
if (!input_file_task->success()) {
AddErrorToResponse(TO_LOG, "Create file blob failed for:" + filename, true);
input_file_success_ = false;
input_file_task->Done(this);
return;
}
DCHECK(!hash_key.empty()) << filename;
stats_->add_input_file_time(input_file_task->timer().GetInIntMilliseconds());
stats_->add_input_file_size(file_size);
if (!input_file_task->UpdateInputInTask(this)) {
LOG(ERROR) << trace_id_ << " bad input data "
<< filename;
input_file_success_ = false;
}
const HttpClient::Status& http_status =
input_file_task->http_status();
stats_->input_file_rpc_size += http_status.req_size;
stats_->input_file_rpc_raw_size += http_status.raw_req_size;
input_file_task->Done(this);
}
void CompileTask::MaybeRunInputFileCallback(bool task_finished) {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_REQ, state_);
OneshotClosure* closure = nullptr;
if (task_finished) {
--num_input_file_task_;
VLOG(1) << trace_id_ << " input remain=" << num_input_file_task_;
if (num_input_file_task_ > 0)
return;
}
CHECK_EQ(0, num_input_file_task_);
if (input_file_callback_) {
closure = input_file_callback_;
input_file_callback_ = nullptr;
}
if (closure)
closure->Run();
}
// ----------------------------------------------------------------
// state_: CALL_EXEC.
void CompileTask::CheckCommandSpec() {
CHECK_EQ(CALL_EXEC, state_);
if (!resp_->result().has_command_spec()) {
return;
}
// Checks all mismatches first, then decide behavior later.
bool is_name_mismatch = false;
bool is_target_mismatch = false;
bool is_binary_hash_mismatch = false;
bool is_version_mismatch = false;
bool is_subprograms_mismatch = false;
const CommandSpec& req_command_spec = req_->command_spec();
const CommandSpec& resp_command_spec = resp_->result().command_spec();
const string message_on_mismatch(
"local:" + CreateCommandVersionString(req_command_spec) +
" but remote:" +
CreateCommandVersionString(resp_command_spec));
if (req_command_spec.name() != resp_command_spec.name()) {
is_name_mismatch = true;
std::ostringstream ss;
ss << trace_id_ << " compiler name mismatch:"
<< " local:" << req_command_spec.name()
<< " remote:" << resp_command_spec.name();
AddErrorToResponse(TO_LOG, ss.str(), false);
stats_->set_exec_command_name_mismatch(message_on_mismatch);
}
if (req_command_spec.target() != resp_command_spec.target()) {
is_target_mismatch = true;
std::ostringstream ss;
ss << trace_id_ << " compiler target mismatch:"
<< " local:" << req_command_spec.target()
<< " remote:" << resp_command_spec.target();
AddErrorToResponse(TO_LOG, ss.str(), false);
stats_->set_exec_command_target_mismatch(message_on_mismatch);
}
if (req_command_spec.binary_hash() != resp_command_spec.binary_hash()) {
is_binary_hash_mismatch = true;
LOG(WARNING) << trace_id_ << " compiler binary hash mismatch:"
<< " local:" << req_command_spec.binary_hash()
<< " remote:" << resp_command_spec.binary_hash();
stats_->set_exec_command_binary_hash_mismatch(message_on_mismatch);
}
if (req_command_spec.version() != resp_command_spec.version()) {
is_version_mismatch = true;
LOG(WARNING) << trace_id_ << " compiler version mismatch:"
<< " local:" << req_command_spec.version()
<< " remote:" << resp_command_spec.version();
stats_->set_exec_command_version_mismatch(message_on_mismatch);
}
if (!IsSameSubprograms(*req_, *resp_)) {
is_subprograms_mismatch = true;
std::ostringstream local_subprograms;
DumpSubprograms(req_->subprogram(), &local_subprograms);
std::ostringstream remote_subprograms;
DumpSubprograms(resp_->result().subprogram(), &remote_subprograms);
LOG(WARNING) << trace_id_ << " compiler subprograms mismatch:"
<< " local:" << local_subprograms.str()
<< " remote:" << remote_subprograms.str();
std::ostringstream ss;
ss << "local:" << CreateCommandVersionString(req_command_spec)
<< " subprogram:" << local_subprograms.str()
<< " but remote:" << CreateCommandVersionString(resp_command_spec)
<< " subprogram:" << remote_subprograms.str();
stats_->set_exec_command_subprograms_mismatch(ss.str());
}
if (service_->hermetic()) {
bool mismatch = false;
// Check if remote used the same command spec.
if (is_name_mismatch) {
mismatch = true;
AddErrorToResponse(TO_USER, "compiler name mismatch", true);
}
if (is_target_mismatch) {
mismatch = true;
AddErrorToResponse(TO_USER, "compiler target mismatch", true);
}
if (is_binary_hash_mismatch) {
mismatch = true;
AddErrorToResponse(TO_USER, "compiler binary hash mismatch", true);
}
if (is_version_mismatch) {
AddErrorToResponse(TO_USER, "compiler version mismatch", true);
mismatch = true;
}
if (is_subprograms_mismatch) {
AddErrorToResponse(TO_USER, "subprograms mismatch", true);
mismatch = true;
}
if (mismatch) {
if (service_->DisableCompilerInfo(compiler_info_state_.get(),
"hermetic mismatch")) {
AddErrorToResponse(
TO_USER,
req_->command_spec().local_compiler_path() + " is disabled.",
true);
}
want_fallback_ = service_->hermetic_fallback();
if (want_fallback_ != requester_env_.fallback()) {
LOG(INFO) << trace_id_ << " hermetic mismatch: fallback changed from "
<< requester_env_.fallback()
<< " to " << want_fallback_;
}
}
return;
}
if (is_name_mismatch || is_target_mismatch) {
AddErrorToResponse(TO_USER, "compiler name or target mismatch", true);
if (service_->DisableCompilerInfo(compiler_info_state_.get(),
"compiler name or target mismatch")) {
AddErrorToResponse(
TO_USER,
req_->command_spec().local_compiler_path() + " is disabled.",
true);
}
return;
}
// TODO: drop command_check_level support in the future.
// GOMA_HERMETIC should be recommended.
if (is_binary_hash_mismatch) {
string error_message;
bool set_error = false;
if (service_->RecordCommandSpecBinaryHashMismatch(
stats_->exec_command_binary_hash_mismatch())) {
error_message = "compiler binary hash mismatch: " +
stats_->exec_command_binary_hash_mismatch();
}
if (service_->command_check_level() == "checksum") {
set_error = true;
}
if (!requester_env_.verify_command().empty()) {
if (requester_env_.verify_command() == "checksum" ||
requester_env_.verify_command() == "all") {
AddErrorToResponse(TO_LOG, "", true);
resp_->mutable_result()->set_stderr_buffer(
"compiler binary hash mismatch: " +
stats_->exec_command_binary_hash_mismatch() + "\n" +
resp_->mutable_result()->stderr_buffer());
}
// ignore when other verify command mode.
} else if (!error_message.empty()) {
error_message =
(set_error ? "Error: " : "Warning: ") + error_message;
AddErrorToResponse(TO_USER, error_message, set_error);
}
}
if (is_version_mismatch) {
string error_message;
bool set_error = false;
if (service_->RecordCommandSpecVersionMismatch(
stats_->exec_command_version_mismatch())) {
error_message = "compiler version mismatch: " +
stats_->exec_command_version_mismatch();
}
if (service_->command_check_level() == "version") {
set_error = true;
}
if (!requester_env_.verify_command().empty()) {
if (requester_env_.verify_command() == "version" ||
requester_env_.verify_command() == "all") {
AddErrorToResponse(TO_LOG, "", true);
resp_->mutable_result()->set_stderr_buffer(
"compiler version mismatch: " +
stats_->exec_command_version_mismatch() + "\n" +
resp_->mutable_result()->stderr_buffer());
}
// ignore when other verify command mode.
} else if (!error_message.empty()) {
error_message =
(set_error ? "Error: " : "Warning: ") + error_message;
AddErrorToResponse(TO_USER, error_message, set_error);
}
}
if (is_subprograms_mismatch) {
std::ostringstream error_message;
bool set_error = false;
std::set<string> remote_hashes;
for (const auto& subprog : resp_->result().subprogram()) {
remote_hashes.insert(subprog.binary_hash());
}
for (const auto& subprog : req_->subprogram()) {
if (remote_hashes.find(subprog.binary_hash()) != remote_hashes.end()) {
continue;
}
std::ostringstream ss;
ss << subprog.path() << " " << subprog.binary_hash();
if (service_->RecordSubprogramMismatch(ss.str())) {
if (!error_message.str().empty()) {
error_message << std::endl;
}
error_message << "subprogram mismatch: "
<< ss.str();
}
}
if (service_->command_check_level() == "checksum") {
set_error = true;
}
if (!requester_env_.verify_command().empty()) {
if (requester_env_.verify_command() == "checksum" ||
requester_env_.verify_command() == "all") {
AddErrorToResponse(TO_LOG, "", true);
resp_->mutable_result()->set_stderr_buffer(
error_message.str() + "\n" +
resp_->mutable_result()->stderr_buffer());
}
// ignore when other verify command mode.
} else if (!error_message.str().empty()) {
AddErrorToResponse(
TO_USER,
(set_error ? "Error: " : "Warning: ") + error_message.str(),
set_error);
}
}
}
void CompileTask::CheckNoMatchingCommandSpec(const string& retry_reason) {
CHECK_EQ(CALL_EXEC, state_);
// If ExecResult does not have CommandSpec, goma backend did not try
// to find the compiler. No need to check mismatches.
if (!resp_->result().has_command_spec()) {
return;
}
bool is_compiler_missing = false;
bool is_subprogram_missing = false;
// If ExecResult has incomplete CommandSpec, it means that goma backend
// tried to select a matching compiler but failed.
if (!resp_->result().command_spec().has_binary_hash()) {
is_compiler_missing = true;
}
if (!IsSameSubprograms(*req_, *resp_)) {
is_subprogram_missing = true;
}
// Nothing is missing.
if (!is_compiler_missing && !is_subprogram_missing) {
return;
}
std::ostringstream local_subprograms;
std::ostringstream remote_subprograms;
DumpSubprograms(req_->subprogram(), &local_subprograms);
DumpSubprograms(resp_->result().subprogram(), &remote_subprograms);
std::ostringstream what_missing;
if (is_compiler_missing) {
LOG(WARNING) << trace_id_
<< " compiler not found:"
<< " local: "
<< CreateCommandVersionString(req_->command_spec())
<< " remote: none";
what_missing << "compiler("
<< CreateCommandVersionString(req_->command_spec())
<< ")";
}
if (is_subprogram_missing) {
LOG(WARNING) << trace_id_
<< " subprogram not found:"
<< " local: " << local_subprograms.str()
<< " remote: " << remote_subprograms.str();
if (!what_missing.str().empty())
what_missing << "/";
what_missing << "subprograms("
<< local_subprograms.str()
<< ")";
}
std::ostringstream ss;
ss << "local: " << CreateCommandVersionString(req_->command_spec())
<< " subprogram: " << local_subprograms.str()
<< " but remote: ";
if (is_compiler_missing) {
ss << "none";
} else {
ss << CreateCommandVersionString(resp_->result().command_spec());
}
ss << " subprogram: " << remote_subprograms.str();
stats_->set_exec_command_not_found(ss.str());
if (service_->hermetic() && !what_missing.str().empty()) {
std::ostringstream msg;
msg << "No matching " << what_missing.str() << " found in server";
AddErrorToResponse(TO_USER, msg.str(), true);
if (is_compiler_missing &&
service_->DisableCompilerInfo(compiler_info_state_.get(),
"no matching compiler found in server")) {
AddErrorToResponse(
TO_USER, req_->command_spec().local_compiler_path() +
" is disabled.",
true);
}
want_fallback_ = service_->hermetic_fallback();
if (want_fallback_ != requester_env_.fallback()) {
LOG(INFO) << trace_id_
<< " hermetic miss "
<< what_missing.str()
<< ": fallback changed from "
<< requester_env_.fallback()
<< " to " << want_fallback_;
}
}
}
void CompileTask::StoreEmbeddedUploadInformationIfNeeded() {
// We save embedded upload information only if missing input size is 0.
// Let's consider the situation we're using cluster A and cluster B.
// When we send a compile request to cluster A, cluster A might report
// there are missing inputs. Then we retry to send a compile request.
// However, we might send it to another cluster B. Then cluster B might
// report missing input error again.
// So, we would like to save the embedded upload information only if
// missing input error did not happen.
// TODO: This can reduce the number of input file missing, it would
// still exist. After uploading a file to cluster B was succeeded, we might
// send another compile request to cluster A. When cluster A does not have
// the file cache, missing inputs error will occur.
if (resp_->missing_input_size() > 0)
return;
// TODO: What time should we use here?
const absl::Time upload_timestamp_ms = absl::Now();
for (const auto& input : req_->input()) {
// If content does not exist, it's not embedded upload.
if (!input.has_content())
continue;
const std::string& abs_filename = file::JoinPathRespectAbsolute(
flags_->cwd(), input.filename());
bool new_cache_key = service_->file_hash_cache()->StoreFileCacheKey(
abs_filename, input.hash_key(), upload_timestamp_ms,
input_file_stat_cache_->Get(abs_filename));
VLOG(1) << trace_id_
<< " store file cache key for embedded upload: "
<< abs_filename
<< " : is new cache key? = " << new_cache_key;
}
}
// ----------------------------------------------------------------
// state_: FILE_RESP.
void CompileTask::SetOutputFileCallback() {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_RESP, state_);
CHECK(!output_file_callback_);
output_file_callback_ = NewCallback(
this, &CompileTask::ProcessFileResponseDone);
num_output_file_task_ = 0;
output_file_success_ = true;
}
void CompileTask::CheckOutputFilename(const string& filename) {
CHECK_EQ(FILE_RESP, state_);
if (filename[0] == '/') {
if (HasPrefixDir(filename, service_->tmp_dir()) ||
HasPrefixDir(filename, "/var")) {
VLOG(1) << "Output to temp directory:" << filename;
} else if (service_->use_relative_paths_in_argv()) {
// If FLAGS_USE_RELATIVE_PATHS_IN_ARGV is false, output path may be
// absolute path specified by -o or so.
Json::Value json;
DumpToJson(true, &json);
LOG(ERROR) << trace_id_ << " " << json;
LOG(FATAL) << "Absolute output filename:"
<< filename;
}
}
}
void CompileTask::StartOutputFileTask() {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_RESP, state_);
++num_output_file_task_;
}
void CompileTask::OutputFileTaskFinished(
std::unique_ptr<OutputFileTask> output_file_task) {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_RESP, state_);
DCHECK_EQ(this, output_file_task->task());
const ExecResult_Output& output = output_file_task->output();
const string& filename = output.filename();
if (abort_) {
output_file_success_ = false;
return;
}
if (!output_file_task->success()) {
AddErrorToResponse(TO_LOG,
"Failed to write file blob:" + filename + " (" +
(cache_hit() ? "cached" : "no-cached") + ")",
true);
output_file_success_ = false;
// If it fails to write file, goma has ExecResult in cache but might
// lost output file. It would be better to retry with STORE_ONLY
// to recreate output file and store it in cache.
ExecReq::CachePolicy cache_policy = req_->cache_policy();
if (cache_policy == ExecReq::LOOKUP_AND_STORE ||
cache_policy == ExecReq::LOOKUP_AND_STORE_SUCCESS) {
LOG(WARNING) << trace_id_
<< " will retry with STORE_ONLY";
req_->set_cache_policy(ExecReq::STORE_ONLY);
}
return;
}
int output_file_time = output_file_task->timer().GetInIntMilliseconds();
LOG_IF(WARNING, output_file_time > 60 * 1000)
<< trace_id_
<< " SLOW output file:"
<< " filename=" << filename
<< " http_rpc=" << output_file_task->http_rpc_status().DebugString()
<< " num_rpc=" << output_file_task->num_rpc()
<< " in_memory=" << output_file_task->IsInMemory()
<< " in " << output_file_time << " msec";
stats_->add_output_file_time(output_file_time);
LOG_IF(WARNING,
output.blob().blob_type() != FileBlob::FILE &&
output.blob().blob_type() != FileBlob::FILE_META)
<< "Invalid blob type: " << output.blob().blob_type();
stats_->add_output_file_size(output.blob().file_size());
stats_->output_file_rpc += output_file_task->num_rpc();
const HttpRPC::Status& http_rpc_status =
output_file_task->http_rpc_status();
stats_->add_chunk_resp_size(http_rpc_status.resp_size);
stats_->output_file_rpc_req_build_time += http_rpc_status.req_build_time;
stats_->output_file_rpc_req_send_time += http_rpc_status.req_send_time;
stats_->output_file_rpc_wait_time += http_rpc_status.wait_time;
stats_->output_file_rpc_resp_recv_time += http_rpc_status.resp_recv_time;
stats_->output_file_rpc_resp_parse_time += http_rpc_status.resp_parse_time;
stats_->output_file_rpc_size += http_rpc_status.resp_size;
stats_->output_file_rpc_raw_size += http_rpc_status.raw_resp_size;
}
void CompileTask::MaybeRunOutputFileCallback(int index, bool task_finished) {
CHECK(BelongsToCurrentThread());
CHECK_EQ(FILE_RESP, state_);
OneshotClosure* closure = nullptr;
if (task_finished) {
DCHECK_NE(-1, index);
// Once output.blob has been written on disk, we don't need it
// any more.
resp_->mutable_result()->mutable_output(index)->clear_blob();
--num_output_file_task_;
if (num_output_file_task_ > 0)
return;
} else {
CHECK_EQ(-1, index);
}
CHECK_EQ(0, num_output_file_task_);
if (output_file_callback_) {
closure = output_file_callback_;
output_file_callback_ = nullptr;
}
if (closure)
closure->Run();
}
bool CompileTask::VerifyOutput(
const string& local_output_path,
const string& goma_output_path) {
CHECK_EQ(FILE_RESP, state_);
LOG(INFO) << "Verify Output: "
<< " local:" << local_output_path
<< " goma:" << goma_output_path;
std::ostringstream error_message;
static const int kSize = 1024;
char local_buf[kSize];
char goma_buf[kSize];
ScopedFd local_fd(ScopedFd::OpenForRead(local_output_path));
if (!local_fd.valid()) {
error_message << "Not found: local file:" << local_output_path;
AddErrorToResponse(TO_USER, error_message.str(), true);
return false;
}
ScopedFd goma_fd(ScopedFd::OpenForRead(goma_output_path));
if (!goma_fd.valid()) {
error_message << "Not found: goma file:" << goma_output_path;
AddErrorToResponse(TO_USER, error_message.str(), true);
return false;
}
int local_len;
int goma_len;
for (size_t len = 0; ; len += local_len) {
local_len = local_fd.Read(local_buf, kSize);
if (local_len < 0) {
error_message << "read error local:" << local_output_path
<< " @" << len << " " << GetLastErrorMessage();
AddErrorToResponse(TO_USER, error_message.str(), true);
return false;
}
goma_len = goma_fd.Read(goma_buf, kSize);
if (goma_len < 0) {
error_message << "read error goma:" << goma_output_path
<< " @" << len << " " << GetLastErrorMessage();
AddErrorToResponse(TO_USER, error_message.str(), true);
return false;
}
if (local_len != goma_len) {
error_message << "read len: " << local_len << "!=" << goma_len
<< " " << local_output_path << " @" << len;
AddErrorToResponse(TO_USER, error_message.str(), true);
return false;
}
if (local_len == 0) {
LOG(INFO) << trace_id_
<< " Verify OK: " << local_output_path
<< " size=" << len;
return true;
}
if (memcmp(local_buf, goma_buf, local_len) != 0) {
error_message << "output mismatch: "
<< " local:" << local_output_path
<< " goma:" << goma_output_path
<< " @[" << len << "," << local_len << ")";
AddErrorToResponse(TO_USER, error_message.str(), true);
return false;
}
VLOG(2) << "len:" << len << "+" << local_len;
}
return true;
}
void CompileTask::ClearOutputFile() {
for (auto& iter : output_file_) {
if (!iter.content.empty()) {
LOG(INFO) << trace_id_ << " clear output, but content is not empty";
service_->ReleaseOutputBuffer(iter.size, &iter.content);
continue;
}
// Remove if we wrote tmp file for the output.
// Don't remove filename, which is the actual output filename,
// and local run might have output to the file.
const string& filename = iter.filename;
const string& tmp_filename = iter.tmp_filename;
if (!tmp_filename.empty() && tmp_filename != filename) {
remove(tmp_filename.c_str());
}
}
output_file_.clear();
}
// ----------------------------------------------------------------
// local run finished.
void CompileTask::SetLocalOutputFileCallback() {
CHECK(BelongsToCurrentThread());
CHECK(!local_output_file_callback_);
local_output_file_callback_ = NewCallback(
this, &CompileTask::ProcessLocalFileOutputDone);
num_local_output_file_task_ = 0;
}
void CompileTask::StartLocalOutputFileTask() {
CHECK(BelongsToCurrentThread());
++num_local_output_file_task_;
}
void CompileTask::LocalOutputFileTaskFinished(
std::unique_ptr<LocalOutputFileTask> local_output_file_task) {
CHECK(BelongsToCurrentThread());
DCHECK_EQ(this, local_output_file_task->task());
const string& filename = local_output_file_task->filename();
if (!local_output_file_task->success()) {
LOG(WARNING) << trace_id_
<< " Create file blob failed for local output:" << filename;
return;
}
const FileBlob& blob = local_output_file_task->blob();
stats_->add_local_output_file_time(
local_output_file_task->timer().GetInIntMilliseconds());
stats_->add_local_output_file_size(blob.file_size());
}
void CompileTask::MaybeRunLocalOutputFileCallback(bool task_finished) {
CHECK(BelongsToCurrentThread());
OneshotClosure* closure = nullptr;
if (task_finished) {
--num_local_output_file_task_;
if (num_local_output_file_task_ > 0)
return;
}
CHECK_EQ(0, num_local_output_file_task_);
if (local_output_file_callback_) {
closure = local_output_file_callback_;
local_output_file_callback_ = nullptr;
}
if (closure)
closure->Run();
}
// ----------------------------------------------------------------
// state_: FINISHED/LOCAL_FINISHED or abort_
void CompileTask::UpdateStats() {
CHECK(state_ >= FINISHED || abort_);
resp_->set_compiler_proxy_time(
handler_timer_.GetInIntMilliseconds() / 1000.0);
resp_->set_compiler_proxy_include_preproc_time(
stats_->include_preprocess_time() / 1000.0);
resp_->set_compiler_proxy_include_fileload_time(
stats_->include_fileload_time() / 1000.0);
resp_->set_compiler_proxy_rpc_call_time(
SumRepeatedInt32(stats_->rpc_call_time()) / 1000.0);
resp_->set_compiler_proxy_file_response_time(
stats_->file_response_time() / 1000.0);
resp_->set_compiler_proxy_rpc_build_time(
SumRepeatedInt32(stats_->rpc_req_build_time()) / 1000.0);
resp_->set_compiler_proxy_rpc_send_time(
SumRepeatedInt32(stats_->rpc_req_send_time()) / 1000.0);
resp_->set_compiler_proxy_rpc_wait_time(
SumRepeatedInt32(stats_->rpc_wait_time()) / 1000.0);
resp_->set_compiler_proxy_rpc_recv_time(
SumRepeatedInt32(stats_->rpc_resp_recv_time()) / 1000.0);
resp_->set_compiler_proxy_rpc_parse_time(
SumRepeatedInt32(stats_->rpc_resp_parse_time()) / 1000.0);
resp_->set_compiler_proxy_local_pending_time(
stats_->local_pending_time() / 1000.0);
resp_->set_compiler_proxy_local_run_time(stats_->local_run_time() / 1000.0);
// TODO: similar logic found in CompileService::CompileTaskDone, so
// it would be better to be merged. Note that ExecResp are not available
// in CompileService::CompileTaskDone.
switch (state_) {
case FINISHED:
resp_->set_compiler_proxy_goma_finished(true);
if (stats_->cache_hit())
resp_->set_compiler_proxy_goma_cache_hit(true);
break;
case LOCAL_FINISHED:
resp_->set_compiler_proxy_local_finished(true);
break;
default:
resp_->set_compiler_proxy_goma_aborted(true);
break;
}
if (stats_->goma_error())
resp_->set_compiler_proxy_goma_error(true);
if (local_run_)
resp_->set_compiler_proxy_local_run(true);
if (local_killed_)
resp_->set_compiler_proxy_local_killed(true);
resp_->set_compiler_proxy_exec_request_retry(
stats_->exec_request_retry());
}
void CompileTask::SaveInfoFromInputOutput() {
DCHECK(BelongsToCurrentThread());
CHECK(state_ >= FINISHED || abort_);
CHECK(req_.get());
CHECK(resp_.get());
CHECK(!exec_resp_.get());
if (failed() || fail_fallback_) {
if (!fail_fallback_) {
// if fail fallback, we already stored remote outputs in stdout_ and
// stderr_, and resp_ becomes local process output.
stdout_ = resp_->result().stdout_buffer();
stderr_ = resp_->result().stderr_buffer();
}
}
// arg, env and expanded_arg are used for dumping ExecReq.
// We should keep what we actually used instead of what came from gomacc.
*stats_->mutable_arg() = std::move(*req_->mutable_arg());
*stats_->mutable_env() = std::move(*req_->mutable_env());
*stats_->mutable_expanded_arg() = std::move(*req_->mutable_expanded_arg());
req_.reset();
resp_.reset();
flags_.reset();
input_file_stat_cache_.reset();
output_file_stat_cache_.reset();
}
// ----------------------------------------------------------------
// subprocess handling.
void CompileTask::SetupSubProcess() {
VLOG(1) << trace_id_ << " SetupSubProcess "
<< SubProcessReq::Weight_Name(subproc_weight_);
CHECK(BelongsToCurrentThread());
CHECK(subproc_ == nullptr) << trace_id_ << " " << StateName(state_)
<< " pid=" << subproc_->started().pid()
<< stats_->local_run_reason();
CHECK(!req_->command_spec().local_compiler_path().empty())
<< req_->DebugString();
if (delayed_setup_subproc_ != nullptr) {
delayed_setup_subproc_->Cancel();
delayed_setup_subproc_ = nullptr;
}
std::vector<const char*> argv;
argv.push_back(req_->command_spec().local_compiler_path().c_str());
for (int i = 1; i < stats_->arg_size(); ++i) {
argv.push_back(stats_->arg(i).c_str());
}
argv.push_back(nullptr);
subproc_ = new SubProcessTask(
trace_id_,
req_->command_spec().local_compiler_path().c_str(),
const_cast<char**>(&argv[0]));
SubProcessReq* req = subproc_->mutable_req();
req->set_cwd(req_->cwd());
if (requester_env_.has_umask()) {
req->set_umask(requester_env_.umask());
}
if (flags_->type() == CompilerFlagType::Gcc) {
const GCCFlags& gcc_flag = static_cast<const GCCFlags&>(*flags_);
if (gcc_flag.is_stdin_input()) {
CHECK_GE(req_->input_size(), 1) << req_->DebugString();
req->set_stdin_filename(req_->input(0).filename());
}
} else if (flags_->type() == CompilerFlagType::Clexe) {
// TODO: handle input is stdin case for VC++?
}
{
std::ostringstream filenamebuf;
filenamebuf << "gomacc." << id_ << ".out";
subproc_stdout_ = file::JoinPath(service_->tmp_dir(), filenamebuf.str());
req->set_stdout_filename(subproc_stdout_);
}
{
std::ostringstream filenamebuf;
filenamebuf << "gomacc." << id_ << ".err";
subproc_stderr_ = file::JoinPath(service_->tmp_dir(), filenamebuf.str());
req->set_stderr_filename(subproc_stderr_);
}
for (const auto& env : stats_->env()) {
req->add_env(env);
}
if (local_path_.empty()) {
LOG(WARNING) << "Empty PATH: " << req_->DebugString();
} else {
req->add_env("PATH=" + local_path_);
}
#ifdef _WIN32
req->add_env("TMP=" + service_->tmp_dir());
req->add_env("TEMP=" + service_->tmp_dir());
if (pathext_.empty()) {
LOG(WARNING) << "Empty PATHEXT: " << req_->DebugString();
} else {
req->add_env("PATHEXT=" + pathext_);
}
#endif
req->set_weight(subproc_weight_);
subproc_->Start(
NewCallback(
this,
&CompileTask::FinishSubProcess));
}
void CompileTask::RunSubProcess(const string& reason) {
VLOG(1) << trace_id_ << " RunSubProcess " << reason;
CHECK(!abort_);
if (subproc_ == nullptr) {
LOG(WARNING) << trace_id_ << " subproc already finished.";
return;
}
stats_->set_local_run_reason(reason);
subproc_->RequestRun();
VLOG(1) << "Run " << reason << " " << subproc_->req().DebugString();
}
void CompileTask::KillSubProcess() {
// TODO: support the case subprocess is killed by FAIL_FAST.
VLOG(1) << trace_id_ << " KillSubProcess";
CHECK(subproc_ != nullptr);
SubProcessState::State state = subproc_->state();
local_killed_ = subproc_->Kill(); // Will call FinishSubProcess().
VLOG(1) << trace_id_ << " kill pid=" << subproc_->started().pid()
<< " " << local_killed_
<< " " << SubProcessState::State_Name(state)
<< "->" << SubProcessState::State_Name(subproc_->state());
if (local_killed_) {
if (service_->dont_kill_subprocess()) {
stats_->set_local_run_reason("fast goma, but wait for local.");
} else {
stats_->set_local_run_reason("killed by fast goma");
}
} else if (subproc_->started().pid() != SubProcessState::kInvalidPid) {
// subproc was signaled but not waited yet.
stats_->set_local_run_reason("fast goma, local signaled");
} else {
// subproc was initialized, but not yet started.
stats_->set_local_run_reason("fast goma, local not started");
}
}
void CompileTask::FinishSubProcess() {
VLOG(1) << trace_id_ << " FinishSubProcess";
CHECK(BelongsToCurrentThread());
CHECK(!abort_);
SubProcessTask* subproc = nullptr;
{
AUTOLOCK(lock, &mu_);
subproc = subproc_;
subproc_ = nullptr;
}
CHECK(subproc);
LOG(INFO) << trace_id_ << " finished subprocess."
<< " pid=" << subproc->started().pid()
<< " status=" << subproc->terminated().status()
<< " pending_ms=" << subproc->started().pending_ms()
<< " run_ms=" << subproc->terminated().run_ms()
<< " mem_kb=" << subproc->terminated().mem_kb()
<< " local_killed=" << local_killed_;
bool local_run_failed = false;
bool local_run_goma_failure = false;
if (subproc->started().pid() != SubProcessState::kInvalidPid) {
local_run_ = true;
if (!local_killed_) {
subproc_exit_status_ = subproc->terminated().status();
// something failed after start of subproc. e.g. kill failed.
if (subproc_exit_status_ < 0) {
stats_->set_compiler_proxy_error(true);
LOG(ERROR) << trace_id_ << " subproc exec failure by goma"
<< " pid=" << subproc->started().pid()
<< " status=" << subproc_exit_status_
<< " error=" << SubProcessTerminated_ErrorTerminate_Name(
subproc->terminated().error());
local_run_goma_failure = true;
}
if (subproc_exit_status_ != 0) {
local_run_failed = true;
}
}
stats_->set_local_pending_time(subproc->started().pending_ms());
stats_->set_local_run_time(subproc->terminated().run_ms());
stats_->set_local_mem_kb(subproc->terminated().mem_kb());
VLOG(1) << trace_id_ << " subproc finished"
<< " pid=" << subproc->started().pid();
} else {
// pid is kInvalidPid
if (subproc->terminated().status() ==
SubProcessTerminated::kInternalError) {
std::ostringstream ss;
ss << "failed to run compiler locally."
<< " pid=" << subproc->started().pid()
<< " error=" << SubProcessTerminated_ErrorTerminate_Name(
subproc->terminated().error())
<< " status=" << subproc->terminated().status();
AddErrorToResponse(TO_USER, ss.str(), true);
local_run_failed = true;
local_run_goma_failure = true;
}
}
if (state_ == FINISHED && !fail_fallback_) {
ProcessReply();
return;
}
// This subprocess would be
// - gch hack (state_ < FINISHED, goma service was slower than local).
// - verify output. (state_ == INIT) -> SETUP
// - should fallback. (state_ == INIT) -> LOCAL_FINISHED.
// - fail fallback. (state_ = FINISHED, fail_fallback_ == true)
// - fallback only (state_ == LOCAL_RUN)
// - idle fallback (state_ < FINISHED, goma service was slower than local).
// - might be killed because gomacc closed the ipc.
string orig_stdout = resp_->result().stdout_buffer();
string orig_stderr = resp_->result().stderr_buffer();
CHECK(resp_.get() != nullptr) << trace_id_ << " state=" << state_;
ExecResult* result = resp_->mutable_result();
CHECK(result != nullptr) << trace_id_ << " state=" << state_;
if (fail_fallback_ && local_run_ &&
result->exit_status() != subproc->terminated().status())
stats_->set_goma_error(true);
result->set_exit_status(subproc->terminated().status());
if (result->exit_status() == 0) {
resp_->clear_error_message();
}
if (subproc->terminated().has_term_signal()) {
std::ostringstream ss;
ss << "child process exited unexpectedly with signal."
<< " signal=" << subproc->terminated().term_signal();
exec_error_message_.push_back(ss.str());
CHECK(result->exit_status() != 0)
<< trace_id_ << " if term signal is not 0, exit status must not be 0."
<< ss.str();
}
string stdout_buffer;
CHECK(!subproc_stdout_.empty()) << trace_id_ << " state=" << state_;
ReadFileToString(subproc_stdout_.c_str(), &stdout_buffer);
remove(subproc_stdout_.c_str());
if (fail_fallback_ && local_run_ && orig_stdout != stdout_buffer)
stats_->set_goma_error(true);
result->set_stdout_buffer(stdout_buffer);
string stderr_buffer;
CHECK(!subproc_stderr_.empty()) << trace_id_ << " state=" << state_;
ReadFileToString(subproc_stderr_.c_str(), &stderr_buffer);
remove(subproc_stderr_.c_str());
if (fail_fallback_ && local_run_ && orig_stderr != stderr_buffer)
stats_->set_goma_error(true);
result->set_stderr_buffer(stderr_buffer);
if (verify_output_) {
CHECK_EQ(INIT, state_);
// local runs done, start remote.
ProcessSetup();
return;
}
if (precompiling_ && service_->enable_gch_hack()) {
CHECK_LT(state_, FINISHED) << trace_id_ << " finish subproc";
CHECK(subproc_ == nullptr) << trace_id_ << " finish subproc";
// local runs done, not yet goma.
return;
}
// Upload output files asynchronously, so that these files could be
// used in link phrase.
if (!local_run_failed) {
ProcessLocalFileOutput();
// The callback must be called asynchronously.
if (service_->store_local_run_output())
CHECK(local_output_file_callback_ != nullptr);
}
if (should_fallback_) {
CHECK_EQ(INIT, state_);
state_ = LOCAL_FINISHED;
finished_ = true;
// reply fallback response.
VLOG(2) << trace_id_ << " should fallback:" << resp_->DebugString();
if (!local_run_failed) {
ReplyResponse("should fallback");
} else {
ReplyResponse("should fallback but local run failed");
}
return;
}
if (fail_fallback_) {
CHECK_EQ(FINISHED, state_);
VLOG(2) << trace_id_ << " fail fallback:" << resp_->DebugString();
if (!local_run_failed) {
ReplyResponse("fail fallback");
} else {
// If both remote and local failed, it is a real compile failure.
// We must not preserve goma's error message then. (b/27889459)
resp_->clear_error_message();
ReplyResponse("fail fallback and local run also failed");
}
return;
}
if (state_ == LOCAL_RUN) {
VLOG(2) << trace_id_ << " local run finished:" << resp_->DebugString();
state_ = LOCAL_FINISHED;
finished_ = true;
if (!local_run_goma_failure) {
resp_->clear_error_message();
}
ReplyResponse("local finish, no goma");
// TODO: restart from the beginning.
// Since no remote compile is running here, it is nice to start remote
// compile in this case. However, let me postpone the implementation
// until I understand procedure of CompileTask well.
return;
}
// otherwise, local finishes earlier than remote, or setup.
if (!local_run_goma_failure) {
abort_ = true;
VLOG(2) << trace_id_ << " idle fallback:" << resp_->DebugString();
resp_->clear_error_message();
ReplyResponse("local finish, abort goma");
return;
}
// In this case, remote should be running and we expect that success.
LOG(INFO) << trace_id_ << " local compile failed because of goma."
<< " waiting for remote result.";
}
// ----------------------------------------------------------------
bool CompileTask::failed() const {
return stats_->exec_exit_status() != 0;
}
bool CompileTask::canceled() const {
return canceled_;
}
bool CompileTask::cache_hit() const {
return stats_->cache_hit();
}
bool CompileTask::local_cache_hit() const {
return stats_->has_cache_source() &&
stats_->cache_source() == ExecLog::LOCAL_OUTPUT_CACHE;
}
void CompileTask::AddErrorToResponse(
ErrDest dest, const string& error_message, bool set_error) {
if (!error_message.empty()) {
if (set_error)
LOG(ERROR) << trace_id_ << " " << error_message;
else
LOG(WARNING) << trace_id_ << " " << error_message;
std::ostringstream msg;
msg << "compiler_proxy:";
msg << handler_timer_.GetInIntMilliseconds() << "ms: ";
msg << error_message;
if (dest == TO_USER) {
DCHECK(set_error) << trace_id_
<< " user error should always set error."
<< " msg=" << error_message;
resp_->add_error_message(msg.str());
} else {
service_->RecordErrorToLog(error_message, set_error);
}
exec_error_message_.push_back(msg.str());
}
if (set_error &&
(!resp_->has_result() || resp_->result().exit_status() == 0)) {
resp_->mutable_result()->set_exit_status(1);
}
}
void CompileTask::DumpRequest() const {
if (!frozen_timestamp_.has_value()) {
LOG(ERROR) << trace_id_ << " DumpRequest called on active task";
return;
}
LOG(INFO) << trace_id_ << " DumpRequest";
string filename = "exec_req.data";
ExecReq req;
CommandSpec* command_spec = req.mutable_command_spec();
*command_spec = command_spec_;
command_spec->set_local_compiler_path(local_compiler_path_);
if (compiler_info_state_.get() != nullptr) {
const CompilerInfo& compiler_info = compiler_info_state_.get()->info();
std::vector<string> args(stats_->arg().begin(), stats_->arg().end());
std::unique_ptr<CompilerFlags> flags(
CompilerFlagsParser::New(args, stats_->cwd()));
FixCommandSpec(compiler_info, *flags, command_spec);
FixSystemLibraryPath(system_library_paths_, command_spec);
MayFixSubprogramSpec(req.mutable_subprogram());
} else {
// If compiler_info_state_ is nullptr, it would be should_fallback_.
LOG_IF(ERROR, !should_fallback_)
<< trace_id_ << " DumpRequest compiler_info_state_ is nullptr.";
filename = "local_exec_req.data";
}
for (const auto& arg : stats_->arg())
req.add_arg(arg);
for (const auto& env : stats_->env())
req.add_env(env);
for (const auto& expanded_arg : stats_->expanded_arg())
req.add_expanded_arg(expanded_arg);
req.set_cwd(stats_->cwd());
*req.mutable_requester_info() = requester_info_;
std::ostringstream ss;
ss << "task_request_" << id_;
const string task_request_dir = file::JoinPath(service_->tmp_dir(), ss.str());
file::RecursivelyDelete(task_request_dir, file::Defaults());
#ifndef _WIN32
PCHECK(mkdir(task_request_dir.c_str(), 0755) == 0);
#else
if (!CreateDirectoryA(task_request_dir.c_str(), nullptr)) {
DWORD err = GetLastError();
LOG_SYSRESULT(err);
LOG_IF(FATAL, FAILED(err)) << "CreateDirectoryA " << task_request_dir;
}
#endif
for (const auto& input_filename : required_files_) {
ExecReq_Input* input = req.add_input();
input->set_filename(input_filename);
FileServiceDumpClient fs;
const string abs_input_filename = file::JoinPathRespectAbsolute(
req.cwd(), input_filename);
if (!fs.CreateFileBlob(abs_input_filename, true,
input->mutable_content())) {
LOG(ERROR) << trace_id_ << " DumpRequest failed to create fileblob:"
<< input_filename;
} else {
input->set_hash_key(FileServiceClient::ComputeHashKey(input->content()));
if (!fs.Dump(file::JoinPath(task_request_dir, input->hash_key()))) {
LOG(ERROR) << trace_id_ << " DumpRequest failed to store fileblob:"
<< input_filename
<< " hash:" << input->hash_key();
}
}
}
string r;
req.SerializeToString(&r);
filename = file::JoinPath(task_request_dir, filename);
if (!WriteStringToFile(r, filename)) {
LOG(ERROR) << trace_id_ << " DumpRequest failed to write: " << filename;
} else {
LOG(INFO) << trace_id_ << " DumpRequest wrote serialized proto: "
<< filename;
}
// Only show file hash for text_format.
for (auto& input : *req.mutable_input()) {
input.clear_content();
}
string text_req;
google::protobuf::TextFormat::PrintToString(req, &text_req);
filename += ".txt";
if (!WriteStringToFile(text_req, filename)) {
LOG(ERROR) << trace_id_ << " DumpRequest failed to write: " << filename;
} else {
LOG(INFO) << trace_id_ << " DumpRequest wrote text proto: " << filename;
}
LOG(INFO) << trace_id_ << " DumpRequest done";
}
} // namespace devtools_goma