| import abc |
| import datetime |
| import glob |
| import json |
| import logging |
| import os |
| import re |
| import shutil |
| |
| import common |
| from client.common_lib import time_utils |
| from client.common_lib import utils |
| from server.cros.dynamic_suite import constants |
| from server.cros.dynamic_suite import frontend_wrappers |
| |
| metrics = utils.metrics_mock |
| |
| |
| SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+' |
| |
| def is_job_expired(age_limit, timestamp): |
| """Check whether a job timestamp is older than an age limit. |
| |
| @param age_limit: Minimum age, measured in days. If the value is |
| not positive, the job is always expired. |
| @param timestamp: Timestamp of the job whose age we are checking. |
| The format must match time_utils.TIME_FMT. |
| |
| @returns True iff the job is old enough to be expired. |
| """ |
| if age_limit <= 0: |
| return True |
| job_time = time_utils.time_string_to_datetime(timestamp) |
| expiration = job_time + datetime.timedelta(days=age_limit) |
| return datetime.datetime.now() >= expiration |
| |
| |
| def get_job_id_or_task_id(result_dir): |
| """Extract job id or special task id from result_dir |
| |
| @param result_dir: path to the result dir. |
| For test job: |
| /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 |
| The hostname at the end is optional. |
| For special task: |
| /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup |
| |
| @returns: str representing the job id or task id. Returns None if fail |
| to parse job or task id from the result_dir. |
| """ |
| if not result_dir: |
| return |
| result_dir = os.path.abspath(result_dir) |
| # Result folder for job running inside container has only job id. |
| ssp_job_pattern = '.*/(\d+)$' |
| # Try to get the job ID from the last pattern of number-text. This avoids |
| # issue with path like 123-results/456-debug_user, in which 456 is the real |
| # job ID. |
| m_job = re.findall('.*/(\d+)-[^/]+', result_dir) |
| if m_job: |
| return m_job[-1] |
| m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir) |
| if m_special_task: |
| return m_special_task.group(1) |
| m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir) |
| if m_ssp_job_pattern and utils.is_in_container(): |
| return m_ssp_job_pattern.group(1) |
| return _get_swarming_run_id(result_dir) |
| |
| |
| def _get_swarming_run_id(path): |
| """Extract the Swarming run_id for a Skylab task from the result path.""" |
| # Legacy swarming results are in directories like |
| # .../results/swarming-3e4391423c3a4311 |
| # In particular, the ending digit is never 0 |
| m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path) |
| if m_legacy_path: |
| return m_legacy_path.group(1) |
| # New style swarming results are in directories like |
| # .../results/swarming-3e4391423c3a4310/1 |
| # - Results are one directory deeper. |
| # - Ending digit of first directory is always 0. |
| m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path) |
| if m_path: |
| return m_path.group(1) + m_path.group(2) |
| return None |
| |
| |
| class _JobDirectory(object, metaclass=abc.ABCMeta): |
| """State associated with a job to be offloaded. |
| |
| The full life-cycle of a job (including failure events that |
| normally don't occur) looks like this: |
| 1. The job's results directory is discovered by |
| `get_job_directories()`, and a job instance is created for it. |
| 2. Calls to `offload()` have no effect so long as the job |
| isn't complete in the database and the job isn't expired |
| according to the `age_limit` parameter. |
| 3. Eventually, the job is both finished and expired. The next |
| call to `offload()` makes the first attempt to offload the |
| directory to GS. Offload is attempted, but fails to complete |
| (e.g. because of a GS problem). |
| 4. Finally, a call to `offload()` succeeds, and the directory no |
| longer exists. Now `is_offloaded()` is true, so the job |
| instance is deleted, and future failures will not mention this |
| directory any more. |
| |
| Only steps 1. and 4. are guaranteed to occur. The others depend |
| on the timing of calls to `offload()`, and on the reliability of |
| the actual offload process. |
| |
| """ |
| |
| GLOB_PATTERN = None # must be redefined in subclass |
| |
| def __init__(self, resultsdir): |
| self.dirname = resultsdir |
| self._id = get_job_id_or_task_id(resultsdir) |
| self.offload_count = 0 |
| self.first_offload_start = 0 |
| |
| @classmethod |
| def get_job_directories(cls): |
| """Return a list of directories of jobs that need offloading.""" |
| return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] |
| |
| @abc.abstractmethod |
| def get_timestamp_if_finished(self): |
| """Return this job's timestamp from the database. |
| |
| If the database has not marked the job as finished, return |
| `None`. Otherwise, return a timestamp for the job. The |
| timestamp is to be used to determine expiration in |
| `is_job_expired()`. |
| |
| @return Return `None` if the job is still running; otherwise |
| return a string with a timestamp in the appropriate |
| format. |
| """ |
| raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") |
| |
| def process_gs_instructions(self): |
| """Process any gs_offloader instructions for this special task. |
| |
| @returns True/False if there is anything left to offload. |
| """ |
| # Default support is to still offload the directory. |
| return True |
| |
| |
| NO_OFFLOAD_README = """These results have been deleted rather than offloaded. |
| This is the expected behavior for passing jobs from the Commit Queue.""" |
| |
| |
| class RegularJobDirectory(_JobDirectory): |
| """Subclass of _JobDirectory for regular test jobs.""" |
| |
| GLOB_PATTERN = '[0-9]*-*' |
| |
| def process_gs_instructions(self): |
| """Process any gs_offloader instructions for this job. |
| |
| @returns True/False if there is anything left to offload. |
| """ |
| # Go through the gs_offloader instructions file for each test in this job. |
| for path in glob.glob( |
| os.path.join(self.dirname, '*', |
| constants.GS_OFFLOADER_INSTRUCTIONS)): |
| with open(path, 'r') as f: |
| gs_off_instructions = json.load(f) |
| if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD): |
| dirname = os.path.dirname(path) |
| _remove_log_directory_contents(dirname) |
| |
| # Finally check if there's anything left to offload. |
| if os.path.exists(self.dirname) and not os.listdir(self.dirname): |
| shutil.rmtree(self.dirname) |
| return False |
| return True |
| |
| def get_timestamp_if_finished(self): |
| """Get the timestamp to use for finished jobs. |
| |
| @returns the latest hqe finished_on time. If the finished_on times are null |
| returns the job's created_on time. |
| """ |
| entry = _cached_afe().get_jobs(id=self._id, finished=True) |
| if not entry: |
| return None |
| hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False, |
| job_id=self._id) |
| if not hqes: |
| return entry[0].created_on |
| # While most Jobs have 1 HQE, some can have multiple, so check them all. |
| return max([hqe.finished_on for hqe in hqes]) |
| |
| |
| def _remove_log_directory_contents(dirpath): |
| """Remove log directory contents. |
| |
| Leave a note explaining what has happened to the logs. |
| |
| @param dirpath: Path to log directory. |
| """ |
| shutil.rmtree(dirpath) |
| os.mkdir(dirpath) |
| breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt') |
| with open(breadcrumb_name, 'w') as f: |
| f.write(NO_OFFLOAD_README) |
| |
| |
| class SpecialJobDirectory(_JobDirectory): |
| """Subclass of _JobDirectory for special (per-host) jobs.""" |
| |
| GLOB_PATTERN = 'hosts/*/[0-9]*-*' |
| |
| def __init__(self, resultsdir): |
| super(SpecialJobDirectory, self).__init__(resultsdir) |
| |
| def get_timestamp_if_finished(self): |
| entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True) |
| return entry[0].time_finished if entry else None |
| |
| |
| def _find_results_dir(dirname): |
| subdirs = [] |
| for root, dirs, files in os.walk(dirname, topdown=True): |
| for f in files: |
| if f == _OFFLOAD_MARKER: |
| subdirs.append(root) |
| return subdirs |
| |
| |
| _OFFLOAD_MARKER = ".ready_for_offload" |
| _marker_parse_error_metric = metrics.Counter( |
| 'chromeos/autotest/gs_offloader/offload_marker_parse_errors', |
| description='Errors parsing the offload marker file') |
| |
| |
| class SwarmingJobDirectory(_JobDirectory): |
| """Subclass of _JobDirectory for Skylab swarming jobs.""" |
| |
| @classmethod |
| def get_job_directories(cls): |
| """Return a list of directories of jobs that need offloading.""" |
| # Legacy swarming results are in directories like |
| # .../results/swarming-3e4391423c3a4311 |
| # In particular, the ending digit is never 0 |
| jobdirs = [ |
| d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]') |
| if os.path.isdir(d) |
| ] |
| # New style swarming results are in directories like |
| # .../results/swarming-3e4391423c3a4310/1 |
| # - Results are one directory deeper. |
| # - Ending digit of first directory is always 0. |
| new_style_topdir = [ |
| d for d in glob.glob('swarming-[0-9a-f]*0') if os.path.isdir(d) |
| ] |
| # When there are multiple tests run in one test_runner build, |
| # the results will be one level deeper with the test_id |
| # as one further subdirectory. |
| # Example: .../results/swarming-3e4391423c3a4310/1/test_id |
| for topdir in new_style_topdir: |
| for d in glob.glob('%s/[1-9a-f]*' % topdir): |
| subdirs = _find_results_dir(d) |
| jobdirs += subdirs |
| |
| return jobdirs |
| |
| def get_timestamp_if_finished(self): |
| """Get the timestamp to use for finished jobs. |
| |
| @returns the latest hqe finished_on time. If the finished_on times are null |
| returns the job's created_on time. |
| """ |
| marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER) |
| try: |
| with open(marker_path) as f: |
| ts_string = f.read().strip() |
| except: |
| return None |
| try: |
| ts = int(ts_string) |
| return time_utils.epoch_time_to_date_string(ts) |
| except ValueError as e: |
| logging.debug('Error parsing %s for %s: %s', _OFFLOAD_MARKER, |
| self.dirname, e) |
| _marker_parse_error_metric.increment() |
| return None |
| |
| |
| _AFE = None |
| def _cached_afe(): |
| global _AFE |
| if _AFE is None: |
| _AFE = frontend_wrappers.RetryingAFE() |
| return _AFE |