blob: f7e8d8b4bdc3b77e12bbef794e0af01492adbcb8 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module integrates buildbucket with swarming.
A bucket config may have "swarming" field that specifies how a builder
is mapped to a recipe. If build is scheduled for a bucket/builder
with swarming configuration, the integration overrides the default behavior.
Prior adding Build to the datastore, a swarming task is created. The definition
of the task definition is rendered from a global template. The parameters of the
template are defined by the bucket config and build parameters.
A build may have "swarming" parameter which is a JSON object with keys:
recipe: JSON object
revision: revision of the recipe. Will be available in the task template
as $revision parameter.
When creating a task, a PubSub topic is specified. Swarming will notify on
task status updates to the topic and buildbucket will sync its state.
Eventually both swarming task and buildbucket build will complete.
Swarming does not guarantee notification delivery, so there is also a cron job
that checks task results of all incomplete builds every 10 min.
"""
import base64
import copy
import datetime
import hashlib
import json
import logging
import posixpath
import random
import string
from components import config as component_config
from components import decorators
from components import net
from components import protoutil
from components import utils
from components.config import validation
from google.appengine.api import app_identity
from google.appengine.api import memcache
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
from google.protobuf import json_format
import webapp2
from third_party import annotations_pb2
from . import flatten_swarmingcfg
from . import isolate
from . import swarmingcfg as swarmingcfg_module
from proto import build_pb2
from proto import launcher_pb2
from proto.config import project_config_pb2
from v2 import tokens
import annotations
import api_common
import buildtags
import config
import errors
import events
import gae_ts_mon
import logdog
import model
import user
_PUBSUB_TOPIC = 'swarming'
_PARAM_PROPERTIES = 'properties'
_PARAM_SWARMING = 'swarming'
_PARAM_CHANGES = 'changes'
_BUILD_RUN_RESULT_FILENAME = 'build-run-result.json'
_BUILD_RUN_RESULT_CORRUPTED = 'corrupted'
# Note: we do not store build-run-result.json as is.
# Instead we convert it to a list of buildbucket.v2.Step and store
# in binary form in model.BuildSteps, which has its own limit.
# A limit is still needed to avoid OOMs.
_BUILD_RUN_RESULT_MAX_SIZE_MB = 2
_BUILD_RUN_RESULT_MAX_SIZE = _BUILD_RUN_RESULT_MAX_SIZE_MB * (1e6)
_BUILD_RUN_RESULT_TOO_LARGE = '>= %d MB' % _BUILD_RUN_RESULT_MAX_SIZE_MB
_BUILD_RUN_RESULT_SIZE_METRIC = gae_ts_mon.CumulativeDistributionMetric(
'buildbucket/build_run_result_size',
'Size of the build result JSON file fetched from isolate',
[gae_ts_mon.StringField('bucket'),
gae_ts_mon.StringField('builder')],
units=gae_ts_mon.MetricsDataUnits.KILOBYTES,
)
_BUILD_STEPS_SIZE_METRIC = gae_ts_mon.CumulativeDistributionMetric(
'buildbucket/build_steps_size',
'Size of the build steps',
[gae_ts_mon.StringField('bucket'),
gae_ts_mon.StringField('builder')],
units=gae_ts_mon.MetricsDataUnits.KILOBYTES,
)
# The default percentage of builds that use canary swarming task template.
# This number is relatively high so we treat canary seriously and that we have
# a strong signal if the canary is broken.
# If it is, the template must be reverted to a stable version ASAP.
_DEFAULT_CANARY_TEMPLATE_PERCENTAGE = 10
# This is the path, relative to the swarming run dir, to the directory that
# contains the mounted swarming named caches. It will be prepended to paths of
# caches defined in swarmbucket configs.
_CACHE_DIR = 'cache'
# This is the path, relative to the swarming run dir, which is where the recipes
# are either checked out, or installed via CIPD package.
_KITCHEN_CHECKOUT = 'kitchen-checkout'
################################################################################
# Creation/cancellation of tasks.
class Error(Exception):
"""Base class for swarmbucket-specific errors."""
class TemplateNotFound(Error):
"""Raised when a task template is not found."""
class CanaryTemplateNotFound(TemplateNotFound):
"""Raised when canary template is explicitly requested, but not found."""
@ndb.tasklet
def _get_settings_async(): # pragma: no cover
settings = yield config.get_settings_async()
raise ndb.Return(settings.swarming)
@ndb.tasklet
def _get_task_template_async(canary, canary_required=True):
"""Gets a tuple (template_revision, template_dict, canary_bool).
Args:
canary (bool): specifies a whether canary template should be returned.
canary_required (bool): controls the behavior if |canary| is True and
the canary template is not found. If False use the non-canary template,
otherwise raise CanaryTemplateNotFound.
Ignored if canary is False.
Returns:
Tuple (template_revision, template_dict, canary):
template_revision (str): revision of the template, e.g. commit hash.
template_dict (dict): parsed template, or None if not found.
May contain $parameters that must be expanded using format_obj().
canary (bool): True if the returned template is a canary template.
"""
text = None
revision = None
if canary:
logging.warning('using canary swarming task template')
revision, text = yield component_config.get_self_config_async(
'swarming_task_template_canary.json', store_last_good=True
)
canary = bool(text)
if not text:
if canary_required:
raise CanaryTemplateNotFound(
'canary swarming task template is requested, '
'but the canary template is not found'
)
logging.warning(
'canary swarming task template is not found. using the default one'
)
if not text:
revision, text = yield component_config.get_self_config_async(
'swarming_task_template.json', store_last_good=True
)
template = None
if text:
template = json.loads(text)
template.pop('__comment__', None)
raise ndb.Return(revision, template, canary)
def validate_build_parameters(builder_name, params):
"""Raises errors.InvalidInputError if build parameters are invalid."""
params = copy.deepcopy(params)
def bad(fmt, *args):
raise errors.InvalidInputError(fmt % args)
params.pop(model.BUILDER_PARAMETER) # already validated
def assert_object(name, value):
if not isinstance(value, dict):
bad('%s parameter must be an object' % name)
changes = params.pop(_PARAM_CHANGES, None)
if changes is not None:
if not isinstance(changes, list):
bad('changes param must be an array')
for c in changes: # pragma: no branch
if not isinstance(c, dict):
bad('changes param must contain only objects')
repo_url = c.get('repo_url')
if repo_url is not None and not isinstance(repo_url, basestring):
bad('change repo_url must be a string')
author = c.get('author')
if not isinstance(author, dict):
bad('change author must be an object')
email = author.get('email')
if not isinstance(email, basestring):
bad('change author email must be a string')
if not email:
bad('change author email not specified')
properties = params.pop(_PARAM_PROPERTIES, None)
if properties is not None:
assert_object('properties', properties)
if properties.pop('buildername', builder_name) != builder_name:
bad('inconsistent builder name')
expected_emails = [c['author']['email'] for c in (changes or [])]
if properties.pop('blamelist', None) not in (None, expected_emails):
bad(
'inconsistent blamelist property; blamelist must not be set or '
'it must match the emails in the "changes" build parameter'
)
# Validate the rest of the properties using common logic.
ctx = validation.Context.raise_on_error(exc_type=errors.InvalidInputError)
for k, v in properties.iteritems():
with ctx.prefix('property %r:', k):
swarmingcfg_module.validate_recipe_property(k, v, ctx)
swarming = params.pop(_PARAM_SWARMING, None)
if swarming is not None:
assert_object('swarming', swarming)
swarming = copy.deepcopy(swarming)
override_builder_cfg_data = swarming.pop('override_builder_cfg', None)
if override_builder_cfg_data is not None:
assert_object('swarming.override_builder_cfg', override_builder_cfg_data)
if 'build_numbers' in override_builder_cfg_data:
bad(
'swarming.override_builder_cfg parameter '
'cannot override build_numbers'
)
override_builder_cfg = project_config_pb2.Builder()
try:
protoutil.merge_dict(override_builder_cfg_data, override_builder_cfg)
except TypeError as ex:
bad('swarming.override_builder_cfg parameter: %s', ex)
if override_builder_cfg.name:
bad('swarming.override_builder_cfg cannot override builder name')
if override_builder_cfg.mixins:
bad('swarming.override_builder_cfg cannot use mixins')
if 'pool:' in override_builder_cfg.dimensions:
bad('swarming.override_builder_cfg cannot remove pool dimension')
ctx = validation.Context.raise_on_error(
exc_type=errors.InvalidInputError,
prefix='swarming.override_builder_cfg parameter: '
)
swarmingcfg_module.validate_builder_cfg(
override_builder_cfg, [], False, ctx
)
if swarming:
bad('unrecognized keys in swarming param: %r', swarming.keys())
# Mocked in tests.
def _should_use_canary_template(percentage): # pragma: no cover
"""Returns True if a canary template should be used.
This function is non-determinstic.
"""
return random.randint(0, 99) < percentage
def _prepare_builder_config(builder_cfg, swarming_param):
"""Returns final version of builder config to use for |build|.
Expects arguments to be valid.
"""
# Builders are already flattened in the datastore.
result = builder_cfg
# Apply overrides in the swarming parameter.
override_builder_cfg_data = swarming_param.get('override_builder_cfg', {})
if override_builder_cfg_data:
override_builder_cfg = project_config_pb2.Builder()
protoutil.merge_dict(override_builder_cfg_data, result)
ctx = validation.Context.raise_on_error(
exc_type=errors.InvalidInputError,
prefix='swarming.override_builder_cfg parameter: '
)
flatten_swarmingcfg.merge_builder(result, override_builder_cfg)
swarmingcfg_module.validate_builder_cfg(result, [], True, ctx)
return result
def _buildbucket_property(build):
"""Returns value for 'buildbucket' build property.
The format of the returned value corresponds the one used in
buildbot-buildbucket integration [1], with two exceptions:
- it is not encoded in JSON
- the list of tags are initial tags only.
Does not include auto-generated tags.
[1]:
https://chromium.googlesource.com/chromium/tools/build/+/82373bb503dca5f91cd0988d49df38394fdf8b0b/scripts/master/buildbucket/integration.py#329
"""
return {
'hostname': app_identity.get_default_version_hostname(),
'build': {
'project': build.project,
'bucket': api_common.format_luci_bucket(build.bucket_id),
'created_by': build.created_by.to_bytes(),
'created_ts': utils.datetime_to_timestamp(build.create_time),
'id': str(build.key.id()),
# Note: this includes only user-specified tags.
# It does not include auto-generated tags, such as "swarming_tag".
# This is a bit different from Buildbot-Buildbucket integration.
# In practice, however, only "buildset" tag is read from this list.
'tags': build.initial_tags,
},
}
def _apply_if_tags(task):
"""Filters a task based on '#if-tag's on JSON objects.
JSON objects containing a property '#if-tag' will be checked to see if the
given value is one of the task's swarming tags. If the tag is present in the
swarming tags of the task, the object is included and the '#if-tag' property
is dropped.
If the JSON object does not contain '#if-tag', it will be unconditionally
included.
It is not possible to filter the entire (top-level) task :).
This returns a copy of the task.
"""
tags = set(task.get('tags', ()))
tag_string = '#if-tag'
def keep(obj):
if isinstance(obj, dict) and tag_string in obj:
return obj[tag_string] in tags
return True
def walk(obj):
if isinstance(obj, dict):
return {
k: walk(v) for k, v in obj.iteritems() if k != tag_string and keep(v)
}
if isinstance(obj, list):
return [walk(i) for i in obj if keep(i)]
return obj
return walk(task)
@ndb.tasklet
def _is_migrating_builder_prod_async(builder_cfg, build):
"""Returns True if the builder is prod according to the migration app.
See also 'luci_migration_host' in the project config.
If unknown, returns None.
On failures, logs them and returns None.
"""
ret = None
master = None
props_list = (
build.parameters.get(_PARAM_PROPERTIES) or {},
flatten_swarmingcfg.read_properties(builder_cfg.recipe),
)
for prop_name in ('luci_migration_master_name', 'mastername'):
for props in props_list:
master = props.get(prop_name)
if master:
break
if master: # pragma: no branch
break
host = swarmingcfg_module.clear_dash(builder_cfg.luci_migration_host)
if master and host:
try:
url = 'https://%s/masters/%s/builders/%s/' % (
host, master, builder_cfg.name
)
res = yield net.json_request_async(
url, params={'format': 'json'}, scopes=net.EMAIL_SCOPE
)
ret = res.get('luci_is_prod')
except net.NotFoundError:
logging.warning(
'missing migration status for %r/%r', master, builder_cfg.name
)
except net.Error:
logging.exception(
'failed to get migration status for %r/%r', master, builder_cfg.name
)
raise ndb.Return(ret)
@ndb.tasklet
def _create_task_def_async(
swarming_cfg, builder_cfg, build, build_number, fake_build
):
"""Creates a swarming task definition for the |build|.
Supports build properties that are supported by Buildbot-Buildbucket
integration. See
https://chromium.googlesource.com/chromium/tools/build/+/eff4ceb/scripts/master/buildbucket/README.md#Build-parameters
Sets build.swarming_hostname and build.canary attributes.
Raises:
errors.InvalidInputError if build.parameters are invalid.
"""
assert isinstance(swarming_cfg,
project_config_pb2.Swarming), type(swarming_cfg)
assert isinstance(builder_cfg, project_config_pb2.Builder), type(builder_cfg)
assert isinstance(build, model.Build), type(build)
assert build.key and build.key.id(), build.key
assert build.url, 'build.url should have been initialized'
assert isinstance(build_number,
int) or build_number is None, type(build_number)
assert isinstance(fake_build, bool), type(fake_build)
params = copy.deepcopy(build.parameters) or {}
build.parameters_actual = params
# params is an alias for build.parameters_actual.
validate_build_parameters(builder_cfg.name, params)
swarming_param = params.get(_PARAM_SWARMING) or {}
# Use canary template?
assert isinstance(build.canary_preference, model.CanaryPreference)
if build.canary_preference == model.CanaryPreference.AUTO:
canary_percentage = _DEFAULT_CANARY_TEMPLATE_PERCENTAGE
if swarming_cfg.HasField( # pragma: no branch
'task_template_canary_percentage'):
canary_percentage = swarming_cfg.task_template_canary_percentage.value
build.canary = _should_use_canary_template(canary_percentage)
else:
build.canary = build.canary_preference == model.CanaryPreference.CANARY
builder_cfg = _prepare_builder_config(builder_cfg, swarming_param)
try:
task_template_rev, task_template, build.canary = (
yield _get_task_template_async(
build.canary,
build.canary_preference == model.CanaryPreference.CANARY
)
)
except CanaryTemplateNotFound as ex:
raise errors.InvalidInputError(ex.message)
if not task_template:
raise TemplateNotFound('task template is not configured')
build.swarming_hostname = swarming_cfg.hostname
if not build.swarming_hostname: # pragma: no cover
raise Error('swarming hostname is not configured')
h = hashlib.sha256('%s/%s' % (build.bucket_id, builder_cfg.name)).hexdigest()
task_template_params = {
'builder_hash': h,
'build_id': build.key.id(),
'build_result_filename': _BUILD_RUN_RESULT_FILENAME,
'build_url': build.url,
'builder': builder_cfg.name,
'cache_dir': _CACHE_DIR,
'hostname': app_identity.get_default_version_hostname(),
'project': build.project,
'swarming_hostname': build.swarming_hostname,
}
extra_swarming_tags = []
extra_cipd_packages = []
if builder_cfg.HasField('recipe'): # pragma: no branch
(extra_swarming_tags, extra_cipd_packages, extra_task_template_params
) = _setup_recipes(build, builder_cfg, build_number, params)
task_template_params.update(extra_task_template_params)
# Render task template.
# Format is
# https://cs.chromium.org/chromium/infra/luci/appengine/swarming/swarming_rpcs.py?q=NewTaskRequest
task_template_params = {
k: v or '' for k, v in task_template_params.iteritems()
}
task = format_obj(task_template, task_template_params)
# Set 'pool_task_template' to match our build's canary status.
# This can be made unconditional after crbug.com/823434 is closed. Right now
# we override this with "SKIP" in the templates to allow an atomic transition.
task.setdefault(
'pool_task_template', 'CANARY_PREFER' if build.canary else 'CANARY_NEVER'
)
task['priority'] = _calc_priority(build, builder_cfg, task.get('priority'))
if builder_cfg.service_account: # pragma: no branch
# Don't pass it if not defined, for backward compatibility.
task['service_account'] = builder_cfg.service_account
task['tags'] = _calc_tags(
build, builder_cfg, extra_swarming_tags, task_template_rev,
task.get('tags')
)
task = _apply_if_tags(task)
_setup_swarming_request_task_slices(
build, builder_cfg, extra_cipd_packages, task
)
if not fake_build: # pragma: no branch | covered by swarmbucketapi_test.py
_setup_swarming_request_pubsub(task, build)
raise ndb.Return(task)
def _setup_recipes(build, builder_cfg, build_number, params):
"""Initializes a build request using recipes.
Mutates params.
Returns:
extra_swarming_tags, extra_cipd_packages, extra_task_template_params
"""
# Properties specified in build parameters must override those in builder
# config.
build_properties = flatten_swarmingcfg.read_properties(builder_cfg.recipe)
build_properties.update(params.get(_PARAM_PROPERTIES) or {})
params[_PARAM_PROPERTIES] = build_properties
# build_properties is an alias for params[_PARAM_PROPERTIES]
# In order to allow some builders to behave like other builders, we allow
# builders to explicitly set buildername.
if 'buildername' not in build_properties:
build_properties['buildername'] = builder_cfg.name
build_properties.update(buildbucket=_buildbucket_property(build),)
assert isinstance(build.experimental, bool)
build_properties.setdefault('$recipe_engine/runtime', {}).update(
is_luci=True,
is_experimental=build.experimental,
)
if build_number is not None: # pragma: no branch
build_properties['buildnumber'] = build_number
changes = params.get(_PARAM_CHANGES)
if changes: # pragma: no branch
# Buildbucket-Buildbot integration passes repo_url of the first change in
# build parameter "changes" as "repository" attribute of SourceStamp.
# https://chromium.googlesource.com/chromium/tools/build/+/2c6023d
# /scripts/master/buildbucket/changestore.py#140
# Buildbot passes repository of the build source stamp as "repository"
# build property. Recipes, in partiular bot_update recipe module, rely on
# "repository" property and it is an almost sane property to support in
# swarmbucket.
repo_url = changes[0].get('repo_url')
if repo_url: # pragma: no branch
build_properties['repository'] = repo_url
# Buildbot-Buildbucket integration converts emails in changes to blamelist
# property.
emails = [c.get('author', {}).get('email') for c in changes]
build_properties['blamelist'] = filter(None, emails)
extra_task_template_params = {
'recipe': builder_cfg.recipe.name,
'properties_json': json.dumps(build_properties, sort_keys=True),
'checkout_dir': _KITCHEN_CHECKOUT,
}
extra_swarming_tags = [
'recipe_name:%s' % builder_cfg.recipe.name,
]
extra_cipd_packages = []
if builder_cfg.recipe.cipd_package:
extra_task_template_params.update({
'repository': '',
'revision': '',
})
extra_swarming_tags.append(
'recipe_package:' + builder_cfg.recipe.cipd_package
)
extra_cipd_packages.append({
'path': _KITCHEN_CHECKOUT,
'package_name': builder_cfg.recipe.cipd_package,
'version': builder_cfg.recipe.cipd_version or 'refs/heads/master',
})
else:
extra_task_template_params.update({
'repository': builder_cfg.recipe.repository,
'revision': 'HEAD',
})
extra_swarming_tags.append(
'recipe_repository:' + builder_cfg.recipe.repository
)
return extra_swarming_tags, extra_cipd_packages, extra_task_template_params
def _calc_priority(build, builder_cfg, priority):
"""Calculates the Swarming task request priority to use."""
priority = int(priority or 0)
if builder_cfg.priority > 0: # pragma: no branch
priority = builder_cfg.priority
if build.experimental:
priority = min(255, priority * 2)
# Swarming accepts priority as a string
return str(priority)
def _calc_tags(
build, builder_cfg, extra_swarming_tags, task_template_rev, tags
):
"""Calculates the Swarming task request tags to use."""
tags = set(tags or [])
tags.add('buildbucket_bucket:%s' % build.bucket_id)
tags.add('buildbucket_build_id:%s' % build.key.id())
tags.add(
'buildbucket_hostname:%s' % app_identity.get_default_version_hostname()
)
tags.add('buildbucket_template_canary:%s' % ('1' if build.canary else '0'))
tags.add('buildbucket_template_revision:%s' % task_template_rev)
tags.update(extra_swarming_tags)
tags.update(builder_cfg.swarming_tags)
tags.update(build.tags)
return sorted(tags)
def _setup_swarming_request_task_slices(
build, builder_cfg, extra_cipd_packages, task
):
"""Mutate the task request with named cache, CIPD packages and (soon) expiring
dimensions.
"""
# For now, refuse a task template with more than one TaskSlice. Otherwise
# it would be much harder to rationalize what's happening while reading the
# Swarming task template.
if len(task[u'task_slices']) != 1:
raise errors.InvalidInputError(
'base swarming task template can only have one task_slices'
)
if build.key: # pragma: no branch
secrets = launcher_pb2.BuildSecrets(
build_token=tokens.generate_build_token(build.key.id()),
)
task[u'task_slices'][0][u'properties'][u'secret_bytes'] = base64.b64encode(
secrets.SerializeToString()
)
if builder_cfg.expiration_secs > 0:
task[u'task_slices'][0][u'expiration_secs'] = str(
builder_cfg.expiration_secs
)
# Now take a look to generate a fallback! This is done by inspecting the
# Builder named caches for the flag "wait_for_warm_cache_secs".
dims = _setup_swarming_props(
build,
builder_cfg,
extra_cipd_packages,
task[u'task_slices'][0][u'properties'],
)
if dims:
if len(dims) > 6:
raise errors.InvalidInputError(
'Too many (%d > 6) TaskSlice fallbacks' % len(dims)
)
# Create a fallback by copying the original task slice, each time adding the
# corresponding expiration.
base_task_slice = task[u'task_slices'].pop()
base_task_slice.setdefault(u'wait_for_capacity', False)
last_exp = 0
for expiration_secs in sorted(dims):
t = {
u'expiration_secs': str(expiration_secs - last_exp),
u'properties': copy.deepcopy(base_task_slice[u'properties']),
u'wait_for_capacity': base_task_slice[u'wait_for_capacity'],
}
last_exp = expiration_secs
task[u'task_slices'].append(t)
# Tweak expiration on the base_task_slice, which is the last slice.
exp = max(int(base_task_slice[u'expiration_secs']) - last_exp, 60)
base_task_slice[u'expiration_secs'] = str(exp)
task[u'task_slices'].append(base_task_slice)
assert len(task[u'task_slices']) == len(dims) + 1
# Now add the actual fallback dimensions. They could be either from optional
# named caches or from buildercfg dimensions in the form
# "<expiration_secs>:<key>:<value>".
extra_dims = []
for i, (_expiration_secs, kv) in enumerate(sorted(dims.iteritems(),
reverse=True)):
# Now mutate each TaskProperties to have the desired dimensions.
extra_dims.extend(kv)
props = task[u'task_slices'][-2 - i][u'properties']
props[u'dimensions'].extend(extra_dims)
props[u'dimensions'].sort(key=lambda x: (x[u'key'], x[u'value']))
def _setup_swarming_props(build, builder_cfg, extra_cipd_packages, props):
"""Fills a TaskProperties.
Updates props; a python format of TaskProperties.
Returns:
dict {expiration_sec: {key: list(values)}} to support caches. This is
different than the format in flatten_swarmingcfg.parse_dimensions().
"""
props.setdefault('env', []).append({
'key': 'BUILDBUCKET_EXPERIMENTAL',
'value': str(build.experimental).upper(),
})
props.setdefault('cipd_input', {}).setdefault('packages',
[]).extend(extra_cipd_packages)
if builder_cfg.execution_timeout_secs > 0:
props['execution_timeout_secs'] = str(builder_cfg.execution_timeout_secs)
cache_fallbacks = _setup_named_caches(builder_cfg, props)
# Add in all of the non-fallback swarming dimensions to the task properties.
dims = swarmingcfg_module.read_dimensions(builder_cfg)
# Reconstruct dims as the actual list of dimensions needed. The challenge here
# is that repeated values are valid!
out = {}
for expirations_secs, items in cache_fallbacks.iteritems():
out.setdefault(expirations_secs, []).extend(
{u'key': u'caches', u'value': item} for item in items
)
for key, (value, expirations_secs) in dims.iteritems():
out.setdefault(expirations_secs, []).append({u'key': key, u'value': value})
props['dimensions'] = out.pop(0, [])
props[u'dimensions'].sort(key=lambda x: (x[u'key'], x[u'value']))
return out
def _setup_named_caches(builder_cfg, props):
"""Adds/replaces named caches to/in the Swarming TaskProperties.
Mutates props.
Returns:
dict {expiration_secs: list(caches)}
"""
template_caches = props.get(u'caches', [])
props[u'caches'] = []
names = set()
paths = set()
cache_fallbacks = {}
# Look for builder specific named caches.
for c in builder_cfg.caches:
if c.path.startswith(u'cache/'): # pragma: no cover
# TODO(nodir): remove this code path onces clients remove "cache/" from
# their configs.
cache_path = c.path
else:
cache_path = posixpath.join(_CACHE_DIR, c.path)
names.add(c.name)
paths.add(cache_path)
props[u'caches'].append({u'path': cache_path, u'name': c.name})
if c.wait_for_warm_cache_secs:
cache_fallbacks.setdefault(c.wait_for_warm_cache_secs, []).append(c.name)
# Look for named cache fallback from the swarming task template itself.
for c in template_caches:
# Only process the caches that were not overridden.
if c.get(u'path') not in paths and c.get(u'name') not in names:
props[u'caches'].append({u'name': c[u'name'], u'path': c[u'path']})
v = c.get(u'wait_for_warm_cache_secs')
if v:
cache_fallbacks.setdefault(v, []).append(c[u'name'])
props[u'caches'].sort(key=lambda p: p.get(u'path'))
return cache_fallbacks
def _setup_swarming_request_pubsub(task, build):
"""Mutates Swarming task request to add pubsub topic."""
task['pubsub_topic'] = 'projects/%s/topics/%s' % (
app_identity.get_application_id(), _PUBSUB_TOPIC
)
task['pubsub_userdata'] = json.dumps(
{
'build_id': build.key.id(),
'created_ts': utils.datetime_to_timestamp(utils.utcnow()),
'swarming_hostname': build.swarming_hostname,
},
sort_keys=True,
)
@ndb.tasklet
def _get_builder_async(build):
"""Returns builder info of the build.
Raises:
errors.InvalidInputError if build has no builder name.
errors.BuilderNotFoundError if builder is not found.
Returns:
(project_config_pb2.Bucket, project_config_pb2.Builder) tuple future.
"""
if not build.parameters:
raise errors.InvalidInputError(
'A build for bucket %r must have parameters' % build.bucket_id
)
builder_name = build.parameters.get(model.BUILDER_PARAMETER)
if not isinstance(builder_name, basestring):
raise errors.InvalidInputError('Invalid builder name %r' % builder_name)
project_id, bucket_cfg = yield config.get_bucket_async(build.bucket_id)
assert bucket_cfg, 'if there is no bucket, this code should not have run'
assert project_id == build.project, '%r != %r' % (project_id, build.project)
if not bucket_cfg.HasField('swarming'):
raise errors.InvalidInputError(
'bucket %r is not a swarming bucket' % bucket_cfg.name
)
for builder_cfg in bucket_cfg.swarming.builders: # pragma: no branch
if builder_cfg.name == builder_name: # pragma: no branch
raise ndb.Return(bucket_cfg, builder_cfg)
raise errors.BuilderNotFoundError(
'Builder %r is not found in bucket %r' % (builder_name, build.bucket_id)
)
@ndb.tasklet
def prepare_task_def_async(build, build_number=None, fake_build=False):
settings = yield _get_settings_async()
bucket_cfg, builder_cfg = yield _get_builder_async(build)
ret = yield _prepare_task_def_async(
build, build_number, bucket_cfg, builder_cfg, settings, fake_build
)
raise ndb.Return(ret)
@ndb.tasklet
def _prepare_task_def_async(
build, build_number, bucket_cfg, builder_cfg, settings, fake_build
):
"""Prepares a swarming task definition.
Validates the new build.
If configured, generates a build number and updates the build.
Creates a swarming task definition.
Sets build attributes: swarming_hostname, canary and url.
May add "build_address" tag.
Returns a task_def dict.
"""
if build.lease_key:
raise errors.InvalidInputError(
'Swarming buckets do not support creation of leased builds'
)
if build_number is not None:
build.tags.append(
buildtags.build_address_tag(
# TODO(crbug.com/851036): migrate build address to use short
# bucket names.
api_common.format_luci_bucket(build.bucket_id),
builder_cfg.name,
build_number,
)
)
build.url = _generate_build_url(settings.milo_hostname, build)
if build.experimental is None:
build.experimental = (builder_cfg.experimental == project_config_pb2.YES)
is_prod = yield _is_migrating_builder_prod_async(builder_cfg, build)
if is_prod is not None:
build.experimental = not is_prod
task_def = yield _create_task_def_async(
bucket_cfg.swarming, builder_cfg, build, build_number, fake_build
)
raise ndb.Return(task_def)
@ndb.tasklet
def create_task_async(build, build_number=None):
"""Creates a swarming task for the build and mutates the build.
May be called only if build's bucket is configured for swarming.
Raises:
errors.InvalidInputError if build attribute values are invalid.
"""
settings = yield _get_settings_async()
bucket_cfg, builder_cfg = yield _get_builder_async(build)
task_def = yield _prepare_task_def_async(
build, build_number, bucket_cfg, builder_cfg, settings, False
)
assert build.swarming_hostname
res = yield _call_api_async(
impersonate=True,
hostname=build.swarming_hostname,
path='tasks/new',
method='POST',
payload=task_def,
# Make Swarming know what bucket the task belong too. Swarming uses
# this to authorize access to pools assigned to specific buckets only.
delegation_tag='buildbucket:bucket:%s' % build.bucket_id,
# Higher timeout than normal because if the task creation request
# fails, but the task is actually created, later we will receive a
# notification that the task is completed, but we won't have a build
# for that task, which results in errors in the log.
deadline=30,
# This code path is executed by put and put_batch request handlers.
# Clients should retry these requests on transient errors, so
# do not retry requests to swarming.
max_attempts=1
)
task_id = res['task_id']
logging.info('Created a swarming task %s', task_id)
build.swarming_task_id = task_id
build.tags.extend([
'swarming_hostname:%s' % build.swarming_hostname,
'swarming_task_id:%s' % task_id,
])
task_req = res.get('request', {})
for t in task_req.get('tags', []):
key, value = buildtags.parse(t)
if key == 'log_location':
host, project, prefix, _ = logdog.parse_url(value)
build.logdog_hostname = host
build.logdog_project = project
build.logdog_prefix = prefix
build.tags.append(buildtags.unparse(buildtags.SWARMING_TAG_KEY, t))
task_slices = task_req.get('task_slices') or [{}]
for d in task_slices[0].get('properties', {}).get('dimensions', []):
dt = buildtags.unparse(d['key'], d['value'])
build.tags.append(buildtags.unparse(buildtags.SWARMING_DIMENSION_KEY, dt))
build.service_account = task_req.get('service_account')
# Mark the build as leased.
exp = sum(int(t['expiration_secs']) for t in task_def['task_slices'])
# This is not exactly true but #closeenough.
exp += int(
task_def['task_slices'][-1]['properties']['execution_timeout_secs']
)
# Adding an hour definitely helps with the #closeenough.
exp += 24 * 60 * 60
build.lease_expiration_date = utils.utcnow() + datetime.timedelta(seconds=exp)
build.regenerate_lease_key()
build.leasee = user.self_identity()
build.never_leased = False
def _generate_build_url(milo_hostname, build):
if not milo_hostname:
return (
'https://%s/task?id=%s' %
(build.swarming_hostname, build.swarming_task_id)
)
return 'https://%s/b/%d' % (milo_hostname, build.key.id())
@ndb.tasklet
def cancel_task_async(hostname, task_id):
"""Cancels a swarming task.
Noop if the task started running.
"""
res = yield _call_api_async(
impersonate=False,
hostname=hostname,
path='task/%s/cancel' % task_id,
method='POST',
payload={
'kill_running': True,
},
)
if res.get('ok'):
logging.info('response: %r', res)
else:
logging.warning('response: %r', res)
def cancel_task(hostname, task_id):
"""Sync version of cancel_task_async.
Noop if the task started running.
"""
cancel_task_async(hostname, task_id).get_result()
def cancel_task_transactionally_async(hostname, task_id): # pragma: no cover
"""Transactionally schedules a push task to cancel a swarming task.
Swarming task cancelation is noop if the task started running.
"""
url = (
'/internal/task/buildbucket/cancel_swarming_task/%s/%s' %
(hostname, task_id)
)
task = taskqueue.Task(url=url)
return task.add_async(queue_name='backend-default', transactional=True)
################################################################################
# Update builds.
def _load_task_result_async(hostname, task_id): # pragma: no cover
return _call_api_async(
impersonate=False,
hostname=hostname,
path='task/%s/result' % task_id,
)
@ndb.tasklet
def _load_build_run_result_async(task_result, bucket_id, builder):
"""Fetches _BUILD_RUN_RESULT_FILENAME from swarming task output.
Logs errors.
Returns (build_run_result dict, error_string), where error_string is None
if there is no error.
"""
outputs_ref = task_result.get('outputs_ref')
if not outputs_ref:
raise ndb.Return(None, None)
server_prefix = 'https://'
if not outputs_ref['isolatedserver'].startswith(server_prefix):
logging.error(
'Bad isolatedserver %r read from task %s',
outputs_ref['isolatedserver'], task_result['id']
)
raise ndb.Return(None, _BUILD_RUN_RESULT_CORRUPTED)
hostname = outputs_ref['isolatedserver'][len(server_prefix):]
@ndb.tasklet
def fetch_json_async(loc):
try:
raw = yield isolate.fetch_async(loc)
raise ndb.Return(json.loads(raw))
except ValueError:
logging.exception('could not load %s', isolated_loc.human_url)
raise ndb.Return(None)
isolated_loc = isolate.Location(
hostname, outputs_ref['namespace'], outputs_ref['isolated']
)
isolated = yield fetch_json_async(isolated_loc)
if isolated is None:
raise ndb.Return(None, _BUILD_RUN_RESULT_CORRUPTED)
# Assume the isolated file format
result_entry = isolated['files'].get(_BUILD_RUN_RESULT_FILENAME)
if not result_entry:
raise ndb.Return(None, None)
result_size = int(result_entry['s'])
if result_size >= _BUILD_RUN_RESULT_MAX_SIZE:
raise ndb.Return(None, _BUILD_RUN_RESULT_TOO_LARGE)
_BUILD_RUN_RESULT_SIZE_METRIC.add(
result_size / 1000,
{
'bucket': bucket_id,
'builder': builder,
},
)
result_loc = isolated_loc._replace(digest=result_entry['h'])
build_result = yield fetch_json_async(result_loc)
raise ndb.Return(
build_result, None if build_result else _BUILD_RUN_RESULT_CORRUPTED
)
def _sync_build_in_memory(
build, task_result, build_run_result, build_run_result_error
):
"""Syncs buildbucket |build| state with swarming task |result|."""
# Task result docs:
# https://github.com/luci/luci-py/blob/985821e9f13da2c93cb149d9e1159c68c72d58da/appengine/swarming/server/task_result.py#L239
#
# build_run_result is dict parsed from BuildRunResult JSONPB. May be None.
# https://chromium.googlesource.com/infra/infra/+/924a4fa83c1c1018635544e5783f18eeb2ea2edc/go/src/infra/tools/kitchen/proto/result.proto
if build.status == model.BuildStatus.COMPLETED: # pragma: no cover
# Completed builds are immutable.
return False
now = utils.utcnow()
old_status = build.status
build.status = None
build.result = None
build.failure_reason = None
build.cancelation_reason = None
build.result_details = {'swarming': {'bot_dimensions': {}}}
bot_dimensions = build.result_details['swarming']['bot_dimensions']
for d in (task_result or {}).get('bot_dimensions', []):
bot_dimensions[d['key']] = d['value'] # this is a list of values
# error message to include in result_details. Used only if build is complete.
errmsg = ''
terminal_states = {
'EXPIRED',
'TIMED_OUT',
'BOT_DIED',
'CANCELED',
'COMPLETED',
'KILLED',
'NO_RESOURCE',
}
state = (task_result or {}).get('state')
if build_run_result_error:
build.status = model.BuildStatus.COMPLETED
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
errmsg = '%s returned by the swarming task is bad: %s.' % (
_BUILD_RUN_RESULT_FILENAME, build_run_result_error
)
elif state is None:
build.status = model.BuildStatus.COMPLETED
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
errmsg = (
'Swarming task %s on %s unexpectedly disappeared' %
(build.swarming_task_id, build.swarming_hostname)
)
elif state == 'PENDING':
if build.status == model.BuildStatus.STARTED: # pragma: no cover
# Most probably, race between PubSub push handler and Cron job.
# With swarming, a build cannot go from STARTED back to PENDING,
# so ignore this.
return False
build.status = model.BuildStatus.SCHEDULED
elif state == 'RUNNING':
build.status = model.BuildStatus.STARTED
elif state in terminal_states:
build.status = model.BuildStatus.COMPLETED
if state in ('CANCELED', 'KILLED'):
build.result = model.BuildResult.CANCELED
build.cancelation_reason = model.CancelationReason.CANCELED_EXPLICITLY
elif state in ('EXPIRED', 'NO_RESOURCE'):
# Task did not start.
build.result = model.BuildResult.CANCELED
build.cancelation_reason = model.CancelationReason.TIMEOUT
elif state == 'TIMED_OUT':
# Task started, but timed out.
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
elif state == 'BOT_DIED' or task_result.get('internal_failure'):
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
elif build_run_result is None:
# There must be a build_run_result, otherwise it is an infra failure.
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
elif task_result.get('failure'):
build.result = model.BuildResult.FAILURE
if build_run_result.get('infraFailure'):
build.failure_reason = model.FailureReason.INFRA_FAILURE
else:
build.failure_reason = model.FailureReason.BUILD_FAILURE
else:
assert state == 'COMPLETED'
build.result = model.BuildResult.SUCCESS
else: # pragma: no cover
assert False, 'Unexpected task state: %s' % state
if build.status == old_status: # pragma: no cover
return False
build.status_changed_time = now
logging.info(
'Build %s status: %s -> %s', build.key.id(), old_status, build.status
)
def ts(key):
v = (task_result or {}).get(key)
return _parse_ts(v) if v else None
if build.status == model.BuildStatus.STARTED:
build.start_time = ts('started_ts') or now
elif build.status == model.BuildStatus.COMPLETED: # pragma: no branch
logging.info('Build %s result: %s', build.key.id(), build.result)
build.clear_lease()
build.start_time = ts('started_ts') or build.start_time
build.complete_time = ts('completed_ts') or ts('abandoned_ts') or now
if build_run_result:
ann = build_run_result.get('annotations') or {}
build.result_details['ui'] = {
'info': '\n'.join(ann.get('text', [])),
}
build.result_details['properties'] = _extract_properties(ann)
if errmsg:
build.result_details['error'] = {'message': errmsg}
return True
def _extract_properties(annotation_step):
"""Extracts properties from an annotation step"""
ret = {}
def extract(step):
for p in step.get('property') or ():
ret[p['name']] = json.loads(p['value'])
for s in step.get('substep') or ():
extract(s.get('step') or {})
extract(annotation_step)
return ret
def _extract_build_steps(build_run_result):
"""Extracts a list of buildbucket.v2.Step from build_run_result."""
# TODO(crbug.com/853450): remove, accept build steps from kitchen directly.
build_run_result = build_run_result or {}
ann_dict = build_run_result.get('annotations')
ann_url = build_run_result.get('annotationUrl')
if not ann_dict or not ann_url: # pragma: no cover
return []
ann_step = annotations_pb2.Step()
json_format.Parse(json.dumps(ann_dict), ann_step, ignore_unknown_fields=True)
host, project, prefix, _ = logdog.parse_url(ann_url)
parser = annotations.StepParser(
default_logdog_host=host,
default_logdog_prefix='%s/%s' % (project, prefix),
)
return parser.parse_substeps(ann_step.substep)
@ndb.tasklet
def _sync_build_async(build_id, task_result, bucket_id, builder):
"""Syncs Build entity in the datastore with the swarming task."""
build_run_result = None
build_run_result_error = False
if task_result:
build_run_result, build_run_result_error = yield (
_load_build_run_result_async(task_result, bucket_id, builder)
)
build_key = ndb.Key(model.Build, build_id)
# TODO(nodir): accept build steps via a separate RPC.
step_container = build_pb2.Build(steps=_extract_build_steps(build_run_result))
step_byte_size = step_container.ByteSize()
_BUILD_STEPS_SIZE_METRIC.add(
step_byte_size / 1000, # convert to Kb
{
'bucket': bucket_id,
'builder': builder,
},
)
too_large = step_byte_size > model.BuildSteps.MAX_STEPS_LEN
build_steps = None
if too_large: # pragma: no cover
# piggy back on the existing error handling mechanism
build_run_result = None
build_run_result_error = (
'build steps are %d bytes which is more than %d' %
(step_byte_size, model.BuildSteps.MAX_STEPS_LEN)
)
build_run_result_error = _BUILD_RUN_RESULT_TOO_LARGE
else:
# Do not set build_steps.step_container unless we are sure it is under the
# size limit.
build_steps = model.BuildSteps(
key=model.BuildSteps.key_for(build_key),
step_container=step_container,
)
@ndb.transactional_tasklet
def txn_async():
build = yield build_key.get_async()
if not build: # pragma: no cover
raise ndb.Return(None)
made_change = _sync_build_in_memory(
build, task_result, build_run_result, build_run_result_error
)
if not made_change:
raise ndb.Return(None)
futures = [build.put_async()]
if build.status == model.BuildStatus.STARTED:
futures.append(events.on_build_starting_async(build))
elif build.status == model.BuildStatus.COMPLETED: # pragma: no cover
# This code is coverd by tests, but pycover reports coverage incorrectly!
futures.append(events.on_build_completing_async(build))
if build_steps:
futures.append(build_steps.put_async())
yield futures
raise ndb.Return(build)
build = yield txn_async()
if build:
if build.status == model.BuildStatus.STARTED:
events.on_build_started(build)
elif build.status == model.BuildStatus.COMPLETED: # pragma: no branch
events.on_build_completed(build)
class SubNotify(webapp2.RequestHandler):
"""Handles PubSub messages from swarming.
Assumes unprivileged users cannot send requests to this handler.
"""
bad_message = False
def unpack_msg(self, msg):
"""Extracts swarming hostname, creation time, task id and build id from msg.
Aborts if |msg| is malformed.
"""
data_b64 = msg.get('data')
if not data_b64:
self.stop('no message data')
try:
data_json = base64.b64decode(data_b64)
except ValueError as ex: # pragma: no cover
self.stop('cannot decode message data as base64: %s', ex)
data = self.parse_json_obj(data_json, 'message data')
userdata = self.parse_json_obj(data.get('userdata'), 'userdata')
hostname = userdata.get('swarming_hostname')
if not hostname:
self.stop('swarming hostname not found in userdata')
if not isinstance(hostname, basestring):
self.stop('swarming hostname is not a string')
created_ts = userdata.get('created_ts')
if not created_ts:
self.stop('created_ts not found in userdata')
try:
created_time = utils.timestamp_to_datetime(created_ts)
except ValueError as ex:
self.stop('created_ts in userdata is invalid: %s', ex)
build_id = userdata.get('build_id')
if not isinstance(build_id, (int, long)):
self.stop('invalid build_id %r', build_id)
task_id = data.get('task_id')
if not task_id:
self.stop('task_id not found in message data')
return hostname, created_time, task_id, build_id
def post(self):
msg = self.request.json['message']
logging.info('Received message: %r', msg)
# Try not to process same message more than once.
nc = 'swarming-pubsub-msg-id'
if memcache.get(msg['messageId'], namespace=nc):
logging.info('seen this message before, ignoring')
else:
self._process_msg(msg)
memcache.set(msg['messageId'], 1, namespace=nc, time=10 * 60)
def _process_msg(self, msg):
hostname, created_time, task_id, build_id = self.unpack_msg(msg)
task_url = 'https://%s/task?id=%s' % (hostname, task_id)
# Load build.
logging.info('Build id: %s', build_id)
build = model.Build.get_by_id(build_id)
if not build:
if utils.utcnow() < created_time + datetime.timedelta(minutes=1):
self.stop(
'Build for a swarming task not found yet\nBuild: %s\nTask: %s',
build_id,
task_url,
redeliver=True
)
self.stop(
'Build for a swarming task not found\nBuild: %s\nTask: %s', build_id,
task_url
)
# Ensure the loaded build is associated with the task.
if build.swarming_hostname != hostname:
self.stop(
'swarming_hostname %s of build %s does not match %s',
build.swarming_hostname, build_id, hostname
)
if build.swarming_task_id != task_id:
self.stop(
'swarming_task_id %s of build %s does not match %s',
build.swarming_task_id, build_id, task_id
)
assert build.parameters
# Update build.
result = _load_task_result_async(hostname, task_id).get_result()
_sync_build_async(
build_id,
result,
build.bucket_id,
build.parameters[model.BUILDER_PARAMETER],
).get_result()
def stop(self, msg, *args, **kwargs):
"""Logs error and stops request processing.
Args:
msg: error message
args: format args for msg.
kwargs:
redeliver: True to process this message later.
"""
self.bad_message = True
if args:
msg = msg % args
redeliver = kwargs.get('redeliver')
logging.log(logging.WARNING if redeliver else logging.ERROR, msg)
self.response.write(msg)
self.abort(400 if redeliver else 200)
def parse_json_obj(self, text, name):
"""Parses a JSON object from |text| if possible. Otherwise stops."""
try:
result = json.loads(text or '')
if not isinstance(result, dict):
raise ValueError()
return result
except ValueError:
self.stop('%s is not a valid JSON object: %r', name, text)
class CronUpdateBuilds(webapp2.RequestHandler):
"""Updates builds that are associated with swarming tasks."""
@ndb.tasklet
def update_build_async(self, build):
result = yield _load_task_result_async(
build.swarming_hostname, build.swarming_task_id
)
if not result:
logging.error(
'Task %s/%s referenced by build %s is not found',
build.swarming_hostname, build.swarming_task_id, build.key.id()
)
yield _sync_build_async(
build.key.id(), result, build.bucket_id,
build.parameters[model.BUILDER_PARAMETER]
)
@decorators.require_cronjob
def get(self): # pragma: no cover
q = model.Build.query(
model.Build.swarming_task_id != None,
# We cannot have a second negation filter, so use IN.
# This will result in two datastore queries, which is fine.
model.Build.status.IN([
model.BuildStatus.SCHEDULED, model.BuildStatus.STARTED
])
)
q.map_async(self.update_build_async).get_result()
def get_backend_routes(): # pragma: no cover
return [
webapp2.Route(r'/internal/cron/swarming/update_builds', CronUpdateBuilds),
webapp2.Route(r'/_ah/push-handlers/swarming/notify', SubNotify),
]
################################################################################
# Utility functions
@ndb.tasklet
def _call_api_async(
impersonate,
hostname,
path,
method='GET',
payload=None,
delegation_tag=None,
deadline=None,
max_attempts=None,
):
"""Calls Swarming API."""
delegation_token = None
if impersonate:
delegation_token = yield user.delegate_async(hostname, delegation_tag)
url = 'https://%s/_ah/api/swarming/v1/%s' % (hostname, path)
res = yield net.json_request_async(
url,
method=method,
payload=payload,
scopes=net.EMAIL_SCOPE,
deadline=deadline,
max_attempts=max_attempts,
delegation_token=delegation_token,
)
raise ndb.Return(res)
def format_obj(obj, params):
"""Evaluates all strings in a JSON-like object as a template."""
def transform(obj):
if isinstance(obj, list):
return map(transform, obj)
elif isinstance(obj, dict):
return {k: transform(v) for k, v in obj.iteritems()}
elif isinstance(obj, basestring):
return string.Template(obj).safe_substitute(params)
else:
return obj
return transform(obj)
def _parse_ts(ts):
"""Parses Swarming API's timestamp, which is RFC3339 without time zone."""
# time-secfrac part of RFC3339 format is optional
# https://tools.ietf.org/html/rfc3339#section-5.6
# strptime cannot handle optional parts.
# HACK: add the time-secfrac part if it is missing.
# P(time-secfrac is missing) = 1e-6.
if '.' not in ts:
ts += '.0'
return datetime.datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%f')