blob: 856baf0dceb8bc1b708da273b1fea7f6e00c35e7 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions for operating on tests run in swarming."""
import json
import logging
import time
from google.appengine.ext import ndb
from common.findit_http_client import FinditHttpClient
from dto import swarming_task_error
from dto.swarming_task_error import SwarmingTaskError
from dto.test_location import TestLocation
from infra_api_clients.swarming import swarming_util
from libs.test_results import test_results_util
from libs.test_results.resultdb_test_results import ResultDBTestResults
from services import constants
from services import resultdb
from services import swarming
from waterfall import waterfall_config
_FINDIT_HTTP_CLIENT = FinditHttpClient()
def GetSwarmingTaskDataAndResult(task_id, http_client=_FINDIT_HTTP_CLIENT):
"""Gets information about a swarming task.
Returns: A tuple of 3 elements
Data of swarming task: dict
Test results for swarming task: subclass of BaseTestResults
Error for swarming task: dict
"""
data, error = swarming_util.GetSwarmingTaskResultById(swarming.SwarmingHost(),
task_id, http_client)
error = SwarmingTaskError.FromSerializable(error)
if not data:
return None, None, error
task_state = data['state']
test_results = None
if task_state not in constants.STATE_NOT_STOP:
if task_state == constants.STATE_COMPLETED:
inv_name = swarming_util.GetInvocationNameForSwarmingTask(
swarming.SwarmingHost(), task_id)
res = resultdb.query_resultdb(
inv_name, only_variants_with_unexpected_results=False)
if res:
test_results = ResultDBTestResults(res)
else:
# The swarming task did not complete successfully.
logging.error('Swarming task %s stopped with status: %s', task_id,
task_state)
error = SwarmingTaskError.GenerateError(
swarming_task_error.STATES_NOT_RUNNING_TO_ERROR_CODES[task_state])
return data, test_results, error
def GetTestResultForSwarmingTask(task_id, http_client=_FINDIT_HTTP_CLIENT):
"""Get test results object for a swarming task based on it's id."""
_data, test_results, _error = GetSwarmingTaskDataAndResult(
task_id, http_client)
return test_results
def GetTestLocation(task_id, test_name):
"""Gets the filepath and line number of a test from swarming.
Args:
task_id (str): The swarming task id to query.
test_name (str): The name of the test whose location to return.
Returns:
(TestLocation): The file path and line number of the test, or None
if the test location was not be retrieved.
"""
test_results = GetTestResultForSwarmingTask(task_id, _FINDIT_HTTP_CLIENT)
test_location, error = test_results.GetTestLocation(
test_name) if test_results else (None, constants.WRONG_FORMAT_LOG)
if error:
return None
return TestLocation.FromSerializable(test_location or {})
def GetTaskIdFromSwarmingTaskEntity(urlsafe_task_key):
"""Gets swarming task id from SwarmingTask. Waits and polls if needed."""
swarming_settings = waterfall_config.GetSwarmingSettings()
wait_seconds = swarming_settings.get('get_swarming_task_id_wait_seconds')
timeout_seconds = swarming_settings.get(
'get_swarming_task_id_timeout_seconds')
deadline = time.time() + timeout_seconds
while time.time() < deadline:
swarming_task = ndb.Key(urlsafe=urlsafe_task_key).get()
if not swarming_task:
raise Exception('Swarming task was deleted unexpectedly!')
if swarming_task.task_id:
return swarming_task.task_id
# Wait for the existing pipeline to start the Swarming task.
time.sleep(wait_seconds)
raise Exception('Timed out waiting for task_id.')