| # Copyright 2016 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ChromeOS Parnter Concole remote actions.""" |
| |
| from __future__ import print_function |
| |
| import base64 |
| import logging |
| |
| import common |
| |
| from autotest_lib.client.common_lib import global_config |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.server.hosts import moblab_host |
| from autotest_lib.site_utils import pubsub_utils |
| from autotest_lib.site_utils import cloud_console_pb2 as cpcon |
| |
| |
| _PUBSUB_TOPIC = global_config.global_config.get_config_value( |
| 'CROS', 'cloud_notification_topic', default=None) |
| |
| # Current notification version. |
| CURRENT_MESSAGE_VERSION = '1' |
| |
| # Test upload pubsub notification attributes |
| LEGACY_ATTR_VERSION = 'version' |
| LEGACY_ATTR_GCS_URI = 'gcs_uri' |
| LEGACY_ATTR_MOBLAB_MAC = 'moblab_mac_address' |
| LEGACY_ATTR_MOBLAB_ID = 'moblab_id' |
| # the message data for new test result notification. |
| LEGACY_TEST_OFFLOAD_MESSAGE = 'NEW_TEST_RESULT' |
| |
| |
| def is_cloud_notification_enabled(): |
| """Checks if cloud pubsub notification is enabled. |
| |
| @returns: True if cloud pubsub notification is enabled. Otherwise, False. |
| """ |
| return global_config.global_config.get_config_value( |
| 'CROS', 'cloud_notification_enabled', type=bool, default=False) |
| |
| |
| def _get_message_type_name(message_type_enum): |
| """Gets the message type name from message type enum. |
| |
| @param message_type_enum: The message type enum. |
| |
| @return The corresponding message type name as string, or 'MSG_UNKNOWN'. |
| """ |
| return cpcon.MessageType.Name(message_type_enum) |
| |
| |
| def _get_attribute_name(attribute_enum): |
| """Gets the message attribute name from attribte enum. |
| |
| @param attribute_enum: The attribute enum. |
| |
| @return The corresponding attribute name as string, or 'ATTR_INVALID'. |
| """ |
| return cpcon.MessageAttribute.Name(attribute_enum) |
| |
| |
| class CloudConsoleClient(object): |
| """The remote interface to the Cloud Console.""" |
| def send_heartbeat(self): |
| """Sends a heartbeat. |
| |
| @returns True if the notification is successfully sent. |
| Otherwise, False. |
| """ |
| pass |
| |
| def send_event(self, event_type=None, event_data=None): |
| """Sends an event notification to the remote console. |
| |
| @param event_type: The event type that is defined in the protobuffer |
| file 'cloud_console.proto'. |
| @param event_data: The event data. |
| |
| @returns True if the notification is successfully sent. |
| Otherwise, False. |
| """ |
| pass |
| |
| def send_test_job_offloaded_message(self, gcs_uri): |
| """Sends a test job offloaded message to the remote console. |
| |
| @param gcs_uri: The test result Google Cloud Storage URI. |
| |
| @returns True if the notification is successfully sent. |
| Otherwise, False. |
| """ |
| pass |
| |
| |
| # Make it easy to mock out |
| def _create_pubsub_client(credential): |
| return pubsub_utils.PubSubClient(credential) |
| |
| |
| class PubSubBasedClient(CloudConsoleClient): |
| """A Cloud PubSub based implementation of the CloudConsoleClient interface. |
| """ |
| def __init__( |
| self, |
| credential=moblab_host.MOBLAB_SERVICE_ACCOUNT_LOCATION, |
| pubsub_topic=_PUBSUB_TOPIC): |
| """Constructor. |
| |
| @param credential: The service account credential filename. Default to |
| '/home/moblab/.service_account.json'. |
| @param pubsub_topic: The cloud pubsub topic name to use. |
| """ |
| super(PubSubBasedClient, self).__init__() |
| self._pubsub_client = _create_pubsub_client(credential) |
| self._pubsub_topic = pubsub_topic |
| |
| |
| def _create_message(self, data, msg_attributes): |
| """Creates a cloud pubsub notification object. |
| |
| @param data: The message data as a string. |
| @param msg_attributes: The message attribute map. |
| |
| @returns: A pubsub message object with data and attributes. |
| """ |
| message = {} |
| if data: |
| message['data'] = data |
| if msg_attributes: |
| message['attributes'] = msg_attributes |
| return message |
| |
| def _create_message_attributes(self, message_type_enum): |
| """Creates a cloud pubsub notification message attribute map. |
| |
| Fills in the version, moblab mac address, and moblab id information |
| as attributes. |
| |
| @param message_type_enum The message type enum. |
| |
| @returns: A pubsub messsage attribute map. |
| """ |
| msg_attributes = {} |
| msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = ( |
| _get_message_type_name(message_type_enum)) |
| msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = ( |
| CURRENT_MESSAGE_VERSION) |
| msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = ( |
| utils.get_moblab_serial_number()) |
| msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = ( |
| utils.get_moblab_id()) |
| return msg_attributes |
| |
| def _create_test_job_offloaded_message(self, gcs_uri): |
| """Construct a test result notification. |
| |
| TODO(ntang): switch LEGACY to new message format. |
| @param gcs_uri: The test result Google Cloud Storage URI. |
| |
| @returns The notification message. |
| """ |
| data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE) |
| msg_attributes = {} |
| msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION |
| msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = ( |
| utils.get_moblab_serial_number()) |
| msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id() |
| msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri |
| |
| return self._create_message(data, msg_attributes) |
| |
| |
| def send_test_job_offloaded_message(self, gcs_uri): |
| """Notify the cloud console a test job is offloaded. |
| |
| @param gcs_uri: The test result Google Cloud Storage URI. |
| |
| @returns True if the notification is successfully sent. |
| Otherwise, False. |
| """ |
| logging.info('Notification on gcs_uri %s', gcs_uri) |
| message = self._create_test_job_offloaded_message(gcs_uri) |
| return self._publish_notification(message) |
| |
| |
| def _publish_notification(self, message): |
| msg_ids = self._pubsub_client.publish_notifications( |
| self._pubsub_topic, [message]) |
| |
| if msg_ids: |
| logging.debug('Successfully sent out a notification') |
| return True |
| logging.warning('Failed to send notification %s', str(message)) |
| return False |
| |
| def send_heartbeat(self): |
| """Sends a heartbeat. |
| |
| @returns True if the heartbeat notification is successfully sent. |
| Otherwise, False. |
| """ |
| logging.info('Sending a heartbeat') |
| |
| event = cpcon.Heartbeat() |
| # Don't sent local timestamp for now. |
| data = event.SerializeToString() |
| try: |
| attributes = self._create_message_attributes( |
| cpcon.MSG_MOBLAB_HEARTBEAT) |
| message = self._create_message(data, attributes) |
| except ValueError: |
| logging.exception('Failed to create message.') |
| return False |
| return self._publish_notification(message) |
| |
| def send_event(self, event_type=None, event_data=None): |
| """Sends an event notification to the remote console. |
| |
| @param event_type: The event type that is defined in the protobuffer |
| file 'cloud_console.proto'. |
| @param event_data: The event data. |
| |
| @returns True if the notification is successfully sent. |
| Otherwise, False. |
| """ |
| logging.info('Send an event.') |
| if not event_type: |
| logging.info('Failed to send event without a type.') |
| return False |
| |
| event = cpcon.RemoteEventMessage() |
| if event_data: |
| event.data = event_data |
| else: |
| event.data = '' |
| event.type = event_type |
| data = event.SerializeToString() |
| try: |
| attributes = self._create_message_attributes( |
| cpcon.MSG_MOBLAB_REMOTE_EVENT) |
| message = self._create_message(data, attributes) |
| except ValueError: |
| logging.exception('Failed to create message.') |
| return False |
| return self._publish_notification(message) |