blob: 19d6383823df48619e0ad9e0e4db742a16f0af2b [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Moblab pubsub client library."""
import base64
import logging # pylint: disable=cros-logging-import
# pylint: disable=no-name-in-module, import-error
from google.cloud import pubsub_v1
_PUBSUB_TOPIC = "moblab-notification"
_LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Current notification version.
CURRENT_MESSAGE_VERSION = "1"
# Test upload pubsub notification attributes
LEGACY_ATTR_VERSION = "version"
LEGACY_ATTR_GCS_URI = "gcs_uri"
LEGACY_ATTR_MOBLAB_MAC = "moblab_mac_address"
LEGACY_ATTR_MOBLAB_ID = "moblab_id"
# the message data for new test result notification.
LEGACY_TEST_OFFLOAD_MESSAGE = b"NEW_TEST_RESULT"
class PubSubException(Exception):
"""Exception to be raised when the test to push to prod failed."""
pass
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
raise PubSubException(
"Publishing message on {} threw an Exception {}.".format(
"Moblab notifications", message_future.exception()
)
)
else:
_LOGGER.info(message_future.result())
class PubSubClient(object):
"""A generic pubsub client."""
def __init__(self, batch_settings=()):
self.publisher = (
pubsub_v1.PublisherClient()
if len(batch_settings) == 0
else pubsub_v1.PublisherClient(batch_settings)
)
def publish_notifications(self, topic, messages=None):
"""Publishes a test result notification to a given pubsub topic.
Args:
topic: The Cloud pubsub topic.
messages: A list of notification messages.
Returns:
A list of pubsub message ids, and empty if fails.
Raises:
PubSubException if failed to publish the notification.
"""
topic_path = self.publisher.topic_path(
"chromeos-partner-moblab", topic
)
message_ids = []
for message in messages:
resp = self.publisher.publish(
topic_path, message["data"], **message["attributes"]
)
resp.add_done_callback(callback)
try:
message_ids.append(resp.result())
except Exception:
_LOGGER.exception("Failed to publish notification: %s", resp)
return message_ids
class PubSubBasedClient(object):
"""A Cloud PubSub based implementation of the CloudConsoleClient interface."""
def __init__(self, batch_settings=()):
"""Constructor.
Args:
credential: The service account credential filename. Default to
'/home/moblab/.service_account.json'.
pubsub_topic: The cloud pubsub topic name to use.
"""
self.pubsub_client = PubSubClient(batch_settings)
def _create_message(self, data, msg_attributes):
"""Creates a cloud pubsub notification object.
Args:
data: The message data as a string.
msg_attributes: The message attribute map.
Returns:
A pubsub message object with data and attributes.
"""
message = {}
if data:
message["data"] = data
if msg_attributes:
message["attributes"] = msg_attributes
return message
def _create_test_job_offloaded_message(
self, gcs_uri, serial_number, moblab_id
):
"""Construct a test result notification.
TODO(ntang): switch LEGACY to new message format.
Args:
gcs_uri: The test result Google Cloud Storage URI.
Returns:
The notification message.
"""
data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)
msg_attributes = {}
msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION
msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = serial_number
msg_attributes[LEGACY_ATTR_MOBLAB_ID] = moblab_id
msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri
return self._create_message(data, msg_attributes)
def send_test_job_offloaded_message(
self, gcs_uri, serial_number, moblab_id
):
"""Notify the cloud console a test job is offloaded.
Args:
gcs_uri: The test result Google Cloud Storage URI.
Returns:
True if the notification is successfully sent.
Otherwise, False.
"""
_LOGGER.info("Notification on gcs_uri %s", gcs_uri)
message = self._create_test_job_offloaded_message(
gcs_uri, serial_number, moblab_id
)
return self.pubsub_client.publish_notifications(
_PUBSUB_TOPIC, [message]
)
def send_messages_with_attributes(
self, pubsub_topic, msg_attributes_list, data
):
"""Publishes a list of pubsub messages containing the 'attributes' field
to the cloud console.
For each message, the 'data' field is the same but the 'attributes' field
is individualized.
Args:
pubsub_topic: The cloud pubsub topic.
msg_attributes_list: A list of pubsub message attribute maps.
data: The message data as a string; can't be none or empty.
Returns:
A list of pubsub message ids, and empty if fails.
Raises:
KeyError if data is none or empty.
"""
messages = [
self._create_message(data, attr) for attr in msg_attributes_list
]
return self.pubsub_client.publish_notifications(pubsub_topic, messages)