blob: c18820d4a940391d640733fdf0b03733ad3109ae [file] [log] [blame]
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""This module facilitates conversion from dictionaries to ProtoRPC messages.
Given a dictionary whose keys' names and values' types comport with the
fields defined for a protorpc.messages.Message subclass, this module tries to
generate a Message instance that corresponds to the provided dictionary. The
"normal" use case is for ndb.Models which need to be represented as a
ProtoRPC.
"""
import datetime
import json
import logging
import swarming_rpcs
from components import utils
from server import task_pack
from server import task_request
from server import task_result
### Private API.
def _string_pairs_from_dict(dictionary):
# For key: value items like env.
return [
swarming_rpcs.StringPair(key=k, value=v)
for k, v in sorted((dictionary or {}).items())
]
def _duplicate_string_pairs_from_dict(dictionary):
# For compatibility due to legacy swarming_rpcs.TaskProperties.dimensions.
out = []
for k, values in (dictionary or {}).items():
assert isinstance(values, (list, tuple)), dictionary
for v in values:
out.append(swarming_rpcs.StringPair(key=k, value=v))
return out
def _string_list_pairs_from_dict(dictionary):
# For key: values items like bot dimensions.
return [
swarming_rpcs.StringListPair(key=k, value=v)
for k, v in sorted((dictionary or {}).items())
]
def _ndb_to_rpc(cls, entity, **overrides):
members = (f.name for f in cls.all_fields())
kwargs = {m: getattr(entity, m) for m in members if not m in overrides}
kwargs.update(overrides)
return cls(**{k: v for k, v in kwargs.items() if v is not None})
def _rpc_to_ndb(cls, entity, **overrides):
kwargs = {
m: getattr(entity, m) for m in cls._properties if not m in overrides
}
kwargs.update(overrides)
return cls(**{k: v for k, v in kwargs.items() if v is not None})
def _taskproperties_from_rpc(props):
"""Converts a swarming_rpcs.TaskProperties to a task_request.TaskProperties.
"""
cipd_input = None
if props.cipd_input:
client_package = None
if props.cipd_input.client_package:
client_package = _rpc_to_ndb(
task_request.CipdPackage, props.cipd_input.client_package)
cipd_input = _rpc_to_ndb(
task_request.CipdInput,
props.cipd_input,
client_package=client_package,
packages=[
_rpc_to_ndb(task_request.CipdPackage, p)
for p in props.cipd_input.packages
])
containment = task_request.Containment()
if props.containment:
containment = _rpc_to_ndb(
task_request.Containment, props.containment,
containment_type=int(props.containment.containment_type or 0))
inputs_ref = None
if props.inputs_ref:
inputs_ref = _rpc_to_ndb(task_request.FilesRef, props.inputs_ref)
cas_input_root = None
if props.cas_input_root:
digest = _rpc_to_ndb(task_request.Digest, props.cas_input_root.digest)
cas_input_root = _rpc_to_ndb(
task_request.CASReference, props.cas_input_root, digest=digest)
secret_bytes = None
if props.secret_bytes:
secret_bytes = task_request.SecretBytes(secret_bytes=props.secret_bytes)
if len(set(i.key for i in props.env)) != len(props.env):
raise ValueError('same environment variable key cannot be specified twice')
if len(set(i.key for i in props.env_prefixes)) != len(props.env_prefixes):
raise ValueError('same environment prefix key cannot be specified twice')
dims = {}
for i in props.dimensions:
dims.setdefault(i.key, []).append(i.value)
out = _rpc_to_ndb(
task_request.TaskProperties,
props,
caches=[_rpc_to_ndb(task_request.CacheEntry, c) for c in props.caches],
cipd_input=cipd_input,
# Passing command=None is supported at API level but not at NDB level.
command=props.command or [],
containment=containment,
has_secret_bytes=secret_bytes is not None,
secret_bytes=None, # ignore this, it's handled out of band
dimensions=None, # it's named dimensions_data
dimensions_data=dims,
env={i.key: i.value for i in props.env},
env_prefixes={i.key: i.value for i in props.env_prefixes},
inputs_ref=inputs_ref,
cas_input_root=cas_input_root)
return out, secret_bytes
def _taskproperties_to_rpc(props):
"""Converts a task_request.TaskProperties to a swarming_rpcs.TaskProperties.
"""
cipd_input = None
if props.cipd_input:
client_package = None
if props.cipd_input.client_package:
client_package = _ndb_to_rpc(
swarming_rpcs.CipdPackage,
props.cipd_input.client_package)
cipd_input = _ndb_to_rpc(
swarming_rpcs.CipdInput,
props.cipd_input,
client_package=client_package,
packages=[
_ndb_to_rpc(swarming_rpcs.CipdPackage, p)
for p in props.cipd_input.packages
])
containment = swarming_rpcs.Containment()
if props.containment:
containment = _ndb_to_rpc(
swarming_rpcs.Containment,
props.containment,
containment_type=swarming_rpcs.ContainmentType(
props.containment.containment_type or 0))
inputs_ref = None
if props.inputs_ref:
inputs_ref = _ndb_to_rpc(swarming_rpcs.FilesRef, props.inputs_ref)
cas_input_root = None
if props.cas_input_root:
digest = _ndb_to_rpc(swarming_rpcs.Digest, props.cas_input_root.digest)
cas_input_root = _ndb_to_rpc(
swarming_rpcs.CASReference, props.cas_input_root, digest=digest)
return _ndb_to_rpc(
swarming_rpcs.TaskProperties,
props,
caches=[_ndb_to_rpc(swarming_rpcs.CacheEntry, c) for c in props.caches],
cipd_input=cipd_input,
containment=containment,
secret_bytes='<REDACTED>' if props.has_secret_bytes else None,
dimensions=_duplicate_string_pairs_from_dict(props.dimensions),
env=_string_pairs_from_dict(props.env),
env_prefixes=_string_list_pairs_from_dict(props.env_prefixes or {}),
inputs_ref=inputs_ref,
cas_input_root=cas_input_root)
def _taskslice_from_rpc(msg):
"""Converts a swarming_rpcs.TaskSlice to a task_request.TaskSlice."""
props, secret_bytes = _taskproperties_from_rpc(msg.properties)
out = _rpc_to_ndb(task_request.TaskSlice, msg, properties=props)
return out, secret_bytes
### Public API.
def epoch_to_datetime(value):
"""Converts a messages.FloatField that represents a timestamp since epoch in
seconds to a datetime.datetime.
Returns None when input is 0 or None.
"""
if not value:
return None
try:
return utils.timestamp_to_datetime(value*1000000.)
except OverflowError as e:
raise ValueError(e)
def bot_info_to_rpc(entity, deleted=False):
""""Returns a swarming_rpcs.BotInfo from a bot.BotInfo."""
return _ndb_to_rpc(
swarming_rpcs.BotInfo,
entity,
bot_id=entity.id,
deleted=deleted,
dimensions=_string_list_pairs_from_dict(entity.dimensions),
is_dead=entity.is_dead,
# Deprecated. TODO(crbug/897355): Remove.
machine_type=entity.machine_type,
state=json.dumps(entity.state, sort_keys=True, separators=(',', ':')))
def bot_event_to_rpc(entity):
""""Returns a swarming_rpcs.BotEvent from a bot.BotEvent."""
return _ndb_to_rpc(
swarming_rpcs.BotEvent,
entity,
dimensions=_string_list_pairs_from_dict(entity.dimensions),
state=json.dumps(entity.state, sort_keys=True, separators=(',', ':')),
task_id=entity.task_id if entity.task_id else None)
def task_request_to_rpc(entity):
""""Returns a swarming_rpcs.TaskRequest from a task_request.TaskRequest."""
assert entity.__class__ is task_request.TaskRequest
slices = []
for i in range(entity.num_task_slices):
t = entity.task_slice(i)
slices.append(
_ndb_to_rpc(
swarming_rpcs.TaskSlice,
t,
properties=_taskproperties_to_rpc(t.properties)))
resultdb = None
if entity.resultdb:
resultdb = _ndb_to_rpc(swarming_rpcs.ResultDBCfg, entity.resultdb)
return _ndb_to_rpc(
swarming_rpcs.TaskRequest,
entity,
authenticated=entity.authenticated.to_bytes(),
# For some amount of time, the properties will be copied into the
# task_slices and vice-versa, to give time to the clients to update.
properties=slices[0].properties,
task_slices=slices,
resultdb=resultdb)
def new_task_request_from_rpc(msg, now):
""""Returns a (task_request.TaskRequest, task_request.SecretBytes,
task_request.TemplateApplyEnum) from a swarming_rpcs.NewTaskRequest.
If secret_bytes were not included in the rpc, the SecretBytes entity will be
None.
"""
assert msg.__class__ is swarming_rpcs.NewTaskRequest
if msg.task_slices and msg.properties:
raise ValueError('Specify one of properties or task_slices, not both')
if msg.properties:
logging.info('Properties is still used')
if not msg.expiration_secs:
raise ValueError('missing expiration_secs')
props, secret_bytes = _taskproperties_from_rpc(msg.properties)
slices = [
task_request.TaskSlice(
properties=props, expiration_secs=msg.expiration_secs),
]
elif msg.task_slices:
if msg.expiration_secs:
raise ValueError(
'When using task_slices, do not specify a global expiration_secs')
secret_bytes = None
slices = []
for t in (msg.task_slices or []):
sl, se = _taskslice_from_rpc(t)
slices.append(sl)
if se:
if secret_bytes and se != secret_bytes:
raise ValueError(
'When using secret_bytes multiple times, all values must match')
secret_bytes = se
else:
raise ValueError('Specify one of properties or task_slices')
pttf = swarming_rpcs.PoolTaskTemplateField
template_apply = {
pttf.AUTO: task_request.TEMPLATE_AUTO,
pttf.CANARY_NEVER: task_request.TEMPLATE_CANARY_NEVER,
pttf.CANARY_PREFER: task_request.TEMPLATE_CANARY_PREFER,
pttf.SKIP: task_request.TEMPLATE_SKIP,
}[msg.pool_task_template]
resultdb = None
if msg.resultdb:
resultdb = _rpc_to_ndb(task_request.ResultDBCfg, msg.resultdb)
req = _rpc_to_ndb(
task_request.TaskRequest,
msg,
created_ts=now,
expiration_ts=None,
# It is set in task_request.init_new_request().
authenticated=None,
properties=None,
task_slices=slices,
# 'tags' is now generated from manual_tags plus automatic tags.
tags=None,
manual_tags=msg.tags,
# This is internal field not settable via RPC.
service_account_token=None,
realms_enabled=None,
resultdb_update_token=None,
resultdb=resultdb,
pool_task_template=None) # handled out of band
return req, secret_bytes, template_apply
def task_result_to_rpc(entity, send_stats):
""""Returns a swarming_rpcs.TaskResult from a task_result.TaskResultSummary or
task_result.TaskRunResult.
"""
outputs_ref = (
_ndb_to_rpc(swarming_rpcs.FilesRef, entity.outputs_ref)
if entity.outputs_ref else None)
cas_output_root = None
if entity.cas_output_root:
digest = _ndb_to_rpc(swarming_rpcs.Digest, entity.cas_output_root.digest)
cas_output_root = _ndb_to_rpc(
swarming_rpcs.CASReference, entity.cas_output_root, digest=digest)
cipd_pins = None
if entity.cipd_pins:
cipd_pins = swarming_rpcs.CipdPins(
client_package=(_ndb_to_rpc(swarming_rpcs.CipdPackage,
entity.cipd_pins.client_package)
if entity.cipd_pins.client_package else None),
packages=[
_ndb_to_rpc(swarming_rpcs.CipdPackage, pkg)
for pkg in entity.cipd_pins.packages
] if entity.cipd_pins.packages else [])
resultdb_info = None
if entity.resultdb_info:
resultdb_info = swarming_rpcs.ResultDBInfo(
hostname=entity.resultdb_info.hostname,
invocation=entity.resultdb_info.invocation,
)
performance_stats = None
if send_stats and entity.performance_stats.is_valid:
def op(entity):
if entity:
return _ndb_to_rpc(swarming_rpcs.OperationStats, entity)
return None
performance_stats = _ndb_to_rpc(
swarming_rpcs.PerformanceStats,
entity.performance_stats,
isolated_download=op(entity.performance_stats.isolated_download),
isolated_upload=op(entity.performance_stats.isolated_upload))
kwargs = {
'bot_dimensions':
_string_list_pairs_from_dict(entity.bot_dimensions or {}),
'cipd_pins':
cipd_pins,
'outputs_ref':
outputs_ref,
'cas_output_root':
cas_output_root,
'performance_stats':
performance_stats,
'state':
swarming_rpcs.TaskState(entity.state),
'resultdb_info':
resultdb_info,
}
if entity.__class__ is task_result.TaskRunResult:
kwargs['costs_usd'] = []
if entity.cost_usd is not None:
kwargs['costs_usd'].append(entity.cost_usd)
kwargs['tags'] = []
kwargs['user'] = None
kwargs['run_id'] = entity.task_id
else:
assert entity.__class__ is task_result.TaskResultSummary, entity
# This returns the right value for deduped tasks too.
k = entity.run_result_key
kwargs['run_id'] = task_pack.pack_run_result_key(k) if k else None
return _ndb_to_rpc(
swarming_rpcs.TaskResult,
entity,
**kwargs)