| # Copyright 2019 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """API for interacting with the ResultDB service. |
| |
| Requires `rdb` command in `$PATH`: |
| https://godoc.org/go.chromium.org/luci/resultdb/cmd/rdb |
| """ |
| from google.protobuf import field_mask_pb2 |
| from google.protobuf import json_format |
| from google.protobuf import timestamp_pb2 |
| from recipe_engine import recipe_api |
| |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import artifact |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import common as common_v1 |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import invocation as invocation_pb2 |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import predicate |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import recorder |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import resultdb |
| from PB.go.chromium.org.luci.resultdb.proto.v1 import test_variant |
| |
| from . import common |
| |
| _SECONDS_PER_DAY = 86400 |
| |
| |
| class ResultDBAPI(recipe_api.RecipeApi): |
| """A module for interacting with ResultDB.""" |
| |
| # Maximum number of requests in a batch RPC. |
| _BATCH_SIZE = 500 |
| |
| # Prefix of an invocation name. |
| _INVOCATION_NAME_PREFIX = 'invocations/' |
| |
| # Expose serialize and deserialize functions. |
| serialize = staticmethod(common.serialize) |
| deserialize = staticmethod(common.deserialize) |
| Invocation = common.Invocation |
| |
| @property |
| def current_invocation(self): |
| return self.m.context.resultdb_invocation_name |
| |
| @property |
| def enabled(self): |
| return self.current_invocation != '' |
| |
| def assert_enabled(self): |
| assert self.enabled, ( |
| 'ResultDB integration was not enabled for this build. ' |
| 'See go/lucicfg#luci.builder and go/lucicfg#resultdb.settings' |
| ) |
| |
| def include_invocations(self, invocations, step_name=None): |
| """Shortcut for resultdb.update_included_invocations().""" |
| return self.update_included_invocations( |
| add_invocations=invocations, step_name=step_name) |
| |
| def exclude_invocations(self, invocations, step_name=None): |
| """Shortcut for resultdb.update_included_invocations().""" |
| return self.update_included_invocations( |
| remove_invocations=invocations, step_name=step_name) |
| |
| def update_included_invocations(self, |
| add_invocations=None, |
| remove_invocations=None, |
| step_name=None): |
| """Add and/or remove included invocations to/from the current invocation. |
| |
| Args: |
| add_invocations (list of str): invocation IDs to add to the current |
| invocation. |
| remove_invocations (list of str): invocation IDs to remove from the |
| current invocation. |
| |
| This updates the inclusions of the current invocation specified in the |
| LUCI_CONTEXT. |
| """ |
| self.assert_enabled() |
| |
| if not (add_invocations or remove_invocations): |
| # Nothing to do. |
| return |
| |
| names = lambda ids: ['invocations/%s' % id for id in ids or []] |
| req = recorder.UpdateIncludedInvocationsRequest( |
| including_invocation=self.current_invocation, |
| add_invocations=names(add_invocations), |
| remove_invocations=names(remove_invocations), |
| ) |
| |
| self._rpc( |
| step_name or 'resultdb.update_included_invocations', |
| 'luci.resultdb.v1.Recorder', |
| 'UpdateIncludedInvocations', |
| json_format.MessageToDict(req), |
| include_update_token=True, |
| step_test_data=lambda: self.m.json.test_api.output_stream({})) |
| |
| def get_included_invocations(self, inv_name=None, step_name=None): |
| """Returns names of included invocations of the input invocation. |
| |
| Args: |
| inv_name (str): the name of the input invocation. If input is None, will |
| use current invocation. |
| step_name (str): name of the step. |
| |
| Returns: |
| A list of invocation name strs. |
| """ |
| req = resultdb.GetInvocationRequest( |
| name=inv_name or self.current_invocation) |
| res = self._rpc( |
| step_name or 'get_included_invocations', |
| 'luci.resultdb.v1.ResultDB', |
| 'GetInvocation', |
| json_format.MessageToDict(req), |
| include_update_token=True, |
| step_test_data=lambda: self.m.json.test_api.output_stream({})) |
| |
| inv_msg = json_format.ParseDict( |
| res, invocation_pb2.Invocation(), ignore_unknown_fields=True) |
| return inv_msg.included_invocations or [] |
| |
| def exonerate(self, test_exonerations, step_name=None): |
| """Exonerates test variants in the current invocation. |
| |
| Args: |
| test_exonerations (list): A list of test_result_pb2.TestExoneration. |
| step_name (str): name of the step. |
| """ |
| |
| def args(test_exonerations, step_name): |
| req = recorder.BatchCreateTestExonerationsRequest( |
| invocation=self.current_invocation, |
| request_id=self.m.uuid.random(), |
| ) |
| for te in test_exonerations: |
| req.requests.add(test_exoneration=te) |
| |
| return [ |
| step_name, 'luci.resultdb.v1.Recorder', 'BatchCreateTestExonerations', |
| json_format.MessageToDict(req), |
| True, lambda: self.m.json.test_api.output_stream({}) |
| ] |
| |
| if not test_exonerations: |
| return |
| |
| self.assert_enabled() |
| step_name = step_name or 'resultdb.exonerate' |
| |
| if len(test_exonerations) <= self._BATCH_SIZE: |
| self._rpc(*args(test_exonerations, step_name)) |
| return |
| |
| # Sends requests in batches. |
| remaining = test_exonerations |
| i = 0 |
| with self.m.step.nest(step_name): |
| while remaining: |
| batch = remaining[:self._BATCH_SIZE] |
| remaining = remaining[self._BATCH_SIZE:] |
| self.m.futures.spawn(self._rpc, *args(batch, 'batch (%d)' % i)) |
| i += 1 |
| |
| def invocation_ids(self, inv_names): |
| """Returns invocation IDs by parsing invocation names. |
| |
| Args: |
| inv_names (list of str): ResultDB invocation names. |
| |
| Returns: |
| A list of invocation_ids. |
| """ |
| assert all(isinstance(name, str) for name in inv_names), inv_names |
| assert all(name.startswith( |
| self._INVOCATION_NAME_PREFIX) for name in inv_names), inv_names |
| |
| return [name[len(self._INVOCATION_NAME_PREFIX):] for name in inv_names] |
| |
| def query(self, |
| inv_ids, |
| variants_with_unexpected_results=False, |
| merge=False, |
| limit=None, |
| step_name=None, |
| tr_fields=None, |
| test_invocations=None, |
| test_regex=None): |
| """Returns test results in the invocations. |
| |
| Most users will be interested only in results of test variants that had |
| unexpected results. This can be achieved by passing |
| variants_with_unexpected_results=True. This significantly reduces output |
| size and latency. |
| |
| Example: |
| results = api.resultdb.query( |
| [ |
| # Invocation ID for a Swarming task. |
| 'task-chromium-swarm.appspot.com-deadbeef', |
| # Invocation ID for a Buildbucket build. |
| 'build-234298374982' |
| ], |
| variants_with_unexpected_results=True, |
| ) |
| |
| Args: |
| inv_ids (list of str): IDs of the invocations. |
| variants_with_unexpected_results (bool): if True, return only test |
| results from variants that have unexpected results. |
| merge (bool): if True, return test results as if all invocations |
| are one, otherwise, results will be ordered by invocation. |
| limit (int): maximum number of test results to return. |
| Unlimited if 0. Defaults to 1000. |
| step_name (str): name of the step. |
| tr_fields (list of str): test result fields in the response. |
| Test result name will always be included regardless of this param value. |
| test_invocations (dict {invocation_id: api.Invocation}): Default test data |
| to be used to simulate the step in tests. The format is the same as |
| what this method returns. |
| test_regex (str): A regular expression of the relevant test variants |
| to query for. |
| |
| Returns: |
| A dict {invocation_id: api.Invocation}. |
| """ |
| assert len(inv_ids) > 0 |
| assert all(isinstance(id, str) for id in inv_ids), inv_ids |
| assert limit is None or limit >= 0 |
| assert tr_fields is None or all( |
| isinstance(field, str) for field in tr_fields), tr_fields |
| assert test_regex is None or isinstance(test_regex, str) |
| limit = 1000 if limit is None else limit |
| |
| args = [ |
| '-json', |
| '-n', str(limit), |
| ] |
| if variants_with_unexpected_results: |
| args += ['-u'] |
| if merge: |
| args += ['-merge'] |
| if tr_fields: |
| args += ['-tr-fields', ','.join(tr_fields)] |
| if test_regex: |
| args += ['-test', test_regex] |
| |
| args += list(inv_ids) |
| |
| step_res = self._run_rdb( |
| subcommand='query', |
| args=args, |
| step_name=step_name, |
| stdout=self.m.raw_io.output_text(add_output_log=True), |
| step_test_data=lambda: self.m.raw_io.test_api.stream_output_text( |
| common.serialize(test_invocations or {})), |
| ) |
| return common.deserialize(step_res.stdout) |
| |
| def query_test_result_statistics(self, invocations=None, step_name=None): |
| """Retrieve stats of test results for the given invocations. |
| |
| Makes a call to the QueryTestResultStatistics API. Returns stats for all |
| given invocations, including those included indirectly. |
| |
| Args: |
| invocations (list): A list of the invocations to query statistics for. |
| If None, the current invocation will be used. |
| step_name (str): name of the step. |
| |
| Returns: |
| A QueryTestResultStatisticsResponse proto message with statistics for the |
| queried invocations. |
| """ |
| |
| if invocations is None: |
| invocations = [self.current_invocation] |
| |
| req = resultdb.QueryTestResultStatisticsRequest(invocations=invocations) |
| |
| res = self._rpc( |
| step_name or 'query_test_result_statistics', |
| 'luci.resultdb.v1.ResultDB', |
| 'QueryTestResultStatistics', |
| req=json_format.MessageToDict(req), |
| step_test_data=lambda: self.m.json.test_api.output_stream({})) |
| |
| return json_format.ParseDict( |
| res, |
| resultdb.QueryTestResultStatisticsResponse(), |
| ignore_unknown_fields=True) |
| |
| def upload_invocation_artifacts( |
| self, artifacts, parent_inv=None, step_name=None): |
| """Create artifacts with the given content type and contents or gcs_uri. |
| |
| Makes a call to the BatchCreateArtifacts API. Returns the created |
| artifacts. |
| |
| Args: |
| artifacts (dict): a collection of artifacts to create. Each key is an |
| artifact ID, with the corresponding value being a dict containing: |
| 'content_type' (optional) |
| one of 'contents' (binary string) or 'gcs_uri' (str) |
| parent_inv (str): the name of the invocation to create the artifacts |
| under. If None, the current invocation will be used. |
| step_name (str): name of the step. |
| |
| Returns: |
| A BatchCreateArtifactsResponse proto message listing the artifacts that |
| were created. |
| """ |
| |
| # TODO: mohrr - Transition artifacts argument from dict[str, dict] to |
| # specific types like dict[str, ContentsArtifact | GcsUriArtifact]. |
| |
| def ensure_bytes(s: str | bytes) -> bytes: |
| if isinstance(s, bytes): |
| return s |
| return s.encode() |
| |
| req = recorder.BatchCreateArtifactsRequest(requests=[ |
| recorder.CreateArtifactRequest( |
| parent=parent_inv or self.current_invocation, |
| artifact=artifact.Artifact( |
| artifact_id=art_id, |
| content_type=art.get('content_type', ''), |
| contents=ensure_bytes(art.get('contents', b'')), |
| gcs_uri=art.get('gcs_uri', ''), |
| ), |
| ) for art_id, art in artifacts.items() |
| ]) |
| |
| res = self._rpc( |
| step_name or 'upload_invocation_artifacts', |
| 'luci.resultdb.v1.Recorder', |
| 'BatchCreateArtifacts', |
| req=json_format.MessageToDict(req), |
| include_update_token=True, |
| step_test_data=lambda: self.m.json.test_api.output_stream({})) |
| |
| return json_format.ParseDict( |
| res, |
| recorder.BatchCreateArtifactsResponse(), |
| ignore_unknown_fields=True) |
| |
| def query_test_results(self, |
| invocations, |
| test_id_regexp=None, |
| variant_predicate=None, |
| field_mask_paths=None, |
| page_size=100, |
| page_token=None, |
| step_name=None): |
| """Retrieve test results from an invocation, recursively. |
| |
| Makes a call to QueryTestResults rpc. Returns a list of test results for the |
| invocations and matching the given filters. |
| |
| Args: |
| invocations (list of str): retrieve the test results included in these |
| invocations. |
| test_id_regexp (str): the subset of test IDs to request history for. |
| Default to None. |
| variant_predicate (resultdb.proto.v1.predicate.VariantPredicate): |
| the subset of test variants to request history for. Defaults to None, |
| but specifying will improve runtime. |
| field_mask_paths (list of str): test result fields in the response. |
| Test result name will always be included regardless of this param value. |
| page_size (int): the maximum number of variants to return. The service may |
| return fewer than this value. The maximum value is 1000; values above |
| 1000 will be coerced to 1000. Defaults to 100. |
| page_token (str): for instances in which the results span multiple pages, |
| each response will contain a page token for the next page, which can be |
| passed in to the next request. Defaults to None, which returns the first |
| page. |
| step_name (str): name of the step. |
| |
| Returns: |
| A QueryTestResultsResponse proto message with test_results and |
| next_page_token. |
| |
| For value format, see [`QueryTestResultsResponse` message] |
| (https://bit.ly/3dsChbo) |
| """ |
| |
| req = resultdb.QueryTestResultsRequest( |
| invocations=invocations, |
| predicate=predicate.TestResultPredicate( |
| test_id_regexp=test_id_regexp, |
| variant=variant_predicate, |
| ), |
| page_size=page_size, |
| page_token=page_token, |
| read_mask=field_mask_pb2.FieldMask(paths=field_mask_paths), |
| ) |
| |
| res = self._rpc(step_name or 'query_test_results', |
| 'luci.resultdb.v1.ResultDB', |
| 'QueryTestResults', |
| req=json_format.MessageToDict(req)) |
| |
| return json_format.ParseDict( |
| res, |
| resultdb.QueryTestResultsResponse(), |
| # Do not fail the build because recipe's proto copy is stale. |
| ignore_unknown_fields=True) |
| |
| def query_test_variants(self, |
| invocations, |
| test_variant_status=None, |
| field_mask_paths=None, |
| page_size=100, |
| page_token=None, |
| step_name=None): |
| """Retrieve test variants from an invocation, recursively. |
| |
| Makes a call to QueryTestVariants rpc. Returns a list of test variants for |
| the invocations and matching the given filters. |
| |
| Args: |
| invocations (list of str): retrieve the test results included in these |
| invocations. |
| test_variant_status (resultdb.proto.v1.test_variant.TestVariantStatus): |
| Use the UNEXPECTED_MASK status to retrieve only variants with |
| non-EXPECTED status. |
| field_mask_paths (list of str): test variant fields in the response. |
| Test id, variantHash and status will always be included. Example: |
| use ["test_id", "variant", "status", "sources_id"] to exclude results |
| from the response. (Note that test_id and status are still specified for |
| clarity.) |
| page_size (int): the maximum number of variants to return. The service may |
| return fewer than this value. The maximum value is 1000; values above |
| 1000 will be coerced to 1000. Defaults to 100. |
| page_token (str): for instances in which the results span multiple pages, |
| each response will contain a page token for the next page, which can be |
| passed in to the next request. Defaults to None, which returns the first |
| page. |
| step_name (str): name of the step. |
| |
| Returns: |
| A QueryTestVariantsResponse proto message with test_results and |
| next_page_token. |
| |
| For value format, see [`QueryTestVariantsResponse` message] |
| (http://shortn/_hv3edsXidO) |
| """ |
| predicate = None |
| if test_variant_status: |
| predicate = test_variant.TestVariantPredicate( |
| status=test_variant.TestVariantStatus.Value(test_variant_status)) |
| req = resultdb.QueryTestVariantsRequest( |
| invocations=invocations, |
| predicate=predicate, |
| page_size=page_size, |
| page_token=page_token, |
| read_mask=field_mask_pb2.FieldMask(paths=field_mask_paths), |
| ) |
| |
| res = self._rpc( |
| step_name or 'query_test_variants', |
| 'luci.resultdb.v1.ResultDB', |
| 'QueryTestVariants', |
| req=json_format.MessageToDict(req)) |
| |
| return json_format.ParseDict( |
| res, |
| resultdb.QueryTestVariantsResponse(), |
| # Do not fail the build because recipe's proto copy is stale. |
| ignore_unknown_fields=True) |
| |
| def query_new_test_variants( |
| self, |
| invocation: str, |
| baseline: str, |
| step_name: str = None, |
| step_test_data: dict = None) -> resultdb.QueryNewTestVariantsResponse(): |
| """Query ResultDB for new tests. |
| |
| Makes a QueryNewTestVariants rpc. |
| |
| Args: |
| inovcation: Name of the invocation, e.g. "invocations/{id}". |
| baseline: The baseline to compare test variants against, to determine if |
| they are new. e.g. “projects/{project}/baselines/{baseline_id}”. |
| |
| Returns: |
| A QueryNewTestVariantsResponse proto message with is_baseline_ready and |
| new_test_variants. |
| """ |
| req = resultdb.QueryNewTestVariantsRequest( |
| invocation=invocation, |
| baseline=baseline, |
| ) |
| |
| res = self._rpc( |
| step_name or 'query_new_test_variants', |
| 'luci.resultdb.v1.ResultDB', |
| 'QueryNewTestVariants', |
| req=json_format.MessageToDict(req), |
| step_test_data=( |
| lambda: self.m.json.test_api.output_stream(step_test_data or {})), |
| ) |
| return json_format.ParseDict( |
| res, |
| resultdb.QueryNewTestVariantsResponse(), |
| # Do not fail the build because recipe's proto copy is stale. |
| ignore_unknown_fields=True) |
| |
| def update_invocation(self, |
| parent_inv='', |
| step_name=None, |
| source_spec=None, |
| baseline_id=None): |
| """Makes a call to the UpdateInvocation API to update the invocation |
| |
| Args: |
| parent_inv (str): the name of the invocation to be updated. |
| step_name (str): name of the step. |
| source_spec (luci.resultdb.v1.SourceSpec): The source information |
| to apply to the given invocation. |
| baseline_id (str): Baseline identifier for this invocation, usually of |
| the format {buildbucket bucket}:{buildbucket builder name}. For example, |
| 'try:linux-rel'. Baselines are used to detect new tests in invocations. |
| """ |
| field_mask_paths = [] |
| if source_spec: |
| field_mask_paths.append('source_spec') |
| if baseline_id: |
| field_mask_paths.append('baseline_id') |
| |
| req = recorder.UpdateInvocationRequest( |
| invocation=invocation_pb2.Invocation( |
| name=parent_inv or self.current_invocation, |
| source_spec=source_spec, |
| baseline_id=baseline_id), |
| update_mask=field_mask_pb2.FieldMask(paths=field_mask_paths), |
| ) |
| self._rpc( |
| step_name or 'update_invocations', |
| 'luci.resultdb.v1.Recorder', |
| 'UpdateInvocation', |
| req=json_format.MessageToDict(req), |
| include_update_token=True, |
| step_test_data=lambda: self.m.json.test_api.output_stream({})) |
| |
| |
| ############################################################################## |
| # Implementation details. |
| |
| def _rpc(self, |
| step_name, |
| service, |
| method, |
| req, |
| include_update_token=False, |
| step_test_data=None): |
| """Makes a ResultDB RPC. |
| |
| Args: |
| step_name (str): name of the step. |
| service (string): the full name of a service, e.g. |
| "luci.resultdb.v1.ResultDB". |
| method (string): the name of the method, e.g. "GetInvocation". |
| req (dict): request message. |
| include_update_token (bool): A flag to indicate if the RPC requires the |
| update token of the invocation. |
| |
| Returns: |
| A dict representation of the response message. |
| """ |
| args = [service, method] |
| if include_update_token: |
| args.append('-include-update-token') |
| |
| step_res = self._run_rdb( |
| subcommand='rpc', |
| step_name=step_name, |
| args=args, |
| stdin=self.m.json.input(req), |
| stdout=self.m.json.output(), |
| step_test_data=step_test_data, |
| ) |
| step_res.presentation.logs['json.input'] = self.m.json.dumps(req, indent=2) |
| |
| return step_res.stdout |
| |
| def _run_rdb(self, |
| subcommand, |
| step_name=None, |
| args=None, |
| stdin=None, |
| stdout=None, |
| step_test_data=None, |
| timeout=None): |
| """Runs rdb tool.""" |
| cmdline = ['rdb', subcommand] + (args or []) |
| |
| return self.m.step( |
| step_name or ('rdb ' + subcommand), |
| cmdline, |
| infra_step=True, |
| stdin=stdin, |
| stdout=stdout, |
| step_test_data=step_test_data, |
| timeout=timeout, |
| ) |
| |
| def wrap( |
| self, |
| cmd, |
| test_id_prefix='', |
| base_variant=None, |
| test_location_base='', |
| base_tags=None, |
| coerce_negative_duration=False, |
| include=False, |
| realm='', |
| location_tags_file='', |
| require_build_inv=True, |
| exonerate_unexpected_pass=False, |
| inv_properties='', |
| inv_properties_file='', |
| inherit_sources=False, |
| sources='', |
| sources_file='', |
| baseline_id='', |
| ): |
| """Wraps the command with ResultSink. |
| |
| Returns a command that, when executed, runs cmd in a go/result-sink |
| environment. For example: |
| |
| api.step('test', api.resultdb.wrap(['./my_test'])) |
| |
| Args: |
| cmd (list of strings): the command line to run. |
| test_id_prefix (str): a prefix to prepend to test IDs of test results |
| reported by cmd. |
| base_variant (dict): variant key-value pairs to attach to all test results |
| reported by cmd. If both base_variant and a reported variant have a |
| value for the same key, the reported one wins. |
| Example: |
| base_variant={ |
| 'bucket': api.buildbucket.build.builder.bucket, |
| 'builder': api.buildbucket.builder_name, |
| } |
| test_location_base (str): the base path to prepend to the test location |
| file name with a relative path. The value must start with "//". |
| base_tags (list of (string, string)): tags to attach to all test results |
| reported by cmd. Each element is a tuple of (key, value), and a key |
| may be repeated. |
| coerce_negative_duration (bool): If true, negative duration values will |
| be coerced to 0. If false, tests results with negative duration values |
| will be rejected with an error. |
| include (bool): If true, a new invocation will be created and included |
| in the parent invocation. |
| realm (str): realm used for the new invocation created if `include=True`. |
| Default is the current realm used in buildbucket. |
| location_tags_file (str): path to the file that contains test location |
| tags in JSON format. |
| require_build_inv(bool): flag to control if the build is required to have |
| an invocation. |
| exonerate_unexpected_pass(bool): flag to control if automatically |
| exonerate unexpected passes. |
| inv_properties(str): stringified JSON object that contains structured, |
| domain-specific properties of the invocation. When not specified, |
| invocation-level properties will not be updated. |
| inv_properties_file(string): Similar to inv_properties but takes a path |
| to the file that contains the JSON object. Cannot be used when |
| inv_properties is specified. |
| inherit_sources(bool): flag to enable inheriting sources from the parent |
| invocation. |
| sources(string): JSON-serialized luci.resultdb.v1.Sources object that |
| contains information about the code sources tested by the invocation. |
| Cannot be used when inherit_sources or sources_file is specified. |
| sources_file(string): Similar to sources, but takes a path to the |
| file that contains the JSON object. Cannot be used when |
| inherit_sources or sources is specified. |
| baseline_id(string): Baseline identifier for this invocation, usually of |
| the format {buildbucket bucket}:{buildbucket builder name}. |
| For example, 'try:linux-rel'. |
| """ |
| if require_build_inv: |
| self.assert_enabled() |
| assert isinstance(test_id_prefix, (type(None), str)), test_id_prefix |
| assert isinstance(base_variant, (type(None), dict)), base_variant |
| assert isinstance(cmd, (tuple, list)), cmd |
| assert isinstance(test_location_base, (type(None), str)), test_location_base |
| assert not test_location_base or test_location_base.startswith( |
| '//'), test_location_base |
| assert isinstance(base_tags, (type(None), list)), base_tags |
| assert isinstance(coerce_negative_duration, bool), coerce_negative_duration |
| assert isinstance(include, bool), include |
| assert isinstance(realm, (type(None), str)), realm |
| assert isinstance(location_tags_file, (type(None), str)), location_tags_file |
| assert isinstance( |
| exonerate_unexpected_pass, bool), exonerate_unexpected_pass |
| assert isinstance(inv_properties, (type(None), str)), inv_properties |
| assert isinstance( |
| inv_properties_file, (type(None), str)), inv_properties_file |
| assert not (inv_properties and inv_properties_file), inv_properties_file |
| assert isinstance(inherit_sources, bool), inherit_sources |
| assert isinstance(sources, (type(None), str)), sources |
| assert isinstance(sources_file, (type(None), str)), sources_file |
| assert isinstance(baseline_id, (type(None), str)), baseline_id |
| |
| ret = ['rdb', 'stream'] |
| |
| if test_id_prefix: |
| ret += ['-test-id-prefix', test_id_prefix] |
| |
| for k, v in sorted((base_variant or {}).items()): |
| ret += ['-var', '%s:%s' % (k, v)] |
| |
| if test_location_base: |
| ret += ['-test-location-base', test_location_base] |
| |
| for k, v in sorted(base_tags or []): |
| ret += ['-tag', '%s:%s' % (k, v)] |
| |
| if coerce_negative_duration: |
| ret += ['-coerce-negative-duration'] |
| |
| if include: |
| ret += [ |
| '-new', '-realm', realm or self.m.context.realm, |
| '-include' |
| ] |
| |
| if location_tags_file: |
| ret += ['-location-tags-file', location_tags_file] |
| |
| if exonerate_unexpected_pass: |
| ret += ['-exonerate-unexpected-pass'] |
| |
| if inv_properties: |
| ret += ['-inv-properties', inv_properties] |
| |
| if inv_properties_file: |
| ret += ['-inv-properties-file', inv_properties_file] |
| |
| if inherit_sources: |
| ret += ['-inherit-sources'] |
| |
| if sources: |
| ret += ['-sources', sources] |
| |
| if sources_file: |
| ret += ['-sources-file', sources_file] |
| |
| if baseline_id: |
| ret += ['-baseline-id', baseline_id] |
| |
| ret += ['--'] + list(cmd) |
| return ret |
| |
| def config_test_presentation(self, column_keys=(), grouping_keys=('status',)): |
| """Specifies how the test results should be rendered. |
| |
| Args: |
| column_keys: |
| A list of keys that will be rendered as 'columns'. status is always the |
| first column and name is always the last column (you don't need to |
| specify them). A key must be one of the following: |
| 1. 'v.{variant_key}': variant.def[variant_key] of the test variant (e.g. |
| v.gpu). |
| |
| grouping_keys: |
| A list of keys that will be used for grouping tests. A key must be one |
| of the following: |
| 1. 'status': status of the test variant. |
| 2. 'name': name of the test variant. |
| 3. 'v.{variant_key}': variant.def[variant_key] of the test variant (e.g. |
| v.gpu). |
| Caveat: test variants with only expected results are not affected by |
| this setting and are always in their own group. |
| """ |
| |
| # To be consistent with the lucicfg implementation, set the test |
| # presentation config only when it's not the default value. |
| if list(column_keys) == [] and list(grouping_keys) == ['status']: |
| return |
| |
| # Validate column_keys. |
| for k in column_keys: |
| assert k.startswith('v.') |
| |
| # Validate grouping_keys. |
| for k in grouping_keys: |
| assert k in ['status', 'name'] or k.startswith('v.') |
| |
| # The fact that it sets a property value is an implementation detail. |
| res = self.m.step('set test presentation config', cmd=None) |
| prop_name = '$recipe_engine/resultdb/test_presentation' |
| res.presentation.properties[prop_name] = { |
| 'column_keys': column_keys, |
| 'grouping_keys': grouping_keys, |
| } |