blob: 1b9257af71dedfb4b75142e64641b0c55b0776d4 [file]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Importer implementation for afe_jobs database."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ci_results_archiver.importers import abstract_importer
# Job enum value-to-name mappings.
_JOB_ENUM_MAPPINGS = {
'control_type': {
1: 'SERVER',
2: 'CLIENT',
},
'reboot_before': {
0: 'NEVER',
1: 'IF_DIRTY',
2: 'ALWAYS',
},
'reboot_after': {
0: 'NEVER',
1: 'IF_ALL_TESTS_PASSED',
2: 'ALWAYS',
},
}
class AfeJobImporter(abstract_importer.AbstractImporter):
"""Importer implementation for afe_jobs database."""
def __init__(self, afe, max_entries, grace_period):
"""Initializes the importer.
Args:
afe: AfeConnection object.
max_entries: Maximum number of entries to return.
grace_period: GracePeriod object.
"""
self._afe = afe
self._max_entries = max_entries
self._grace_period = grace_period
def ImportEntries(self, next_id):
"""Imports AFE job entries."""
job_map = self._QueryJobMap(next_id)
if not job_map:
return [], [], next_id
job_keyvals_map = self._QueryJobKeyvalsMap(job_map.keys())
job_dependency_labels_map = self._QueryJobDependencyLabelsMap(
job_map.keys())
# Join to jobs.
for job_id, job in job_map.iteritems():
job['keyvals'] = job_keyvals_map[job_id]
job['dependency_labels'] = job_dependency_labels_map[job_id]
jobs = job_map.values()
new_next_id = max(j['afe_job_id'] for j in jobs) + 1 if jobs else next_id
return jobs, [], new_next_id
def _QueryJobMap(self, next_id):
"""Retrieves AFE job entries from the AFE database.
Args:
next_id: ID to start importing.
Returns:
A dictionary whose key is a job ID and value is a job dictionary.
Raises:
MySQLdb.Error: On MySQL errors.
"""
jobs = self._afe.QueryJobs(
afe_job_id_start=next_id, limit=self._max_entries)
jobs = self._grace_period.FilterEntries(jobs)
for job in jobs:
# Truncate too long control files.
job['control_file'] = job['control_file'][:8192]
for key, mapping in _JOB_ENUM_MAPPINGS.iteritems():
job[key] = mapping[job[key]]
return {job['afe_job_id']: job for job in jobs}
def _QueryJobKeyvalsMap(self, job_ids):
"""Retrieves AFE job keyvals from the AFE database.
Args:
job_ids: A list of AFE job IDs to retrieve keyvals of.
Returns:
A dictionary whose key is a job ID and value is a list of keyvals.
Raises:
MySQLdb.Error: On MySQL errors.
"""
all_keyvals = self._afe.QueryJobKeyvals(afe_job_ids=job_ids)
job_keyvals_map = {job_id: [] for job_id in job_ids}
for keyval in all_keyvals:
job_id = keyval.pop('afe_job_id')
job_keyvals_map[job_id].append(keyval)
for keyvals in job_keyvals_map.itervalues():
keyvals.sort(key=lambda keyval: keyval['key'])
return job_keyvals_map
def _QueryJobDependencyLabelsMap(self, job_ids):
"""Retrieves AFE job dependency labels from the AFE database.
Args:
job_ids: A list of AFE job IDs to retrieve dependency labels of.
Returns:
A dictionary whose key is a job ID and value is a list of dependency
labels.
Raises:
MySQLdb.Error: On MySQL errors.
"""
all_dependency_labels = self._afe.QueryJobDependencyLabels(
afe_job_ids=job_ids)
job_dependency_labels_map = {job_id: [] for job_id in job_ids}
for label in all_dependency_labels:
job_id = label['afe_job_id']
job_dependency_labels_map[job_id].append(label['label'])
for dependency_labels in job_dependency_labels_map.itervalues():
dependency_labels.sort()
return job_dependency_labels_map