| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # Service that runs on moblab and checks to see if any tests are to be |
| # scheduled for this device. |
| """Service that regularly checks for remote commands and executes them.""" |
| |
| |
| import argparse |
| import datetime |
| import logging |
| import os |
| import sys |
| import time |
| import functools |
| from collections import defaultdict |
| |
| # pylint: disable=no-name-in-module, import-error |
| from google.cloud import exceptions as cloud_exceptions, storage |
| |
| # pylint: disable=no-name-in-module, import-error |
| from google.auth import exceptions as auth_exceptions |
| |
| from moblab_common import afe_connector |
| from moblab_common import config_connector |
| from moblab_common import devserver_connector |
| from moblab_common import versioned_upload |
| |
| import remote_requests |
| import scheduler_common |
| |
| os.environ.setdefault( |
| "GOOGLE_APPLICATION_CREDENTIALS", "/home/moblab/.service_account.json" |
| ) |
| |
| _LOGGER = logging.getLogger(__name__) |
| |
| |
| class MoblabTestScheduler(object): |
| """Service that regularly checks for remote commands and executes them.""" |
| |
| def __init__(self): |
| """Constructor.""" |
| self.storage_client = storage.Client() |
| self.afe_connector = afe_connector.AFEConnector() |
| self.config_connector = config_connector.MoblabConfigConnector( |
| self.afe_connector |
| ) |
| self.requests = remote_requests.MoblabRemoteRequests() |
| self.moblab_bucket_name = self.config_connector.get_cloud_bucket() |
| _LOGGER.info("Using bucket: %s", self.moblab_bucket_name) |
| self.devserver_connector = devserver_connector.DevserverConnector( |
| self.moblab_bucket_name |
| ) |
| self.moblab_id = open(os.environ["HOME"] + "/.moblab_id").readlines() |
| |
| def get_executed_commands(self): |
| """Get the list of commands that have already been executed. |
| |
| Returns: |
| list: A list of strings, each string being a unique id of an |
| executed command. |
| """ |
| return scheduler_common.get_executed_commands( |
| self.storage_client, self.moblab_bucket_name |
| ) |
| |
| def obtain_lock(self, request): |
| """Create lock file for the remote command request. |
| |
| By successfully creating an object in cloud storage with version 0 you |
| are guaranteed to be the only process able to create that file. |
| |
| The contents of the file are just for debugging, possibly future |
| reporting. |
| |
| Args: |
| request: a MoblabRemoteRequest. |
| |
| Returns: |
| bool: True if the lock was successfully obtained otherwise false. |
| """ |
| _LOGGER.debug("Obtaining lock %s", request.unique_id) |
| blob = self.storage_client.bucket(self.moblab_bucket_name).blob( |
| "%s/%s" % (scheduler_common.SCHEDULE_DIR, request.unique_id) |
| ) |
| if blob.exists(): |
| _LOGGER.debug("Lock already %s exists", request.unique_id) |
| return False |
| contents = "Lock date %s\nRequest %s\nMoblab Id: %s" % ( |
| datetime.datetime.utcnow(), |
| request, |
| self.moblab_id, |
| ) |
| try: |
| versioned_upload.upload_from_string(blob, contents, 0) |
| except cloud_exceptions.NotFound: |
| return False |
| return True |
| |
| def get_available_hosts(self): |
| """Get the board.model string for DUT's that can run tests. |
| |
| Make a list of the configured DUT's on this device that are working |
| and availabe to run tests. |
| |
| Returns: |
| dict: mapping of board.model to number of working configured devices |
| connected to this moblab. |
| """ |
| available_hosts = defaultdict(int) |
| hosts = self.afe_connector.get_connected_devices() |
| for host in hosts: |
| if host["status"] != "Ready" or host["locked"]: |
| _LOGGER.debug("Host not ready or locked") |
| continue |
| labels = host["labels"] |
| |
| board = None |
| model = None |
| for label in labels: |
| if label.startswith("model:"): |
| model = label.split(":")[1] |
| continue |
| if label.startswith("board:"): |
| board = label.split(":")[1] |
| continue |
| if not model: |
| model = board |
| identifier = "%s.%s" % (board, model) |
| available_hosts[identifier] += 1 |
| _LOGGER.debug(available_hosts) |
| return available_hosts |
| |
| # Get the required build to run |
| def get_next_command_to_run(self): |
| """Go through the list of commands and figure out the best one to run. |
| |
| Returns: |
| object: Object describing the request to be executed or None if |
| there is no request able to be scheduled. |
| """ |
| attached_boards = self.get_available_hosts() |
| if not attached_boards: |
| return None |
| blob = storage.Blob( |
| scheduler_common.SCHEDULE_FILENAME, |
| self.storage_client.bucket(self.moblab_bucket_name), |
| ) |
| self.requests.load_requests_from_gcs(blob) |
| executed = self.get_executed_commands() |
| |
| def has_been_executed(executed, request): |
| if request.unique_id in executed: |
| _LOGGER.debug( |
| "Rejecting request %s as executed", request.unique_id |
| ) |
| return False |
| return True |
| |
| def can_be_executed(attached_boards, request): |
| return request.can_be_executed(attached_boards) |
| |
| # Filter out all commands that can or should not run on this moblab. |
| self.requests.filter_requests( |
| functools.partial(has_been_executed, executed) |
| ) |
| self.requests.filter_requests( |
| functools.partial(can_be_executed, attached_boards) |
| ) |
| # TODO(haddowk) Filter out / remove expired tasks. |
| self.requests.sort_requests() |
| |
| # return the first highest priority command to run. |
| return self.requests.get_request(index=0) |
| |
| def parse_arguments(self, argv): |
| """Creates the argument parser.""" |
| parser = argparse.ArgumentParser(description=__doc__) |
| |
| parser.add_argument( |
| "-s", |
| "--single_run", |
| action="store_true", |
| help="Run the scheduler one time and then exit.", |
| ) |
| parser.add_argument( |
| "-v", |
| "--verbose", |
| action="store_true", |
| help="Turn on debug _LOGGER.", |
| ) |
| return parser.parse_args(argv) |
| |
| def execute_remote_commands(self, argv): |
| """Loop forever, waiting for new commands to be executed on this device. |
| |
| Args: |
| argv (list): Arguments passed in from the command line. |
| |
| Returns: |
| bool: False if the scheduler is not enabled on this device. |
| """ |
| is_enabled = self.config_connector.is_remote_task_scheduler_enabled() |
| delay_mins = ( |
| self.config_connector.get_remote_task_scheduler_tick_delay() |
| ) |
| debug_enabled = self.config_connector.is_remote_task_debug_enabled() |
| |
| options = self.parse_arguments(argv) |
| |
| logging_severity = logging.INFO |
| if options.verbose or debug_enabled: |
| logging_severity = logging.DEBUG |
| _setup_logging(logging_severity) |
| |
| _LOGGER.info( |
| "enabled %s delay %d debug %s", |
| is_enabled, |
| delay_mins, |
| debug_enabled, |
| ) |
| |
| if not is_enabled: |
| _LOGGER.info("The scheduler is not enabled") |
| return False |
| |
| _LOGGER.info("Starting scheduler: tick delay: %s", delay_mins) |
| |
| while True: |
| request = self.get_next_command_to_run() |
| if request: |
| if self.obtain_lock(request): |
| _LOGGER.info("Executing request. %s", request.unique_id) |
| # TODO - return an enum SUCCESS/FAIL/RETRY. |
| _ = request.execute( |
| self.devserver_connector, self.afe_connector |
| ) |
| else: |
| _LOGGER.debug("No requests to run.") |
| if options.single_run: |
| break |
| _LOGGER.debug("Sleeping for %d mins", delay_mins) |
| time.sleep(delay_mins * 60) |
| |
| |
| def _setup_logging(level): |
| """Enable the correct level of _LOGGER. |
| |
| Args: |
| level (int): One of the predefined _LOGGER levels, e.g loging.DEBUG |
| """ |
| _LOGGER.handlers = [] |
| fh = logging.FileHandler("/var/log/moblab/moblab_remote_schedule.log") |
| fh.setLevel(level) |
| # create formatter and add it to the handlers |
| fh.setFormatter( |
| logging.Formatter( |
| "%(asctime)s %(filename)s:%(lineno)d %(levelname)s: %(message)s" |
| ) |
| ) |
| _LOGGER.addHandler(fh) |
| _LOGGER.setLevel(level) |
| |
| |
| if __name__ == "__main__": |
| try: |
| scheduler = MoblabTestScheduler() |
| except auth_exceptions.DefaultCredentialsError as e: |
| _LOGGER.exception("Failed to start remote scheduler, %s", str(e)) |
| sys.exit(0) |
| scheduler.execute_remote_commands(sys.argv[1:]) |