blob: 1a7d55ebb7be27aad686dffda12730bcd66913f2 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Used render templates for datastore admin."""
import base64
import datetime
import logging
import os
import random
from google.appengine.datastore import entity_pb
from google.appengine.api import datastore
from google.appengine.api import memcache
from google.appengine.api import users
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.datastore_admin import config
from google.appengine.ext.db import stats
from google.appengine.ext.webapp import _template
try:
from google.appengine.ext.mapreduce import context
from google.appengine.ext.mapreduce import control
from google.appengine.ext.mapreduce import model
from google.appengine.ext.mapreduce import operation as mr_operation
from google.appengine.ext.mapreduce import util
except ImportError:
from google.appengine._internal.mapreduce import context
from google.appengine._internal.mapreduce import control
from google.appengine._internal.mapreduce import model
from google.appengine._internal.mapreduce import operation as mr_operation
from google.appengine._internal.mapreduce import util
MEMCACHE_NAMESPACE = '_ah-datastore_admin'
XSRF_VALIDITY_TIME = 600
KINDS_AND_SIZES_VAR = 'kinds_and_sizes'
MAPREDUCE_MIN_SHARDS = 8
MAPREDUCE_DEFAULT_SHARDS = 32
MAPREDUCE_MAX_SHARDS = 256
RESERVE_KEY_POOL_MAX_SIZE = 1000
DATASTORE_ADMIN_OPERATION_KIND = '_AE_DatastoreAdmin_Operation'
BACKUP_INFORMATION_KIND = '_AE_Backup_Information'
BACKUP_INFORMATION_FILES_KIND = '_AE_Backup_Information_Kind_Files'
BACKUP_INFORMATION_KIND_TYPE_INFO = '_AE_Backup_Information_Kind_Type_Info'
DATASTORE_ADMIN_KINDS = (DATASTORE_ADMIN_OPERATION_KIND,
BACKUP_INFORMATION_KIND,
BACKUP_INFORMATION_FILES_KIND,
BACKUP_INFORMATION_KIND_TYPE_INFO)
def IsKindNameVisible(kind_name):
return not (kind_name.startswith('__') or
kind_name in DATASTORE_ADMIN_KINDS or
kind_name in model._MAP_REDUCE_KINDS)
def RenderToResponse(handler, template_file, template_params):
"""Render the given template_file using template_vals and write to response.
Args:
handler: the handler whose response we should render to
template_file: the file name only of the template file we are using
template_params: the parameters used to render the given template
"""
template_params = _GetDefaultParams(template_params)
handler.response.headers['X-FRAME-OPTIONS'] = ('ALLOW-FROM %s' %
config.ADMIN_CONSOLE_URL)
template_params['admin_console_url'] = config.ADMIN_CONSOLE_URL
rendered = _template.render(_GetTemplatePath(template_file), template_params)
handler.response.out.write(rendered)
def _GetTemplatePath(template_file):
"""Return the expected path for the template to render.
Args:
template_file: simple file name of template to render.
Returns:
path of template to render.
"""
return os.path.join(
os.path.dirname(__file__), 'templates', template_file)
def _GetDefaultParams(template_params):
"""Update template_params to always contain necessary paths and never None."""
if not template_params:
template_params = {}
template_params.update({
'base_path': config.BASE_PATH,
'mapreduce_path': config.MAPREDUCE_PATH,
})
return template_params
def CreateXsrfToken(action):
"""Generate a token to be passed with a form for XSRF protection.
Args:
action: action to restrict token to
Returns:
suitably random token which is only valid for ten minutes and, if the user
is authenticated, is only valid for the user that generated it.
"""
user_str = _MakeUserStr()
token = base64.b64encode(
''.join(chr(int(random.random()*255)) for _ in range(0, 64)))
memcache.set(token,
(user_str, action),
time=XSRF_VALIDITY_TIME,
namespace=MEMCACHE_NAMESPACE)
return token
def ValidateXsrfToken(token, action):
"""Validate a given XSRF token by retrieving it from memcache.
If the token has not been evicted from memcache (past ten minutes) and the
user strings are equal, then this is a valid token.
Args:
token: token to validate from memcache.
action: action that token should correspond to
Returns:
True if the token exists in memcache and the user strings are equal,
False otherwise.
"""
user_str = _MakeUserStr()
token_obj = memcache.get(token, namespace=MEMCACHE_NAMESPACE)
if not token_obj:
return False
token_str, token_action = token_obj
if user_str != token_str or action != token_action:
return False
return True
def CacheStats(formatted_results):
"""Cache last retrieved kind size values in memcache.
Args:
formatted_results: list of dictionaries of the form returnned by
main._PresentableKindStats.
"""
kinds_and_sizes = dict((kind['kind_name'], kind['total_bytes'])
for kind in formatted_results)
memcache.set(KINDS_AND_SIZES_VAR,
kinds_and_sizes,
namespace=MEMCACHE_NAMESPACE)
def RetrieveCachedStats():
"""Retrieve cached kind sizes from last datastore stats call.
Returns:
Dictionary mapping kind names to total bytes.
"""
return memcache.get(KINDS_AND_SIZES_VAR, namespace=MEMCACHE_NAMESPACE)
def _MakeUserStr():
"""Make a user string to use to represent the user. 'noauth' by default."""
user = users.get_current_user()
return user.nickname() if user else 'noauth'
def GetPrettyBytes(bytes_num, significant_digits=0):
"""Get a pretty print view of the given number of bytes.
This will give a string like 'X MBytes'.
Args:
bytes_num: the original number of bytes to pretty print.
significant_digits: number of digits to display after the decimal point.
Returns:
A string that has the pretty print version of the given bytes.
If bytes_num is to big the string 'Alot' will be returned.
"""
byte_prefixes = ['', 'K', 'M', 'G', 'T', 'P', 'E']
for i in range(0, 7):
exp = i * 10
if bytes_num < 1<<(exp + 10):
if i == 0:
formatted_bytes = str(bytes_num)
else:
formatted_bytes = '%.*f' % (significant_digits,
(bytes_num * 1.0 / (1<<exp)))
if formatted_bytes != '1':
plural = 's'
else:
plural = ''
return '%s %sByte%s' % (formatted_bytes, byte_prefixes[i], plural)
logging.error('Number too high to convert: %d', bytes_num)
return 'Alot'
def FormatThousands(value):
"""Format a numerical value, inserting commas as thousands separators.
Args:
value: An integer, float, or string representation thereof.
If the argument is a float, it is converted to a string using '%.2f'.
Returns:
A string with groups of 3 digits before the decimal point (if any)
separated by commas.
NOTE: We don't deal with whitespace, and we don't insert
commas into long strings of digits after the decimal point.
"""
if isinstance(value, float):
value = '%.2f' % value
else:
value = str(value)
if '.' in value:
head, tail = value.split('.', 1)
tail = '.' + tail
elif 'e' in value:
head, tail = value.split('e', 1)
tail = 'e' + tail
else:
head = value
tail = ''
sign = ''
if head.startswith('-'):
sign = '-'
head = head[1:]
while len(head) > 3:
tail = ',' + head[-3:] + tail
head = head[:-3]
return sign + head + tail
def TruncDelta(delta):
"""Strips microseconds from a timedelta."""
return datetime.timedelta(days=delta.days, seconds=delta.seconds)
def GetPrintableStrs(namespace, kinds):
"""Returns tuples describing affected kinds and namespace.
Args:
namespace: namespace being targeted.
kinds: list of kinds being targeted.
Returns:
(namespace_str, kind_str) tuple used for display to user.
"""
namespace_str = namespace or ''
if kinds:
kind_str = 'all %s entities' % ', '.join(kinds)
else:
kind_str = ''
return (namespace_str, kind_str)
def ParseKindsAndSizes(kinds):
"""Parses kind|size list and returns template parameters.
Args:
kinds: list of kinds to process.
Returns:
sizes_known: whether or not all kind objects have known sizes.
size_total: total size of objects with known sizes.
len(kinds) - 2: for template rendering of greater than 3 kinds.
"""
sizes_known = True
size_total = 0
kinds_and_sizes = RetrieveCachedStats()
if kinds_and_sizes:
for kind in kinds:
if kind in kinds_and_sizes:
size_total += kinds_and_sizes[kind]
else:
sizes_known = False
else:
sizes_known = False
if size_total:
size_total = GetPrettyBytes(size_total)
return sizes_known, size_total, len(kinds) - 2
def _CreateDatastoreConfig():
"""Create datastore config for use during datastore admin operations."""
return datastore_rpc.Configuration(force_writes=True)
def GenerateHomeUrl(request):
"""Generates a link to the Datastore Admin main page.
Primarily intended to be used for cancel buttons or links on error pages. To
avoid any XSS security vulnerabilities the URL should not use any
user-defined strings (unless proper precautions are taken).
Args:
request: the webapp.Request object (to determine if certain query
parameters need to be used).
Returns:
domain-relative URL for the main Datastore Admin page.
"""
datastore_admin_home = config.BASE_PATH
if request and request.get('run_as_a_service'):
datastore_admin_home += '?run_as_a_service=True'
return datastore_admin_home
class MapreduceDoneHandler(webapp.RequestHandler):
"""Handler to delete data associated with successful MapReduce jobs."""
SUFFIX = 'mapreduce_done'
def post(self):
"""Mapreduce done callback to delete job data if it was successful."""
if 'Mapreduce-Id' in self.request.headers:
mapreduce_id = self.request.headers['Mapreduce-Id']
mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
mapreduce_params = mapreduce_state.mapreduce_spec.params
db_config = _CreateDatastoreConfig()
if mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS:
operation_key = mapreduce_params.get(
DatastoreAdminOperation.PARAM_DATASTORE_ADMIN_OPERATION)
if operation_key is None:
logging.error('Done callback for job %s without operation key.',
mapreduce_id)
else:
def tx():
operation = DatastoreAdminOperation.get(operation_key)
if mapreduce_id in operation.active_job_ids:
operation.active_jobs -= 1
operation.completed_jobs += 1
operation.active_job_ids.remove(mapreduce_id)
if not operation.active_jobs:
if operation.status == DatastoreAdminOperation.STATUS_ACTIVE:
operation.status = DatastoreAdminOperation.STATUS_COMPLETED
db.delete(DatastoreAdminOperationJob.all().ancestor(operation),
config=db_config)
operation.put(config=db_config)
if 'done_callback_handler' in mapreduce_params:
done_callback_handler = util.for_name(
mapreduce_params['done_callback_handler'])
if done_callback_handler:
done_callback_handler(operation, mapreduce_id, mapreduce_state)
else:
logging.error('done_callbackup_handler %s was not found',
mapreduce_params['done_callback_handler'])
db.run_in_transaction(tx)
if config.CLEANUP_MAPREDUCE_STATE:
keys = []
keys = model.ShardState.calculate_keys_by_mapreduce_state(
mapreduce_state)
keys.append(model.MapreduceControl.get_key_by_job_id(mapreduce_id))
db.delete(keys, config=db_config)
db.delete(mapreduce_state, config=db_config)
logging.info('State for successful job %s was deleted.', mapreduce_id)
else:
logging.info('Job %s was not successful so no state was deleted.',
mapreduce_id)
else:
logging.error('Done callback called without Mapreduce Id.')
class Error(Exception):
"""Base DatastoreAdmin error type."""
class DatastoreAdminOperation(db.Model):
"""An entity to keep progress and status of datastore admin operation."""
STATUS_CREATED = 'Created'
STATUS_ACTIVE = 'Active'
STATUS_COMPLETED = 'Completed'
STATUS_FAILED = 'Failed'
STATUS_ABORTED = 'Aborted'
PARAM_DATASTORE_ADMIN_OPERATION = 'datastore_admin_operation'
DEFAULT_LAST_UPDATED_VALUE = datetime.datetime(1970, 1, 1)
description = db.TextProperty()
status = db.StringProperty(default=STATUS_CREATED)
active_jobs = db.IntegerProperty(default=0)
active_job_ids = db.StringListProperty()
completed_jobs = db.IntegerProperty(default=0)
last_updated = db.DateTimeProperty(default=DEFAULT_LAST_UPDATED_VALUE,
auto_now=True)
status_info = db.StringProperty(default='', indexed=False)
service_job_id = db.StringProperty()
@classmethod
def kind(cls):
return DATASTORE_ADMIN_OPERATION_KIND
class DatastoreAdminOperationJob(db.Model):
"""An entity to keep track of started jobs to ensure idempotency.
This entity can be used during spawning additional jobs. It is
always stored as a child entity of DatastoreAdminOperation.
Entity key name is job unique id.
"""
pass
def StartOperation(description):
"""Start datastore admin operation.
Args:
description: operation description to be displayed to user.
Returns:
an instance of DatastoreAdminOperation.
"""
operation = DatastoreAdminOperation(
description=description,
id=db.allocate_ids(
db.Key.from_path(DatastoreAdminOperation.kind(), 1), 1)[0])
operation.put(config=_CreateDatastoreConfig())
return operation
@db.non_transactional(allow_existing=False)
def StartMap(operation_key,
job_name,
handler_spec,
reader_spec,
writer_spec,
mapper_params,
mapreduce_params=None,
queue_name=None,
shard_count=MAPREDUCE_DEFAULT_SHARDS):
"""Start map as part of datastore admin operation.
Will increase number of active jobs inside the operation and start new map.
Args:
operation_key: Key of the DatastoreAdminOperation for current operation.
job_name: Map job name.
handler_spec: Map handler specification.
reader_spec: Input reader specification.
writer_spec: Output writer specification.
mapper_params: Custom mapper parameters.
mapreduce_params: Custom mapreduce parameters.
queue_name: the name of the queue that will be used by the M/R.
shard_count: the number of shards the M/R will try to use.
Returns:
resulting map job id as string.
"""
if not mapreduce_params:
mapreduce_params = {}
mapreduce_params[DatastoreAdminOperation.PARAM_DATASTORE_ADMIN_OPERATION] = (
str(operation_key))
mapreduce_params['done_callback'] = '%s/%s' % (config.BASE_PATH,
MapreduceDoneHandler.SUFFIX)
if queue_name is not None:
mapreduce_params['done_callback_queue'] = queue_name
mapreduce_params['force_writes'] = 'True'
def tx(is_xg_transaction):
"""Start MapReduce job and update datastore admin state.
Args:
is_xg_transaction: True if we are running inside a xg-enabled
transaction, else False if we are running inside a non-xg-enabled
transaction (which means the datastore admin state is updated in one
transaction and the MapReduce job in an indepedent transaction).
Returns:
result MapReduce job id as a string.
"""
job_id = control.start_map(
job_name, handler_spec, reader_spec,
mapper_params,
output_writer_spec=writer_spec,
mapreduce_parameters=mapreduce_params,
base_path=config.MAPREDUCE_PATH,
shard_count=shard_count,
in_xg_transaction=is_xg_transaction,
queue_name=queue_name)
operation = DatastoreAdminOperation.get(operation_key)
operation.status = DatastoreAdminOperation.STATUS_ACTIVE
operation.active_jobs += 1
operation.active_job_ids = list(set(operation.active_job_ids + [job_id]))
operation.put(config=_CreateDatastoreConfig())
return job_id
datastore_type = datastore_rpc._GetDatastoreType()
if datastore_type != datastore_rpc.BaseConnection.MASTER_SLAVE_DATASTORE:
return db.run_in_transaction_options(
db.create_transaction_options(xg=True), tx, True)
else:
return db.run_in_transaction(tx, False)
def RunMapForKinds(operation_key,
kinds,
job_name_template,
handler_spec,
reader_spec,
writer_spec,
mapper_params,
mapreduce_params=None,
queue_name=None,
max_shard_count=None):
"""Run mapper job for all entities in specified kinds.
Args:
operation_key: The key of the DatastoreAdminOperation to record all jobs.
kinds: list of entity kinds as strings.
job_name_template: template for naming individual mapper jobs. Can
reference %(kind)s and %(namespace)s formatting variables.
handler_spec: mapper handler specification.
reader_spec: reader specification.
writer_spec: writer specification.
mapper_params: custom parameters to pass to mapper.
mapreduce_params: dictionary parameters relevant to the whole job.
queue_name: the name of the queue that will be used by the M/R.
max_shard_count: maximum value for shards count.
Returns:
Ids of all started mapper jobs as list of strings.
"""
jobs = []
try:
for kind in kinds:
mapper_params['entity_kind'] = kind
job_name = job_name_template % {'kind': kind, 'namespace':
mapper_params.get('namespace', '')}
shard_count = GetShardCount(kind, max_shard_count)
jobs.append(StartMap(operation_key, job_name, handler_spec, reader_spec,
writer_spec, mapper_params, mapreduce_params,
queue_name=queue_name, shard_count=shard_count))
return jobs
except BaseException, ex:
AbortAdminOperation(operation_key,
_status=DatastoreAdminOperation.STATUS_FAILED,
_status_info='%s: %s' % (ex.__class__.__name__, ex))
raise
def GetShardCount(kind, max_shard_count=None):
stat = stats.KindStat.all().filter('kind_name =', kind).get()
if stat:
shard_count = min(max(MAPREDUCE_MIN_SHARDS,
stat.bytes // (32 * 1024 * 1024)),
MAPREDUCE_MAX_SHARDS)
if max_shard_count and max_shard_count < shard_count:
shard_count = max_shard_count
return shard_count
return MAPREDUCE_DEFAULT_SHARDS
def AbortAdminOperation(operation_key,
_status=DatastoreAdminOperation.STATUS_ABORTED,
_status_info=''):
"""Aborts active jobs."""
operation = DatastoreAdminOperation.get(operation_key)
operation.status = _status
operation.status_info = _status_info
operation.put(config=_CreateDatastoreConfig())
for job in operation.active_job_ids:
logging.info('Aborting Job %s', job)
model.MapreduceControl.abort(job, config=_CreateDatastoreConfig())
def get_kind_from_entity_pb(entity):
element_list = entity.key().path().element_list()
return element_list[-1].type() if element_list else None
def FixKeys(entity_proto, app_id):
"""Go over keys in the given entity and update the application id.
Args:
entity_proto: An EntityProto to be fixed up. All identifiable keys in the
proto will have the 'app' field reset to match app_id.
app_id: The desired application id, typically os.getenv('APPLICATION_ID').
"""
def FixKey(mutable_key):
mutable_key.set_app(app_id)
def FixPropertyList(property_list):
for prop in property_list:
prop_value = prop.mutable_value()
if prop_value.has_referencevalue():
FixKey(prop_value.mutable_referencevalue())
elif prop.meaning() == entity_pb.Property.ENTITY_PROTO:
embedded_entity_proto = entity_pb.EntityProto()
try:
embedded_entity_proto.ParsePartialFromString(prop_value.stringvalue())
except Exception:
logging.exception('Failed to fix-keys for property %s of %s',
prop.name(),
entity_proto.key())
else:
FixKeys(embedded_entity_proto, app_id)
prop_value.set_stringvalue(
embedded_entity_proto.SerializePartialToString())
if entity_proto.has_key() and entity_proto.key().path().element_size():
FixKey(entity_proto.mutable_key())
FixPropertyList(entity_proto.property_list())
FixPropertyList(entity_proto.raw_property_list())
class ReserveKeyPool(object):
"""Mapper pool which buffers keys with ids to reserve.
Runs v4 AllocateIds rpc(s) when flushed.
"""
def __init__(self):
self.keys = []
def reserve_key(self, key):
for id_or_name in key.to_path()[1::2]:
if isinstance(id_or_name, (int, long)):
self.keys.append(key)
if len(self.keys) >= RESERVE_KEY_POOL_MAX_SIZE:
self.flush()
return
def flush(self):
datastore._GetConnection()._reserve_keys(self.keys)
self.keys = []
class ReserveKey(mr_operation.Operation):
"""Mapper operation to reserve key ids."""
def __init__(self, key):
self.key = key
self.app_id = key.app()
self.pool_id = 'reserve_key_%s_pool' % self.app_id
def __call__(self, ctx):
pool = ctx.get_pool(self.pool_id)
if not pool:
pool = ReserveKeyPool()
ctx.register_pool(self.pool_id, pool)
pool.reserve_key(self.key)
class PutPool(context.Pool):
"""A trimmed copy of the MutationPool class.
Properties:
puts: a list of entities to put to datastore.
max_entity_count: maximum number of entities before flushing it to db.
"""
POOL_NAME = 'put_pool'
def __init__(self, max_entity_count=context.MAX_ENTITY_COUNT):
"""Constructor.
Args:
max_entity_count: maximum number of entities before flushing it to db.
"""
self.max_entity_count = max_entity_count
self.puts = []
def Put(self, entity):
"""Registers entity to put to datastore.
Args:
entity: The EntityProto for the entity to be put.
"""
if len(self.puts) >= self.max_entity_count:
self.flush()
self.puts.append(entity)
def flush(self):
"""Flush all puts to datastore."""
if self.puts:
datastore_rpc.Connection(
config=datastore_rpc.Configuration(deadline=60)).put(self.puts)
self.puts = []
class Put(mr_operation.Operation):
"""Mapper operation to batch puts."""
def __init__(self, entity):
"""Constructor.
Args:
entity: The EntityProto of the entity to put.
"""
self.entity = entity
def __call__(self, ctx):
pool = ctx.get_pool(PutPool.POOL_NAME)
if not pool:
pool = PutPool(
max_entity_count=(context.MAX_ENTITY_COUNT/(2**ctx.task_retry_count)))
ctx.register_pool(PutPool.POOL_NAME, pool)
pool.Put(self.entity)