| # Copyright 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import datetime |
| import logging |
| import urlparse |
| |
| from google.appengine.api import taskqueue |
| from google.appengine.api import modules |
| from google.appengine.ext import deferred |
| from google.appengine.ext import ndb |
| from google.protobuf import struct_pb2 |
| |
| from components import auth |
| from components import utils |
| import gae_ts_mon |
| |
| from go.chromium.org.luci.buildbucket.proto import common_pb2 |
| import buildtags |
| import config |
| import errors |
| import events |
| import model |
| import search |
| import swarming |
| import user |
| |
| MAX_RETURN_BUILDS = 100 |
| DEFAULT_LEASE_DURATION = datetime.timedelta(minutes=1) |
| |
| # A cumlative counter of access denied errors in peek() method. |
| # This metric exists because defining it on the buildbucket server is easier |
| # than modifying Buildbot. It is very specific intentionally. |
| PEEK_ACCESS_DENIED_ERROR_COUNTER = gae_ts_mon.CounterMetric( |
| 'buildbucket/peek_access_denied_errors', 'Number of errors in peek API', |
| [gae_ts_mon.StringField('bucket')] |
| ) |
| |
| |
| def validate_lease_key(lease_key): |
| if lease_key is None: |
| raise errors.InvalidInputError('Lease key is not provided') |
| |
| |
| def validate_url(url): |
| if url is None: |
| return |
| if not isinstance(url, basestring): |
| raise errors.InvalidInputError('url must be string') |
| parsed = urlparse.urlparse(url) |
| if not parsed.netloc: |
| raise errors.InvalidInputError('url must be absolute') |
| if parsed.scheme.lower() not in ('http', 'https'): |
| raise errors.InvalidInputError( |
| 'Unexpected url scheme: "%s"' % parsed.scheme |
| ) |
| |
| |
| def unregister_builders(): |
| """Unregisters builders that didn't have builds for 4 weeks.""" |
| threshold = utils.utcnow() - model.BUILDER_EXPIRATION_DURATION |
| q = model.Builder.query(model.Builder.last_scheduled < threshold) |
| keys = q.fetch(keys_only=True) |
| if keys: # pragma: no branch |
| logging.warning('unregistered builders: %s', [k.id() for k in keys]) |
| ndb.delete_multi(keys) |
| |
| |
| @ndb.tasklet |
| def get_async(build_id): |
| """Gets a build by |build_id|. |
| |
| Requires the current user to have permissions to view the build. |
| """ |
| build = yield model.Build.get_by_id_async(build_id) |
| if not build: |
| raise ndb.Return(None) |
| if not (yield user.has_perm_async(user.PERM_BUILDS_GET, build.bucket_id)): |
| raise user.current_identity_cannot('view build %s', build.key.id()) |
| raise ndb.Return(build) |
| |
| |
| def peek(bucket_ids, max_builds=None, start_cursor=None): |
| """Returns builds available for leasing in the specified |bucket_ids|. |
| |
| Builds are sorted by creation time, oldest first. |
| |
| Args: |
| bucket_ids (list of string): fetch only builds in any of |bucket_ids|. |
| max_builds (int): maximum number of builds to return. Defaults to 10. |
| start_cursor (string): a value of "next" cursor returned by previous |
| peek call. If not None, return next builds in the query. |
| |
| Returns: |
| A tuple: |
| builds (list of Builds): available builds. |
| next_cursor (str): cursor for the next page. |
| None if there are no more builds. |
| """ |
| if not bucket_ids: |
| raise errors.InvalidInputError('No buckets specified') |
| bucket_ids = sorted(set(bucket_ids)) |
| search.check_acls_async( |
| bucket_ids, inc_metric=PEEK_ACCESS_DENIED_ERROR_COUNTER |
| ).get_result() |
| for bid in bucket_ids: |
| _reject_swarming_bucket(bid) |
| max_builds = search.fix_max_builds(max_builds) |
| |
| # Prune any buckets that are paused. |
| bucket_states = _get_bucket_states(bucket_ids) |
| active_buckets = [] |
| for b in bucket_ids: |
| if bucket_states[b].is_paused: # pragma: no cover |
| logging.warning('Ignoring paused bucket: %s.', b) |
| continue |
| active_buckets.append(b) |
| |
| # Short-circuit: if there are no remaining buckets to query, then we're done. |
| if not active_buckets: # pragma: no cover |
| return ([], None) |
| |
| q = model.Build.query( |
| model.Build.status_legacy == model.BuildStatus.SCHEDULED, |
| model.Build.is_leased == False, |
| model.Build.bucket_id.IN(active_buckets), |
| ) |
| q = q.order(-model.Build.key) # oldest first. |
| |
| # Check once again locally because an ndb query may return an entity not |
| # satisfying the query. |
| def local_predicate(b): |
| return ( |
| b.status_legacy == model.BuildStatus.SCHEDULED and not b.is_leased and |
| b.bucket_id in active_buckets |
| ) |
| |
| return search.fetch_page_async( |
| q, max_builds, start_cursor, predicate=local_predicate |
| ).get_result() |
| |
| |
| def _get_leasable_build(build_id, perm): |
| assert perm in (user.PERM_BUILDS_LEASE, user.PERM_BUILDS_RESET), perm |
| build = model.Build.get_by_id(build_id) |
| if build is None: |
| raise errors.BuildNotFoundError() |
| if not user.has_perm(perm, build.bucket_id): |
| action = 'reset' if perm == user.PERM_BUILDS_RESET else 'lease' |
| raise user.current_identity_cannot('%s build %s', action, build.key.id()) |
| if build.is_luci: |
| raise errors.InvalidInputError('cannot lease a swarmbucket build') |
| return build |
| |
| |
| def lease(build_id, lease_expiration_date=None): |
| """Leases the build, makes it unavailable for the leasing. |
| |
| Changes lease_key to a different value. |
| |
| After the lease expires, a cron task will make the build leasable again. |
| |
| Args: |
| build_id (int): build id. |
| lease_expiration_date (datetime.datetime): lease expiration date. |
| Defaults to 10 seconds from now. |
| |
| Returns: |
| Tuple: |
| success (bool): True if the build was leased |
| build (ndb.Build) |
| """ |
| errors.validate_lease_expiration_date(lease_expiration_date) |
| if lease_expiration_date is None: |
| lease_expiration_date = utils.utcnow() + DEFAULT_LEASE_DURATION |
| |
| @ndb.transactional |
| def try_lease(): |
| build = _get_leasable_build(build_id, user.PERM_BUILDS_LEASE) |
| |
| ok = False |
| if not (build.proto.status != common_pb2.SCHEDULED or build.is_leased): |
| ok = True |
| build.lease_expiration_date = lease_expiration_date |
| build.regenerate_lease_key() |
| build.leasee = auth.get_current_identity() |
| build.never_leased = False |
| |
| # we do a put() unconditionally so that the legacy status fields can |
| # be updated in the pre-put hooks. |
| build.put() |
| return ok, build |
| |
| updated, build = try_lease() |
| if updated: |
| events.on_build_leased(build) |
| return updated, build |
| |
| |
| def _check_lease(build, lease_key): |
| if lease_key != build.lease_key: |
| raise errors.LeaseExpiredError( |
| 'lease_key for build %s is incorrect. Your lease might be expired.' % |
| build.key.id() |
| ) |
| |
| |
| def start(build_id, lease_key, url): |
| """Marks build as STARTED. Idempotent. |
| |
| Args: |
| build_id: id of the started build. |
| lease_key: current lease key. |
| url (str): a URL to a build-system-specific build, viewable by a human. |
| |
| Returns: |
| The updated Build. |
| """ |
| validate_lease_key(lease_key) |
| validate_url(url) |
| |
| @ndb.transactional |
| def txn(): |
| build = _get_leasable_build(build_id, user.PERM_BUILDS_LEASE) |
| |
| if build.proto.status == common_pb2.STARTED: |
| if build.url == url: |
| return False, build |
| build.url = url |
| build.put() |
| return True, build |
| |
| if build.is_ended: |
| raise errors.BuildIsCompletedError('Cannot start a completed build') |
| |
| assert build.proto.status == common_pb2.SCHEDULED |
| |
| _check_lease(build, lease_key) |
| |
| now = utils.utcnow() |
| build.proto.start_time.FromDatetime(now) |
| build.proto.status = common_pb2.STARTED |
| build.status_changed_time = now |
| build.url = url |
| _fut_results(build.put_async(), events.on_build_starting_async(build)) |
| return True, build |
| |
| updated, build = txn() |
| if updated: |
| events.on_build_started(build) |
| return build |
| |
| |
| def _get_bucket_states(bucket_ids): |
| """Returns the list of bucket states for all named buckets. |
| |
| Args: |
| bucket_ids (list): A list of bucket id strings. |
| Assumed to have already been validated. |
| |
| Returns (dict): |
| A map of bucket id to BucketState for that bucket. |
| """ |
| # Get bucket keys and deduplicate. |
| default_states = [model.BucketState(id=b) for b in bucket_ids] |
| states = ndb.get_multi(state.key for state in default_states) |
| for i, state in enumerate(states): # pragma: no cover |
| if not state: |
| states[i] = default_states[i] |
| return dict(zip(bucket_ids, states)) |
| |
| |
| @ndb.tasklet |
| def heartbeat_async(build_id, lease_key, lease_expiration_date): |
| """Extends build lease. |
| |
| Args: |
| build_id: id of the build. |
| lease_key: current lease key. |
| lease_expiration_date (datetime.timedelta): new lease expiration date. |
| |
| Returns: |
| The updated Build as Future. |
| """ |
| |
| @ndb.transactional_tasklet |
| def txn(): |
| validate_lease_key(lease_key) |
| if lease_expiration_date is None: |
| raise errors.InvalidInputError('Lease expiration date not specified') |
| errors.validate_lease_expiration_date(lease_expiration_date) |
| build = yield model.Build.get_by_id_async(build_id) |
| if build is None: |
| raise errors.BuildNotFoundError() |
| if build.is_ended: |
| msg = '' |
| if (build.result == model.BuildResult.CANCELED and |
| build.cancelation_reason == model.CancelationReason.TIMEOUT): |
| msg = ( |
| 'Build was marked as timed out ' |
| 'because it did not complete for %s' % model.BUILD_TIMEOUT |
| ) |
| raise errors.BuildIsCompletedError(msg) |
| _check_lease(build, lease_key) |
| build.lease_expiration_date = lease_expiration_date |
| yield build.put_async() |
| raise ndb.Return(build) |
| |
| try: |
| build = yield txn() |
| except Exception as ex: |
| events.on_heartbeat_failure(build_id, ex) |
| raise |
| raise ndb.Return(build) |
| |
| |
| def heartbeat(build_id, lease_key, lease_expiration_date): |
| future = heartbeat_async(build_id, lease_key, lease_expiration_date) |
| return future.get_result() |
| |
| |
| def heartbeat_batch(heartbeats): |
| """Extends build leases in a batch. |
| |
| Args: |
| heartbeats (list of dict): list of builds to update. Each dict is kwargs |
| for heartbeat() method. |
| |
| Returns: |
| List of (build_id, build, exception) tuples. |
| """ |
| build_ids = [h['build_id'] for h in heartbeats] |
| logging.info('Batch heartbeat: %s', build_ids) |
| futures = [(h, heartbeat_async(**h)) for h in heartbeats] |
| |
| def get_result(hb, future): |
| build_id = hb['build_id'] |
| exc = future.get_exception() |
| if not exc: |
| return build_id, future.get_result(), None |
| else: |
| return build_id, None, exc |
| |
| return [get_result(h, f) for h, f in futures] |
| |
| |
| @ndb.tasklet |
| def _put_output_properties_async(build_key, legacy_result_details): |
| prop_dict = (legacy_result_details or {}).get('properties') |
| if isinstance(prop_dict, dict): |
| props = struct_pb2.Struct() |
| props.update(prop_dict) |
| yield model.BuildOutputProperties( |
| key=model.BuildOutputProperties.key_for(build_key), |
| properties=props.SerializeToString(), |
| ).put_async() |
| |
| |
| def _complete( |
| build_id, lease_key, status, result_details, url=None, new_tags=None |
| ): |
| """Marks a build as completed. Used by succeed and fail methods.""" |
| validate_lease_key(lease_key) |
| validate_url(url) |
| buildtags.validate_tags(new_tags, 'append') |
| assert model.is_terminal_status(status), status |
| |
| @ndb.transactional |
| def txn(): |
| build = _get_leasable_build(build_id, user.PERM_BUILDS_LEASE) |
| |
| if build.is_ended: |
| if (build.proto.status == status and |
| build.result_details == result_details and build.url == url): |
| return False, build |
| raise errors.BuildIsCompletedError( |
| 'Build %s has already completed' % build_id |
| ) |
| _check_lease(build, lease_key) |
| |
| now = utils.utcnow() |
| build.proto.status = status |
| build.status_changed_time = now |
| build.proto.end_time.FromDatetime(now) |
| if url is not None: # pragma: no branch |
| build.url = url |
| build.result_details = result_details |
| if new_tags: |
| build.tags.extend(new_tags) |
| build.tags = sorted(set(build.tags)) |
| build.clear_lease() |
| |
| _fut_results( |
| build.put_async(), |
| events.on_build_completing_async(build), |
| _put_output_properties_async(build.key, result_details), |
| ) |
| return True, build |
| |
| updated, build = txn() |
| if updated: |
| events.on_build_completed(build) |
| return build |
| |
| |
| def succeed(build_id, lease_key, result_details=None, url=None, new_tags=None): |
| """Marks a build as succeeded. Idempotent. |
| |
| Args: |
| build_id: id of the build to complete. |
| lease_key: current lease key. |
| result_details (dict): build result description. |
| new_tags (list of str): list of new tags to add to the Build. |
| |
| Returns: |
| The succeeded Build. |
| """ |
| return _complete( |
| build_id, |
| lease_key, |
| common_pb2.SUCCESS, |
| result_details, |
| url=url, |
| new_tags=new_tags |
| ) |
| |
| |
| def fail( |
| build_id, |
| lease_key, |
| result_details=None, |
| failure_reason=None, |
| url=None, |
| new_tags=None |
| ): |
| """Marks a build as failed. Idempotent. |
| |
| Args: |
| build_id: id of the build to complete. |
| lease_key: current lease key. |
| failure_reason (model.FailureReason): why the build failed. |
| Defaults to model.FailureReason.BUILD_FAILURE. |
| result_details (dict): build result description. |
| new_tags (list of str): list of new tags to add to the Build. |
| |
| Returns: |
| The failed Build. |
| """ |
| if not failure_reason or failure_reason == model.FailureReason.BUILD_FAILURE: |
| status = common_pb2.FAILURE |
| else: |
| status = common_pb2.INFRA_FAILURE |
| return _complete( |
| build_id, |
| lease_key, |
| status, |
| result_details, |
| url=url, |
| new_tags=new_tags, |
| ) |
| |
| |
| @ndb.tasklet |
| def cancel_async(build_id, summary_markdown='', result_details=None): |
| """Cancels build. Does not require a lease key. |
| |
| The current user has to have a permission to cancel a build in the |
| bucket. |
| |
| Cancelling an already ended build will be a no-op and the latest |
| BuildBundle for the provided build id will be returned |
| |
| Args: |
| build_id: id of the build to cancel. |
| summary_markdown (basestring): human-readable explanation. |
| result_details (dict): build result description. |
| |
| Returns: |
| Canceled Build. |
| """ |
| identity_str = auth.get_current_identity().to_bytes() |
| |
| @ndb.tasklet |
| def get_bundle_async(check_access): |
| bundle = yield model.BuildBundle.get_async(build_id, infra=True) |
| if not bundle: |
| raise errors.BuildNotFoundError() |
| build = bundle.build |
| if check_access: |
| has = yield user.has_perm_async(user.PERM_BUILDS_CANCEL, build.bucket_id) |
| if not has: |
| raise user.current_identity_cannot('cancel build %s', build.key.id()) |
| # If the build is ended, do not update the build |
| raise ndb.Return(bundle, not build.is_ended) |
| |
| @ndb.transactional_tasklet |
| def txn_async(): |
| bundle, should_update = yield get_bundle_async(False) |
| if not should_update: # pragma: no cover |
| raise ndb.Return(bundle, False) |
| now = utils.utcnow() |
| build = bundle.build |
| build.proto.status = common_pb2.CANCELED |
| build.status_changed_time = now |
| build.result_details = result_details |
| build.proto.summary_markdown = summary_markdown |
| build.proto.canceled_by = identity_str |
| build.proto.end_time.FromDatetime(now) |
| build.clear_lease() |
| futs = [ |
| build.put_async(), |
| events.on_build_completing_async(build), |
| _put_output_properties_async(build.key, result_details), |
| model.BuildSteps.cancel_incomplete_steps_async( |
| build.key.id(), build.proto.end_time |
| ) |
| ] |
| |
| sw = bundle.infra.parse().swarming |
| # TODO(nodir): remove, in favor of swarming.TaskSyncBuild. |
| if sw.hostname and sw.task_id: # pragma: no branch |
| futs.append(swarming.cancel_task_transactionally_async(build, sw)) |
| yield futs |
| raise ndb.Return(bundle, True) |
| |
| bundle, should_update = yield get_bundle_async(True) |
| if should_update: |
| bundle, updated = yield txn_async() |
| if updated: # pragma: no branch |
| events.on_build_completed(bundle.build) |
| raise ndb.Return(bundle.build) |
| |
| |
| def _reject_swarming_bucket(bucket_id): |
| config.validate_bucket_id(bucket_id) |
| _, cfg = config.get_bucket(bucket_id) |
| assert cfg, 'permission check should have failed' |
| if config.is_swarming_config(cfg): # pragma: no cover |
| raise errors.InvalidInputError('Invalid operation on a Swarming bucket') |
| |
| |
| def _fut_results(*futures): |
| return [f.get_result() for f in futures] |