blob: 64714891bfe27552b9a35a5c5d334340446438c7 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tool for deploying apps to an app server.
Currently, the application only uploads new appversions. To do this, it first
walks the directory tree rooted at the path the user specifies, adding all the
files it finds to a list. It then uploads the application configuration
(app.yaml) to the server using HTTP, followed by uploading each of the files.
It then commits the transaction with another request.
The bulk of this work is handled by the AppVersionUpload class, which exposes
methods to add to the list of files, fetch a list of modified files, upload
files, and commit or rollback the transaction.
"""
from __future__ import with_statement
import calendar
import contextlib
import copy
import datetime
import errno
import getpass
import hashlib
import logging
import mimetypes
import optparse
import os
import random
import re
import shutil
import subprocess
import sys
import tempfile
import time
import urllib
import urllib2
import google
import yaml
from google.appengine.cron import groctimespecification
from google.appengine.api import appinfo
from google.appengine.api import appinfo_includes
from google.appengine.api import backendinfo
from google.appengine.api import client_deployinfo
from google.appengine.api import croninfo
from google.appengine.api import dispatchinfo
from google.appengine.api import dosinfo
from google.appengine.api import queueinfo
from google.appengine.api import yaml_errors
from google.appengine.api import yaml_object
from google.appengine.datastore import datastore_index
from google.appengine.tools import appengine_rpc
try:
from google.appengine.tools import appengine_rpc_httplib2
except ImportError:
appengine_rpc_httplib2 = None
if sys.version_info[:2] >= (2, 7):
from google.appengine.tools import appcfg_java
else:
appcfg_java = None
from google.appengine.tools import augment_mimetypes
from google.appengine.tools import bulkloader
from google.appengine.tools import sdk_update_checker
LIST_DELIMITER = '\n'
TUPLE_DELIMITER = '|'
BACKENDS_ACTION = 'backends'
BACKENDS_MESSAGE = ('Warning: This application uses Backends, a deprecated '
'feature that has been replaced by Modules, which '
'offers additional functionality. Please convert your '
'backends to modules as described at: ')
_CONVERTING_URL = (
'https://developers.google.com/appengine/docs/%s/modules/converting')
MAX_LOG_LEVEL = 4
MAX_BATCH_SIZE = 3200000
MAX_BATCH_COUNT = 100
MAX_BATCH_FILE_SIZE = 200000
BATCH_OVERHEAD = 500
verbosity = 1
PREFIXED_BY_ADMIN_CONSOLE_RE = '^(?:admin-console)(.*)'
SDK_PRODUCT = 'appcfg_py'
DAY = 24*3600
SUNDAY = 6
SUPPORTED_RUNTIMES = (
'contrib-dart', 'dart', 'go', 'php', 'python', 'python27', 'java', 'java7',
'vm', 'custom')
MEGA = 1024 * 1024
MILLION = 1000 * 1000
DEFAULT_RESOURCE_LIMITS = {
'max_file_size': 32 * MILLION,
'max_blob_size': 32 * MILLION,
'max_files_to_clone': 100,
'max_total_file_size': 150 * MEGA,
'max_file_count': 10000,
}
# Client ID and secrets are managed in the Google API console.
APPCFG_CLIENT_ID = '550516889912.apps.googleusercontent.com'
APPCFG_CLIENT_NOTSOSECRET = 'ykPq-0UYfKNprLRjVx1hBBar'
APPCFG_SCOPES = ('https://www.googleapis.com/auth/appengine.admin',
'https://www.googleapis.com/auth/cloud-platform')
STATIC_FILE_PREFIX = '__static__'
METADATA_BASE = 'http://metadata.google.internal'
SERVICE_ACCOUNT_BASE = (
'computeMetadata/v1beta1/instance/service-accounts/default')
APP_YAML_FILENAME = 'app.yaml'
GO_APP_BUILDER = os.path.join('goroot', 'bin', 'go-app-builder')
if sys.platform.startswith('win'):
GO_APP_BUILDER += '.exe'
augment_mimetypes.init()
class Error(Exception):
pass
class OAuthNotAvailable(Error):
"""The appengine_rpc_httplib2 module could not be imported."""
pass
class CannotStartServingError(Error):
"""We could not start serving the version being uploaded."""
pass
def PrintUpdate(msg, error_fh=sys.stderr):
"""Print a message to stderr or the given file-like object.
If 'verbosity' is greater than 0, print the message.
Args:
msg: The string to print.
error_fh: Where to send the message.
"""
if verbosity > 0:
timestamp = datetime.datetime.now()
print >>error_fh, '%s %s' % (timestamp.strftime('%I:%M %p'), msg)
def StatusUpdate(msg, error_fh=sys.stderr):
"""Print a status message to stderr or the given file-like object."""
PrintUpdate(msg, error_fh)
def BackendsStatusUpdate(runtime, error_fh=sys.stderr):
"""Print the Backends status message based on current runtime.
Args:
runtime: String name of current runtime.
error_fh: Where to send the message.
"""
language = runtime
if language == 'python27':
language = 'python'
elif language == 'java7':
language = 'java'
if language == 'python' or language == 'java':
StatusUpdate(BACKENDS_MESSAGE + (_CONVERTING_URL % language), error_fh)
def ErrorUpdate(msg, error_fh=sys.stderr):
"""Print an error message to stderr."""
PrintUpdate(msg, error_fh)
def _PrintErrorAndExit(stream, msg, exit_code=2):
"""Prints the given error message and exists the program.
Args:
stream: The stream (e.g. StringIO or file) to write the message to.
msg: The error message to display as a string.
exit_code: The integer code to pass to sys.exit().
"""
stream.write(msg)
sys.exit(exit_code)
def JavaSupported():
"""True if Java is supported by this SDK."""
tools_java_dir = os.path.join(os.path.dirname(appcfg_java.__file__), 'java')
return os.path.isdir(tools_java_dir)
@contextlib.contextmanager
def TempChangeField(obj, field_name, new_value):
"""Context manager to change a field value on an object temporarily.
Args:
obj: The object to change the field on.
field_name: The field name to change.
new_value: The new value.
Yields:
The old value.
"""
old_value = getattr(obj, field_name)
setattr(obj, field_name, new_value)
yield old_value
setattr(obj, field_name, old_value)
class FileClassification(object):
"""A class to hold a file's classification.
This class both abstracts away the details of how we determine
whether a file is a regular, static or error file as well as acting
as a container for various metadata about the file.
"""
def __init__(self, config, filename, error_fh=sys.stderr):
"""Initializes a FileClassification instance.
Args:
config: The app.yaml object to check the filename against.
filename: The name of the file.
error_fh: Where to send status and error messages.
"""
self.__error_fh = error_fh
self.__static_mime_type = self.__GetMimeTypeIfStaticFile(config, filename)
self.__static_app_readable = self.__GetAppReadableIfStaticFile(config,
filename)
self.__error_mime_type, self.__error_code = self.__LookupErrorBlob(config,
filename)
def __GetMimeTypeIfStaticFile(self, config, filename):
"""Looks up the mime type for 'filename'.
Uses the handlers in 'config' to determine if the file should
be treated as a static file.
Args:
config: The app.yaml object to check the filename against.
filename: The name of the file.
Returns:
The mime type string. For example, 'text/plain' or 'image/gif'.
None if this is not a static file.
"""
if self.__FileNameImpliesStaticFile(filename):
return self.__MimeType(filename)
for handler in config.handlers:
handler_type = handler.GetHandlerType()
if handler_type in ('static_dir', 'static_files'):
if handler_type == 'static_dir':
regex = os.path.join(re.escape(handler.GetHandler()), '.*')
else:
regex = handler.upload
if re.match(regex, filename):
return handler.mime_type or self.__MimeType(filename)
return None
@staticmethod
def __FileNameImpliesStaticFile(filename):
"""True if the name of a file implies that it is a static resource.
For Java applications specified with web.xml and appengine-web.xml, we
create a staging directory that includes a __static__ hierarchy containing
links to all files that are implied static by the contents of those XML
files. So if a file has been copied into that directory then we can assume
it is static.
Args:
filename: The full path to the file.
Returns:
True if the file should be considered a static resource based on its name.
"""
static = '__static__' + os.sep
return static in filename
@staticmethod
def __GetAppReadableIfStaticFile(config, filename):
"""Looks up whether a static file is readable by the application.
Uses the handlers in 'config' to determine if the file should
be treated as a static file and if so, if the file should be readable by the
application.
Args:
config: The AppInfoExternal object to check the filename against.
filename: The name of the file.
Returns:
True if the file is static and marked as app readable, False otherwise.
"""
for handler in config.handlers:
handler_type = handler.GetHandlerType()
if handler_type in ('static_dir', 'static_files'):
if handler_type == 'static_dir':
regex = os.path.join(re.escape(handler.GetHandler()), '.*')
else:
regex = handler.upload
if re.match(regex, filename):
return handler.application_readable
return False
def __LookupErrorBlob(self, config, filename):
"""Looks up the mime type and error_code for 'filename'.
Uses the error handlers in 'config' to determine if the file should
be treated as an error blob.
Args:
config: The app.yaml object to check the filename against.
filename: The name of the file.
Returns:
A tuple of (mime_type, error_code), or (None, None) if this is not an
error blob. For example, ('text/plain', default) or ('image/gif',
timeout) or (None, None).
"""
if not config.error_handlers:
return (None, None)
for error_handler in config.error_handlers:
if error_handler.file == filename:
error_code = error_handler.error_code
error_code = error_code or 'default'
if error_handler.mime_type:
return (error_handler.mime_type, error_code)
else:
return (self.__MimeType(filename), error_code)
return (None, None)
def __MimeType(self, filename, default='application/octet-stream'):
guess = mimetypes.guess_type(filename)[0]
if guess is None:
print >>self.__error_fh, ('Could not guess mimetype for %s. Using %s.'
% (filename, default))
return default
return guess
def IsApplicationFile(self):
return bool((not self.IsStaticFile() or self.__static_app_readable) and
not self.IsErrorFile())
def IsStaticFile(self):
return bool(self.__static_mime_type)
def StaticMimeType(self):
return self.__static_mime_type
def IsErrorFile(self):
return bool(self.__error_mime_type)
def ErrorMimeType(self):
return self.__error_mime_type
def ErrorCode(self):
return self.__error_code
def BuildClonePostBody(file_tuples):
"""Build the post body for the /api/clone{files,blobs,errorblobs} urls.
Args:
file_tuples: A list of tuples. Each tuple should contain the entries
appropriate for the endpoint in question.
Returns:
A string containing the properly delimited tuples.
"""
file_list = []
for tup in file_tuples:
path = tup[1]
tup = tup[2:]
file_list.append(TUPLE_DELIMITER.join([path] + list(tup)))
return LIST_DELIMITER.join(file_list)
def _GetRemoteResourceLimits(logging_context):
"""Get the resource limit as reported by the admin console.
Get the resource limits by querying the admin_console/appserver. The
actual limits returned depends on the server we are talking to and
could be missing values we expect or include extra values.
Args:
logging_context: The _ClientDeployLoggingContext for this upload.
Returns:
A dictionary.
"""
try:
yaml_data = logging_context.Send('/api/appversion/getresourcelimits')
except urllib2.HTTPError, err:
if err.code != 404:
raise
return {}
return yaml.safe_load(yaml_data)
def GetResourceLimits(logging_context, error_fh=sys.stderr):
"""Gets the resource limits.
Gets the resource limits that should be applied to apps. Any values
that the server does not know about will have their default value
reported (although it is also possible for the server to report
values we don't know about).
Args:
logging_context: The _ClientDeployLoggingContext for this upload.
error_fh: Where to send status and error messages.
Returns:
A dictionary.
"""
resource_limits = DEFAULT_RESOURCE_LIMITS.copy()
StatusUpdate('Getting current resource limits.', error_fh)
resource_limits.update(_GetRemoteResourceLimits(logging_context))
logging.debug('Using resource limits: %s', resource_limits)
return resource_limits
def RetryWithBackoff(callable_func, retry_notify_func,
initial_delay=1, backoff_factor=2,
max_delay=60, max_tries=20):
"""Calls a function multiple times, backing off more and more each time.
Args:
callable_func: A function that performs some operation that should be
retried a number of times upon failure. Signature: () -> (done, value)
If 'done' is True, we'll immediately return (True, value)
If 'done' is False, we'll delay a bit and try again, unless we've
hit the 'max_tries' limit, in which case we'll return (False, value).
retry_notify_func: This function will be called immediately before the
next retry delay. Signature: (value, delay) -> None
'value' is the value returned by the last call to 'callable_func'
'delay' is the retry delay, in seconds
initial_delay: Initial delay after first try, in seconds.
backoff_factor: Delay will be multiplied by this factor after each try.
max_delay: Maximum delay, in seconds.
max_tries: Maximum number of tries (the first one counts).
Returns:
What the last call to 'callable_func' returned, which is of the form
(done, value). If 'done' is True, you know 'callable_func' returned True
before we ran out of retries. If 'done' is False, you know 'callable_func'
kept returning False and we ran out of retries.
Raises:
Whatever the function raises--an exception will immediately stop retries.
"""
delay = initial_delay
num_tries = 0
while True:
done, opaque_value = callable_func()
num_tries += 1
if done:
return True, opaque_value
if num_tries >= max_tries:
return False, opaque_value
retry_notify_func(opaque_value, delay)
time.sleep(delay)
delay = min(delay * backoff_factor, max_delay)
def RetryNoBackoff(callable_func,
retry_notify_func,
delay=5,
max_tries=200):
"""Calls a function multiple times, with the same delay each time.
Args:
callable_func: A function that performs some operation that should be
retried a number of times upon failure. Signature: () -> (done, value)
If 'done' is True, we'll immediately return (True, value)
If 'done' is False, we'll delay a bit and try again, unless we've
hit the 'max_tries' limit, in which case we'll return (False, value).
retry_notify_func: This function will be called immediately before the
next retry delay. Signature: (value, delay) -> None
'value' is the value returned by the last call to 'callable_func'
'delay' is the retry delay, in seconds
delay: Delay between tries, in seconds.
max_tries: Maximum number of tries (the first one counts).
Returns:
What the last call to 'callable_func' returned, which is of the form
(done, value). If 'done' is True, you know 'callable_func' returned True
before we ran out of retries. If 'done' is False, you know 'callable_func'
kept returning False and we ran out of retries.
Raises:
Whatever the function raises--an exception will immediately stop retries.
"""
return RetryWithBackoff(callable_func,
retry_notify_func,
delay,
1,
delay,
max_tries)
def MigratePython27Notice():
"""Tells the user that Python 2.5 runtime is deprecated.
Encourages the user to migrate from Python 2.5 to Python 2.7.
Prints a message to sys.stdout. The caller should have tested that the user is
using Python 2.5, so as not to spuriously display this message.
"""
print (
'WARNING: This application is using the Python 2.5 runtime, which is '
'deprecated! It should be updated to the Python 2.7 runtime as soon as '
'possible, which offers performance improvements and many new features. '
'Learn how simple it is to migrate your application to Python 2.7 at '
'https://developers.google.com/appengine/docs/python/python25/migrate27.')
class IndexDefinitionUpload(object):
"""Provides facilities to upload index definitions to the hosting service."""
def __init__(self, rpcserver, definitions, error_fh=sys.stderr):
"""Creates a new DatastoreIndexUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of HttpRpcServer
or TestRpcServer.
definitions: An IndexDefinitions object.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.definitions = definitions
self.error_fh = error_fh
def DoUpload(self):
"""Uploads the index definitions."""
StatusUpdate('Uploading index definitions.', self.error_fh)
with TempChangeField(self.definitions, 'application', None) as app_id:
self.rpcserver.Send('/api/datastore/index/add',
app_id=app_id,
payload=self.definitions.ToYAML())
class CronEntryUpload(object):
"""Provides facilities to upload cron entries to the hosting service."""
def __init__(self, rpcserver, cron, error_fh=sys.stderr):
"""Creates a new CronEntryUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of a subclass of
AbstractRpcServer
cron: The CronInfoExternal object loaded from the cron.yaml file.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.cron = cron
self.error_fh = error_fh
def DoUpload(self):
"""Uploads the cron entries."""
StatusUpdate('Uploading cron entries.', self.error_fh)
with TempChangeField(self.cron, 'application', None) as app_id:
self.rpcserver.Send('/api/cron/update',
app_id=app_id,
payload=self.cron.ToYAML())
class QueueEntryUpload(object):
"""Provides facilities to upload task queue entries to the hosting service."""
def __init__(self, rpcserver, queue, error_fh=sys.stderr):
"""Creates a new QueueEntryUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of a subclass of
AbstractRpcServer
queue: The QueueInfoExternal object loaded from the queue.yaml file.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.queue = queue
self.error_fh = error_fh
def DoUpload(self):
"""Uploads the task queue entries."""
StatusUpdate('Uploading task queue entries.', self.error_fh)
with TempChangeField(self.queue, 'application', None) as app_id:
self.rpcserver.Send('/api/queue/update',
app_id=app_id,
payload=self.queue.ToYAML())
class DispatchEntryUpload(object):
"""Provides facilities to upload dispatch entries to the hosting service."""
def __init__(self, rpcserver, dispatch, error_fh=sys.stderr):
"""Creates a new DispatchEntryUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of a subclass of
AbstractRpcServer
dispatch: The DispatchInfoExternal object loaded from the dispatch.yaml
file.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.dispatch = dispatch
self.error_fh = error_fh
def DoUpload(self):
"""Uploads the dispatch entries."""
StatusUpdate('Uploading dispatch entries.', self.error_fh)
self.rpcserver.Send('/api/dispatch/update',
app_id=self.dispatch.application,
payload=self.dispatch.ToYAML())
class DosEntryUpload(object):
"""Provides facilities to upload dos entries to the hosting service."""
def __init__(self, rpcserver, dos, error_fh=sys.stderr):
"""Creates a new DosEntryUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of a subclass of
AbstractRpcServer.
dos: The DosInfoExternal object loaded from the dos.yaml file.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.dos = dos
self.error_fh = error_fh
def DoUpload(self):
"""Uploads the dos entries."""
StatusUpdate('Uploading DOS entries.', self.error_fh)
with TempChangeField(self.dos, 'application', None) as app_id:
self.rpcserver.Send('/api/dos/update',
app_id=app_id,
payload=self.dos.ToYAML())
class PagespeedEntryUpload(object):
"""Provides facilities to upload pagespeed configs to the hosting service."""
def __init__(self, rpcserver, config, pagespeed, error_fh=sys.stderr):
"""Creates a new PagespeedEntryUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of a subclass of
AbstractRpcServer.
config: The AppInfoExternal object derived from the app.yaml file.
pagespeed: The PagespeedEntry object from config.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.config = config
self.pagespeed = pagespeed
self.error_fh = error_fh
def DoUpload(self):
"""Uploads the pagespeed entries."""
pagespeed_yaml = ''
if self.pagespeed:
StatusUpdate('Uploading PageSpeed configuration.', self.error_fh)
pagespeed_yaml = self.pagespeed.ToYAML()
try:
self.rpcserver.Send('/api/appversion/updatepagespeed',
app_id=self.config.application,
version=self.config.version,
payload=pagespeed_yaml)
except urllib2.HTTPError, err:
if err.code != 404 or self.pagespeed is not None:
raise
class DefaultVersionSet(object):
"""Provides facilities to set the default (serving) version."""
def __init__(self, rpcserver, app_id, module, version, error_fh=sys.stderr):
"""Creates a new DefaultVersionSet.
Args:
rpcserver: The RPC server to use. Should be an instance of a subclass of
AbstractRpcServer.
app_id: The application to make the change to.
module: The module to set the default version of (if any).
version: The version to set as the default.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.app_id = app_id
self.module = module
self.version = version
self.error_fh = error_fh
def SetVersion(self):
"""Sets the default version."""
if self.module:
modules = self.module.split(',')
if len(modules) > 1:
StatusUpdate('Setting the default version of modules %s of application '
'%s to %s.' % (', '.join(modules),
self.app_id,
self.version),
self.error_fh)
params = [('app_id', self.app_id), ('version', self.version)]
params.extend(('module', module) for module in modules)
url = '/api/appversion/setdefault?' + urllib.urlencode(sorted(params))
self.rpcserver.Send(url)
return
else:
StatusUpdate('Setting default version of module %s of application %s '
'to %s.' % (self.module, self.app_id, self.version),
self.error_fh)
else:
StatusUpdate('Setting default version of application %s to %s.'
% (self.app_id, self.version), self.error_fh)
self.rpcserver.Send('/api/appversion/setdefault',
app_id=self.app_id,
module=self.module,
version=self.version)
class TrafficMigrator(object):
"""Provides facilities to migrate traffic."""
def __init__(self, rpcserver, app_id, version, error_fh=sys.stderr):
"""Creates a new TrafficMigrator.
Args:
rpcserver: The RPC server to use. Should be an instance of a subclass of
AbstractRpcServer.
app_id: The application to make the change to.
version: The version to set as the default.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.app_id = app_id
self.version = version
self.error_fh = error_fh
def MigrateTraffic(self):
"""Migrates traffic."""
StatusUpdate('Migrating traffic of application %s to %s.'
% (self.app_id, self.version), self.error_fh)
self.rpcserver.Send('/api/appversion/migratetraffic',
app_id=self.app_id,
version=self.version)
class IndexOperation(object):
"""Provide facilities for writing Index operation commands."""
def __init__(self, rpcserver, error_fh=sys.stderr):
"""Creates a new IndexOperation.
Args:
rpcserver: The RPC server to use. Should be an instance of HttpRpcServer
or TestRpcServer.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.error_fh = error_fh
def DoDiff(self, definitions):
"""Retrieve diff file from the server.
Args:
definitions: datastore_index.IndexDefinitions as loaded from users
index.yaml file.
Returns:
A pair of datastore_index.IndexDefinitions objects. The first record
is the set of indexes that are present in the index.yaml file but missing
from the server. The second record is the set of indexes that are
present on the server but missing from the index.yaml file (indicating
that these indexes should probably be vacuumed).
"""
StatusUpdate('Fetching index definitions diff.', self.error_fh)
with TempChangeField(definitions, 'application', None) as app_id:
response = self.rpcserver.Send('/api/datastore/index/diff',
app_id=app_id,
payload=definitions.ToYAML())
return datastore_index.ParseMultipleIndexDefinitions(response)
def DoDelete(self, definitions, app_id):
"""Delete indexes from the server.
Args:
definitions: Index definitions to delete from datastore.
app_id: The application id.
Returns:
A single datstore_index.IndexDefinitions containing indexes that were
not deleted, probably because they were already removed. This may
be normal behavior as there is a potential race condition between fetching
the index-diff and sending deletion confirmation through.
"""
StatusUpdate('Deleting selected index definitions.', self.error_fh)
response = self.rpcserver.Send('/api/datastore/index/delete',
app_id=app_id,
payload=definitions.ToYAML())
return datastore_index.ParseIndexDefinitions(response)
class VacuumIndexesOperation(IndexOperation):
"""Provide facilities to request the deletion of datastore indexes."""
def __init__(self, rpcserver, force, confirmation_fn=raw_input,
error_fh=sys.stderr):
"""Creates a new VacuumIndexesOperation.
Args:
rpcserver: The RPC server to use. Should be an instance of HttpRpcServer
or TestRpcServer.
force: True to force deletion of indexes, else False.
confirmation_fn: Function used for getting input form user.
error_fh: Where to send status and error messages.
"""
super(VacuumIndexesOperation, self).__init__(rpcserver, error_fh)
self.force = force
self.confirmation_fn = confirmation_fn
def GetConfirmation(self, index):
"""Get confirmation from user to delete an index.
This method will enter an input loop until the user provides a
response it is expecting. Valid input is one of three responses:
y: Confirm deletion of index.
n: Do not delete index.
a: Delete all indexes without asking for further confirmation.
If the user enters nothing at all, the default action is to skip
that index and do not delete.
If the user selects 'a', as a side effect, the 'force' flag is set.
Args:
index: Index to confirm.
Returns:
True if user enters 'y' or 'a'. False if user enter 'n'.
"""
while True:
print 'This index is no longer defined in your index.yaml file.'
print
print index.ToYAML()
print
confirmation = self.confirmation_fn(
'Are you sure you want to delete this index? (N/y/a): ')
confirmation = confirmation.strip().lower()
if confirmation == 'y':
return True
elif confirmation == 'n' or not confirmation:
return False
elif confirmation == 'a':
self.force = True
return True
else:
print 'Did not understand your response.'
def DoVacuum(self, definitions):
"""Vacuum indexes in datastore.
This method will query the server to determine which indexes are not
being used according to the user's local index.yaml file. Once it has
made this determination, it confirms with the user which unused indexes
should be deleted. Once confirmation for each index is receives, it
deletes those indexes.
Because another user may in theory delete the same indexes at the same
time as the user, there is a potential race condition. In this rare cases,
some of the indexes previously confirmed for deletion will not be found.
The user is notified which indexes these were.
Args:
definitions: datastore_index.IndexDefinitions as loaded from users
index.yaml file.
"""
unused_new_indexes, notused_indexes = self.DoDiff(definitions)
deletions = datastore_index.IndexDefinitions(indexes=[])
if notused_indexes.indexes is not None:
for index in notused_indexes.indexes:
if self.force or self.GetConfirmation(index):
deletions.indexes.append(index)
if deletions.indexes:
not_deleted = self.DoDelete(deletions, definitions.application)
if not_deleted.indexes:
not_deleted_count = len(not_deleted.indexes)
if not_deleted_count == 1:
warning_message = ('An index was not deleted. Most likely this is '
'because it no longer exists.\n\n')
else:
warning_message = ('%d indexes were not deleted. Most likely this '
'is because they no longer exist.\n\n'
% not_deleted_count)
for index in not_deleted.indexes:
warning_message += index.ToYAML()
logging.warning(warning_message)
class LogsRequester(object):
"""Provide facilities to export request logs."""
def __init__(self,
rpcserver,
app_id,
module,
version_id,
output_file,
num_days,
append,
severity,
end,
vhost,
include_vhost,
include_all=None,
time_func=time.time,
error_fh=sys.stderr):
"""Constructor.
Args:
rpcserver: The RPC server to use. Should be an instance of HttpRpcServer
or TestRpcServer.
app_id: The application to fetch logs from.
module: The module of the app to fetch logs from, optional.
version_id: The version of the app to fetch logs for.
output_file: Output file name.
num_days: Number of days worth of logs to export; 0 for all available.
append: True if appending to an existing file.
severity: App log severity to request (0-4); None for no app logs.
end: date object representing last day of logs to return.
vhost: The virtual host of log messages to get. None for all hosts.
include_vhost: If true, the virtual host is included in log messages.
include_all: If true, we add to the log message everything we know
about the request.
time_func: A time.time() compatible function, which can be overridden for
testing.
error_fh: Where to send status and error messages.
"""
self.rpcserver = rpcserver
self.app_id = app_id
self.output_file = output_file
self.append = append
self.num_days = num_days
self.severity = severity
self.vhost = vhost
self.include_vhost = include_vhost
self.include_all = include_all
self.error_fh = error_fh
self.module = module
self.version_id = version_id
self.sentinel = None
self.write_mode = 'w'
if self.append:
self.sentinel = FindSentinel(self.output_file)
self.write_mode = 'a'
self.skip_until = False
now = PacificDate(time_func())
if end < now:
self.skip_until = end
else:
end = now
self.valid_dates = None
if self.num_days:
start = end - datetime.timedelta(self.num_days - 1)
self.valid_dates = (start, end)
def DownloadLogs(self):
"""Download the requested logs.
This will write the logs to the file designated by
self.output_file, or to stdout if the filename is '-'.
Multiple roundtrips to the server may be made.
"""
if self.module:
StatusUpdate('Downloading request logs for app %s module %s version %s.' %
(self.app_id, self.module, self.version_id), self.error_fh)
else:
StatusUpdate('Downloading request logs for app %s version %s.' %
(self.app_id, self.version_id), self.error_fh)
tf = tempfile.TemporaryFile()
last_offset = None
try:
while True:
try:
new_offset = self.RequestLogLines(tf, last_offset)
if not new_offset or new_offset == last_offset:
break
last_offset = new_offset
except KeyboardInterrupt:
StatusUpdate('Keyboard interrupt; saving data downloaded so far.',
self.error_fh)
break
StatusUpdate('Copying request logs to %r.' % self.output_file,
self.error_fh)
if self.output_file == '-':
of = sys.stdout
else:
try:
of = open(self.output_file, self.write_mode)
except IOError, err:
StatusUpdate('Can\'t write %r: %s.' % (self.output_file, err))
sys.exit(1)
try:
line_count = CopyReversedLines(tf, of)
finally:
of.flush()
if of is not sys.stdout:
of.close()
finally:
tf.close()
StatusUpdate('Copied %d records.' % line_count, self.error_fh)
def RequestLogLines(self, tf, offset):
"""Make a single roundtrip to the server.
Args:
tf: Writable binary stream to which the log lines returned by
the server are written, stripped of headers, and excluding
lines skipped due to self.sentinel or self.valid_dates filtering.
offset: Offset string for a continued request; None for the first.
Returns:
The offset string to be used for the next request, if another
request should be issued; or None, if not.
"""
logging.info('Request with offset %r.', offset)
kwds = {'app_id': self.app_id,
'version': self.version_id,
'limit': 1000,
'no_header': 1,
}
if self.module:
kwds['module'] = self.module
if offset:
kwds['offset'] = offset
if self.severity is not None:
kwds['severity'] = str(self.severity)
if self.vhost is not None:
kwds['vhost'] = str(self.vhost)
if self.include_vhost is not None:
kwds['include_vhost'] = str(self.include_vhost)
if self.include_all is not None:
kwds['include_all'] = str(self.include_all)
response = self.rpcserver.Send('/api/request_logs', payload=None, **kwds)
response = response.replace('\r', '\0')
lines = response.splitlines()
logging.info('Received %d bytes, %d records.', len(response), len(lines))
offset = None
valid_dates = self.valid_dates
sentinel = self.sentinel
skip_until = self.skip_until
len_sentinel = None
if sentinel:
len_sentinel = len(sentinel)
for line in lines:
if line.startswith('#'):
match = re.match(r'^#\s*next_offset=(\S+)\s*$', line)
if match and match.group(1) != 'None':
offset = match.group(1)
continue
if (sentinel and
line.startswith(sentinel) and
line[len_sentinel : len_sentinel+1] in ('', '\0')):
return None
linedate = DateOfLogLine(line)
if not linedate:
continue
if skip_until:
if linedate > skip_until:
continue
else:
self.skip_until = skip_until = False
if valid_dates and not valid_dates[0] <= linedate <= valid_dates[1]:
return None
tf.write(line + '\n')
if not lines:
return None
return offset
def DateOfLogLine(line):
"""Returns a date object representing the log line's timestamp.
Args:
line: a log line string.
Returns:
A date object representing the timestamp or None if parsing fails.
"""
m = re.compile(r'[^[]+\[(\d+/[A-Za-z]+/\d+):[^\d]*').match(line)
if not m:
return None
try:
return datetime.date(*time.strptime(m.group(1), '%d/%b/%Y')[:3])
except ValueError:
return None
def PacificDate(now):
"""For a UTC timestamp, return the date in the US/Pacific timezone.
Args:
now: A posix timestamp giving current UTC time.
Returns:
A date object representing what day it is in the US/Pacific timezone.
"""
return datetime.date(*time.gmtime(PacificTime(now))[:3])
def PacificTime(now):
"""Helper to return the number of seconds between UTC and Pacific time.
This is needed to compute today's date in Pacific time (more
specifically: Mountain View local time), which is how request logs
are reported. (Google servers always report times in Mountain View
local time, regardless of where they are physically located.)
This takes (post-2006) US DST into account. Pacific time is either
8 hours or 7 hours west of UTC, depending on whether DST is in
effect. Since 2007, US DST starts on the Second Sunday in March
March, and ends on the first Sunday in November. (Reference:
http://aa.usno.navy.mil/faq/docs/daylight_time.php.)
Note that the server doesn't report its local time (the HTTP Date
header uses UTC), and the client's local time is irrelevant.
Args:
now: A posix timestamp giving current UTC time.
Returns:
A pseudo-posix timestamp giving current Pacific time. Passing
this through time.gmtime() will produce a tuple in Pacific local
time.
"""
now -= 8*3600
if IsPacificDST(now):
now += 3600
return now
def IsPacificDST(now):
"""Helper for PacificTime to decide whether now is Pacific DST (PDT).
Args:
now: A pseudo-posix timestamp giving current time in PST.
Returns:
True if now falls within the range of DST, False otherwise.
"""
pst = time.gmtime(now)
year = pst[0]
assert year >= 2007
begin = calendar.timegm((year, 3, 8, 2, 0, 0, 0, 0, 0))
while time.gmtime(begin).tm_wday != SUNDAY:
begin += DAY
end = calendar.timegm((year, 11, 1, 2, 0, 0, 0, 0, 0))
while time.gmtime(end).tm_wday != SUNDAY:
end += DAY
return begin <= now < end
def CopyReversedLines(instream, outstream, blocksize=2**16):
r"""Copy lines from input stream to output stream in reverse order.
As a special feature, null bytes in the input are turned into
newlines followed by tabs in the output, but these 'sub-lines'
separated by null bytes are not reversed. E.g. If the input is
'A\0B\nC\0D\n', the output is 'C\n\tD\nA\n\tB\n'.
Args:
instream: A seekable stream open for reading in binary mode.
outstream: A stream open for writing; doesn't have to be seekable or binary.
blocksize: Optional block size for buffering, for unit testing.
Returns:
The number of lines copied.
"""
line_count = 0
instream.seek(0, 2)
last_block = instream.tell() // blocksize
spillover = ''
for iblock in xrange(last_block + 1, -1, -1):
instream.seek(iblock * blocksize)
data = instream.read(blocksize)
lines = data.splitlines(True)
lines[-1:] = ''.join(lines[-1:] + [spillover]).splitlines(True)
if lines and not lines[-1].endswith('\n'):
lines[-1] += '\n'
lines.reverse()
if lines and iblock > 0:
spillover = lines.pop()
if lines:
line_count += len(lines)
data = ''.join(lines).replace('\0', '\n\t')
outstream.write(data)
return line_count
def FindSentinel(filename, blocksize=2**16, error_fh=sys.stderr):
"""Return the sentinel line from the output file.
Args:
filename: The filename of the output file. (We'll read this file.)
blocksize: Optional block size for buffering, for unit testing.
error_fh: Where to send status and error messages.
Returns:
The contents of the last line in the file that doesn't start with
a tab, with its trailing newline stripped; or None if the file
couldn't be opened or no such line could be found by inspecting
the last 'blocksize' bytes of the file.
"""
if filename == '-':
StatusUpdate('Can\'t combine --append with output to stdout.',
error_fh)
sys.exit(2)
try:
fp = open(filename, 'rb')
except IOError, err:
StatusUpdate('Append mode disabled: can\'t read %r: %s.' % (filename, err),
error_fh)
return None
try:
fp.seek(0, 2)
fp.seek(max(0, fp.tell() - blocksize))
lines = fp.readlines()
del lines[:1]
sentinel = None
for line in lines:
if not line.startswith('\t'):
sentinel = line
if not sentinel:
StatusUpdate('Append mode disabled: can\'t find sentinel in %r.' %
filename, error_fh)
return None
return sentinel.rstrip('\n')
finally:
fp.close()
class UploadBatcher(object):
"""Helper to batch file uploads."""
def __init__(self, what, logging_context):
"""Constructor.
Args:
what: Either 'file' or 'blob' or 'errorblob' indicating what kind of
objects this batcher uploads. Used in messages and URLs.
logging_context: The _ClientDeployLoggingContext for this upload.
"""
assert what in ('file', 'blob', 'errorblob'), repr(what)
self.what = what
self.logging_context = logging_context
self.single_url = '/api/appversion/add' + what
self.batch_url = self.single_url + 's'
self.batching = True
self.batch = []
self.batch_size = 0
def SendBatch(self):
"""Send the current batch on its way.
If successful, resets self.batch and self.batch_size.
Raises:
HTTPError with code=404 if the server doesn't support batching.
"""
boundary = 'boundary'
parts = []
for path, payload, mime_type in self.batch:
while boundary in payload:
boundary += '%04x' % random.randint(0, 0xffff)
assert len(boundary) < 80, 'Unexpected error, please try again.'
part = '\n'.join(['',
'X-Appcfg-File: %s' % urllib.quote(path),
'X-Appcfg-Hash: %s' % _Hash(payload),
'Content-Type: %s' % mime_type,
'Content-Length: %d' % len(payload),
'Content-Transfer-Encoding: 8bit',
'',
payload,
])
parts.append(part)
parts.insert(0,
'MIME-Version: 1.0\n'
'Content-Type: multipart/mixed; boundary="%s"\n'
'\n'
'This is a message with multiple parts in MIME format.' %
boundary)
parts.append('--\n')
delimiter = '\n--%s' % boundary
payload = delimiter.join(parts)
logging.info('Uploading batch of %d %ss to %s with boundary="%s".',
len(self.batch), self.what, self.batch_url, boundary)
self.logging_context.Send(self.batch_url,
payload=payload,
content_type='message/rfc822')
self.batch = []
self.batch_size = 0
def SendSingleFile(self, path, payload, mime_type):
"""Send a single file on its way."""
logging.info('Uploading %s %s (%s bytes, type=%s) to %s.',
self.what, path, len(payload), mime_type, self.single_url)
self.logging_context.Send(self.single_url,
payload=payload,
content_type=mime_type,
path=path)
def Flush(self):
"""Flush the current batch.
This first attempts to send the batch as a single request; if that
fails because the server doesn't support batching, the files are
sent one by one, and self.batching is reset to False.
At the end, self.batch and self.batch_size are reset.
"""
if not self.batch:
return
try:
self.SendBatch()
except urllib2.HTTPError, err:
if err.code != 404:
raise
logging.info('Old server detected; turning off %s batching.', self.what)
self.batching = False
for path, payload, mime_type in self.batch:
self.SendSingleFile(path, payload, mime_type)
self.batch = []
self.batch_size = 0
def AddToBatch(self, path, payload, mime_type):
"""Batch a file, possibly flushing first, or perhaps upload it directly.
Args:
path: The name of the file.
payload: The contents of the file.
mime_type: The MIME Content-type of the file, or None.
If mime_type is None, application/octet-stream is substituted.
"""
if not mime_type:
mime_type = 'application/octet-stream'
size = len(payload)
if size <= MAX_BATCH_FILE_SIZE:
if (len(self.batch) >= MAX_BATCH_COUNT or
self.batch_size + size > MAX_BATCH_SIZE):
self.Flush()
if self.batching:
logging.info('Adding %s %s (%s bytes, type=%s) to batch.',
self.what, path, size, mime_type)
self.batch.append((path, payload, mime_type))
self.batch_size += size + BATCH_OVERHEAD
return
self.SendSingleFile(path, payload, mime_type)
def _FormatHash(h):
"""Return a string representation of a hash.
The hash is a sha1 hash. It is computed both for files that need to be
pushed to App Engine and for data payloads of requests made to App Engine.
Args:
h: The hash
Returns:
The string representation of the hash.
"""
return '%s_%s_%s_%s_%s' % (h[0:8], h[8:16], h[16:24], h[24:32], h[32:40])
def _Hash(content):
"""Compute the sha1 hash of the content.
Args:
content: The data to hash as a string.
Returns:
The string representation of the hash.
"""
h = hashlib.sha1(content).hexdigest()
return _FormatHash(h)
def _HashFromFileHandle(file_handle):
"""Compute the hash of the content of the file pointed to by file_handle.
Args:
file_handle: File-like object which provides seek, read and tell.
Returns:
The string representation of the hash.
"""
pos = file_handle.tell()
content_hash = _Hash(file_handle.read())
file_handle.seek(pos, 0)
return content_hash
def EnsureDir(path):
"""Makes sure that a directory exists at the given path.
If a directory already exists at that path, nothing is done.
Otherwise, try to create a directory at that path with os.makedirs.
If that fails, propagate the resulting OSError exception.
Args:
path: The path that you want to refer to a directory.
"""
try:
os.makedirs(path)
except OSError, exc:
if not (exc.errno == errno.EEXIST and os.path.isdir(path)):
raise
def DoDownloadApp(rpcserver, out_dir, app_id, module, app_version,
error_fh=sys.stderr):
"""Downloads the files associated with a particular app version.
Args:
rpcserver: The RPC server to use to download.
out_dir: The directory the files should be downloaded to.
app_id: The app ID of the app whose files we want to download.
module: The module we want to download from. Can be:
- None: We'll download from the default module.
- <module>: We'll download from the specified module.
app_version: The version number we want to download. Can be:
- None: We'll download the latest default version.
- <major>: We'll download the latest minor version.
- <major>/<minor>: We'll download that exact version.
error_fh: Where to send status and error messages.
"""
StatusUpdate('Fetching file list...', error_fh)
url_args = {'app_id': app_id}
if module:
url_args['module'] = module
if app_version is not None:
url_args['version_match'] = app_version
result = rpcserver.Send('/api/files/list', **url_args)
StatusUpdate('Fetching files...', error_fh)
lines = result.splitlines()
if len(lines) < 1:
logging.error('Invalid response from server: empty')
return
full_version = lines[0]
file_lines = lines[1:]
current_file_number = 0
num_files = len(file_lines)
num_errors = 0
for line in file_lines:
parts = line.split('|', 2)
if len(parts) != 3:
logging.error('Invalid response from server: expecting '
'"<id>|<size>|<path>", found: "%s"\n', line)
return
current_file_number += 1
file_id, size_str, path = parts
try:
size = int(size_str)
except ValueError:
logging.error('Invalid file list entry from server: invalid size: '
'"%s"', size_str)
return
StatusUpdate('[%d/%d] %s' % (current_file_number, num_files, path),
error_fh)
def TryGet():
"""A request to /api/files/get which works with the RetryWithBackoff."""
try:
contents = rpcserver.Send('/api/files/get', app_id=app_id,
version=full_version, id=file_id)
return True, contents
except urllib2.HTTPError, exc:
if exc.code == 503:
return False, exc
else:
raise
def PrintRetryMessage(_, delay):
StatusUpdate('Server busy. Will try again in %d seconds.' % delay,
error_fh)
success, contents = RetryWithBackoff(TryGet, PrintRetryMessage)
if not success:
logging.error('Unable to download file "%s".', path)
num_errors += 1
continue
if len(contents) != size:
logging.error('File "%s": server listed as %d bytes but served '
'%d bytes.', path, size, len(contents))
num_errors += 1
full_path = os.path.join(out_dir, path)
if os.path.exists(full_path):
logging.error('Unable to create file "%s": path conflicts with '
'an existing file or directory', path)
num_errors += 1
continue
full_dir = os.path.dirname(full_path)
try:
EnsureDir(full_dir)
except OSError, exc:
logging.error('Couldn\'t create directory "%s": %s', full_dir, exc)
num_errors += 1
continue
try:
out_file = open(full_path, 'wb')
except IOError, exc:
logging.error('Couldn\'t open file "%s": %s', full_path, exc)
num_errors += 1
continue
try:
try:
out_file.write(contents)
except IOError, exc:
logging.error('Couldn\'t write to file "%s": %s', full_path, exc)
num_errors += 1
continue
finally:
out_file.close()
if num_errors > 0:
logging.error('Number of errors: %d. See output for details.', num_errors)
class _ClientDeployLoggingContext(object):
"""Context for sending and recording server rpc requests.
Attributes:
rpcserver: The AbstractRpcServer to use for the upload.
requests: A list of client_deployinfo.Request objects to include
with the client deploy log.
time_func: Function to get the current time in milliseconds.
request_params: A dictionary with params to append to requests
"""
def __init__(self,
rpcserver,
request_params,
usage_reporting,
time_func=time.time):
"""Creates a new AppVersionUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of HttpRpcServer
or TestRpcServer.
request_params: A dictionary with params to append to requests
usage_reporting: Whether to actually upload data.
time_func: Function to return the current time in millisecods
(default time.time).
"""
self.rpcserver = rpcserver
self.request_params = request_params
self.usage_reporting = usage_reporting
self.time_func = time_func
self.requests = []
def Send(self, url, payload='', **kwargs):
"""Sends a request to the server, with common params."""
start_time_usec = self.GetCurrentTimeUsec()
request_size_bytes = len(payload)
try:
logging.info('Send: %s, params=%s', url, self.request_params)
kwargs.update(self.request_params)
result = self.rpcserver.Send(url, payload=payload, **kwargs)
self._RegisterReqestForLogging(url, 200, start_time_usec,
request_size_bytes)
return result
except urllib2.HTTPError, e:
self._RegisterReqestForLogging(url, e.code, start_time_usec,
request_size_bytes)
raise e
def GetCurrentTimeUsec(self):
"""Returns the current time in microseconds."""
return int(round(self.time_func() * 1000 * 1000))
def GetSdkVersion(self):
"""Returns the current SDK Version."""
sdk_version = sdk_update_checker.GetVersionObject()
return sdk_version.get('release', '?') if sdk_version else '?'
def _RegisterReqestForLogging(self, path, response_code, start_time_usec,
request_size_bytes):
"""Registers a request for client deploy logging purposes."""
end_time_usec = self.GetCurrentTimeUsec()
self.requests.append(client_deployinfo.Request(
path=path,
response_code=response_code,
start_time_usec=start_time_usec,
end_time_usec=end_time_usec,
request_size_bytes=request_size_bytes))
def LogClientDeploy(self, runtime, start_time_usec, success):
"""Logs a client deployment attempt.
Args:
runtime: The runtime for the app being deployed.
start_time_usec: The start time of the deployment in micro seconds.
success: True if the deployment succeeded otherwise False.
"""
if not self.usage_reporting:
logging.info('Skipping usage reporting.')
return
end_time_usec = self.GetCurrentTimeUsec()
try:
info = client_deployinfo.ClientDeployInfoExternal(
runtime=runtime,
start_time_usec=start_time_usec,
end_time_usec=end_time_usec,
requests=self.requests,
success=success,
sdk_version=self.GetSdkVersion())
self.Send('/api/logclientdeploy', info.ToYAML())
except BaseException, e:
logging.debug('Exception logging deploy info continuing - %s', e)
class EndpointsState(object):
SERVING = 'serving'
PENDING = 'pending'
FAILED = 'failed'
_STATES = frozenset((SERVING, PENDING, FAILED))
@classmethod
def Parse(cls, value):
state = value.lower()
if state not in cls._STATES:
lst = sorted(cls._STATES)
pretty_states = ', '.join(lst[:-1]) + ', or ' + lst[-1]
raise ValueError('Unexpected Endpoints state "%s"; should be %s.' %
(value, pretty_states))
return state
class AppVersionUpload(object):
"""Provides facilities to upload a new appversion to the hosting service.
Attributes:
rpcserver: The AbstractRpcServer to use for the upload.
config: The AppInfoExternal object derived from the app.yaml file.
app_id: The application string from 'config'.
version: The version string from 'config'.
backend: The backend to update, if any.
files: A dictionary of files to upload to the rpcserver, mapping path to
hash of the file contents.
in_transaction: True iff a transaction with the server has started.
An AppVersionUpload can do only one transaction at a time.
deployed: True iff the Deploy method has been called.
started: True iff the StartServing method has been called.
logging_context: The _ClientDeployLoggingContext for this upload.
"""
def __init__(self, rpcserver, config, module_yaml_path='app.yaml',
backend=None,
error_fh=None,
get_version=sdk_update_checker.GetVersionObject,
usage_reporting=False):
"""Creates a new AppVersionUpload.
Args:
rpcserver: The RPC server to use. Should be an instance of HttpRpcServer
or TestRpcServer.
config: An AppInfoExternal object that specifies the configuration for
this application.
module_yaml_path: The (string) path to the yaml file corresponding to
<config>, relative to the bundle directory.
backend: If specified, indicates the update applies to the given backend.
The backend name must match an entry in the backends: stanza.
error_fh: Unexpected HTTPErrors are printed to this file handle.
get_version: Method for determining the current SDK version. The override
is used for testing.
usage_reporting: Whether or not to report usage.
"""
self.rpcserver = rpcserver
self.config = config
self.app_id = self.config.application
self.module = self.config.module
self.backend = backend
self.error_fh = error_fh or sys.stderr
self.version = self.config.version
self.params = {}
if self.app_id:
self.params['app_id'] = self.app_id
if self.module:
self.params['module'] = self.module
if self.backend:
self.params['backend'] = self.backend
elif self.version:
self.params['version'] = self.version
self.files = {}
self.all_files = set()
self.in_transaction = False
self.deployed = False
self.started = False
self.batching = True
self.logging_context = _ClientDeployLoggingContext(rpcserver,
self.params,
usage_reporting)
self.file_batcher = UploadBatcher('file', self.logging_context)
self.blob_batcher = UploadBatcher('blob', self.logging_context)
self.errorblob_batcher = UploadBatcher('errorblob', self.logging_context)
if not self.config.vm_settings:
self.config.vm_settings = appinfo.VmSettings()
self.config.vm_settings['module_yaml_path'] = module_yaml_path
if not self.config.vm_settings.get('image'):
sdk_version = get_version()
if sdk_version and sdk_version.get('release'):
self.config.vm_settings['image'] = sdk_version['release']
if not self.config.auto_id_policy:
self.config.auto_id_policy = appinfo.DATASTORE_ID_POLICY_DEFAULT
def AddFile(self, path, file_handle):
"""Adds the provided file to the list to be pushed to the server.
Args:
path: The path the file should be uploaded as.
file_handle: A stream containing data to upload.
"""
assert not self.in_transaction, 'Already in a transaction.'
assert file_handle is not None
reason = appinfo.ValidFilename(path)
if reason:
logging.error(reason)
return
content_hash = _HashFromFileHandle(file_handle)
self.files[path] = content_hash
self.all_files.add(path)
def Describe(self):
"""Returns a string describing the object being updated."""
result = 'app: %s' % self.app_id
if self.module is not None and self.module != appinfo.DEFAULT_MODULE:
result += ', module: %s' % self.module
if self.backend:
result += ', backend: %s' % self.backend
elif self.version:
result += ', version: %s' % self.version
return result
@staticmethod
def _ValidateBeginYaml(resp):
"""Validates the given /api/appversion/create response string."""
response_dict = yaml.safe_load(resp)
if not response_dict or 'warnings' not in response_dict:
return False
return response_dict
def Begin(self):
"""Begins the transaction, returning a list of files that need uploading.
All calls to AddFile must be made before calling Begin().
Returns:
A list of pathnames for files that should be uploaded using UploadFile()
before Commit() can be called.
"""
assert not self.in_transaction, 'Already in a transaction.'
config_copy = copy.deepcopy(self.config)
for url in config_copy.handlers:
handler_type = url.GetHandlerType()
if url.application_readable:
if handler_type == 'static_dir':
url.static_dir = '%s/%s' % (STATIC_FILE_PREFIX, url.static_dir)
elif handler_type == 'static_files':
url.static_files = '%s/%s' % (STATIC_FILE_PREFIX, url.static_files)
url.upload = '%s/%s' % (STATIC_FILE_PREFIX, url.upload)
response = self.logging_context.Send(
'/api/appversion/create',
payload=config_copy.ToYAML())
result = self._ValidateBeginYaml(response)
if result:
warnings = result.get('warnings')
for warning in warnings:
StatusUpdate('WARNING: %s' % warning, self.error_fh)
self.in_transaction = True
files_to_clone = []
blobs_to_clone = []
errorblobs = {}
for path, content_hash in self.files.iteritems():
file_classification = FileClassification(
self.config, path, error_fh=self.error_fh)
if file_classification.IsStaticFile():
upload_path = path
if file_classification.IsApplicationFile():
upload_path = '%s/%s' % (STATIC_FILE_PREFIX, path)
blobs_to_clone.append((path, upload_path, content_hash,
file_classification.StaticMimeType()))
if file_classification.IsErrorFile():
errorblobs[path] = content_hash
if file_classification.IsApplicationFile():
files_to_clone.append((path, path, content_hash))
files_to_upload = {}
def CloneFiles(url, files, file_type):
"""Sends files to the given url.
Args:
url: the server URL to use.
files: a list of files
file_type: the type of the files
"""
if not files:
return
StatusUpdate('Cloning %d %s file%s.' %
(len(files), file_type, len(files) != 1 and 's' or ''),
self.error_fh)
max_files = self.resource_limits['max_files_to_clone']
for i in xrange(0, len(files), max_files):
if i > 0 and i % max_files == 0:
StatusUpdate('Cloned %d files.' % i, self.error_fh)
chunk = files[i:min(len(files), i + max_files)]
result = self.logging_context.Send(url,
payload=BuildClonePostBody(chunk))
if result:
to_upload = {}
for f in result.split(LIST_DELIMITER):
for entry in files:
real_path, upload_path = entry[:2]
if f == upload_path:
to_upload[real_path] = self.files[real_path]
break
files_to_upload.update(to_upload)
CloneFiles('/api/appversion/cloneblobs', blobs_to_clone, 'static')
CloneFiles('/api/appversion/clonefiles', files_to_clone, 'application')
logging.debug('Files to upload: %s', files_to_upload)
for (path, content_hash) in errorblobs.iteritems():
files_to_upload[path] = content_hash
self.files = files_to_upload
return sorted(files_to_upload.iterkeys())
def UploadFile(self, path, file_handle):
"""Uploads a file to the hosting service.
Must only be called after Begin().
The path provided must be one of those that were returned by Begin().
Args:
path: The path the file is being uploaded as.
file_handle: A file-like object containing the data to upload.
Raises:
KeyError: The provided file is not amongst those to be uploaded.
"""
assert self.in_transaction, 'Begin() must be called before UploadFile().'
if path not in self.files:
raise KeyError('File \'%s\' is not in the list of files to be uploaded.'
% path)
del self.files[path]
file_classification = FileClassification(
self.config, path, error_fh=self.error_fh)
payload = file_handle.read()
if file_classification.IsStaticFile():
upload_path = path
if file_classification.IsApplicationFile():
upload_path = '%s/%s' % (STATIC_FILE_PREFIX, path)
self.blob_batcher.AddToBatch(upload_path, payload,
file_classification.StaticMimeType())
if file_classification.IsErrorFile():
self.errorblob_batcher.AddToBatch(file_classification.ErrorCode(),
payload,
file_classification.ErrorMimeType())
if file_classification.IsApplicationFile():
self.file_batcher.AddToBatch(path, payload, None)
def Precompile(self):
"""Handle precompilation."""
StatusUpdate('Compilation starting.', self.error_fh)
files = []
if self.config.GetEffectiveRuntime() == 'go':
for f in self.all_files:
if f.endswith('.go') and not self.config.nobuild_files.match(f):
files.append(f)
while True:
if files:
StatusUpdate('Compilation: %d files left.' % len(files), self.error_fh)
files = self.PrecompileBatch(files)
if not files:
break
StatusUpdate('Compilation completed.', self.error_fh)
def PrecompileBatch(self, files):
"""Precompile a batch of files.
Args:
files: Either an empty list (for the initial request) or a list
of files to be precompiled.
Returns:
Either an empty list (if no more files need to be precompiled)
or a list of files to be precompiled subsequently.
"""
payload = LIST_DELIMITER.join(files)
response = self.logging_context.Send('/api/appversion/precompile',
payload=payload)
if not response:
return []
return response.split(LIST_DELIMITER)
def Commit(self):
"""Commits the transaction, making the new app version available.
All the files returned by Begin() must have been uploaded with UploadFile()
before Commit() can be called.
This tries the new 'deploy' method; if that fails it uses the old 'commit'.
Returns:
An appinfo.AppInfoSummary if one was returned from the Deploy, None
otherwise.
Raises:
RuntimeError: Some required files were not uploaded.
CannotStartServingError: Another operation is in progress on this version.
"""
assert self.in_transaction, 'Begin() must be called before Commit().'
if self.files:
raise RuntimeError('Not all required files have been uploaded.')
def PrintRetryMessage(_, delay):
StatusUpdate('Will check again in %s seconds.' % delay, self.error_fh)
app_summary = self.Deploy()
success, unused_contents = RetryWithBackoff(
lambda: (self.IsReady(), None), PrintRetryMessage, 1, 2, 60, 20)
if not success:
logging.warning('Version still not ready to serve, aborting.')
raise RuntimeError('Version not ready.')
result = self.StartServing()
if not result:
self.in_transaction = False
else:
if result == '0':
raise CannotStartServingError(
'Another operation on this version is in progress.')
success, response = RetryNoBackoff(self.IsServing, PrintRetryMessage)
if not success:
logging.warning('Version still not serving, aborting.')
raise RuntimeError('Version not ready.')
check_config_updated = response.get('check_endpoints_config')
if check_config_updated:
unused_done, last_state = RetryWithBackoff(
self.IsEndpointsConfigUpdated,
PrintRetryMessage, 1, 2, 60, 20)
if last_state != EndpointsState.SERVING:
error_message = (
'Failed to update Endpoints configuration (last result %s). '
'Check the app\'s AppEngine logs for errors: %s' %
(last_state, self.GetLogUrl()))
StatusUpdate(error_message, self.error_fh)
logging.warning(error_message)
raise RuntimeError(error_message)
self.in_transaction = False
return app_summary
def Deploy(self):
"""Deploys the new app version but does not make it default.
All the files returned by Begin() must have been uploaded with UploadFile()
before Deploy() can be called.
Returns:
An appinfo.AppInfoSummary if one was returned from the Deploy, None
otherwise.
Raises:
RuntimeError: Some required files were not uploaded.
"""
assert self.in_transaction, 'Begin() must be called before Deploy().'
if self.files:
raise RuntimeError('Not all required files have been uploaded.')
StatusUpdate('Starting deployment.', self.error_fh)
result = self.logging_context.Send('/api/appversion/deploy')
self.deployed = True
if result:
return yaml_object.BuildSingleObject(appinfo.AppInfoSummary, result)
else:
return None
def IsReady(self):
"""Check if the new app version is ready to serve traffic.
Raises:
RuntimeError: Deploy has not yet been called.
Returns:
True if the server returned the app is ready to serve.
"""
assert self.deployed, 'Deploy() must be called before IsReady().'
StatusUpdate('Checking if deployment succeeded.', self.error_fh)
result = self.logging_context.Send('/api/appversion/isready')
return result == '1'
def StartServing(self):
"""Start serving with the newly created version.
Raises:
RuntimeError: Deploy has not yet been called.
Returns:
The response body, as a string.
"""
assert self.deployed, 'Deploy() must be called before StartServing().'
StatusUpdate('Deployment successful.', self.error_fh)
self.params['willcheckserving'] = '1'
result = self.logging_context.Send('/api/appversion/startserving')
del self.params['willcheckserving']
self.started = True
return result
@staticmethod
def _ValidateIsServingYaml(resp):
"""Validates the given /isserving YAML string.
Args:
resp: the response from an RPC to a URL such as /api/appversion/isserving.
Returns:
The resulting dictionary if the response is valid, or None otherwise.
"""
response_dict = yaml.safe_load(resp)
if 'serving' not in response_dict:
return None
return response_dict
def IsServing(self):
"""Check if the new app version is serving.
Raises:
RuntimeError: Deploy has not yet been called.
CannotStartServingError: A bad response was received from the isserving
API call.
Returns:
(serving, response) Where serving is True if the deployed app version is
serving, False otherwise. response is a dict containing the parsed
response from the server, or an empty dict if the server's response was
an old style 0/1 response.
"""
assert self.started, 'StartServing() must be called before IsServing().'
StatusUpdate('Checking if updated app version is serving.', self.error_fh)
self.params['new_serving_resp'] = '1'
result = self.logging_context.Send('/api/appversion/isserving')
del self.params['new_serving_resp']
if result in ['0', '1']:
return result == '1', {}
result = AppVersionUpload._ValidateIsServingYaml(result)
if not result:
raise CannotStartServingError(
'Internal error: Could not parse IsServing response.')
message = result.get('message')
fatal = result.get('fatal')
if message:
StatusUpdate(message, self.error_fh)
if fatal:
raise CannotStartServingError(message or 'Unknown error.')
return result['serving'], result
@staticmethod
def _ValidateIsEndpointsConfigUpdatedYaml(resp):
"""Validates the YAML string response from an isconfigupdated request.
Args:
resp: A string containing the response from the server.
Returns:
The dictionary with the parsed response if the response is valid.
Otherwise returns False.
"""
response_dict = yaml.safe_load(resp)
if 'updated' not in response_dict and 'updatedDetail' not in response_dict:
return None
return response_dict
def GetLogUrl(self):
"""Get the URL for the app's logs."""
module = '%s:' % self.module if self.module else ''
return ('https://appengine.google.com/logs?' +
urllib.urlencode((('app_id', self.app_id),
('version_id', module + self.version))))
def IsEndpointsConfigUpdated(self):
"""Check if the Endpoints configuration for this app has been updated.
This should only be called if the app has a Google Cloud Endpoints
handler, or if it's removing one. The server performs the check to see
if Endpoints support is added/updated/removed, and the response to the
isserving call indicates whether IsEndpointsConfigUpdated should be called.
Raises:
AssertionError: Deploy has not yet been called.
CannotStartServingError: There was an unexpected error with the server
response.
Returns:
(done, updated_state), where done is False if this function should
be called again to retry, True if not. updated_state is an
EndpointsState value indicating whether the Endpoints configuration has
been updated on the server.
"""
assert self.started, ('StartServing() must be called before '
'IsEndpointsConfigUpdated().')
StatusUpdate('Checking if Endpoints configuration has been updated.',
self.error_fh)
result = self.logging_context.Send('/api/isconfigupdated')
result = AppVersionUpload._ValidateIsEndpointsConfigUpdatedYaml(result)
if result is None:
raise CannotStartServingError(
'Internal error: Could not parse IsEndpointsConfigUpdated response.')
if 'updatedDetail' in result:
updated_state = EndpointsState.Parse(result['updatedDetail'])
else:
updated_state = (EndpointsState.SERVING if result['updated']
else EndpointsState.PENDING)
return updated_state != EndpointsState.PENDING, updated_state
def Rollback(self, force_rollback=False):
"""Rolls back the transaction if one is in progress."""
if not self.in_transaction:
return
msg = 'Rolling back the update.'
if self.config.vm and not force_rollback:
msg += (' This can sometimes take a while since a VM version is being '
'rolled back.')
StatusUpdate(msg, self.error_fh)
self.logging_context.Send('/api/appversion/rollback',
force_rollback='1' if force_rollback else '0')
self.in_transaction = False
self.files = {}
def DoUpload(self, paths, openfunc):
"""Uploads a new appversion with the given config and files to the server.
Args:
paths: An iterator that yields the relative paths of the files to upload.
openfunc: A function that takes a path and returns a file-like object.
Returns:
An appinfo.AppInfoSummary if one was returned from the server, None
otherwise.
"""
start_time_usec = self.logging_context.GetCurrentTimeUsec()
logging.info('Reading app configuration.')
StatusUpdate('\nStarting update of %s' % self.Describe(), self.error_fh)
path = ''
try:
self.resource_limits = GetResourceLimits(self.logging_context,
self.error_fh)
self._AddFilesThatAreSmallEnough(paths, openfunc)
except KeyboardInterrupt:
logging.info('User interrupted. Aborting.')
raise
except EnvironmentError, e:
if self._IsExceptionClientDeployLoggable(e):
self.logging_context.LogClientDeploy(self.config.runtime,
start_time_usec, False)
logging.error('An error occurred processing file \'%s\': %s. Aborting.',
path, e)
raise
try:
missing_files = self.Begin()
self._UploadMissingFiles(missing_files, openfunc)
if (self.config.derived_file_type and
appinfo.PYTHON_PRECOMPILED in self.config.derived_file_type):
try:
self.Precompile()
except urllib2.HTTPError, e:
ErrorUpdate('Error %d: --- begin server output ---\n'
'%s\n--- end server output ---' %
(e.code, e.read().rstrip('\n')))
if e.code == 422 or self.config.GetEffectiveRuntime() == 'go':
raise
print >>self.error_fh, (
'Precompilation failed. Your app can still serve but may '
'have reduced startup performance. You can retry the update '
'later to retry the precompilation step.')
app_summary = self.Commit()
StatusUpdate('Completed update of %s' % self.Describe(), self.error_fh)
self.logging_context.LogClientDeploy(self.config.runtime, start_time_usec,
True)
except BaseException, e:
try:
self._LogDoUploadException(e)
self.Rollback()
finally:
if self._IsExceptionClientDeployLoggable(e):
self.logging_context.LogClientDeploy(self.config.runtime,
start_time_usec, False)
raise
logging.info('Done!')
return app_summary
def _IsExceptionClientDeployLoggable(self, exception):
"""Determines if an exception qualifes for client deploy log reistration.
Args:
exception: The exception to check.
Returns:
True iff exception qualifies for client deploy logging - basically a
system error rather than a user or error or cancellation.
"""
if isinstance(exception, KeyboardInterrupt):
return False
if (isinstance(exception, urllib2.HTTPError)
and 400 <= exception.code <= 499):
return False
return True
def _AddFilesThatAreSmallEnough(self, paths, openfunc):
"""Calls self.AddFile on files that are small enough.
By small enough, we mean that their size is within
self.resource_limits['max_file_size'] for application files, and
'max_blob_size' otherwise. Files that are too large are logged as errors,
and dropped (not sure why this isn't handled by raising an exception...).
Args:
paths: List of paths, relative to the app's base path.
openfunc: A function that takes a paths element, and returns a file-like
object.
"""
StatusUpdate('Scanning files on local disk.', self.error_fh)
num_files = 0
for path in paths:
file_handle = openfunc(path)
try:
file_length = GetFileLength(file_handle)
file_classification = FileClassification(
self.config, path, self.error_fh)
if file_classification.IsApplicationFile():
max_size = self.resource_limits['max_file_size']
else:
max_size = self.resource_limits['max_blob_size']
if file_length > max_size:
extra_msg = (' Consider --enable_jar_splitting.'
if JavaSupported() and path.endswith('jar')
else '')
logging.error('Ignoring file \'%s\': Too long '
'(max %d bytes, file is %d bytes).%s',
path, max_size, file_length, extra_msg)
else:
logging.info('Processing file \'%s\'', path)
self.AddFile(path, file_handle)
finally:
file_handle.close()
num_files += 1
if num_files % 500 == 0:
StatusUpdate('Scanned %d files.' % num_files, self.error_fh)
def _UploadMissingFiles(self, missing_files, openfunc):
"""DoUpload helper to upload files that need to be uploaded.
Args:
missing_files: List of files that need to be uploaded. Begin returns such
a list. Design note: we don't call Begin here, because we want DoUpload
to call it directly so that Begin/Commit are more clearly paired.
openfunc: Function that takes a path relative to the app's base path, and
returns a file-like object.
"""
if not missing_files:
return
StatusUpdate('Uploading %d files and blobs.' % len(missing_files),
self.error_fh)
num_files = 0
for missing_file in missing_files:
file_handle = openfunc(missing_file)
try:
self.UploadFile(missing_file, file_handle)
finally:
file_handle.close()
num_files += 1
if num_files % 500 == 0:
StatusUpdate('Processed %d out of %s.' %
(num_files, len(missing_files)), self.error_fh)
self.file_batcher.Flush()
self.blob_batcher.Flush()
self.errorblob_batcher.Flush()
StatusUpdate('Uploaded %d files and blobs' % num_files, self.error_fh)
@staticmethod
def _LogDoUploadException(exception):
"""Helper that logs exceptions that occurred during DoUpload.
Args:
exception: An exception that was thrown during DoUpload.
"""
def InstanceOf(tipe):
return isinstance(exception, tipe)
if InstanceOf(KeyboardInterrupt):
logging.info('User interrupted. Aborting.')
elif InstanceOf(urllib2.HTTPError):
logging.info('HTTP Error (%s)', exception)
elif InstanceOf(CannotStartServingError):
logging.error(exception.message)
else:
logging.exception('An unexpected error occurred. Aborting.')
class DoLockAction(object):
"""Locks/unlocks a particular vm app version and shows state."""
def __init__(
self, url, rpcserver, app_id, version, module, instance, file_handle):
self.url = url
self.rpcserver = rpcserver
self.app_id = app_id
self.version = version
self.module = module
self.instance = instance
self.file_handle = file_handle
def GetState(self):
yaml_data = self.rpcserver.Send('/api/vms/debugstate',
app_id=self.app_id,
version_match=self.version,
module=self.module)
state = yaml.safe_load(yaml_data)
done = state['state'] != 'PENDING'
if done:
print >> self.file_handle, state['message']
return (done, state['message'])
def PrintRetryMessage(self, msg, delay):
StatusUpdate('%s. Will try again in %d seconds.' % (msg, delay),
self.file_handle)
def Do(self):
kwargs = {'app_id': self.app_id,
'version_match': self.version,
'module': self.module}
if self.instance:
kwargs['instance'] = self.instance
response = self.rpcserver.Send(self.url, **kwargs)
print >> self.file_handle, response
RetryWithBackoff(self.GetState, self.PrintRetryMessage, 1, 2, 5, 20)
def FileIterator(base, skip_files, runtime, separator=os.path.sep):
"""Walks a directory tree, returning all the files. Follows symlinks.
Args:
base: The base path to search for files under.
skip_files: A regular expression object for files/directories to skip.
runtime: The name of the runtime e.g. "python". If "python27" then .pyc
files with matching .py files will be skipped.
separator: Path separator used by the running system's platform.
Yields:
Paths of files found, relative to base.
"""
dirs = ['']
while dirs:
current_dir = dirs.pop()
entries = set(os.listdir(os.path.join(base, current_dir)))
for entry in sorted(entries):
name = os.path.join(current_dir, entry)
fullname = os.path.join(base, name)
if separator == '\\':
name = name.replace('\\', '/')
if runtime == 'python27' and not skip_files.match(name):
root, extension = os.path.splitext(entry)
if extension == '.pyc' and (root + '.py') in entries:
logging.warning('Ignoring file \'%s\': Cannot upload both '
'<filename>.py and <filename>.pyc', name)
continue
if os.path.isfile(fullname):
if skip_files.match(name):
logging.info('Ignoring file \'%s\': File matches ignore regex.', name)
else:
yield name
elif os.path.isdir(fullname):
if skip_files.match(name):
logging.info(
'Ignoring directory \'%s\': Directory matches ignore regex.',
name)
else:
dirs.append(name)
def GetFileLength(fh):
"""Returns the length of the file represented by fh.
This function is capable of finding the length of any seekable stream,
unlike os.fstat, which only works on file streams.
Args:
fh: The stream to get the length of.
Returns:
The length of the stream.
"""
pos = fh.tell()
fh.seek(0, 2)
length = fh.tell()
fh.seek(pos, 0)
return length
def GetUserAgent(get_version=sdk_update_checker.GetVersionObject,
get_platform=appengine_rpc.GetPlatformToken,
sdk_product=SDK_PRODUCT):
"""Determines the value of the 'User-agent' header to use for HTTP requests.
If the 'APPCFG_SDK_NAME' environment variable is present, that will be
used as the first product token in the user-agent.
Args:
get_version: Used for testing.
get_platform: Used for testing.
sdk_product: Used as part of sdk/version product token.
Returns:
String containing the 'user-agent' header value, which includes the SDK
version, the platform information, and the version of Python;
e.g., 'appcfg_py/1.0.1 Darwin/9.2.0 Python/2.5.2'.
"""
product_tokens = []
sdk_name = os.environ.get('APPCFG_SDK_NAME')
if sdk_name:
product_tokens.append(sdk_name)
else:
version = get_version()
if version is None:
release = 'unknown'
else:
release = version['release']
product_tokens.append('%s/%s' % (sdk_product, release))
product_tokens.append(get_platform())
python_version = '.'.join(str(i) for i in sys.version_info)
product_tokens.append('Python/%s' % python_version)
return ' '.join(product_tokens)
def GetSourceName(get_version=sdk_update_checker.GetVersionObject):
"""Gets the name of this source version."""
version = get_version()
if version is None:
release = 'unknown'
else:
release = version['release']
return 'Google-appcfg-%s' % (release,)
def _ReadUrlContents(url):
"""Reads the contents of a URL into a string.
Args:
url: a string that is the URL to read.
Returns:
A string that is the contents read from the URL.
Raises:
urllib2.URLError: If the URL cannot be read.
"""
req = urllib2.Request(url)
return urllib2.urlopen(req).read()
class AppCfgApp(object):
"""Singleton class to wrap AppCfg tool functionality.
This class is responsible for parsing the command line and executing
the desired action on behalf of the user. Processing files and
communicating with the server is handled by other classes.
Attributes:
actions: A dictionary mapping action names to Action objects.
action: The Action specified on the command line.
parser: An instance of optparse.OptionParser.
options: The command line options parsed by 'parser'.
argv: The original command line as a list.
args: The positional command line args left over after parsing the options.
raw_input_fn: Function used for getting raw user input, like email.
password_input_fn: Function used for getting user password.
error_fh: Unexpected HTTPErrors are printed to this file handle.
Attributes for testing:
parser_class: The class to use for parsing the command line. Because
OptionsParser will exit the program when there is a parse failure, it
is nice to subclass OptionsParser and catch the error before exiting.
read_url_contents: A function to read the contents of a URL.
"""
def __init__(self, argv, parser_class=optparse.OptionParser,
rpc_server_class=None,
raw_input_fn=raw_input,
password_input_fn=getpass.getpass,
out_fh=sys.stdout,
error_fh=sys.stderr,
update_check_class=sdk_update_checker.SDKUpdateChecker,
throttle_class=None,
opener=open,
file_iterator=FileIterator,
time_func=time.time,
wrap_server_error_message=True,
oauth_client_id=APPCFG_CLIENT_ID,
oauth_client_secret=APPCFG_CLIENT_NOTSOSECRET,
oauth_scopes=APPCFG_SCOPES):
"""Initializer. Parses the cmdline and selects the Action to use.
Initializes all of the attributes described in the class docstring.
Prints help or error messages if there is an error parsing the cmdline.
Args:
argv: The list of arguments passed to this program.
parser_class: Options parser to use for this application.
rpc_server_class: RPC server class to use for this application.
raw_input_fn: Function used for getting user email.
password_input_fn: Function used for getting user password.
out_fh: All normal output is printed to this file handle.
error_fh: Unexpected HTTPErrors are printed to this file handle.
update_check_class: sdk_update_checker.SDKUpdateChecker class (can be
replaced for testing).
throttle_class: A class to use instead of ThrottledHttpRpcServer
(only used in the bulkloader).
opener: Function used for opening files.
file_iterator: Callable that takes (basepath, skip_files, file_separator)
and returns a generator that yields all filenames in the file tree
rooted at that path, skipping files that match the skip_files compiled
regular expression.
time_func: A time.time() compatible function, which can be overridden for
testing.
wrap_server_error_message: If true, the error messages from
urllib2.HTTPError exceptions in Run() are wrapped with
'--- begin server output ---' and '--- end server output ---',
otherwise the error message is printed as is.
oauth_client_id: The client ID of the project providing Auth. Defaults to
the SDK default project client ID, the constant APPCFG_CLIENT_ID.
oauth_client_secret: The client secret of the project providing Auth.
Defaults to the SDK default project client secret, the constant
APPCFG_CLIENT_NOTSOSECRET.
oauth_scopes: The scope or set of scopes to be accessed by the OAuth2
token retrieved. Defaults to APPCFG_SCOPES. Can be a string or
iterable of strings, representing the scope(s) to request.
"""
self.parser_class = parser_class
self.argv = argv
self.rpc_server_class = rpc_server_class
self.raw_input_fn = raw_input_fn
self.password_input_fn = password_input_fn
self.out_fh = out_fh
self.error_fh = error_fh
self.update_check_class = update_check_class
self.throttle_class = throttle_class
self.time_func = time_func
self.wrap_server_error_message = wrap_server_error_message
self.oauth_client_id = oauth_client_id
self.oauth_client_secret = oauth_client_secret
self.oauth_scopes = oauth_scopes
self.read_url_contents = _ReadUrlContents
self.parser = self._GetOptionParser()
for action in self.actions.itervalues():
action.options(self, self.parser)
self.options, self.args = self.parser.parse_args(argv[1:])
if len(self.args) < 1:
self._PrintHelpAndExit()
if not self.options.allow_any_runtime:
if self.options.runtime:
if self.options.runtime not in SUPPORTED_RUNTIMES:
_PrintErrorAndExit(self.error_fh,
'"%s" is not a supported runtime\n' %
self.options.runtime)
else:
appinfo.AppInfoExternal.ATTRIBUTES[appinfo.RUNTIME] = (
'|'.join(SUPPORTED_RUNTIMES))
action = self.args.pop(0)
def RaiseParseError(actionname, action):
self.parser, self.options = self._MakeSpecificParser(action)
error_desc = action.error_desc
if not error_desc:
error_desc = "Expected a <directory> argument after '%s'." % (
actionname.split(' ')[0])
self.parser.error(error_desc)
if action == BACKENDS_ACTION:
if len(self.args) < 1:
RaiseParseError(action, self.actions[BACKENDS_ACTION])
backend_action_first = BACKENDS_ACTION + ' ' + self.args[0]
if backend_action_first in self.actions:
self.args.pop(0)
action = backend_action_first
elif len(self.args) > 1:
backend_directory_first = BACKENDS_ACTION + ' ' + self.args[1]
if backend_directory_first in self.actions:
self.args.pop(1)
action = backend_directory_first
if len(self.args) < 1 or action == BACKENDS_ACTION:
RaiseParseError(action, self.actions[action])
if action not in self.actions:
self.parser.error("Unknown action: '%s'\n%s" %
(action, self.parser.get_description()))