| # Copyright 2019 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dataclasses |
| import typing |
| |
| from google.protobuf import message |
| from google.protobuf import any_pb2 |
| from google.cloud import datastore |
| |
| from chromeperf.pinpoint.models import change as change_module |
| from chromeperf.engine import evaluator |
| from chromeperf.engine import event as event_module |
| from chromeperf.engine import task_pb2 |
| from chromeperf.pinpoint.models import task as task_module |
| from chromeperf.pinpoint.actions import updates |
| from chromeperf.pinpoint import find_isolate_task_payload_pb2 |
| from chromeperf.pinpoint import test_runner_payload_pb2 |
| |
| |
| @dataclasses.dataclass |
| class UpdateWrapper: |
| datastore_client: datastore.Client |
| job: typing.Any |
| task: task_module.Task |
| new_state: str |
| payload: message.Message |
| |
| def __call__(self, _): |
| encoded_payload = any_pb2.Any() |
| encoded_payload.Pack(self.payload) |
| return updates.update_task( |
| self.datastore_client, |
| self.job, |
| self.task.id, |
| new_state=self.new_state, |
| payload=encoded_payload, |
| ) |
| |
| |
| def FakeNotFoundIsolate(datastore_client, job, task, *_): |
| if task.state == 'completed': |
| return None |
| |
| return [ |
| UpdateWrapper(datastore_client, job, task, 'completed', task.payload) |
| ] |
| |
| |
| @dataclasses.dataclass |
| class FakeFoundIsolate(task_module.PayloadUnpackingMixin): |
| datastore_client: datastore.Client |
| job: typing.Any |
| |
| def __call__(self, task, *_): |
| if task.task_type != 'find_isolate': |
| return None |
| |
| if task.state == 'completed': |
| return None |
| |
| task_payload = self.unpack( |
| find_isolate_task_payload_pb2.FindIsolateTaskPayload, task.payload) |
| task_payload.buildbucket_build.id = '345982437987234' |
| task_payload.buildbucket_build.url = 'https://builbucket/url' |
| task_payload.buildbucket_build.status = 'COMPLETED' |
| task_payload.buildbucket_build.result = 'SUCCESS' |
| task_payload.buildbucket_build.result_details_json = '{}' |
| task_payload.isolate_server = 'some-isolate-server' |
| task_payload.isolate_hash = '14aaaaaaaaaaa514' |
| return [ |
| UpdateWrapper( |
| self.datastore_client, |
| self.job, |
| task, |
| 'completed', |
| task_payload, |
| ) |
| ] |
| |
| |
| @dataclasses.dataclass |
| class FakeFindIsolateFailed(task_module.PayloadUnpackingMixin): |
| datastore_client: datastore.Client |
| job: typing.Any |
| |
| def __call__(self, task, *_): |
| if task.task_type != 'find_isolate': |
| return None |
| |
| if task.state == 'failed': |
| return None |
| |
| task_payload = self.unpack( |
| find_isolate_task_payload_pb2.FindIsolateTaskPayload, task.payload) |
| task_payload.buildbucket_build.status = 'COMPLETED' |
| task_payload.buildbucket_build.result = 'FAILURE' |
| task_payload.buildbucket_build.result_details_json = '{}' |
| task_payload.tries = 1 |
| return [ |
| UpdateWrapper( |
| self.datastore_client, |
| self.job, |
| task, |
| 'failed', |
| task_payload, |
| ) |
| ] |
| |
| |
| @dataclasses.dataclass |
| class FakeSuccessfulRunTest(task_module.PayloadUnpackingMixin): |
| datastore_client: datastore.Client |
| job: typing.Any |
| |
| def __call__(self, task, *_): |
| if task.task_type != 'run_test': |
| return None |
| |
| if task.state == 'completed': |
| return None |
| |
| task_payload = self.unpack(test_runner_payload_pb2.TestRunnerPayload, |
| task.payload) |
| task_payload.output.swarming_response.bot_id = 'some_bot' |
| task_payload.output.swarming_response.task_id = 'some_task_id' |
| task_payload.output.task_output.isolate_server = 'https://isolate.server' |
| task_payload.output.task_output.isolate_hash = '12334981aad2304ff1243458' |
| return [ |
| UpdateWrapper( |
| self.datastore_client, |
| self.job, |
| task, |
| 'completed', |
| task_payload, |
| ) |
| ] |
| |
| |
| @dataclasses.dataclass |
| class FakeFailedRunTest(task_module.PayloadUnpackingMixin): |
| datastore_client: datastore.Client |
| job: typing.Any |
| |
| def __call__(self, task, *_): |
| if task.task_type != 'run_test': |
| return None |
| |
| if task.state == 'failed': |
| return None |
| |
| task_payload = self.unpack(test_runner_payload_pb2.TestRunnerPayload, |
| task.payload) |
| task_payload.errors.append( |
| task_pb2.ErrorMessage( |
| reason='SomeReason', |
| message='There is some message here.', |
| )) |
| return [ |
| UpdateWrapper( |
| self.datastore_client, |
| self.job, |
| task, |
| 'failed', |
| task_payload, |
| ) |
| ] |