blob: 9307c0802f5697c56d6f3faf4b6c1845101457da [file] [log] [blame]
# Copyright 2017 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Common gRPC implementation for Swarming and Isolate"""
import logging
import os
import re
import time
import types
import urlparse
from utils import net
# gRPC may not be installed on the worker machine. This is fine, as long as
# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
# Full external requirements are: grpcio, certifi.
try:
import grpc
from google import auth as google_auth
from google.auth.transport import grpc as google_auth_transport_grpc
from google.auth.transport import requests as google_auth_transport_requests
except ImportError as err:
grpc = None
# If gRPC was successfully imported, try to import certifi as well. This is not
# actually used anywhere in this module, but if certifi is missing,
# google.auth.transport will fail (see
# https://stackoverflow.com/questions/24973326). So checking it here allows us
# to print out a somewhat-sane error message.
certifi = None
if grpc is not None:
try:
import certifi
except ImportError:
# Will print out error messages later (ie when we have a logger)
pass
# How many times to retry a gRPC call
MAX_GRPC_ATTEMPTS = 30
# Longest time to sleep between gRPC calls
MAX_GRPC_SLEEP = 10.
# Start the timeout at three minutes.
GRPC_TIMEOUT_SEC = 3 * 60
def available():
"""Returns true if gRPC can be used on this host."""
return grpc != None
class Proxy(object):
"""Represents a gRPC proxy.
If the proxy begins with 'https', the returned channel will be secure and
authorized using default application credentials - see
https://developers.google.com/identity/protocols/application-default-credentials.
Currently, we're using Cloud Container Builder scopes for testing; this may
change in the future to allow different scopes to be passed in for different
channels.
To use the returned channel to call methods directly, say:
proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
myapi_pb2.MyApiStub)
To make a unary call with retries (recommended):
proto_output = proxy.call_unary('MyMethod', proto_input)
To make a unary call without retries, or to pass in a client side stream
(proto_input can be an iterator here):
proto_output = proxy.call_no_retries('MyMethod', proto_input)
You can also call the stub directly (not recommended, since no errors will be
caught or logged):
proto_output = proxy.stub.MyMethod(proto_input)
To make a call to a server-side streaming call (these are not retried):
for response in proxy.get_stream('MyStreaminingMethod', proto_input):
<process response>
To retrieve the prefix:
prefix = proxy.prefix # returns "prefix/for/resource/names"
All exceptions are logged using "logging.warning."
"""
def __init__(self, proxy, stub_class):
self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE')
if self._verbose:
logging.info('Enabled verbose mode for %s with stub %s',
proxy, stub_class.__name__)
# NB: everything in url is unicode; convert to strings where
# needed.
url = urlparse.urlparse(proxy)
if self._verbose:
logging.info('Parsed URL for proxy is %r', url)
if url.scheme == 'http':
self._secure = False
elif url.scheme == 'https':
self._secure = True
else:
raise ValueError('gRPC proxy %s must use http[s], not %s' % (
proxy, url.scheme))
if url.netloc == '':
raise ValueError('gRPC proxy is missing hostname: %s' % proxy)
self._host = url.netloc
self._prefix = url.path
if self._prefix.endswith('/'):
self._prefix = self._prefix[:-1]
if self._prefix.startswith('/'):
self._prefix = self._prefix[1:]
if url.params != '' or url.fragment != '':
raise ValueError('gRPC proxy may not contain params or fragments: %s' %
proxy)
self._debug_info = ['full proxy name: ' + proxy]
self._channel = self._create_channel()
self._stub = stub_class(self._channel)
logging.info('%s: initialized', self.name)
if self._verbose:
self._dump_proxy_info()
@property
def prefix(self):
return self._prefix
@property
def channel(self):
return self._channel
@property
def stub(self):
return self._stub
@property
def name(self):
security = 'insecure'
if self._secure:
security = 'secure'
return 'gRPC %s proxy %s/%s' % (
security, self._host, self._stub.__class__.__name__)
def call_unary(self, name, request):
"""Calls a method, waiting if the service is not available.
Note that it stores the request of generator type into a list for future
retries, so it is not memory-friendly for streaming requests of large size.
Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
"""
# If the request is a generator, store it to a list which can be reused in
# retries.
is_generator = False
if isinstance(request, types.GeneratorType):
request = list(request)
is_generator = True
for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
try:
return self.call_no_retries(name, request
if not is_generator else iter(request))
except grpc.RpcError as g:
if g.code() is not grpc.StatusCode.UNAVAILABLE:
raise
logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
self.name, attempt, MAX_GRPC_ATTEMPTS)
# Save the error in case we need to return it
grpc_error = g
time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
# If we get here, it must be because we got (and saved) an error
assert grpc_error is not None
raise grpc_error
def get_stream(self, name, request):
"""Calls a server-side streaming method, returning an iterator.
Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
"""
stream = self.call_no_retries(name, request)
while True:
# The lambda "next(stream, 1)" will return a protobuf on success, or the
# integer 1 if the stream has ended. This allows us to avoid attempting
# to catch StopIteration, which gets logged by _wrap_grpc_operation.
response = self._wrap_grpc_operation(name + ' pull from stream',
lambda: next(stream, 1))
if isinstance(response, int):
# Iteration is finished
return
yield response
def call_no_retries(self, name, request):
"""Calls a method without any retries.
Recommended for client-side streaming or nonidempotent unary calls.
"""
method = getattr(self._stub, name)
if method is None:
raise NameError('%s: "%s" is not a valid method name', self.name, name)
return self._wrap_grpc_operation(
name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))
def _wrap_grpc_operation(self, name, fn):
"""Wraps a gRPC operation (call or iterator increment) for logging."""
if self._verbose:
logging.info('%s/%s - starting gRPC operation', self.name, name)
try:
return fn()
except grpc.RpcError as g:
logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g)
self._dump_proxy_info()
raise g
except Exception as e:
logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e)
self._dump_proxy_info()
raise e
finally:
if self._verbose:
logging.info('%s/%s - finished gRPC operation', self.name, name)
def _dump_proxy_info(self):
logging.warning('DETAILED PROXY INFO')
logging.warning('prefix = %s', self.prefix)
logging.warning('debug info:\n\t%s\n\n',
'\n\t'.join(self._debug_info))
def _create_channel(self):
# Make sure grpc was successfully imported
assert available()
if not self._secure:
return grpc.insecure_channel(self._host)
# Authenticate the host.
#
# You're allowed to override the root certs and server if necessary. For
# example, if you're running your proxy on localhost, you'll need to set
# GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate
# for the root CA that your localhost server has used to certify itself, and
# the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to
# identify itself. For example, the ROOTS env var might be
# "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if
# this is what's used by the server you're running.
#
# If you're connecting to a real server with real SSL, none of this should
# be used.
if not certifi:
self._debug_info.append('CERTIFI IS NOT PRESENT;' +
' gRPC HTTPS CONNECTIONS MAY FAIL')
root_certs = None
roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS')
if roots:
self._debug_info.append('Overridden root CA: %s' % roots)
with open(roots) as f:
root_certs = f.read()
else:
self._debug_info.append('Using default root CAs from certifi')
overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE')
options = ()
if overd:
options=(('grpc.ssl_target_name_override', overd),)
ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
# Authenticate the user.
scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
self._debug_info.append('Scopes are: %r' % scopes)
user_creds, _ = google_auth.default(scopes=scopes)
# Create the channel.
request = google_auth_transport_requests.Request()
if len(options) > 0:
self._debug_info.append('Options are: %r' % options)
else:
self._debug_info.append('No options used')
return google_auth_transport_grpc.secure_authorized_channel(
user_creds, request, self._host, ssl_creds, options=options)