blob: 49092d089f40b31c5b5b8f9fa7d14f6ab4b4d0ef [file] [log] [blame]
# Copyright 2013 Google Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Util functions and classes for cloudstorage_api."""
__all__ = ['set_default_retry_params',
import copy
import httplib
import logging
import math
import os
import threading
import time
import urllib
from google.appengine.api import urlfetch
from google.appengine.datastore import datastore_rpc
from google.appengine.ext.ndb import eventloop
from google.appengine.ext.ndb import utils
from google.appengine import runtime
from google.appengine.runtime import apiproxy_errors
except ImportError:
from google.appengine.api import urlfetch
from google.appengine.datastore import datastore_rpc
from google.appengine import runtime
from google.appengine.runtime import apiproxy_errors
from google.appengine.ext.ndb import eventloop
from google.appengine.ext.ndb import utils
_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
_thread_local_settings = threading.local()
_thread_local_settings.default_retry_params = None
def set_default_retry_params(retry_params):
"""Set a default RetryParams for current thread current request."""
_thread_local_settings.default_retry_params = copy.copy(retry_params)
def _get_default_retry_params():
"""Get default RetryParams for current request and current thread.
A new instance of the default RetryParams.
default = getattr(_thread_local_settings, 'default_retry_params', None)
if default is None or not default.belong_to_current_request():
return RetryParams()
return copy.copy(default)
def _quote_filename(filename):
"""Quotes filename to use as a valid URI path.
filename: user provided filename. /bucket/filename.
The filename properly quoted to use as URI's path component.
return urllib.quote(filename)
def _unquote_filename(filename):
"""Unquotes a valid URI path back to its filename.
This is the opposite of _quote_filename.
filename: a quoted filename. /bucket/some%20filename.
The filename unquoted.
return urllib.unquote(filename)
def _should_retry(resp):
"""Given a urlfetch response, decide whether to retry that request."""
return (resp.status_code == httplib.REQUEST_TIMEOUT or
(resp.status_code >= 500 and
resp.status_code < 600))
class RetryParams(object):
"""Retry configuration parameters."""
def __init__(self,
This object is unique per request per thread.
Library will retry according to this setting when App Engine Server
can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
500-600 response.
backoff_factor: exponential backoff multiplier.
initial_delay: seconds to delay for the first retry.
max_delay: max seconds to delay for every retry.
min_retries: min number of times to retry. This value is automatically
capped by max_retries.
max_retries: max number of times to retry. Set this to 0 for no retry.
max_retry_period: max total seconds spent on retry. Retry stops when
this period passed AND min_retries has been attempted.
urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
in which case the value will be chosen by urlfetch module.
save_access_token: persist access token to datastore to avoid
excessive usage of GetAccessToken API. Usually the token is cached
in process and in memcache. In some cases, memcache isn't very
self.backoff_factor = self._check('backoff_factor', backoff_factor)
self.initial_delay = self._check('initial_delay', initial_delay)
self.max_delay = self._check('max_delay', max_delay)
self.max_retry_period = self._check('max_retry_period', max_retry_period)
self.max_retries = self._check('max_retries', max_retries, True, int)
self.min_retries = self._check('min_retries', min_retries, True, int)
if self.min_retries > self.max_retries:
self.min_retries = self.max_retries
self.urlfetch_timeout = None
if urlfetch_timeout is not None:
self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
self.save_access_token = self._check('save_access_token', save_access_token,
True, bool)
self._request_id = os.getenv('REQUEST_LOG_ID')
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
def _check(cls, name, val, can_be_zero=False, val_type=float):
"""Check init arguments.
name: name of the argument. For logging purpose.
val: value. Value has to be non negative number.
can_be_zero: whether value can be zero.
val_type: Python type of the value.
The value.
ValueError: when invalid value is passed in.
TypeError: when invalid value type is passed in.
valid_types = [val_type]
if val_type is float:
if type(val) not in valid_types:
raise TypeError(
'Expect type %s for parameter %s' % (val_type.__name__, name))
if val < 0:
raise ValueError(
'Value for parameter %s has to be greater than 0' % name)
if not can_be_zero and val == 0:
raise ValueError(
'Value for parameter %s can not be 0' % name)
return val
def belong_to_current_request(self):
return os.getenv('REQUEST_LOG_ID') == self._request_id
def delay(self, n, start_time):
"""Calculate delay before the next retry.
n: the number of current attempt. The first attempt should be 1.
start_time: the time when retry started in unix time.
Number of seconds to wait before next retry. -1 if retry should give up.
if (n > self.max_retries or
(n > self.min_retries and
time.time() - start_time > self.max_retry_period)):
return -1
return min(
math.pow(self.backoff_factor, n-1) * self.initial_delay,
def _retry_fetch(url, retry_params, **kwds):
"""A blocking fetch function similar to urlfetch.fetch.
This function should be used when a urlfetch has timed out or the response
shows http request timeout. This function will put current thread to
sleep between retry backoffs.
url: url to fetch.
retry_params: an instance of RetryParams.
**kwds: keyword arguments for urlfetch. If deadline is specified in kwds,
it precedes the one in RetryParams. If none is specified, it's up to
urlfetch to use its own default.
A urlfetch response from the last retry. None if no retry was attempted.
Whatever exception encountered during the last retry.
n = 1
start_time = time.time()
delay = retry_params.delay(n, start_time)
if delay <= 0:
return'Will retry request to %s.', url)
while delay > 0:
resp = None
try:'Retry in %s seconds.', delay)
resp = urlfetch.fetch(url, **kwds)
except runtime.DeadlineExceededError:
'Urlfetch retry %s will exceed request deadline '
'after %s seconds total', n, time.time() - start_time)
n += 1
delay = retry_params.delay(n, start_time)
if resp and not _should_retry(resp):
elif resp:
'Got status %s from GCS.', resp.status_code)
'Got exception "%r" while contacting GCS.', e)
if resp:
return resp'Urlfetch failed after %s retries and %s seconds in total.',
n - 1, time.time() - start_time)
def _run_until_rpc():
"""Eagerly evaluate tasklets until it is blocking on some RPC.
Usually ndb eventloop el isn't run until some code calls future.get_result().
When an async tasklet is called, the tasklet wrapper evaluates the tasklet
code into a generator, enqueues a callback _help_tasklet_along onto
the el.current queue, and returns a future.
_help_tasklet_along, when called by the el, will
get one yielded value from the generator. If the value if another future,
set up a callback _on_future_complete to invoke _help_tasklet_along
when the dependent future fulfills. If the value if a RPC, set up a
callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
Thus _help_tasklet_along drills down
the chain of futures until some future is blocked by RPC. El runs
all callbacks and constantly check pending RPC status.
el = eventloop.get_event_loop()
while el.current:
def _eager_tasklet(tasklet):
"""Decorator to turn tasklet to run eagerly."""
def eager_wrapper(*args, **kwds):
fut = tasklet(*args, **kwds)
return fut
return eager_wrapper