blob: 9a1336a6861d1c2f95784dbc86feb490c412fef8 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Handles triggering and reporting flake swarming task results."""
import logging
from common import exceptions
from dto.flake_swarming_task_output import FlakeSwarmingTaskOutput
from gae_libs.pipelines import AsynchronousPipeline
from gae_libs.pipelines import pipeline
from libs.structured_object import StructuredObject
from services.flake_failure import flake_swarming
class RunFlakeSwarmingTaskInput(StructuredObject):
# The name of the master on which to find a reference task.
master_name = basestring
# The name of the builder on which to find a reference task.
builder_name = basestring
# A reference build number from which find a reference task.
reference_build_number = int
# The name of the step that contains the test to run.
step_name = basestring
# The name of the test to run.
test_name = basestring
# The commit position to run the flake swarming task against.
commit_position = int
# The isolate sha pointing to the binaries to test.
isolate_sha = basestring
# The number of iterations to run.
iterations = int
# The number of seconds the task must complete within.
timeout_seconds = int
class RunFlakeSwarmingTaskPipeline(AsynchronousPipeline):
"""Triggers, waits for, and returns results of a flake swarming task."""
input_type = RunFlakeSwarmingTaskInput
output_type = FlakeSwarmingTaskOutput
def TimeoutSeconds(self):
return 24 * 60 * 60 # 24 hours. This will enable a timeout callback.
def OnTimeout(self, pipeline_parameters, callback_parameters):
# TODO(crbug.com/835066): Capture metrics for pipeline timeouts.
super(RunFlakeSwarmingTaskPipeline, self).OnTimeout(pipeline_parameters,
callback_parameters)
task_id = callback_parameters.get('task_id')
return flake_swarming.OnSwarmingTaskTimeout(pipeline_parameters, task_id)
def RunImpl(self, pipeline_parameters):
if self.GetCallbackParameters().get('task_id'):
# For idempotent operation.
logging.warning(
'RunImpl invoked again after swarming task was already triggered.')
return
task_id = flake_swarming.TriggerSwarmingTask(
master_name=pipeline_parameters.master_name,
builder_name=pipeline_parameters.builder_name,
reference_build_number=pipeline_parameters.reference_build_number,
step_name=pipeline_parameters.step_name,
test_name=pipeline_parameters.test_name,
isolate_sha=pipeline_parameters.isolate_sha,
iterations=pipeline_parameters.iterations,
timeout_seconds=pipeline_parameters.timeout_seconds,
runner_id=self.pipeline_id)
if not task_id:
# Retry upon failure.
raise pipeline.Retry('Failed to schedule a swarming task')
self.SaveCallbackParameters({'task_id': task_id})
def CallbackImpl(self, pipeline_parameters, callback_parameters):
"""Returns the results of the swarming task."""
if not callback_parameters.get('task_id'):
# Task_id is not saved in callback parameters yet, retries the callback.
return 'Task_id not found for pipeline %s' % self.pipeline_id, None
task_id = callback_parameters['task_id']
try:
results = flake_swarming.OnSwarmingTaskStateChanged(
pipeline_parameters, task_id)
if not results:
# No task state, further callback is needed.
return None
return None, results
except exceptions.RetryException as e: # Indicate an error to retry.
return ('Error getting swarming task result: {}'.format(e.error_message),
None)