| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Importer implementation for tko_jobs/tko_tests database.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| from ci_results_archiver.importers import abstract_importer |
| |
| |
| class TkoJobImporter(abstract_importer.AbstractImporter): |
| """Importer implementation for tko_jobs/tko_tests database.""" |
| |
| def __init__(self, tko, max_entries, grace_period): |
| """Initializes the importer. |
| |
| Args: |
| tko: TkoConnection object. |
| max_entries: Maximum number of entries to return. |
| grace_period: GracePeriod object. |
| """ |
| self._tko = tko |
| self._max_entries = max_entries |
| self._grace_period = grace_period |
| |
| def ImportEntries(self, next_id): |
| """Imports TKO job entries.""" |
| job_map = self._QueryJobMap(next_id) |
| if not job_map: |
| return [], [], next_id |
| |
| job_keyvals_map = self._QueryJobKeyvalsMap(job_map.keys()) |
| tests_map = self._QueryTestsMap(job_map.keys()) |
| |
| # Join to jobs. |
| for job_id, job in job_map.iteritems(): |
| job['keyvals'] = job_keyvals_map[job_id] |
| job['tests'] = tests_map[job_id] |
| |
| jobs = job_map.values() |
| invalid_tko_test_ids = self._ScanAndApplyInvalidation(jobs) |
| new_next_id = max(j['tko_job_id'] for j in jobs) + 1 if jobs else next_id |
| return jobs, invalid_tko_test_ids, new_next_id |
| |
| def _QueryJobMap(self, next_id): |
| """Retrieves TKO job entries from the TKO database. |
| |
| Args: |
| next_id: ID to start importing. |
| |
| Returns: |
| A dictionary whose key is a job ID and value is a job dictionary. |
| |
| Raises: |
| MySQLdb.Error: On MySQL errors. |
| """ |
| jobs = self._tko.QueryJobs( |
| tko_job_id_start=next_id, limit=self._max_entries) |
| |
| jobs = self._grace_period.FilterEntries(jobs) |
| |
| return {job['tko_job_id']: job for job in jobs} |
| |
| def _QueryJobKeyvalsMap(self, job_ids): |
| """Retrieves TKO job keyvals from the TKO database. |
| |
| Args: |
| job_ids: A list of TKO job IDs to retrieve keyvals of. |
| |
| Returns: |
| A dictionary whose key is a job ID and value is a list of keyvals. |
| |
| Raises: |
| MySQLdb.Error: On MySQL errors. |
| """ |
| all_keyvals = self._tko.QueryJobKeyvals(tko_job_ids=job_ids) |
| |
| job_keyvals_map = {job_id: [] for job_id in job_ids} |
| for keyval in all_keyvals: |
| job_id = keyval.pop('tko_job_id') |
| job_keyvals_map[job_id].append(keyval) |
| |
| for keyvals in job_keyvals_map.itervalues(): |
| keyvals.sort(key=lambda keyval: keyval['key']) |
| |
| return job_keyvals_map |
| |
| def _QueryTestsMap(self, job_ids): |
| """Retrieves TKO tests from the TKO database. |
| |
| Args: |
| job_ids: A list of TKO job IDs to retrieve tests of. |
| |
| Returns: |
| A dictionary whose key is a job ID and value is a list of tests. |
| |
| Raises: |
| MySQLdb.Error: On MySQL errors. |
| """ |
| all_tests = self._tko.QueryTests(tko_job_ids=job_ids) |
| |
| tests_map = {job_id: [] for job_id in job_ids} |
| for test in all_tests: |
| job_id = test.pop('tko_job_id') |
| tests_map[job_id].append(test) |
| |
| for tests in tests_map.itervalues(): |
| tests.sort(key=lambda t: t['tko_test_id']) |
| |
| return tests_map |
| |
| def _ScanAndApplyInvalidation(self, jobs): |
| """Scans test entries and applies invalidation. |
| |
| Rows in TKO tko_tests table can be invalidated when a test result |
| is overridden by a retry run (see tko/parse.py). In this case, |
| `invalid` column of an overridden test is set to true, and also |
| `invalidates_test_idx` column of an overriding test is set to the |
| ID of an overridden test. |
| |
| This function scans |jobs| and invalidates tests referenced in |
| `invalidates_test_idx`. If a test referenced in the column is not |
| found in |jobs|, it means we already processed it in an earlier run, |
| possibly leaving it as "not invalid". In this case we need to |
| update existing rows in BigQuery tables to invalidate such entries. |
| |
| Args: |
| jobs: A list of dictionaries representing TKO jobs. |
| |
| Returns: |
| A list of TKO test IDs to be invalidated, or None. |
| """ |
| all_tests = [] |
| for job in jobs: |
| all_tests.extend(job['tests']) |
| |
| invalid_test_id_set = set() |
| for test in all_tests: |
| invalid_test_id = test['invalidating_tko_test_id'] |
| if invalid_test_id: |
| invalid_test_id_set.add(invalid_test_id) |
| |
| # Apply invalidations in the current test set. |
| for test in all_tests: |
| if test['tko_test_id'] in invalid_test_id_set: |
| test['invalid'] = 1 |
| invalid_test_id_set.discard(test['tko_test_id']) |
| |
| # Some tests were processed in last iteration so we need to update them. |
| return sorted(invalid_test_id_set) |