# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Helper functions for working with Cloud Pub/Sub."""
import base64
import contextlib
import copy
import logging
import re
from google.appengine.ext import ndb
import webapp2
from components import net
# From Pub/Sub docs: `{subscription}` must start with a letter, and contain only
# letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), underscores (`_`),
# periods (`.`), tildes (`~`), plus (`+`) or percent signs (`%`). It must be
# between 3 and 255 characters in length, and it must not start with `"goog"`.
# The last check is done in _validate_name to simplify the regexp.
# Same rules apply to topic names too.
_NAME_RE = re.compile(r'^[A-Za-z][A-Za-z0-9\-_\.~\+%]{2,254}$')
class Error(Exception):
"""Raised on fatal errors."""
def __init__(self, inner):
super(Error, self).__init__(str(inner))
self.inner = inner
class TransientError(Exception):
"""Raised on errors that can go away with retry. Results in 500 HTTP code.
Specifically not a subclass of Error, so that "except Error" block passes
transient errors through.
def __init__(self, inner):
super(TransientError, self).__init__(str(inner))
self.inner = inner
def validate_project(project):
"""Ensures the given project is valid for Cloud Pub/Sub."""
# Technically, there are more restrictions for project names than we check
# here, but the API will reject anything that doesn't match. We only check /
# in case the user is trying to manipulate the topic into posting somewhere
# else (e.g. by setting the project as ../../<some other project>.
return project and '/' not in project
def validate_topic(topic):
"""Ensures the given topic is valid for Cloud Pub/Sub."""
return _validate_name(topic)
def validate_subscription(subscription):
"""Ensures the given subscription is valid for Cloud Pub/Sub."""
return _validate_name(subscription)
def full_topic_name(project, topic):
"""Returns full topic name in given project."""
assert validate_project(project), project
assert validate_topic(topic), topic
return 'projects/%s/topics/%s' % (project, topic)
def full_subscription_name(project, subscription):
"""Returns full subscription name in given project."""
assert validate_project(project), project
assert validate_subscription(subscription), subscription
return 'projects/%s/subscriptions/%s' % (project, subscription)
def validate_full_name(name, kind):
"""Returns True if name has form "projects/<project-id>/<kind>/<id>."""
chunks = name.split('/')
return (
len(chunks) == 4 and
chunks[0] == 'projects' and
validate_project(chunks[1]) and
chunks[2] == kind and
def _validate_name(name):
"""Returns True if the name matches rules for topic and subcription names."""
return (
not name.startswith('goog') and
def _call_async(method, endpoint, payload=None):
"""Makes HTTP request to Pub/Sub service.
method: HTTP verb, such as 'GET' or 'PUT'.
endpoint: URL of the endpoint, relative to
payload: Body of the request to send as JSON.
return net.json_request_async(
url='' + endpoint,
def _call(method, endpoint, payload=None, accepted_http_statuses=None):
"""Makes HTTP request to Pub/Sub service.
method: HTTP verb, such as 'GET' or 'PUT'.
endpoint: URL of the endpoint, relative to
payload: Body of the request to send as JSON.
accepted_http_statuses: List of additional status codes to treat as success.
Error or TransientError.
return _call_async(method, endpoint, payload=payload).get_result()
except net.Error as e:
if accepted_http_statuses and e.status_code in accepted_http_statuses:
return None
if e.status_code >= 500:
raise TransientError(e)
raise Error(e)
def ensure_topic_exists(topic):
"""Ensures the given Cloud Pub/Sub topic exists.
topic: Full topic name, as returned by `full_topic_name`.
Error or TransientError.
assert validate_full_name(topic, 'topics'), topic
# 409 is the status code when the topic already exists.
_call('PUT', topic, accepted_http_statuses=[409])
def _with_existing_topic(topic, callback):
"""Calls callback, catches 404, creates the topic, calls callback again.
Used by 'publish' and 'ensure_subscription_exists'.
Error or TransientError.
return callback()
except Error as e:
if e.inner.status_code != 404:
raise e
return callback()
def publish_multi(topic, messages):
"""Publish messages to Cloud Pub/Sub. Creates the topic if it doesn't exist.
topic: Full name of the topic to publish to.
messages: Content of the message to publish mapped to any attributes to
send with the message.
Error or TransientError.
assert validate_full_name(topic, 'topics'), topic
messages = [
{'attributes': attributes or {}, 'data': base64.b64encode(message)}
for message, attributes in messages.iteritems()
def call_publish():
_call('POST', '%s:publish' % topic, payload={'messages': messages})
_with_existing_topic(topic, call_publish)
def publish(topic, message, attributes):
"""Publish a message to Cloud Pub/Sub. Creates the topic if it doesn't exist.
topic: Full name of the topic to publish to.
message: Content of the message to publish.
attributes: Any attributes to send with the message.
Error or TransientError.
publish_multi(topic, {message: attributes})
def modify_ack_deadline_async(subscription, deadline, *ack_ids):
"""Modifies acknowledgement deadline of messages.
subcription: Full name of the subscription.
deadline: New deadline (in seconds).
*ack_ids: List of IDs of messages to extend the ack deadline of.
return _call_async(
'%s:modifyAckDeadline' % subscription,
'ackDeadlineSeconds': deadline,
'ackIds': ack_ids,
def ack_async(subscription, *ack_ids):
"""Acknowledges receipt of messages.
subscription: Full name of the subscription.
*ack_ids: List of IDs of messages to ack.
return _call_async(
'POST', '%s:acknowledge' % subscription, payload={'ackIds': ack_ids})
def ack(*args, **kwargs):
return ack_async(*args, **kwargs).get_result()
def pull(subscription, max_messages=100):
"""Polls a pull subscription for messages.
subscription: Full name of the subscription.
max_messages: Maximum number of messages to return.
return _call(
'%s:pull' % subscription,
'maxMessages': max_messages,
'returnImmediately': True,
def get_subscription(subscription):
"""Returns subscription dict or None if no such subscription.
subscription: Full name of the subscription.
Error or TransientError.
return _call('GET', subscription, accepted_http_statuses=[404])
def ensure_subscription_exists(
subscription, topic, push_config=None, ack_deadline_seconds=None):
"""Ensures given subscription exists.
Will register new subscription or chage push config of an existing one
if necessary, but won't change a topic or default ack_deadline_seconds if they
don't match requested values (there's no API to do that).
Will also try to create the topic if it's missing.
subscription: Full name of the subscription.
topic: Full name of the topic to subscribe to.
push_config: Dict with push configuration, or None to not touch it.
ack_deadline_seconds: How long to wait for ack.
Error or TransientError.
assert validate_full_name(subscription, 'subscriptions'), subscription
assert validate_full_name(topic, 'topics'), topic
# Need to use GET to ensure the subscription is actually subscripted to
# the given topic and not to something else.
existing = _call('GET', subscription, accepted_http_statuses=[404])
if existing:
if existing['topic'] != topic:
raise Error('Can\'t change topic of an existing subscription')
if (ack_deadline_seconds is not None and
ack_deadline_seconds != existing['ackDeadlineSeconds']):
raise Error('Can\'t change ack deadline of an existing subscription')
if push_config is not None and push_config != existing['pushConfig']:
'POST', '%s:modifyPushConfig' % subscription,
payload={'pushConfig': push_config})
def create_subscription():
'PUT', subscription,
'topic': topic,
'pushConfig': push_config or {},
'ackDeadlineSeconds': ack_deadline_seconds or 60,
_with_existing_topic(topic, create_subscription)
def ensure_topic_deleted(topic):
"""Deletes a topic if it exists.
topic: Full name of the topic.
Error or TransientError.
assert validate_full_name(topic, 'topics'), topic
_call('DELETE', topic, accepted_http_statuses=[404])
def ensure_subscription_deleted(subscription):
"""Deletes a subscription if it exists.
subscription: Full name of the subscription.
Error or TransientError.
assert validate_full_name(subscription, 'subscriptions'), subscription
_call('DELETE', subscription, accepted_http_statuses=[404])
def iam_policy(object_name):
"""Changes IAM policy for an existing subscription or topic.
Reads current policy, invokes the context manager body, writes modified policy
back. Uses etags for concurrency control. Autocreates a topic if object_name
is referencing a topic that doesn't exist.
Usage example:
with pubsub.iam_policy(...) as policy:
policy.add_member('roles/viewer', '')
object_name: Full subscription or topic name.
Error or TransientError.
is_sub = validate_full_name(object_name, 'subscriptions')
is_topic = validate_full_name(object_name, 'topics')
assert is_sub or is_topic, object_name
def get_policy():
return _call('GET', '%s:getIamPolicy' % object_name)
# We can create topics on the fly (but not subscriptions).
if is_topic:
policy = _with_existing_topic(object_name, get_policy)
policy = get_policy()
copied = IAMPolicy(copy.deepcopy(policy))
yield copied
if copied.policy != policy:
'POST', '%s:setIamPolicy' % object_name,
payload={'policy': copied.policy})
class IAMPolicy(object):
"""Wrapper around IAM policy dict for simpler modification.
def __init__(self, policy):
assert isinstance(policy, dict), policy
self.policy = policy
def members(self, role):
"""Returns list of members with given role, or empty list if none.
role: Role name, such as 'roles/viewer'.
for b in self.policy.get('bindings', []):
if b.get('role') == role:
return list(b.get('members', []))
return []
def add_member(self, role, member):
"""Adds a member to some role.
role: Role name, such as 'roles/viewer'.
member: ID of the member to add, such as ''.
True if added, False if was already there.
role_dict = None
for b in self.policy.get('bindings', []):
if b.get('role') == role:
role_dict = b
if role_dict is None:
self.policy.setdefault('bindings', []).append({
'role': role,
'members': [member],
return True
if member in role_dict.get('members', []):
return False
role_dict.setdefault('members', []).append(member)
return True
def remove_member(self, role, member):
"""Removes a member from some role.
role: Role name, such as 'roles/viewer'.
member: ID of the member to add, such as ''.
True if removed, False if wasn't there.
for b in self.policy.get('bindings', []):
if b.get('role') == role:
members = b.get('members')
if not members or member not in members:
return False
return True
return False
class SubscriptionHandler(webapp2.RequestHandler):
"""Base class for defining Pub/Sub subscription handlers."""
# TODO(smut): Keep in datastore. See components/datastore_utils.
TOPIC = None
def get_subscription_name(cls):
"""Returns full name of the subscription."""
return full_subscription_name(cls.SUBSCRIPTION_PROJECT, cls.SUBSCRIPTION)
def get_topic_name(cls):
"""Returns full name of the topic."""
return full_topic_name(cls.TOPIC_PROJECT, cls.TOPIC)
def unsubscribe(cls):
"""Unsubscribes from a Cloud Pub/Sub project."""
def ensure_subscribed(cls, push=False):
"""Ensures a Cloud Pub/Sub subscription exists.
Can also be used to change subscription type between push and pull.
push: Whether or not to create a push subscription. Defaults to pull.
push_config={'pushEndpoint': cls.ENDPOINT} if push else {})
def is_subscribed(cls):
"""Returns whether or not a Cloud Pub/Sub subscription exists.
True if the subscription exists, False otherwise.
return bool(get_subscription(cls.get_subscription_name()))
def get(self):
"""Queries for Pub/Sub messages."""
response = _call(
'POST', '%s:pull' % self.get_subscription_name(),
'maxMessages': self.MAX_MESSAGES,
'returnImmediately': True,
message_ids = []
for received_message in response.get('receivedMessages', []):
attributes = received_message.get('message', {}).get('attributes', {})
message = received_message.get('message', {}).get('data', '')
'Received Pub/Sub message:\n%s\nAttributes:\n%s', message, attributes)
# TODO(smut): Process messages in parallel.
self.process_message(message, attributes)
if message_ids:
'POST', '%s:acknowledge' % self.get_subscription_name(),
payload={'ackIds': message_ids})
def post(self):
"""Handles a Pub/Sub push message."""
# TODO(smut): Ensure message came from Cloud Pub/Sub.
# Since anyone can post to this endpoint, we need to ensure the message
# actually came from Cloud Pub/Sub. Unfortunately, there aren't any
# useful headers set that can guarantee this.
attributes = self.request.json.get('message', {}).get('attributes', {})
message = self.request.json.get('message', {}).get('data', '')
subscription = self.request.json.get('subscription')
if subscription != self.get_subscription_name():
self.response.headers['Content-Type'] = 'text/plain; charset=utf-8'
logging.error('Ignoring unexpected subscription: %s', subscription)
self.abort(403, 'Unexpected subscription: %s' % subscription)
'Received Pub/Sub message:\n%s\nAttributes:\n%s', message, attributes)
return self.process_message(message, attributes)
def process_message(self, message, attributes):
"""Process a Pub/Sub message.
message: The message string.
attributes: A dict of key/value pairs representing attributes associated
with this message.
A webapp2.Response instance, or None.
raise NotImplementedError()