blob: 59e9a3f6fad7d98b7f684b59cc361cf88e414ade [file]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Handler for data backup operation.
Generic datastore admin console transfers control to ConfirmBackupHandler
after selection of entities. The ConfirmBackupHandler confirms with user
his choice, enters a backup name and transfers control to
DoBackupHandler. DoBackupHandler starts backup mappers and displays confirmation
page.
This module also contains actual mapper code for backing data over.
"""
import datetime
import logging
import re
import time
import urllib
from google.appengine.datastore import entity_pb
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import blobstore as blobstore_api
from google.appengine.api import capabilities
from google.appengine.api import datastore
from google.appengine.api import files
from google.appengine.api import taskqueue
from google.appengine.api.taskqueue import taskqueue_service_pb
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import deferred
from google.appengine.ext import webapp
from google.appengine.ext.datastore_admin import utils
from google.appengine.ext.mapreduce import input_readers
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce import output_writers
XSRF_ACTION = 'backup'
class ConfirmBackupHandler(webapp.RequestHandler):
"""Handler to deal with requests from the admin console to backup data."""
SUFFIX = 'confirm_backup'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
namespace = handler.request.get('namespace', None)
has_namespace = namespace is not None
kinds = handler.request.get('kind', allow_multiple=True)
sizes_known, size_total, remainder = utils.ParseKindsAndSizes(kinds)
notreadonly_warning = capabilities.CapabilitySet(
'datastore_v3', capabilities=['write']).is_enabled()
blob_warning = bool(blobstore.BlobInfo.all().count(1))
app_id = handler.request.get('app_id')
template_params = {
'form_target': DoBackupHandler.SUFFIX,
'kind_list': kinds,
'remainder': remainder,
'sizes_known': sizes_known,
'size_total': size_total,
'app_id': app_id,
'queues': None,
'cancel_url': handler.request.get('cancel_url'),
'has_namespace': has_namespace,
'namespace': namespace,
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'notreadonly_warning': notreadonly_warning,
'blob_warning': blob_warning,
'backup_name': 'datastore_backup_%s' % time.strftime('%Y_%m_%d')
}
utils.RenderToResponse(handler, 'confirm_backup.html', template_params)
class ConfirmDeleteBackupHandler(webapp.RequestHandler):
"""Handler to confirm admin console requests to delete a backup copy."""
SUFFIX = 'confirm_delete_backup'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
backup_ids = handler.request.get_all('backup_id')
if backup_ids:
backups = db.get(backup_ids)
backup_ids = [backup.key() for backup in backups]
backup_names = [backup.name for backup in backups]
else:
backup_names = []
backup_ids = []
template_params = {
'form_target': DoBackupDeleteHandler.SUFFIX,
'app_id': handler.request.get('app_id'),
'cancel_url': handler.request.get('cancel_url'),
'backup_ids': backup_ids,
'backup_names': backup_names,
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
}
utils.RenderToResponse(handler, 'confirm_delete_backup.html',
template_params)
class ConfirmRestoreFromBackupHandler(webapp.RequestHandler):
"""Handler to confirm admin console requests to restore from backup."""
SUFFIX = 'confirm_restore_from_backup'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
backup_id = handler.request.get('backup_id')
backup = db.get(backup_id) if backup_id else None
notreadonly_warning = capabilities.CapabilitySet(
'datastore_v3', capabilities=['write']).is_enabled()
app_id = handler.request.get('app_id')
template_params = {
'form_target': DoBackupRestoreHandler.SUFFIX,
'app_id': app_id,
'queues': None,
'cancel_url': handler.request.get('cancel_url'),
'backup': backup,
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'notreadonly_warning': notreadonly_warning
}
utils.RenderToResponse(handler, 'confirm_restore_from_backup.html',
template_params)
class BaseDoHandler(webapp.RequestHandler):
"""Base class for all Do*Handlers."""
MAPREDUCE_DETAIL = utils.config.MAPREDUCE_PATH + '/detail?mapreduce_id='
def get(self):
"""Handler for get requests to datastore_admin backup operations.
Status of executed jobs is displayed.
"""
jobs = self.request.get('job', allow_multiple=True)
tasks = self.request.get('task', allow_multiple=True)
error = self.request.get('error', '')
xsrf_error = self.request.get('xsrf_error', '')
template_params = {
'job_list': jobs,
'task_list': tasks,
'mapreduce_detail': self.MAPREDUCE_DETAIL,
'error': error,
'xsrf_error': xsrf_error,
'datastore_admin_home': utils.config.BASE_PATH,
}
utils.RenderToResponse(self, self._get_html_page, template_params)
@property
def _get_html_page(self):
"""Return the name of the HTML page for HTTP/GET requests."""
raise NotImplementedError
@property
def _get_post_html_page(self):
"""Return the name of the HTML page for HTTP/POST requests."""
raise NotImplementedError
def _ProcessPostRequest(self):
"""Process the HTTP/POST request and return the result as parametrs."""
raise NotImplementedError
def _GetBasicMapperParams(self):
return {'namespace': self.request.get('namespace', None)}
def post(self):
"""Handler for post requests to datastore_admin/backup.do.
Redirects to the get handler after processing the request.
"""
token = self.request.get('xsrf_token')
if not utils.ValidateXsrfToken(token, XSRF_ACTION):
parameters = [('xsrf_error', '1')]
else:
try:
parameters = self._ProcessPostRequest()
except Exception, e:
error = self._HandleException(e)
parameters = [('error', error)]
query = urllib.urlencode(parameters)
self.redirect('%s/%s?%s' % (utils.config.BASE_PATH,
self._get_post_html_page,
query))
def _HandleException(self, e):
"""Make exception handling overrideable by tests.
Args:
e: The exception to handle.
Returns:
The exception error string.
"""
return str(type(e)) + ': ' + str(e)
class DoBackupHandler(BaseDoHandler):
"""Handler to deal with requests from the admin console to backup data."""
SUFFIX = 'backup.do'
BACKUP_HANDLER = __name__ + '.BackupEntity.map'
BACKUP_COMPLETE_HANDLER = __name__ + '.BackupCompleteHandler'
INPUT_READER = input_readers.__name__ + '.DatastoreEntityInputReader'
OUTPUT_WRITER = output_writers.__name__ + '.BlobstoreRecordsOutputWriter'
_get_html_page = 'do_backup.html'
_get_post_html_page = SUFFIX
def _ProcessPostRequest(self):
"""Triggers backup mapper jobs and returns their ids."""
backup = self.request.get('backup_name').strip()
if not backup:
return [('error', 'Unspecified Backup name.')]
if BackupInformation.name_exists(backup):
return [('error', 'Backup "%s" already exists.' % backup)]
kinds = self.request.get('kind', allow_multiple=True)
queue = self.request.get('queue')
job_name = 'datastore_backup_%s_%%(kind)s' % re.sub(r'[^\w]', '_', backup)
try:
job_operation = utils.StartOperation('Backup: %s' % backup)
backup_info = BackupInformation(parent=job_operation)
backup_info.name = backup
backup_info.kinds = kinds
backup_info.put(config=datastore_rpc.Configuration(force_writes=True))
mapreduce_params = {
'done_callback_handler': self.BACKUP_COMPLETE_HANDLER,
'backup_info_pk': str(backup_info.key()),
'force_ops_writes': True
}
if len(kinds) <= 10:
return [('job', job) for job in _run_map_jobs(
job_operation.key(), backup_info.key(), kinds, job_name,
self.BACKUP_HANDLER, self.INPUT_READER, self.OUTPUT_WRITER,
self._GetBasicMapperParams(), mapreduce_params, queue)]
else:
retry_options = taskqueue.TaskRetryOptions(task_retry_limit=1)
return [('task', deferred.defer(_run_map_jobs, job_operation.key(),
backup_info.key(), kinds, job_name,
self.BACKUP_HANDLER, self.INPUT_READER,
self.OUTPUT_WRITER,
self._GetBasicMapperParams(),
mapreduce_params,
queue, _queue=queue,
_url=utils.ConfigDefaults.DEFERRED_PATH,
_retry_options=retry_options).name)]
except Exception:
logging.exception('Failed to start a datastore backup job[s] for "%s".',
job_name)
if job_operation:
job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
job_operation.put(config=datastore_rpc.Configuration(force_writes=True))
raise
def _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
backup_handler, input_reader, output_writer, mapper_params,
mapreduce_params, queue):
backup_info = BackupInformation.get(backup_info_key)
if not backup_info:
return []
jobs = utils.RunMapForKinds(
job_operation_key,
kinds,
job_name,
backup_handler,
input_reader,
output_writer,
mapper_params,
mapreduce_params,
queue_name=queue)
backup_info.active_jobs = jobs
backup_info.put(config=datastore_rpc.Configuration(force_writes=True))
return jobs
class DoBackupDeleteHandler(BaseDoHandler):
"""Handler to deal with datastore admin requests to delete backup data."""
SUFFIX = 'backup_delete.do'
def get(self):
self.post()
def post(self):
"""Handler for post requests to datastore_admin/backup_delete.do.
Jobs are executed and user is redirected to the base-path handler.
"""
backup_ids = self.request.get_all('backup_id')
token = self.request.get('xsrf_token')
error = None
if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
try:
for backup_info in db.get(backup_ids):
if backup_info:
blobstore_api.delete([files.blobstore.get_blob_key(filename)
for filename in backup_info.blob_files])
backup_info.delete()
except Exception, e:
logging.exception('Failed to delete datastore backup.')
error = str(e)
if error:
self.redirect(utils.config.BASE_PATH + '?error=%s' % error)
else:
self.redirect(utils.config.BASE_PATH)
class DoBackupRestoreHandler(BaseDoHandler):
"""Handler to deal with requests from the admin console to backup data."""
SUFFIX = 'backup_restore.do'
BACKUP_RESTORE_HANDLER = __name__ + '.RestoreEntity.map'
INPUT_READER = input_readers.__name__ + '.RecordsReader'
_get_html_page = 'do_restore_from_backup.html'
_get_post_html_page = SUFFIX
def _ProcessPostRequest(self):
"""Triggers backup restore mapper jobs and returns their ids."""
backup_id = self.request.get('backup_id')
if not backup_id:
return [('error', 'Unspecified Backup.')]
backup = db.get(db.Key(backup_id))
if not backup:
return [('error', 'Invalid Backup id.')]
queue = self.request.get('queue')
job_name = 'datastore_backup_%s' % re.sub(r'[^\w]', '_', backup.name)
job_operation = None
try:
job_operation = utils.StartOperation('Restore from backup: %s'
% backup.name)
mapper_params = self._GetBasicMapperParams()
mapper_params['files'] = backup.blob_files
mapreduce_params = {
'backup_name': backup.name,
'force_ops_writes': True
}
return [('job', utils.StartMap(
job_operation.key(),
job_name,
self.BACKUP_RESTORE_HANDLER,
self.INPUT_READER,
None,
mapper_params,
mapreduce_params,
queue_name=queue))]
except Exception:
logging.exception('Failed to start a restore from backup job "%s".',
job_name)
if job_operation:
job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
job_operation.put(config=datastore_rpc.Configuration(force_writes=True))
raise
class BackupInformation(db.Model):
"""An entity to keep information on successful backup operations."""
name = db.StringProperty()
kinds = db.StringListProperty()
start_time = db.DateTimeProperty(auto_now_add=True)
active_jobs = db.StringListProperty()
completed_jobs = db.StringListProperty()
complete_time = db.DateTimeProperty()
blob_files = db.StringListProperty()
@classmethod
def kind(cls):
return '_AE_Backup_Information'
@classmethod
def name_exists(cls, backup_name):
query = BackupInformation.all(keys_only=True)
query.filter('name =', backup_name)
return query.get() is not None
@db.transactional
def BackupCompleteHandler(operation, job_id, mapreduce_state):
"""Updates BackupInformation record for a completed mapper job."""
mapreduce_spec = mapreduce_state.mapreduce_spec
backup_info = BackupInformation.get(mapreduce_spec.params['backup_info_pk'])
if backup_info:
filenames = mapreduce_state.writer_state['filenames']
backup_info.blob_files = list(set(backup_info.blob_files + filenames))
if job_id in backup_info.active_jobs:
backup_info.active_jobs.remove(job_id)
backup_info.completed_jobs = list(
set(backup_info.completed_jobs + [job_id]))
backup_info.put(config=datastore_rpc.Configuration(force_writes=True))
if operation.status == utils.DatastoreAdminOperation.STATUS_COMPLETED:
deferred.defer(finalize_backup_info, backup_info.key(),
_url=utils.ConfigDefaults.DEFERRED_PATH)
else:
logging.warn('BackupInfo was not found for %s',
mapreduce_spec.params['backup_info_pk'])
def finalize_backup_info(backup_info_key):
backup_info = BackupInformation.get(backup_info_key)
backup_info.complete_time = datetime.datetime.now()
backup_info.blob_files = drop_empty_files(backup_info.blob_files)
backup_info.put(config=datastore_rpc.Configuration(force_writes=True))
logging.info('Backup %s completed', backup_info.name)
def drop_empty_files(filenames):
"""Deletes empty files and returns filenames minus the deleted ones."""
non_empty_filenames = []
empty_file_keys = []
blobs_info = blobstore.BlobInfo.get([files.blobstore.get_blob_key(fn)
for fn in filenames])
for filename, blob_info in zip(filenames, blobs_info):
if blob_info:
if blob_info.size > 0:
non_empty_filenames.append(filename)
else:
empty_file_keys.append(blob_info.key())
blobstore_api.delete(empty_file_keys)
return non_empty_filenames
class BackupEntity(object):
"""A class which dumps the entity to the writer."""
def map(self, entity):
"""Backup entity map handler.
Args:
entity: An instance of datastore.Entity.
Yields:
A serialized entity_pb.EntityProto as a string
"""
yield entity.ToPb().SerializeToString()
class RestoreEntity(object):
"""A class which restore the entity to datastore."""
def map(self, record):
"""Restore entity map handler.
Args:
record: A serialized entity_pb.EntityProto.
Yields:
A operation.db.Put for the mapped entity
"""
pb = entity_pb.EntityProto(contents=record)
entity = datastore.Entity._FromPb(pb)
yield op.db.Put(entity)
def get_queue_names(app_id=None):
"""Returns a list with all non-special queue names for app_id."""
rpc = apiproxy_stub_map.UserRPC('taskqueue')
request = taskqueue_service_pb.TaskQueueFetchQueuesRequest()
response = taskqueue_service_pb.TaskQueueFetchQueuesResponse()
if app_id:
request.set_app_id(app_id)
request.set_max_rows(100)
queues = ['default']
try:
rpc.make_call('FetchQueues', request, response)
rpc.check_success()
for queue in response.queue_list():
if (queue.mode() == taskqueue_service_pb.TaskQueueMode.PUSH and
not queue.queue_name().startswith('__') and
queue.queue_name() != 'default'):
queues.append(queue.queue_name())
except Exception, e:
logging.exception('Failed to get queue names: %s', str(e))
return queues
def handlers_list(base_path):
return [
(r'%s/%s' % (base_path, ConfirmBackupHandler.SUFFIX),
ConfirmBackupHandler),
(r'%s/%s' % (base_path, DoBackupHandler.SUFFIX), DoBackupHandler),
(r'%s/%s' % (base_path, DoBackupRestoreHandler.SUFFIX),
DoBackupRestoreHandler),
(r'%s/%s' % (base_path, DoBackupDeleteHandler.SUFFIX),
DoBackupDeleteHandler),
]