| # Copyright 2018 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| import base64 |
| import contextlib |
| import copy |
| |
| from collections import namedtuple |
| |
| from state import TaskState |
| |
| from recipe_engine import recipe_api |
| |
| |
| DEFAULT_CIPD_VERSION = 'git_revision:fd7d55c05dac7486ba163c0d08827a0901afaa7b' |
| |
| |
| # TODO(iannucci): Investigate whether slices can be made invisible to clients |
| # who only wish to specify a request with a single slice. |
| class TaskRequest(object): |
| """Describes a single Swarming request for a new task. |
| |
| A TaskRequest object is immutable and building it up follows the 'constructor' |
| pattern. The with_* and add_* methods set the associated value on a copy of |
| the object, and return that updated copy. |
| |
| A new request has a single empty TaskSlice (see below). |
| |
| Example: |
| ``` |
| request = (api.swarming.task_request(). |
| with_name('my-name'). |
| with_priority(100). |
| with_service_account("my-service-account"). |
| with_slice(0, (request[0]. |
| # ... |
| # Building up of a TaskSlice, following the same pattern; see below. |
| ) |
| ) |
| ) |
| ``` |
| |
| For more details on what goes into a Swarming task, see the user guide: |
| https://chromium.googlesource.com/infra/luci/luci-py/+/master/appengine/swarming/doc/User-Guide.md#task |
| """ |
| def __init__(self, api): |
| self._name = '' |
| self._priority = 200 |
| self._service_account = '' |
| self._api = api |
| self._slices = [self.TaskSlice(api)] |
| |
| def _copy(self): |
| return copy.copy(self) |
| |
| def __getitem__(self, idx): |
| """Returns task slice of the given index.""" |
| return self._slices[idx] |
| |
| def __len__(self): |
| """Returns the number of task slices comprising the request.""" |
| return len(self._slices) |
| |
| def add_slice(self, slice_obj): |
| """Returns the request with the given slice appendedd. |
| |
| Args: |
| * slice (TaskSlice) - The slice to append. |
| """ |
| ret = self._copy() |
| ret._slices.append(slice_obj) |
| return ret |
| |
| def with_slice(self, idx, slice_obj): |
| """Returns the request with the given slice set at the given index. |
| |
| Args: |
| * idx (int) - The index at which to set the slice. |
| * slice (TaskRequest) - The slice to set. |
| """ |
| assert isinstance(slice_obj, self.TaskSlice) |
| assert 0 <= idx < len(self._slices) |
| ret = self._copy() |
| ret._slices[idx] = slice_obj |
| return ret |
| |
| @property |
| def name(self): |
| """Returns the name of the task.""" |
| return self._name |
| |
| def with_name(self, name): |
| """Returns the request with the given name set. |
| |
| Args: |
| * name (str) - The name of the task. |
| """ |
| assert isinstance(name, str) |
| ret = self._copy() |
| ret._name = name |
| return ret |
| |
| @property |
| def priority(self): |
| """Returns the priority of the task. |
| |
| Priority is a numerical priority between 0 and 255 where a higher number |
| corresponds to a lower priority. Tasks are scheduled by swarming in order |
| of their priority (e.g. if both a task of priority 1 and a task of |
| priority 2 are waiting for resources to free up for execution, the task |
| with priority 1 will take precedence). |
| """ |
| return self._priority |
| |
| def with_priority(self, priority): |
| """Returns the request with the given priority set. |
| |
| Args: |
| * priority (int) - The priority of the task. |
| """ |
| assert isinstance(priority, int) |
| ret = self._copy() |
| ret._priority = priority |
| return ret |
| |
| @property |
| def service_account(self): |
| """Returns the service account with which the task will run.""" |
| return self._service_account |
| |
| def with_service_account(self, account): |
| """Returns the request with the given service account attached. |
| |
| Args: |
| * service_account (str) - The service account to attach to the task. |
| """ |
| assert isinstance(account, str) |
| ret = self._copy() |
| ret._service_account = account |
| return ret |
| |
| def to_jsonish(self): |
| """Renders the task request as a JSON-serializable dict. |
| |
| The format follows the schema given by the NewTaskRequest class found here: |
| https://cs.chromium.org/chromium/infra/luci/appengine/swarming/swarming_rpcs.py?q=NewTaskRequest |
| """ |
| return { |
| 'name': self.name, |
| 'priority': str(self.priority), |
| 'service_account': self.service_account, |
| 'task_slices': [task_slice.to_jsonish() for task_slice in self._slices], |
| } |
| |
| class TaskSlice(object): |
| """Describes a specification of a Swarming task slice. |
| |
| A TaskSlice object is immutable and building it up follows the 'constructor' |
| pattern. |
| |
| Example: |
| ``` |
| slice = (request[-1]. |
| with_command(['echo', 'hello']). |
| with_dimensions({'pool': 'my.pool', 'os': 'Debian'}). |
| with_isolated('606d94add94223636ee516c6bc9918f937823ccc'). |
| with_expiration_secs(3600). |
| with_io_timeout_secs(600) |
| ) |
| """ |
| def __init__(self, api): |
| self._command = [] |
| self._isolated = '' |
| self._dimensions = {} |
| self._cipd_ensure_file = api.cipd.EnsureFile() |
| self._outputs = [] |
| self._env_vars = {} |
| self._env_prefixes = {} |
| self._expiration_secs = 300 |
| self._io_timeout_secs = 60 |
| self._execution_timeout_secs = 1200 |
| self._grace_period_secs = 30 |
| self._idempotent = False |
| self._secret_bytes = '' |
| |
| # Containment |
| self._lower_priority = False |
| self._containment_type = 'NONE' |
| self._limit_processes = 0 |
| self._limit_total_committed_memory = 0 |
| |
| self._api = api |
| |
| def _copy(self): |
| # Warning: the copy may have its member mutated. |
| return copy.copy(self) |
| |
| @property |
| def command(self): |
| """Returns the command (list(str)) the task will run.""" |
| return self._command[:] |
| |
| def with_command(self, cmd): |
| """Returns the slice with the given command set. |
| |
| Args: |
| cmd (str) - The command the task will run. |
| """ |
| assert isinstance(cmd, list) and all(isinstance(s, str) for s in cmd) |
| ret = self._copy() |
| ret._command = cmd |
| return ret |
| |
| @property |
| def isolated(self): |
| """Returns the hash of an isolated on the default isolated server. |
| |
| The default isolated server is the one set in IsolatedApi. |
| """ |
| return self._isolated |
| |
| def with_isolated(self, isolated): |
| """Returns the slice with the given isolated hash set. |
| |
| Args: |
| isolated (str) - The hash of an isolated on the default isolated server. |
| """ |
| assert isinstance(isolated, str) |
| ret = self._copy() |
| ret._isolated = isolated |
| return ret |
| |
| @property |
| def dimensions(self): |
| """Returns the dimensions (dict[str]str) on which to filter swarming |
| bots. |
| """ |
| return copy.deepcopy(self._dimensions) |
| |
| def with_dimensions(self, **kwargs): |
| """Returns the slice with the given dimensions set. |
| |
| A key with a value of None will be interpreted as a directive to unset the |
| associated dimension. |
| |
| Example: |
| ``` |
| slice = request[-1].with_dimensions( |
| SOME_DIM='stuff', OTHER_DIM='other stuff') |
| |
| # ... |
| |
| if condition: |
| slice = slice.with_dimensions(SOME_DIM=None) |
| ) |
| ``` |
| """ |
| ret = self._copy() |
| # Make a copy. |
| ret._dimensions = self.dimensions |
| for k, v in kwargs.iteritems(): |
| assert isinstance(k, str) and (isinstance(v, str) or v == None) |
| if v is None: |
| ret._dimensions.pop(k, None) |
| else: |
| ret._dimensions[k] = v |
| return ret |
| |
| @property |
| def cipd_ensure_file(self): |
| """Returns the CIPD ensure file (api.cipd.EnsureFile) of packages to |
| install. |
| """ |
| return copy.deepcopy(self._cipd_ensure_file) |
| |
| def with_cipd_ensure_file(self, ensure_file): |
| """Returns the slice with the given CIPD packages set. |
| |
| Args: |
| ensure_file (api.cipd.EnsureFile) - The CIPD ensure file of the packages |
| to install. |
| """ |
| assert isinstance(ensure_file, self._api.cipd.EnsureFile) |
| ret = self._copy() |
| ret._cipd_ensure_file = ensure_file |
| return ret |
| |
| @property |
| def outputs(self): |
| """Returns the list of files to be isolated on task exit.""" |
| return copy.copy(self._outputs) |
| |
| def with_outputs(self, outputs): |
| """Returns the slice with given outputs set. |
| |
| Args: |
| outputs (list(str)) - Files relative to the swarming task's root |
| directory; they are symlinked into $ISOLATED_OUTDIR and isolated upon |
| exit of the task. |
| """ |
| assert isinstance(outputs, list) |
| assert all(isinstance(output, basestring) for output in outputs) |
| ret = self._copy() |
| ret._outputs = outputs |
| return ret |
| |
| @property |
| def env_vars(self): |
| """Returns the mapping (dict) of an environment variable to its value.""" |
| return copy.deepcopy(self._env_vars) |
| |
| def with_env_vars(self, **kwargs): |
| """Returns the slice with the given environment variables set. |
| |
| A key with a value of None will be interpreted as a directive to unset the |
| associated environment variable. |
| |
| Example: |
| ``` |
| slice = request[-1].with_env_vars( |
| SOME_VARNAME='stuff', OTHER_VAR='more stuff', UNSET_ME=None, |
| ) |
| ``` |
| """ |
| ret = self._copy() |
| # Make a copy. |
| ret._env_vars = self.env_vars |
| for k, v in kwargs.iteritems(): |
| assert ( |
| isinstance(k, basestring) and |
| (isinstance(v, basestring) or v is None)) |
| if v is None: |
| ret._env_vars.pop(k, None) |
| else: |
| ret._env_vars[k] = v |
| return ret |
| |
| @property |
| def env_prefixes(self): |
| """Returns a mapping (dict) of an environment variable to the list of |
| paths to be prepended.""" |
| return copy.deepcopy(self._env_prefixes) |
| |
| def with_env_prefixes(self, **kwargs): |
| """Returns the slice with the given environment prefixes set. |
| |
| The given paths are interpeted as relative to the Swarming root directory. |
| |
| Successive calls to this method is additive with respect to prefixes: a |
| call that sets FOO=[a,...] chained with a call with FOO=[b,...] is |
| equivalent to a single call that sets FOO=[a,...,b,...]. |
| |
| A key with a value of None will be interpreted as a directive to unset the |
| associated environment variable. |
| |
| Example: |
| ``` |
| slice = request[-1].with_env_prefixes( |
| PATH=['path/to/bin/dir', 'path/to/other/bin/dir'], UNSET_ME=None, |
| ) |
| ``` |
| """ |
| ret = self._copy() |
| # Make a copy. |
| ret._env_prefixes = self.env_prefixes |
| for k, v in kwargs.iteritems(): |
| assert ( |
| isinstance(k, basestring) and (isinstance(v, list) or v is None)), ( |
| '%r must be a string and %r None or a list of strings' % (k, v)) |
| if v is None: |
| ret._env_prefixes.pop(k, None) |
| else: |
| assert all(isinstance(prefix, basestring) for prefix in v) |
| ret._env_prefixes.setdefault(k, []).extend(v) |
| return ret |
| |
| @property |
| def expiration_secs(self): |
| """Returns the seconds before this task expires.""" |
| return self._expiration_secs |
| |
| def with_expiration_secs(self, secs): |
| """Returns the slice with the given expiration timeout set. |
| |
| Args: |
| secs (int) - The seconds before the task expires. |
| """ |
| assert isinstance(secs, int) and secs >= 0 |
| ret = self._copy() |
| ret._expiration_secs = secs |
| return ret |
| |
| @property |
| def io_timeout_secs(self): |
| """Returns the seconds for which the task may be silent (no i/o).""" |
| return self._io_timeout_secs |
| |
| def with_io_timeout_secs(self, secs): |
| """Returns the slice with the given i/o timeout set. |
| |
| Args: |
| secs (int) - The seconds for which the task the may be silent (no i/o). |
| """ |
| assert isinstance(secs, int) and secs >= 0 |
| ret = self._copy() |
| ret._io_timeout_secs = secs |
| return ret |
| |
| @property |
| def execution_timeout_secs(self): |
| """Returns the seconds before Swarming should kill the task.""" |
| return self._execution_timeout_secs |
| |
| def with_execution_timeout_secs(self, secs): |
| """Returns the slice with the given hard timeout set. |
| |
| Args: |
| secs (int) - The seconds before which Swarming should kill the task. |
| """ |
| assert isinstance(secs, int) and secs >= 0 |
| ret = self._copy() |
| ret._execution_timeout_secs = secs |
| return ret |
| |
| @property |
| def grace_period_secs(self): |
| """Returns the grace period for the slice. |
| |
| When a Swarming task is killed, the grace period is the amount of time |
| to wait before a SIGKILL is issued to the process, allowing it to |
| perform any clean-up operations. |
| """ |
| return self._grace_period_secs |
| |
| def with_grace_period_secs(self, secs): |
| """Returns the slice with the given grace period set. |
| |
| Args: |
| secs (int) - The seconds giving the grace period. |
| """ |
| assert isinstance(secs, int) and secs >= 0 |
| ret = self._copy() |
| ret._grace_period_secs = secs |
| return ret |
| |
| @property |
| def idempotent(self): |
| """Returns whether the task is idempotent. |
| |
| A task is idempotent if for another task is executed with identical |
| properties, we can short-circuit execution and just return the other |
| latter's results. |
| """ |
| return self._idempotent |
| |
| def with_idempotent(self, idempotent): |
| """Returns the slice with the given idempotency set. |
| |
| Args: |
| idempotent (bool) - Whether the task is idempotent. |
| """ |
| assert isinstance(idempotent, bool) |
| ret = self._copy() |
| ret._idempotent = idempotent |
| return ret |
| |
| @property |
| def secret_bytes(self): |
| """Returns the data to be passed as secret bytes. |
| |
| Secret bytes are base64-encoded data that may be securely passed to the |
| task. This returns the raw, unencoded data initially passed. |
| """ |
| return self._secret_bytes |
| |
| def with_secret_bytes(self, data): |
| """Returns the slice with the given data set as secret bytes. |
| |
| Args: |
| data (str) - The data to be written to secret bytes. |
| """ |
| assert isinstance(data, str) |
| ret = self._copy() |
| ret._secret_bytes = data |
| return ret |
| |
| @property |
| def lower_priority(self): |
| """Returns whether the task process is run with a low priority.""" |
| return self._lower_priority |
| |
| def with_lower_priority(self, lower_priority): |
| """Returns the slice with the given lower_priority boolean set. |
| |
| Args: |
| lower_priority (bool) - Whether the task is run with low process |
| priority. |
| """ |
| assert isinstance(lower_priority, bool) |
| ret = self._copy() |
| ret._lower_priority = lower_priority |
| return ret |
| |
| @property |
| def containment_type(self): |
| """Returns whether the task process is contained.""" |
| return self._containment_type |
| |
| def with_containment_type(self, containment_type): |
| """Returns the slice with the given containment_type set. |
| |
| Args: |
| containment_type (str) - One of the supported containment types. |
| """ |
| assert containment_type in ('NONE', 'AUTO', 'JOB_OJBECT') |
| ret = self._copy() |
| ret._containment_type = containment_type |
| return ret |
| |
| @property |
| def limit_processes(self): |
| """Returns the maximum number of concurrent processes a task can run.""" |
| return self._limit_processes |
| |
| def with_limit_processes(self, limit_processes): |
| """Returns the slice with the maximum number of concurrent processes a |
| task can run set. |
| |
| Args: |
| limit_processes (int) - 0 or the maximum number of concurrent processes |
| """ |
| assert isinstance(limit_processes, int) |
| ret = self._copy() |
| ret._limit_processes = limit_processes |
| return ret |
| |
| @property |
| def limit_total_committed_memory(self): |
| """Returns the maximum amount of committed memory the processes in this |
| task can use. |
| """ |
| return self._limit_total_committed_memory |
| |
| def with_limit_total_committed_memory(self, limit_total_committed_memory): |
| """Returns the slice with the maximum amount of RAM all processes can |
| commit set. |
| |
| Args: |
| limit_total_committed_memory (int) - 0 or the maximum amount of RAM |
| committed |
| """ |
| assert isinstance(limit_total_committed_memory, int) |
| ret = self._copy() |
| ret._limit_total_committed_memory = limit_total_committed_memory |
| return ret |
| |
| def to_jsonish(self): |
| """Renders the task request as a JSON-serializable dict. |
| |
| The format follows the schema given by the TaskSlice class found here: |
| https://cs.chromium.org/chromium/infra/luci/appengine/swarming/swarming_rpcs.py?q=TaskSlice\( |
| """ |
| dims = self.dimensions |
| assert len(dims) >= 1 and dims['pool'] |
| |
| properties = { |
| 'command': self.command, |
| 'dimensions': [{'key': k, 'value': v} for k, v in dims.iteritems()], |
| 'outputs' : self.outputs, |
| 'env' : [{'key': k , 'value': v} for k, v in self.env_vars.iteritems()], |
| 'env_prefixes' : [ |
| {'key': k , 'value' : v} for k, v in self.env_prefixes.iteritems() |
| ], |
| 'execution_timeout_secs': str(self.execution_timeout_secs), |
| 'io_timeout_secs': str(self.io_timeout_secs), |
| 'grace_period_secs': str(self.grace_period_secs), |
| 'idempotent': self.idempotent, |
| 'containment': { |
| 'lower_priority': self.lower_priority, |
| 'containment_type': self.containment_type, |
| 'limit_processes': str(self.limit_processes), |
| 'limit_total_committed_memory': str(self.limit_total_committed_memory), |
| }, |
| } |
| |
| if self.isolated: |
| properties['inputs_ref'] = { |
| 'isolated': self.isolated, |
| 'namespace': self._api.isolated.namespace, |
| 'isolatedserver': self._api.isolated.isolate_server, |
| } |
| if self.secret_bytes: |
| properties['secret_bytes'] = base64.b64encode(self.secret_bytes) |
| if len(self.cipd_ensure_file.packages) > 0: |
| properties['cipd_input'] = { |
| 'packages': [ |
| { |
| 'package_name': pkg.name, |
| 'path': path or '.', |
| 'version': pkg.version, |
| } |
| for path in sorted(self.cipd_ensure_file.packages) |
| for pkg in self.cipd_ensure_file.packages[path] |
| ] |
| } |
| |
| return { |
| 'expiration_secs': str(self.expiration_secs), |
| 'properties': properties, |
| } |
| |
| class TaskRequestMetadata(object): |
| """Metadata of a requested task.""" |
| def __init__(self, swarming_server, task_json): |
| self._task_json = task_json |
| self._swarming_server = swarming_server |
| |
| @property |
| def name(self): |
| """Returns the name of the associated task.""" |
| return self._task_json['request']['name'] |
| |
| @property |
| def id(self): |
| """Returns the id of the associated task.""" |
| return self._task_json['task_id'] |
| |
| @property |
| def task_ui_link(self): |
| """Returns the URL of the associated task in the Swarming UI.""" |
| return '%s/task?id=%s' % (self._swarming_server, self.id) |
| |
| class TaskResult(object): |
| """Result of a Swarming task.""" |
| |
| # A tuple giving the isolated output refs of a task. |
| IsolatedOutputs = namedtuple( |
| 'IsolatedOutputs', ['hash', 'server', 'namespace']) |
| |
| def __init__(self, api, id, raw_results, output_dir): |
| """ |
| Args: |
| api (recipe_api.RecipeApi): a recipe API. |
| id (str): The task's id. |
| raw_results (dict): The jsonish summary output from a `collect` call. |
| output_dir (Path|None): Where the task's outputs were downloaded to. |
| """ |
| self._api = api |
| self._id = id |
| self._outputs = {} |
| self._isolated_outputs = None |
| if 'error' in raw_results: |
| self._output = raw_results['error'] |
| self._name = None |
| self._state = None |
| self._success = None |
| self._duration = None |
| else: |
| results = raw_results['results'] |
| self._name = results['name'] |
| self._state = TaskState[results['state']] |
| |
| assert self._state not in [ |
| TaskState.INVALID, TaskState.PENDING, TaskState.RUNNING, |
| ], 'state %s is either invalid or non-final' % self._state.name |
| |
| self._success = False |
| if self._state == TaskState.COMPLETED: |
| # If 0, a default value, exit_code may be omitted by the cloud |
| # endpoint's response. |
| self._success = results.get('exit_code', 0) == 0 |
| |
| self._duration = results.get('duration', 0) |
| |
| outputs_ref = results.get('outputs_ref') |
| if outputs_ref: |
| self._isolated_outputs = self.IsolatedOutputs( |
| hash=outputs_ref['isolated'], |
| server=outputs_ref['isolatedserver'], |
| namespace=outputs_ref['namespace'], |
| ) |
| |
| self._output = raw_results['output'] |
| if output_dir and raw_results.get('outputs'): |
| self._outputs = { |
| output: api.path.join(output_dir, output) |
| for output in raw_results['outputs'] |
| } |
| |
| @property |
| def name(self): |
| """The name (str) of the task.""" |
| return self._name |
| |
| @property |
| def id(self): |
| """The ID (str) of the task.""" |
| return self._id |
| |
| @property |
| def state(self): |
| """The final state (TaskState|None) of the task. |
| |
| Returns None if there was a client-side, RPC-level error in determining the |
| result, in which case the task is in an unknown state. |
| """ |
| return self._state |
| |
| @property |
| def success(self): |
| """Returns whether the task completed successfully (bool|None). |
| |
| If None, then the task is in an unknown state due to a client-side, |
| RPC-level failure. |
| """ |
| return self._success |
| |
| @property |
| def output(self): |
| """The output (str) streamed from the task.""" |
| return self._output |
| |
| @property |
| def outputs(self): |
| """A map (dict[str]Path) of the files, relative to absolute paths, output |
| from the task. |
| |
| This dictionary is comprised of the files found in $ISOLATED_OUTDIR upon |
| exiting the task, mapping to the paths on disk to where they were |
| downloaded. |
| |
| There will be no outputs fetched unless api.swarming.collect() was called |
| with output_dir set. |
| """ |
| return self._outputs |
| |
| @property |
| def isolated_outputs(self): |
| """Returns the isolated output refs (IsolatedOutputs|None) of the task.""" |
| return self._isolated_outputs |
| |
| def analyze(self): |
| """Raises a step failure if the task was unsuccessful.""" |
| if self.state == None: |
| raise self._api.step.InfraFailure('Failed to collect: %s' % self.output) |
| elif self.state == TaskState.EXPIRED: |
| raise self._api.step.InfraFailure('Timed out waiting for a bot to run on') |
| elif self.state == TaskState.TIMED_OUT: |
| output_lines = self.output.rsplit('\n', 11) |
| timeout = int(self._duration) |
| failure_lines = [ |
| 'Timed out after %s seconds. Last 10 lines of output:' % timeout, |
| ] + output_lines[-10:] |
| raise self._api.step.StepFailure('\n'.join(failure_lines)) |
| elif self.state == TaskState.BOT_DIED: |
| raise self._api.step.InfraFailure('The bot running this task died') |
| elif self.state == TaskState.CANCELED: |
| raise self._api.step.InfraFailure( |
| 'The task was canceled before it could run') |
| elif self.state == TaskState.COMPLETED: |
| if not self.success: |
| raise self._api.step.InfraFailure( |
| 'Swarming task failed:\n%s' % self.output) |
| elif self.state == TaskState.KILLED: |
| raise self._api.step.InfraFailure('The task was killed mid-execution') |
| elif self.state == TaskState.NO_RESOURCE: |
| raise self._api.step.InfraFailure('Found no bots to run this task') |
| else: |
| assert False, 'unknown state %s; a case needs to be added above' % ( |
| self.state.name # pragma: no cover |
| ) |
| |
| class SwarmingApi(recipe_api.RecipeApi): |
| """API for interacting with swarming. |
| |
| The tool's source lives at |
| http://go.chromium.org/luci/client/cmd/swarming. |
| |
| This module will deploy the client to [CACHE]/swarming_client/; users should |
| add this path to the named cache for their builder. |
| """ |
| TaskState = TaskState |
| |
| def __init__(self, swarming_properties, *args, **kwargs): |
| super(SwarmingApi, self).__init__(*args, **kwargs) |
| self._server = swarming_properties.get('server', None) |
| self._version = swarming_properties.get('version', DEFAULT_CIPD_VERSION) |
| self._client_dir = None |
| self._client = None |
| |
| def initialize(self): |
| if self._test_data.enabled: |
| self._server = 'https://example.swarmingserver.appspot.com' |
| if self.m.runtime.is_experimental: |
| self._version = 'latest' |
| self._client_dir = self.m.path['cache'].join('swarming_client') |
| |
| def _ensure_swarming(self): |
| """Ensures that swarming client is installed.""" |
| if not self._client: |
| with self.m.step.nest('ensure swarming'): |
| with self.m.context(infra_steps=True): |
| pkgs = self.m.cipd.EnsureFile() |
| pkgs.add_package('infra/tools/luci/swarming/${platform}', |
| self._version) |
| self.m.cipd.ensure(self._client_dir, pkgs) |
| self._client = self._client_dir.join('swarming') |
| |
| def _run(self, name, cmd, step_test_data=None): |
| """Return an swarming command step. |
| |
| Args: |
| name: (str): name of the step. |
| cmd (list(str|Path)): swarming client subcommand to run. |
| """ |
| self._ensure_swarming() |
| return self.m.step(name, |
| [self._client] + list(cmd), |
| step_test_data=step_test_data, |
| infra_step=True) |
| |
| @contextlib.contextmanager |
| def on_path(self): |
| """This context manager ensures the go swarming client is available on |
| $PATH. |
| |
| Example: |
| |
| with api.swarming.on_path(): |
| # do your steps which require the swarming binary on path |
| """ |
| self._ensure_swarming() |
| with self.m.context(env_prefixes={'PATH': [self._client_dir]}): |
| yield |
| |
| @contextlib.contextmanager |
| def with_server(self, server): |
| """This context sets the server for Swarming calls. |
| |
| Example: |
| |
| with api.swarming.server('new-swarming-server.com'): |
| # perform swarming calls |
| |
| Args: |
| server (str): The swarming server to call within context. |
| """ |
| old_server = self._server |
| self._server = server |
| yield |
| |
| self._server = old_server |
| |
| def task_request(self): |
| """Creates a new TaskRequest object. |
| |
| See documentation for TaskRequest/TaskSlice to see how to build this up into |
| a full task. |
| |
| Once your TaskRequest is complete, you can pass it to `trigger` in order to |
| have it start running on the swarming server. |
| """ |
| return TaskRequest(self.m) |
| |
| def trigger(self, step_name, requests): |
| """Triggers a set of Swarming tasks. |
| |
| Args: |
| step_name (str): The name of the step. |
| tasks (seq[TaskRequest]): A sequence of task request objects representing |
| the tasks we want to trigger. |
| |
| Returns: |
| A list of TaskRequestMetadata objects. |
| """ |
| assert len(requests) > 0 |
| assert self._server |
| |
| trigger_resp = self._run( |
| step_name, |
| [ |
| 'spawn-tasks', |
| '-server', self._server, |
| '-json-input', self.m.json.input({ |
| 'requests': [ req.to_jsonish() for req in requests ] |
| }), |
| '-json-output', self.m.json.output(), |
| ], |
| step_test_data=lambda: self.test_api.trigger( |
| task_names=tuple(map(lambda req: req.name, requests)), |
| ) |
| ).json.output |
| |
| metadata_objs = [] |
| presented_links = self.m.step.active_result.presentation.links |
| for task_json in trigger_resp['tasks']: |
| metadata_obj = TaskRequestMetadata(self._server, task_json) |
| presented_links['Swarming task UI: %s' % metadata_obj.name] = ( |
| metadata_obj.task_ui_link) |
| metadata_objs.append(metadata_obj) |
| |
| return metadata_objs |
| |
| def collect(self, name, tasks, output_dir=None, timeout=None): |
| """Waits on a set of Swarming tasks. |
| |
| Args: |
| name (str): The name of the step. |
| tasks ((list(str|TaskRequestMetadata)): A list of ids or metadata objects |
| corresponding to tasks to wait |
| output_dir (Path|None): Where to download the tasks' isolated outputs. If |
| set to None, they will not be downloades; else, a given task's outputs |
| will be downloaded to output_dir/<task id>/. |
| timeout (str|None): The duration for which to wait on the tasks to finish. |
| If set to None, there will be no timeout; else, timeout follows the |
| format described by https://golang.org/pkg/time/#ParseDuration. |
| |
| Returns: |
| A list of TaskResult objects. |
| """ |
| assert self._server |
| assert isinstance(tasks, list) |
| cmd = [ |
| 'collect', |
| '-server', self._server, |
| '-task-summary-json', self.m.json.output(), |
| '-task-output-stdout', 'json', |
| ] |
| if output_dir: |
| cmd.extend(['-output-dir', output_dir]) |
| if timeout: |
| cmd.extend(['-timeout', timeout]) |
| |
| test_data = [] |
| for idx, task in enumerate(tasks): |
| if isinstance(task, str): |
| cmd.append(task) |
| test_data.append( |
| self.test_api.task_result(id=task, name='my_task_%d' % idx)) |
| elif isinstance(task, TaskRequestMetadata): |
| cmd.append(task.id) |
| test_data.append(self.test_api.task_result(id=task.id, name=task.name)) |
| else: |
| raise ValueError("%s must be a string or TaskRequestMetadata object" % |
| task.__repr__()) # pragma: no cover |
| step_result = self._run( |
| name, |
| cmd, |
| step_test_data=lambda: self.test_api.collect(test_data), |
| ) |
| parsed_results = [ |
| TaskResult(self.m, |
| id, |
| task, |
| self.m.path.join(output_dir, id) if output_dir else None) |
| for id, task in step_result.json.output.iteritems() |
| ] |
| # Update presentation on collect to reflect bot results. |
| for result in parsed_results: |
| if result.output: |
| log_name = 'Swarming task output: %s' % result.name |
| step_result.presentation.logs[log_name] = result.output.splitlines() |
| return parsed_results |