blob: 849d1de19ae235cb9e09654e657030af1524fa6e [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Handler for data backup operation.
Generic datastore admin console transfers control to ConfirmBackupHandler
after selection of entities. The ConfirmBackupHandler confirms with user
his choice, enters a backup name and transfers control to
DoBackupHandler. DoBackupHandler starts backup mappers and displays confirmation
page.
This module also contains actual mapper code for backing data over.
"""
from __future__ import with_statement
import cStringIO
import datetime
import itertools
import logging
import os
import random
import re
import time
import urllib
import xml.dom.minidom
from google.appengine.datastore import entity_pb
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import app_identity
from google.appengine.api import blobstore as blobstore_api
from google.appengine.api import capabilities
from google.appengine.api import datastore
from google.appengine.api import datastore_types
from google.appengine.api import files
from google.appengine.api import taskqueue
from google.appengine.api import urlfetch
from google.appengine.api.files import records
from google.appengine.api.taskqueue import taskqueue_service_pb
from google.appengine.datastore import datastore_query
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import deferred
from google.appengine.ext import webapp
from google.appengine.ext.datastore_admin import backup_pb2
from google.appengine.ext.datastore_admin import config
from google.appengine.ext.datastore_admin import utils
from google.appengine.runtime import apiproxy_errors
try:
from google.appengine.ext.mapreduce import context
from google.appengine.ext.mapreduce import datastore_range_iterators as db_iters
from google.appengine.ext.mapreduce import input_readers
from google.appengine.ext.mapreduce import json_util
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce import output_writers
except ImportError:
from google.appengine._internal.mapreduce import context
from google.appengine._internal.mapreduce import datastore_range_iterators as db_iters
from google.appengine._internal.mapreduce import input_readers
from google.appengine._internal.mapreduce import json_util
from google.appengine._internal.mapreduce import operation as op
from google.appengine._internal.mapreduce import output_writers
try:
from google.appengine.ext.datastore_admin import services_client
except ImportError:
pass
XSRF_ACTION = 'backup'
BUCKET_PATTERN = (r'^([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*)'
r'(\.([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*))*$')
MAX_BUCKET_LEN = 222
MIN_BUCKET_LEN = 3
MAX_BUCKET_SEGMENT_LEN = 63
NUM_KINDS_DEFERRED_THRESHOLD = 10
MAX_BLOBS_PER_DELETE = 500
TEST_WRITE_FILENAME_PREFIX = 'datastore_backup_write_test'
MAX_KEYS_LIST_SIZE = 100
MAX_TEST_FILENAME_TRIES = 10
MEANING_TO_PRIMITIVE_TYPE = {
entity_pb.Property.GD_WHEN: backup_pb2.EntitySchema.DATE_TIME,
entity_pb.Property.GD_RATING: backup_pb2.EntitySchema.RATING,
entity_pb.Property.ATOM_LINK: backup_pb2.EntitySchema.LINK,
entity_pb.Property.ATOM_CATEGORY: backup_pb2.EntitySchema.CATEGORY,
entity_pb.Property.GD_PHONENUMBER: backup_pb2.EntitySchema.PHONE_NUMBER,
entity_pb.Property.GD_POSTALADDRESS: backup_pb2.EntitySchema.POSTAL_ADDRESS,
entity_pb.Property.GD_EMAIL: backup_pb2.EntitySchema.EMAIL,
entity_pb.Property.GD_IM: backup_pb2.EntitySchema.IM_HANDLE,
entity_pb.Property.BLOBKEY: backup_pb2.EntitySchema.BLOB_KEY,
entity_pb.Property.TEXT: backup_pb2.EntitySchema.TEXT,
entity_pb.Property.BLOB: backup_pb2.EntitySchema.BLOB,
entity_pb.Property.BYTESTRING: backup_pb2.EntitySchema.SHORT_BLOB
}
class ConfirmBackupHandler(webapp.RequestHandler):
"""Handler to deal with requests from the admin console to backup data."""
SUFFIX = 'confirm_backup'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
kinds = handler.request.get_all('kind')
sizes_known, size_total, remainder = utils.ParseKindsAndSizes(kinds)
notreadonly_warning = capabilities.CapabilitySet(
'datastore_v3', capabilities=['write']).is_enabled()
blob_warning = bool(blobstore.BlobInfo.all().count(1))
template_params = {
'run_as_a_service': handler.request.get('run_as_a_service'),
'form_target': DoBackupHandler.SUFFIX,
'kind_list': kinds,
'remainder': remainder,
'sizes_known': sizes_known,
'size_total': size_total,
'queues': None,
'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
'namespaces': get_namespaces(handler.request.get('namespace', None)),
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'notreadonly_warning': notreadonly_warning,
'blob_warning': blob_warning,
'backup_name': 'datastore_backup_%s' % time.strftime('%Y_%m_%d')
}
utils.RenderToResponse(handler, 'confirm_backup.html', template_params)
def get_namespaces(selected_namespace):
namespaces = [('--All--', '*', selected_namespace is None)]
for ns in datastore.Query('__namespace__', keys_only=True).Run():
ns_name = ns.name() or ''
namespaces.append((ns_name or '--Default--',
ns_name,
ns_name == selected_namespace))
return namespaces
class ConfirmDeleteBackupHandler(webapp.RequestHandler):
"""Handler to confirm admin console requests to delete a backup copy."""
SUFFIX = 'confirm_delete_backup'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
requested_backup_ids = handler.request.get_all('backup_id')
backups = []
gs_warning = False
if requested_backup_ids:
for backup in db.get(requested_backup_ids):
if backup:
backups.append(backup)
gs_warning |= backup.filesystem == files.GS_FILESYSTEM
template_params = {
'form_target': DoBackupDeleteHandler.SUFFIX,
'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
'backups': backups,
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'gs_warning': gs_warning,
'run_as_a_service': handler.request.get('run_as_a_service'),
}
utils.RenderToResponse(handler, 'confirm_delete_backup.html',
template_params)
class ConfirmAbortBackupHandler(webapp.RequestHandler):
"""Handler to confirm admin console requests to abort a backup copy."""
SUFFIX = 'confirm_abort_backup'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
requested_backup_ids = handler.request.get_all('backup_id')
backups = []
if requested_backup_ids:
for backup in db.get(requested_backup_ids):
if backup:
backups.append(backup)
template_params = {
'form_target': DoBackupAbortHandler.SUFFIX,
'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
'backups': backups,
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'run_as_a_service': handler.request.get('run_as_a_service'),
}
utils.RenderToResponse(handler, 'confirm_abort_backup.html',
template_params)
class ConfirmRestoreFromBackupHandler(webapp.RequestHandler):
"""Handler to confirm admin console requests to restore from backup."""
SUFFIX = 'confirm_restore_from_backup'
@classmethod
def Render(cls, handler, default_backup_id=None,
default_delete_backup_after_restore=False):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
default_backup_id: default value for handler.request
default_delete_backup_after_restore: default value for handler.request
"""
backup_id = handler.request.get('backup_id', default_backup_id)
backup = db.get(backup_id) if backup_id else None
notreadonly_warning = capabilities.CapabilitySet(
'datastore_v3', capabilities=['write']).is_enabled()
original_app_warning = backup.original_app
if os.getenv('APPLICATION_ID') == original_app_warning:
original_app_warning = None
template_params = {
'form_target': DoBackupRestoreHandler.SUFFIX,
'queues': None,
'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
'backup': backup,
'delete_backup_after_restore': handler.request.get(
'delete_backup_after_restore', default_delete_backup_after_restore),
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'notreadonly_warning': notreadonly_warning,
'original_app_warning': original_app_warning,
'run_as_a_service': handler.request.get('run_as_a_service'),
}
utils.RenderToResponse(handler, 'confirm_restore_from_backup.html',
template_params)
class ConfirmBackupImportHandler(webapp.RequestHandler):
"""Handler to import backup information."""
SUFFIX = 'backup_information'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
gs_handle = handler.request.get('gs_handle')
error = None if gs_handle else 'Google Cloud Storage path is missing'
other_backup_info_files = []
selected_backup_info_file = None
backup_info_specified = False
if not error:
try:
gs_handle = gs_handle.rstrip()
bucket_name, prefix = parse_gs_handle(gs_handle)
validate_gs_bucket_name(bucket_name)
if not is_accessible_bucket_name(bucket_name):
raise BackupValidationError(
'Bucket "%s" is not accessible' % bucket_name)
if prefix.endswith('.backup_info'):
prefix = prefix[0:prefix.rfind('/')]
backup_info_specified = True
elif prefix and not prefix.endswith('/'):
prefix += '/'
for backup_info_file in list_bucket_files(bucket_name, prefix):
backup_info_path = '/gs/%s/%s' % (bucket_name, backup_info_file)
if backup_info_specified and backup_info_path == gs_handle:
selected_backup_info_file = backup_info_path
elif (backup_info_file.endswith('.backup_info')
and backup_info_file.count('.') == 1):
other_backup_info_files.append(backup_info_path)
except Exception, ex:
error = 'Failed to read bucket: %s' % ex.message
logging.exception(ex.message)
template_params = {
'error': error,
'form_target': DoBackupImportHandler.SUFFIX,
'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
'selected_backup_info_file': selected_backup_info_file,
'other_backup_info_files': other_backup_info_files,
'backup_info_specified': backup_info_specified,
'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
'run_as_a_service': handler.request.get('run_as_a_service'),
}
utils.RenderToResponse(handler, 'confirm_backup_import.html',
template_params)
class BackupInformationHandler(webapp.RequestHandler):
"""Handler to display backup information."""
SUFFIX = 'backup_information'
@classmethod
def Render(cls, handler):
"""Rendering method that can be called by main.py.
Args:
handler: the webapp.RequestHandler invoking the method
"""
backup_ids = handler.request.get_all('backup_id')
template_params = {
'backups': db.get(backup_ids),
'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
'run_as_a_service': handler.request.get('run_as_a_service'),
}
utils.RenderToResponse(handler, 'backup_information.html', template_params)
class BaseDoHandler(webapp.RequestHandler):
"""Base class for all Do*Handlers."""
MAPREDUCE_DETAIL = config.MAPREDUCE_PATH + '/detail?mapreduce_id='
def get(self):
"""Handler for get requests to datastore_admin backup operations.
Status of executed jobs is displayed.
"""
jobs = self.request.get_all('job')
remote_job = self.request.get('remote_job')
tasks = self.request.get_all('task')
error = self.request.get('error', '')
xsrf_error = self.request.get('xsrf_error', '')
template_params = {
'job_list': jobs,
'remote_job': remote_job,
'task_list': tasks,
'mapreduce_detail': self.MAPREDUCE_DETAIL,
'error': error,
'xsrf_error': xsrf_error,
'datastore_admin_home': utils.GenerateHomeUrl(self.request),
}
utils.RenderToResponse(self, self._get_html_page, template_params)
@property
def _get_html_page(self):
"""Return the name of the HTML page for HTTP/GET requests."""
raise NotImplementedError
@property
def _get_post_html_page(self):
"""Return the name of the HTML page for HTTP/POST requests."""
raise NotImplementedError
def _ProcessPostRequest(self):
"""Process the HTTP/POST request and return the result as parametrs."""
raise NotImplementedError
def _GetBasicMapperParams(self):
namespace = self.request.get('namespace', None)
if namespace == '*':
namespace = None
return {'namespace': namespace}
def SendRedirect(self, path=None, params=()):
"""Send a redirect response."""
run_as_a_service = self.request.get('run_as_a_service')
if run_as_a_service:
params = list(params)
params.append(('run_as_a_service', True))
dest = config.BASE_PATH
if path:
dest = '%s/%s' % (dest, path)
if params:
dest = '%s?%s' % (dest, urllib.urlencode(params))
self.redirect(dest)
def post(self):
"""Handler for post requests to datastore_admin/backup.do.
Redirects to the get handler after processing the request.
"""
token = self.request.get('xsrf_token')
if not utils.ValidateXsrfToken(token, XSRF_ACTION):
parameters = [('xsrf_error', '1')]
else:
try:
parameters = self._ProcessPostRequest()
except Exception, e:
error = self._HandleException(e)
parameters = [('error', error)]
self.SendRedirect(self._get_post_html_page, parameters)
def _HandleException(self, e):
"""Make exception handling overridable by tests.
Args:
e: The exception to handle.
Returns:
The exception error string.
"""
logging.exception(e.message)
return '%s: %s' % (type(e), e.message)
class BackupValidationError(utils.Error):
"""Raised upon backup request validation."""
def _perform_backup(run_as_a_service, kinds, selected_namespace,
filesystem, gs_bucket_name, backup,
queue, mapper_params, max_jobs):
"""Triggers backup mapper jobs.
Args:
run_as_a_service: True if backup should be done via admin-jobs
kinds: a sequence of kind names
selected_namespace: The selected namespace or None for all
filesystem: files.BLOBSTORE_FILESYSTEM or files.GS_FILESYSTEM
or None to default to blobstore
gs_bucket_name: the GS file system bucket in which to store the backup
when using the GS file system, and otherwise ignored
backup: the backup name
queue: the task queue for the backup task
mapper_params: the mapper parameters
max_jobs: if backup needs more jobs than this, defer them
Returns:
The job or task ids.
Raises:
BackupValidationError: On validation error.
Exception: On other error.
"""
BACKUP_COMPLETE_HANDLER = __name__ + '.BackupCompleteHandler'
BACKUP_HANDLER = __name__ + '.BackupEntity.map'
INPUT_READER = __name__ + '.DatastoreEntityProtoInputReader'
OUTPUT_WRITER = output_writers.__name__ + '.FileRecordsOutputWriter'
if run_as_a_service:
if not gs_bucket_name:
raise BackupValidationError('Bucket name missing.')
gs_bucket_name = validate_and_canonicalize_gs_bucket(gs_bucket_name)
datastore_admin_service = services_client.DatastoreAdminClient()
description = 'Remote backup job: %s' % backup
remote_job_id = datastore_admin_service.create_backup(
description, backup, gs_bucket_name, selected_namespace, kinds)
return [('remote_job', remote_job_id)]
queue = queue or os.environ.get('HTTP_X_APPENGINE_QUEUENAME', 'default')
if queue[0] == '_':
queue = 'default'
if not filesystem:
filesystem = files.BLOBSTORE_FILESYSTEM
if filesystem == files.GS_FILESYSTEM:
if not gs_bucket_name:
raise BackupValidationError('Bucket name missing.')
gs_bucket_name = validate_and_canonicalize_gs_bucket(gs_bucket_name)
elif filesystem == files.BLOBSTORE_FILESYSTEM:
pass
else:
raise BackupValidationError('Unknown filesystem "%s".' % filesystem)
job_name = 'datastore_backup_%s_%%(kind)s' % re.sub(r'[^\w]', '_', backup)
try:
job_operation = utils.StartOperation('Backup: %s' % backup)
backup_info = BackupInformation(parent=job_operation)
backup_info.filesystem = filesystem
backup_info.name = backup
backup_info.kinds = kinds
if selected_namespace is not None:
backup_info.namespaces = [selected_namespace]
backup_info.put(force_writes=True)
mapreduce_params = {
'done_callback_handler': BACKUP_COMPLETE_HANDLER,
'backup_info_pk': str(backup_info.key()),
'force_ops_writes': True,
}
mapper_params = dict(mapper_params)
mapper_params['filesystem'] = filesystem
if filesystem == files.GS_FILESYSTEM:
mapper_params['gs_bucket_name'] = gs_bucket_name
if len(kinds) <= max_jobs:
return [('job', job) for job in _run_map_jobs(
job_operation.key(), backup_info.key(), kinds, job_name,
BACKUP_HANDLER, INPUT_READER, OUTPUT_WRITER,
mapper_params, mapreduce_params, queue)]
else:
retry_options = taskqueue.TaskRetryOptions(task_retry_limit=1)
deferred_task = deferred.defer(_run_map_jobs_deferred,
backup, job_operation.key(),
backup_info.key(), kinds, job_name,
BACKUP_HANDLER, INPUT_READER,
OUTPUT_WRITER, mapper_params,
mapreduce_params, queue, _queue=queue,
_url=config.DEFERRED_PATH,
_retry_options=retry_options)
return [('task', deferred_task.name)]
except Exception:
logging.exception('Failed to start a datastore backup job[s] for "%s".',
backup)
if backup_info:
delete_backup_info(backup_info)
if job_operation:
job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
job_operation.put(force_writes=True)
raise
class BackupLinkHandler(webapp.RequestHandler):
"""Handler to deal with requests to the backup link to backup data."""
SUFFIX = 'backup.create'
def get(self):
"""Handler for get requests to datastore_admin/backup.create."""
self.post()
def post(self):
"""Handler for post requests to datastore_admin/backup.create."""
try:
if ('X-AppEngine-TaskName' not in self.request.headers and
'X-AppEngine-Cron' not in self.request.headers):
logging.critical('Scheduled backups must be started via task queue or '
'cron.')
self.response.set_status(403)
return
backup_prefix = self.request.get('name')
if not backup_prefix:
if self.request.headers.get('X-AppEngine-Cron'):
backup_prefix = 'cron-'
else:
backup_prefix = 'link-'
backup_prefix_with_date = backup_prefix + time.strftime('%Y_%m_%d')
backup_name = backup_prefix_with_date
backup_suffix_counter = 1
while BackupInformation.name_exists(backup_name):
backup_suffix_counter += 1
backup_name = backup_prefix_with_date + '-' + str(backup_suffix_counter)
kinds = self.request.get_all('kind')
if not kinds:
self.errorResponse('Backup must include at least one kind.')
return
for kind in kinds:
if not utils.IsKindNameVisible(kind):
self.errorResponse('Invalid kind %s.' % kind)
return
namespace = self.request.get('namespace', None)
if namespace == '*':
namespace = None
mapper_params = {'namespace': namespace}
_perform_backup(self.request.get('run_as_a_service', False),
kinds,
namespace,
self.request.get('filesystem'),
self.request.get('gs_bucket_name'),
backup_name,
self.request.get('queue'),
mapper_params,
1000000)
except Exception, e:
self.errorResponse(e.message)
def errorResponse(self, message):
logging.error('Could not create backup via link: %s', message)
self.response.set_status(400, message)
class DatastoreEntityProtoInputReader(input_readers.RawDatastoreInputReader):
"""An input reader which yields datastore entity proto for a kind."""
_KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityProtoIterator
class DoBackupHandler(BaseDoHandler):
"""Handler to deal with requests from the admin console to backup data."""
SUFFIX = 'backup.do'
_get_html_page = 'do_backup.html'
_get_post_html_page = SUFFIX
def _ProcessPostRequest(self):
"""Triggers backup mapper jobs and returns their ids."""
try:
backup = self.request.get('backup_name').strip()
if not backup:
raise BackupValidationError('Unspecified backup name.')
if BackupInformation.name_exists(backup):
raise BackupValidationError('Backup "%s" already exists.' % backup)
mapper_params = self._GetBasicMapperParams()
backup_result = _perform_backup(self.request.get('run_as_a_service',
False),
self.request.get_all('kind'),
mapper_params.get('namespace'),
self.request.get('filesystem'),
self.request.get('gs_bucket_name'),
backup,
self.request.get('queue'),
mapper_params,
10)
return backup_result
except Exception, e:
logging.exception(e.message)
return [('error', e.message)]
def _run_map_jobs_deferred(backup_name, job_operation_key, backup_info_key,
kinds, job_name, backup_handler, input_reader,
output_writer, mapper_params, mapreduce_params,
queue):
backup_info = BackupInformation.get(backup_info_key)
if backup_info:
try:
_run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
backup_handler, input_reader, output_writer, mapper_params,
mapreduce_params, queue)
except BaseException:
logging.exception('Failed to start a datastore backup job[s] for "%s".',
backup_name)
delete_backup_info(backup_info)
else:
logging.info('Missing backup info, can not start backup jobs for "%s"',
backup_name)
def _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
backup_handler, input_reader, output_writer, mapper_params,
mapreduce_params, queue):
"""Creates backup/restore MR jobs for the given operation.
Args:
job_operation_key: a key of utils.DatastoreAdminOperation entity.
backup_info_key: a key of BackupInformation entity.
kinds: a list of kinds to run the M/R for.
job_name: the M/R job name prefix.
backup_handler: M/R job completion handler.
input_reader: M/R input reader.
output_writer: M/R output writer.
mapper_params: custom parameters to pass to mapper.
mapreduce_params: dictionary parameters relevant to the whole job.
queue: the name of the queue that will be used by the M/R.
Returns:
Ids of all started mapper jobs as list of strings.
"""
backup_info = BackupInformation.get(backup_info_key)
if not backup_info:
return []
jobs = utils.RunMapForKinds(
job_operation_key,
kinds,
job_name,
backup_handler,
input_reader,
output_writer,
mapper_params,
mapreduce_params,
queue_name=queue)
backup_info.active_jobs = jobs
backup_info.put(force_writes=True)
return jobs
def get_backup_files(backup_info, selected_kinds=None):
"""Returns the backup filenames for selected kinds or all if None/Empty."""
if backup_info.blob_files:
return backup_info.blob_files
else:
kinds_backup_files = backup_info.get_kind_backup_files(selected_kinds)
return list(itertools.chain(*(
kind_backup_files.files for kind_backup_files in kinds_backup_files)))
def delete_backup_files(filesystem, backup_files):
if backup_files:
if filesystem == files.BLOBSTORE_FILESYSTEM:
blob_keys = []
for fname in backup_files:
blob_key = files.blobstore.get_blob_key(fname)
if blob_key:
blob_keys.append(blob_key)
if len(blob_keys) == MAX_BLOBS_PER_DELETE:
blobstore_api.delete(blob_keys)
blob_keys = []
if blob_keys:
blobstore_api.delete(blob_keys)
def delete_backup_info(backup_info, delete_files=True):
"""Deletes a backup including its associated files and other metadata."""
if backup_info.blob_files:
delete_backup_files(backup_info.filesystem, backup_info.blob_files)
backup_info.delete(force_writes=True)
else:
kinds_backup_files = tuple(backup_info.get_kind_backup_files())
if delete_files:
delete_backup_files(backup_info.filesystem, itertools.chain(*(
kind_backup_files.files for kind_backup_files in kinds_backup_files)))
db.delete(kinds_backup_files + (backup_info,), force_writes=True)
class DoBackupDeleteHandler(BaseDoHandler):
"""Handler to deal with datastore admin requests to delete backup data."""
SUFFIX = 'backup_delete.do'
def get(self):
self.post()
def post(self):
"""Handler for post requests to datastore_admin/backup_delete.do.
Deletes are executed and user is redirected to the base-path handler.
"""
backup_ids = self.request.get_all('backup_id')
token = self.request.get('xsrf_token')
params = ()
if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
try:
for backup_info in db.get(backup_ids):
if backup_info:
delete_backup_info(backup_info)
except Exception, e:
logging.exception('Failed to delete datastore backup.')
params = [('error', e.message)]
self.SendRedirect(params=params)
class DoBackupAbortHandler(BaseDoHandler):
"""Handler to deal with datastore admin requests to abort pending backups."""
SUFFIX = 'backup_abort.do'
def get(self):
self.post()
def post(self):
"""Handler for post requests to datastore_admin/backup_abort.do.
Abort is executed and user is redirected to the base-path handler.
"""
backup_ids = self.request.get_all('backup_id')
token = self.request.get('xsrf_token')
params = ()
if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
try:
for backup_info in db.get(backup_ids):
if backup_info:
operation = backup_info.parent()
if operation.parent_key():
job_id = str(operation.parent_key())
datastore_admin_service = services_client.DatastoreAdminClient()
datastore_admin_service.abort_backup(job_id)
else:
utils.AbortAdminOperation(operation.key())
delete_backup_info(backup_info)
except Exception, e:
logging.exception('Failed to abort pending datastore backup.')
params = [('error', e.message)]
self.SendRedirect(params=params)
class DoBackupRestoreHandler(BaseDoHandler):
"""Handler to restore backup data.
Deals with requests from the admin console.
"""
SUFFIX = 'backup_restore.do'
BACKUP_RESTORE_HANDLER = __name__ + '.RestoreEntity.map'
RESTORE_COMPLETE_HANDLER = __name__ + '.RestoreCompleteHandler'
INPUT_READER = input_readers.__name__ + '.RecordsReader'
_get_html_page = 'do_restore_from_backup.html'
_get_post_html_page = SUFFIX
def _ProcessPostRequest(self):
"""Triggers backup restore mapper jobs and returns their ids."""
backup_id = self.request.get('backup_id')
if not backup_id:
return [('error', 'Unspecified Backup.')]
backup = db.get(db.Key(backup_id))
if not backup:
return [('error', 'Invalid Backup id.')]
if backup.gs_handle:
if not is_readable_gs_handle(backup.gs_handle):
return [('error', 'Backup not readable')]
kinds = set(self.request.get_all('kind'))
if not (backup.blob_files or kinds):
return [('error', 'No kinds were selected')]
backup_kinds = set(backup.kinds)
difference = kinds.difference(backup_kinds)
if difference:
return [('error', 'Backup does not have kind[s] %s' %
', '.join(difference))]
if self.request.get('run_as_a_service', False):
if backup.filesystem != files.GS_FILESYSTEM:
return [('error',
'Restore as a service is only available for GS backups')]
datastore_admin_service = services_client.DatastoreAdminClient()
description = 'Remote restore job: %s' % backup.name
remote_job_id = datastore_admin_service.restore_from_backup(
description, backup_id, list(kinds))
return [('remote_job', remote_job_id)]
queue = self.request.get('queue')
job_name = 'datastore_backup_restore_%s' % re.sub(r'[^\w]', '_',
backup.name)
job_operation = None
try:
operation_name = 'Restoring %s from backup: %s' % (
', '.join(kinds) if kinds else 'all', backup.name)
job_operation = utils.StartOperation(operation_name)
mapper_params = self._GetBasicMapperParams()
kinds = list(kinds) if len(backup_kinds) != len(kinds) else []
mapper_params['files'] = get_backup_files(backup, kinds)
mapper_params['kind_filter'] = kinds
mapper_params['original_app'] = backup.original_app
mapreduce_params = {
'backup_name': backup.name,
'force_ops_writes': True,
}
shard_count = min(max(utils.MAPREDUCE_MIN_SHARDS,
len(mapper_params['files'])),
utils.MAPREDUCE_MAX_SHARDS)
job = utils.StartMap(job_operation.key(), job_name,
self.BACKUP_RESTORE_HANDLER, self.INPUT_READER, None,
mapper_params, mapreduce_params, queue_name=queue,
shard_count=shard_count)
return [('job', job)]
except Exception:
logging.exception('Failed to start a restore from backup job "%s".',
job_name)
if job_operation:
job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
job_operation.put(force_writes=True)
raise
class DoBackupImportHandler(BaseDoHandler):
"""Handler to deal with datastore admin requests to import backup info."""
SUFFIX = 'import_backup.do'
def get(self):
self.post()
def post(self):
"""Handler for post requests to datastore_admin/import_backup.do.
Import is executed and user is redirected to the base-path handler.
"""
gs_handle = self.request.get('gs_handle')
token = self.request.get('xsrf_token')
error = None
if gs_handle and utils.ValidateXsrfToken(token, XSRF_ACTION):
try:
bucket_name, path = parse_gs_handle(gs_handle)
file_content = get_gs_object(bucket_name, path)
entities = parse_backup_info_file(file_content)
original_backup_info = entities.next()
entity = datastore.Entity(BackupInformation.kind())
entity.update(original_backup_info)
backup_info = BackupInformation.from_entity(entity)
if original_backup_info.key().app() != os.getenv('APPLICATION_ID'):
backup_info.original_app = original_backup_info.key().app()
def tx():
backup_info.put(force_writes=True)
kind_files_models = []
for entity in entities:
kind_files = backup_info.create_kind_backup_files(
entity.key().name(), entity['files'])
kind_files_models.append(kind_files)
db.put(kind_files_models, force_writes=True)
db.run_in_transaction(tx)
backup_id = str(backup_info.key())
except Exception, e:
logging.exception('Failed to Import datastore backup information.')
error = e.message
if error:
self.SendRedirect(params=[('error', error)])
elif self.request.get('Restore'):
ConfirmRestoreFromBackupHandler.Render(
self, default_backup_id=backup_id,
default_delete_backup_after_restore=True)
else:
self.SendRedirect()
class BackupInformation(db.Model):
"""An entity to keep information on a datastore backup."""
name = db.StringProperty()
kinds = db.StringListProperty()
namespaces = db.StringListProperty()
filesystem = db.StringProperty(default=files.BLOBSTORE_FILESYSTEM)
start_time = db.DateTimeProperty(auto_now_add=True)
active_jobs = db.StringListProperty()
completed_jobs = db.StringListProperty()
complete_time = db.DateTimeProperty(default=None)
blob_files = db.StringListProperty()
original_app = db.StringProperty(default=None)
gs_handle = db.TextProperty(default=None)
destination = db.StringProperty()
@classmethod
def kind(cls):
return utils.BACKUP_INFORMATION_KIND
@classmethod
def name_exists(cls, backup_name):
query = BackupInformation.all(keys_only=True)
query.filter('name =', backup_name)
return query.get() is not None
def create_kind_backup_files_key(self, kind):
return db.Key.from_path(KindBackupFiles.kind(), kind, parent=self.key())
def create_kind_backup_files(self, kind, kind_files):
return KindBackupFiles(key=self.create_kind_backup_files_key(kind),
files=kind_files)
def get_kind_backup_files(self, kinds=None):
if kinds:
return db.get([self.create_kind_backup_files_key(kind) for kind in kinds])
else:
return KindBackupFiles.all().ancestor(self).run()
class KindBackupFiles(db.Model):
"""An entity to keep files information per kind for a backup.
A key for this model should created using kind as a name and the associated
BackupInformation as a parent.
"""
files = db.StringListProperty(indexed=False)
@property
def backup_kind(self):
return self.key().name()
@classmethod
def kind(cls):
return utils.BACKUP_INFORMATION_FILES_KIND
def BackupCompleteHandler(operation, job_id, mapreduce_state):
"""Updates BackupInformation record for a completed mapper job."""
mapreduce_spec = mapreduce_state.mapreduce_spec
filenames = mapreduce_spec.mapper.output_writer_class().get_filenames(
mapreduce_state)
_perform_backup_complete(operation,
job_id,
mapreduce_spec.mapper.params['entity_kind'],
mapreduce_spec.params['backup_info_pk'],
mapreduce_spec.mapper.params.get('gs_bucket_name'),
filenames,
mapreduce_spec.params.get('done_callback_queue'))
@db.transactional
def _perform_backup_complete(
operation, job_id, kind, backup_info_pk, gs_bucket_name, filenames, queue):
backup_info = BackupInformation.get(backup_info_pk)
if backup_info:
if job_id in backup_info.active_jobs:
backup_info.active_jobs.remove(job_id)
backup_info.completed_jobs = list(
set(backup_info.completed_jobs + [job_id]))
if backup_info.filesystem == files.BLOBSTORE_FILESYSTEM:
filenames = drop_empty_files(filenames)
kind_backup_files = backup_info.get_kind_backup_files([kind])[0]
if kind_backup_files:
kind_backup_files.files = list(set(kind_backup_files.files + filenames))
else:
kind_backup_files = backup_info.create_kind_backup_files(kind, filenames)
db.put((backup_info, kind_backup_files), force_writes=True)
if operation.status == utils.DatastoreAdminOperation.STATUS_COMPLETED:
deferred.defer(finalize_backup_info, backup_info.key(),
gs_bucket_name,
_url=config.DEFERRED_PATH,
_queue=queue,
_transactional=True)
else:
logging.warn('BackupInfo was not found for %s', backup_info_pk)
def finalize_backup_info(backup_info_pk, gs_bucket):
"""Finalize the state of BackupInformation and creates info file for GS."""
def get_backup_info():
return BackupInformation.get(backup_info_pk)
backup_info = db.run_in_transaction(get_backup_info)
if backup_info:
complete_time = datetime.datetime.now()
backup_info.complete_time = complete_time
gs_handle = None
if backup_info.filesystem == files.GS_FILESYSTEM:
gs_handle = BackupInfoWriter(gs_bucket).write(backup_info)[0]
def set_backup_info_with_finalize_info():
backup_info = get_backup_info()
backup_info.complete_time = complete_time
backup_info.gs_handle = gs_handle
backup_info.put(force_writes=True)
db.run_in_transaction(set_backup_info_with_finalize_info)
logging.info('Backup %s completed', backup_info.name)
else:
logging.warn('Backup %s could not be found', backup_info_pk)
def parse_backup_info_file(content):
"""Returns entities iterator from a backup_info file content."""
reader = records.RecordsReader(cStringIO.StringIO(content))
version = reader.read()
if version != '1':
raise IOError('Unsupported version')
return (datastore.Entity.FromPb(record) for record in reader)
@db.non_transactional
def drop_empty_files(filenames):
"""Deletes empty files and returns filenames minus the deleted ones."""
non_empty_filenames = []
empty_file_keys = []
blobs_info = blobstore.BlobInfo.get(
[files.blobstore.get_blob_key(fn) for fn in filenames])
for filename, blob_info in itertools.izip(filenames, blobs_info):
if blob_info:
if blob_info.size > 0:
non_empty_filenames.append(filename)
else:
empty_file_keys.append(blob_info.key())
blobstore_api.delete(empty_file_keys)
return non_empty_filenames
class BackupInfoWriter(object):
"""A class for writing Datastore backup metadata files."""
def __init__(self, gs_bucket):
"""Construct a BackupInfoWriter.
Args:
gs_bucket: Required string for the target GS bucket.
"""
self.__gs_bucket = gs_bucket
def write(self, backup_info):
"""Write the metadata files for the given backup_info.
As a side effect, updates the backup_info in-memory entity object with the
gs_handle to the Backup info filename. This is not saved to the datastore.
Args:
backup_info: Required BackupInformation.
Returns:
A list with Backup info filename followed by Kind info filenames.
"""
fn = self._write_backup_info(backup_info)
return [fn] + self._write_kind_info(backup_info)
def _generate_filename(self, backup_info, suffix):
key_str = str(backup_info.key()).replace('/', '_')
return '/gs/%s/%s%s' % (self.__gs_bucket, key_str, suffix)
def _write_backup_info(self, backup_info):
"""Writes a backup_info_file.
Args:
backup_info: Required BackupInformation.
Returns:
Backup info filename.
"""
filename = self._generate_filename(backup_info, '.backup_info')
backup_info.gs_handle = filename
info_file = files.open(files.gs.create(filename), 'a', exclusive_lock=True)
try:
with records.RecordsWriter(info_file) as writer:
writer.write('1')
writer.write(db.model_to_protobuf(backup_info).SerializeToString())
for kind_files in backup_info.get_kind_backup_files():
writer.write(db.model_to_protobuf(kind_files).SerializeToString())
finally:
info_file.close(finalize=True)
return filename
def _write_kind_info(self, backup_info):
"""Writes type information schema for each kind in backup_info.
Args:
backup_info: Required BackupInformation.
Returns:
A list with all created filenames.
"""
def get_backup_files_tx():
kind_backup_files_list = []
for kind_backup_files in backup_info.get_kind_backup_files():
kind_backup_files_list.append(kind_backup_files)
return kind_backup_files_list
kind_backup_files_list = db.run_in_transaction(get_backup_files_tx)
filenames = []
for kind_backup_files in kind_backup_files_list:
backup = self._create_kind_backup(backup_info, kind_backup_files)
filename = self._generate_filename(
backup_info, '.%s.backup_info' % kind_backup_files.backup_kind)
self._write_kind_backup_info_file(filename, backup)
filenames.append(filename)
return filenames
def _create_kind_backup(self, backup_info, kind_backup_files):
"""Creates and populate a backup_pb2.Backup."""
backup = backup_pb2.Backup()
backup.backup_info.backup_name = backup_info.name
backup.backup_info.start_timestamp = datastore_types.DatetimeToTimestamp(
backup_info.start_time)
backup.backup_info.end_timestamp = datastore_types.DatetimeToTimestamp(
backup_info.complete_time)
kind = kind_backup_files.backup_kind
kind_info = backup.kind_info.add()
kind_info.kind = kind
kind_info.entity_schema.kind = kind
kind_info.file.extend(kind_backup_files.files)
entity_type_info = EntityTypeInfo(kind=kind)
for sharded_aggregation in SchemaAggregationResult.load(
backup_info.key(), kind):
if sharded_aggregation.is_partial:
kind_info.is_partial = True
if sharded_aggregation.entity_type_info:
entity_type_info.merge(sharded_aggregation.entity_type_info)
entity_type_info.populate_entity_schema(kind_info.entity_schema)
return backup
@classmethod
def _write_kind_backup_info_file(cls, filename, backup):
"""Writes a kind backup_info.
Args:
filename: The name of the file to be created as string.
backup: apphosting.ext.datastore_admin.Backup proto.
"""
f = files.open(files.gs.create(filename), 'a', exclusive_lock=True)
try:
f.write(backup.SerializeToString())
finally:
f.close(finalize=True)
class PropertyTypeInfo(json_util.JsonMixin):
"""Type information for an entity property."""
def __init__(self, name, is_repeated=False, primitive_types=None,
embedded_entities=None):
"""Construct a PropertyTypeInfo instance.
Args:
name: The name of the property as a string.
is_repeated: A boolean that indicates if the property is repeated.
primitive_types: Optional list of PrimitiveType integer values.
embedded_entities: Optional list of EntityTypeInfo.
"""
self.__name = name
self.__is_repeated = is_repeated
self.__primitive_types = set(primitive_types) if primitive_types else set()
self.__embedded_entities = {}
for entity in embedded_entities or ():
if entity.kind in self.__embedded_entities:
self.__embedded_entities[entity.kind].merge(entity)
else:
self.__embedded_entities[entity.kind] = entity
@property
def name(self):
return self.__name
@property
def is_repeated(self):
return self.__is_repeated
@property
def primitive_types(self):
return self.__primitive_types
def embedded_entities_kind_iter(self):
return self.__embedded_entities.iterkeys()
def get_embedded_entity(self, kind):
return self.__embedded_entities.get(kind)
def merge(self, other):
"""Merge a PropertyTypeInfo with this instance.
Args:
other: Required PropertyTypeInfo to merge.
Returns:
True if anything was changed. False otherwise.
Raises:
ValueError: if property names do not match.
TypeError: if other is not instance of PropertyTypeInfo.
"""
if not isinstance(other, PropertyTypeInfo):
raise TypeError('Expected PropertyTypeInfo, was %r' % (other,))
if other.__name != self.__name:
raise ValueError('Property names mismatch (%s, %s)' %
(self.__name, other.__name))
changed = False
if other.__is_repeated and not self.__is_repeated:
self.__is_repeated = True
changed = True
if not other.__primitive_types.issubset(self.__primitive_types):
self.__primitive_types = self.__primitive_types.union(
other.__primitive_types)
changed = True
for kind, other_embedded_entity in other.__embedded_entities.iteritems():
embedded_entity = self.__embedded_entities.get(kind)
if embedded_entity:
changed = embedded_entity.merge(other_embedded_entity) or changed
else:
self.__embedded_entities[kind] = other_embedded_entity
changed = True
return changed
def populate_entity_schema_field(self, entity_schema):
"""Add an populate a Field to the given entity_schema.
Args:
entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
"""
if not (self.__primitive_types or self.__embedded_entities):
return
field = entity_schema.field.add()
field.name = self.__name
field_type = field.type.add()
field_type.is_list = self.__is_repeated
field_type.primitive_type.extend(self.__primitive_types)
for embedded_entity in self.__embedded_entities.itervalues():
embedded_entity_schema = field_type.embedded_schema.add()
embedded_entity.populate_entity_schema(embedded_entity_schema)
def to_json(self):
json = dict()
json['name'] = self.__name
json['is_repeated'] = self.__is_repeated
json['primitive_types'] = list(self.__primitive_types)
json['embedded_entities'] = [e.to_json() for e in
self.__embedded_entities.itervalues()]
return json
@classmethod
def from_json(cls, json):
return cls(json['name'], json['is_repeated'], json.get('primitive_types'),
[EntityTypeInfo.from_json(entity_json) for entity_json
in json.get('embedded_entities')])
class EntityTypeInfo(json_util.JsonMixin):
"""Type information for an entity."""
def __init__(self, kind=None, properties=None):
"""Construct an EntityTypeInfo instance.
Args:
kind: An optional kind name as string.
properties: An optional list of PropertyTypeInfo.
"""
self.__kind = kind
self.__properties = {}
for property_type_info in properties or ():
if property_type_info.name in self.__properties:
self.__properties[property_type_info.name].merge(property_type_info)
else:
self.__properties[property_type_info.name] = property_type_info
@property
def kind(self):
return self.__kind
def properties_name_iter(self):
return self.__properties.iterkeys()
def get_property(self, name):
return self.__properties.get(name)
def merge(self, other):
"""Merge an EntityTypeInfo with this instance.
Args:
other: Required EntityTypeInfo to merge.
Returns:
True if anything was changed. False otherwise.
Raises:
ValueError: if kinds do not match.
TypeError: if other is not instance of EntityTypeInfo.
"""
if not isinstance(other, EntityTypeInfo):
raise TypeError('Expected EntityTypeInfo, was %r' % (other,))
if other.__kind != self.__kind:
raise ValueError('Kinds mismatch (%s, %s)' % (self.__kind, other.__kind))
changed = False
for name, other_property in other.__properties.iteritems():
self_property = self.__properties.get(name)
if self_property:
changed = self_property.merge(other_property) or changed
else:
self.__properties[name] = other_property
changed = True
return changed
def populate_entity_schema(self, entity_schema):
"""Populates the given entity_schema with values from this instance.
Args:
entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
"""
if self.__kind:
entity_schema.kind = self.__kind
for property_type_info in self.__properties.itervalues():
property_type_info.populate_entity_schema_field(entity_schema)
def to_json(self):
return {
'kind': self.__kind,
'properties': [p.to_json() for p in self.__properties.itervalues()]
}
@classmethod
def from_json(cls, json):
kind = json.get('kind')
properties_json = json.get('properties')
if properties_json:
return cls(kind, [PropertyTypeInfo.from_json(p) for p in properties_json])
else:
return cls(kind)
@classmethod
def create_from_entity_proto(cls, entity_proto):
"""Creates and populates an EntityTypeInfo from an EntityProto."""
properties = [cls.__get_property_type_info(property_proto) for
property_proto in itertools.chain(
entity_proto.property_list(),
entity_proto.raw_property_list())]
kind = utils.get_kind_from_entity_pb(entity_proto)
return cls(kind, properties)
@classmethod
def __get_property_type_info(cls, property_proto):
"""Returns the type mapping for the provided property."""
name = property_proto.name()
is_repeated = bool(property_proto.multiple())
primitive_type = None
entity_type = None
if property_proto.has_meaning():
primitive_type = MEANING_TO_PRIMITIVE_TYPE.get(property_proto.meaning())
if primitive_type is None:
value = property_proto.value()
if value.has_int64value():
primitive_type = backup_pb2.EntitySchema.INTEGER
elif value.has_booleanvalue():
primitive_type = backup_pb2.EntitySchema.BOOLEAN
elif value.has_stringvalue():
if property_proto.meaning() == entity_pb.Property.ENTITY_PROTO:
entity_proto = entity_pb.EntityProto()
try:
entity_proto.ParsePartialFromString(value.stringvalue())
except Exception:
pass
else:
entity_type = EntityTypeInfo.create_from_entity_proto(entity_proto)
else:
primitive_type = backup_pb2.EntitySchema.STRING
elif value.has_doublevalue():
primitive_type = backup_pb2.EntitySchema.FLOAT
elif value.has_pointvalue():
primitive_type = backup_pb2.EntitySchema.GEO_POINT
elif value.has_uservalue():
primitive_type = backup_pb2.EntitySchema.USER
elif value.has_referencevalue():
primitive_type = backup_pb2.EntitySchema.REFERENCE
return PropertyTypeInfo(
name, is_repeated,
(primitive_type,) if primitive_type is not None else None,
(entity_type,) if entity_type else None)
class SchemaAggregationResult(db.Model):
"""Persistent aggregated type information for a kind.
An instance can be retrieved via the load method or created
using the create method. An instance aggregates all type information
for all seen embedded_entities via the merge method and persisted when needed
using the model put method.
"""
entity_type_info = json_util.JsonProperty(
EntityTypeInfo, default=EntityTypeInfo(), indexed=False)
is_partial = db.BooleanProperty(default=False)
def merge(self, other):
"""Merge a SchemaAggregationResult or an EntityTypeInfo with this instance.
Args:
other: Required SchemaAggregationResult or EntityTypeInfo to merge.
Returns:
True if anything was changed. False otherwise.
"""
if self.is_partial:
return False
if isinstance(other, SchemaAggregationResult):
other = other.entity_type_info
return self.entity_type_info.merge(other)
@classmethod
def _get_parent_key(cls, backup_id, kind_name):
return datastore_types.Key.from_path('Kind', kind_name, parent=backup_id)
@classmethod
def create(cls, backup_id, kind_name, shard_id):
"""Create SchemaAggregationResult instance.
Args:
backup_id: Required BackupInformation Key.
kind_name: Required kind name as string.
shard_id: Required shard id as string.
Returns:
A new SchemaAggregationResult instance.
"""
parent = cls._get_parent_key(backup_id, kind_name)
return SchemaAggregationResult(
key_name=shard_id, parent=parent,
entity_type_info=EntityTypeInfo(kind=kind_name))
@classmethod
def load(cls, backup_id, kind_name, shard_id=None):
"""Retrieve SchemaAggregationResult from the Datastore.
Args:
backup_id: Required BackupInformation Key.
kind_name: Required kind name as string.
shard_id: Optional shard id as string.
Returns:
SchemaAggregationResult iterator or an entity if shard_id not None.
"""
parent = cls._get_parent_key(backup_id, kind_name)
if shard_id:
key = datastore_types.Key.from_path(cls.kind(), shard_id, parent=parent)
return SchemaAggregationResult.get(key)
else:
return db.Query(cls).ancestor(parent).run()
@classmethod
def kind(cls):
return utils.BACKUP_INFORMATION_KIND_TYPE_INFO
class SchemaAggregationPool(object):
"""An MR pool to aggregation type information per kind."""
def __init__(self, backup_id, kind, shard_id):
"""Construct SchemaAggregationPool instance.
Args:
backup_id: Required BackupInformation Key.
kind: Required kind name as string.
shard_id: Required shard id as string.
"""
self.__backup_id = backup_id
self.__kind = kind
self.__shard_id = shard_id
self.__aggregation = SchemaAggregationResult.load(backup_id, kind, shard_id)
if not self.__aggregation:
self.__aggregation = SchemaAggregationResult.create(backup_id, kind,
shard_id)
self.__needs_save = True
else:
self.__needs_save = False
def merge(self, entity_type_info):
"""Merge EntityTypeInfo into aggregated type information."""
if self.__aggregation.merge(entity_type_info):
self.__needs_save = True
def flush(self):
"""Save aggregated type information to the datastore if changed."""
if self.__needs_save:
def update_aggregation_tx():
aggregation = SchemaAggregationResult.load(
self.__backup_id, self.__kind, self.__shard_id)
if aggregation:
if aggregation.merge(self.__aggregation):
aggregation.put(force_writes=True)
self.__aggregation = aggregation
else:
self.__aggregation.put(force_writes=True)
def mark_aggregation_as_partial_tx():
aggregation = SchemaAggregationResult.load(
self.__backup_id, self.__kind, self.__shard_id)
if aggregation is None:
aggregation = SchemaAggregationResult.create(
self.__backup_id, self.__kind, self.__shard_id)
aggregation.is_partial = True
aggregation.put(force_writes=True)
self.__aggregation = aggregation
try:
db.run_in_transaction(update_aggregation_tx)
except apiproxy_errors.RequestTooLargeError:
db.run_in_transaction(mark_aggregation_as_partial_tx)
self.__needs_save = False
class AggregateSchema(op.Operation):
"""An MR Operation to aggregation type information for a kind.
This operation will register an MR pool, SchemaAggregationPool, if
one is not already registered and will invoke the pool's merge operation
per entity. The pool is responsible for keeping a persistent state of
type aggregation using the sharded db model, SchemaAggregationResult.
"""
def __init__(self, entity_proto):
self.__entity_info = EntityTypeInfo.create_from_entity_proto(entity_proto)
def __call__(self, ctx):
pool = ctx.get_pool('schema_aggregation_pool')
if not pool:
backup_id = datastore_types.Key(
context.get().mapreduce_spec.params['backup_info_pk'])
pool = SchemaAggregationPool(
backup_id, self.__entity_info.kind, ctx.shard_id)
ctx.register_pool('schema_aggregation_pool', pool)
pool.merge(self.__entity_info)
class BackupEntity(object):
"""A class which dumps the entity to the writer."""
def map(self, entity_proto):
"""Backup entity map handler.
Args:
entity_proto: An instance of entity_pb.EntityProto.
Yields:
A serialized entity_pb.EntityProto as a string
"""
yield entity_proto.SerializeToString()
yield AggregateSchema(entity_proto)
class RestoreEntity(object):
"""A class which restore the entity to datastore."""
def __init__(self):
self.initialized = False
self.kind_filter = None
self.app_id = None
def initialize(self):
"""Initialize a restore mapper instance."""
if self.initialized:
return
mapper_params = get_mapper_params_from_context()
kind_filter = mapper_params.get('kind_filter')
self.kind_filter = set(kind_filter) if kind_filter else None
original_app = mapper_params.get('original_app')
target_app = os.getenv('APPLICATION_ID')
if original_app and target_app != original_app:
self.app_id = target_app
self.initialized = True
def map(self, record):
"""Restore entity map handler.
Args:
record: A serialized entity_pb.EntityProto.
Yields:
A operation.db.Put for the mapped entity
"""
self.initialize()
pb = entity_pb.EntityProto(contents=record)
if self.app_id:
utils.FixKeys(pb, self.app_id)
if not self.kind_filter or (
utils.get_kind_from_entity_pb(pb) in self.kind_filter):
yield utils.Put(pb)
if self.app_id:
yield utils.ReserveKey(datastore_types.Key._FromPb(pb.key()))
def get_mapper_params_from_context():
"""Get mapper params from MR context. Split out for ease of testing."""
return context.get().mapreduce_spec.mapper.params
def validate_gs_bucket_name(bucket_name):
"""Validate the format of the given bucket_name.
Validation rules are based:
https://developers.google.com/storage/docs/bucketnaming#requirements
Args:
bucket_name: The bucket name to validate.
Raises:
BackupValidationError: If the bucket name is invalid.
"""
if len(bucket_name) > MAX_BUCKET_LEN:
raise BackupValidationError(
'Bucket name length should not be longer than %d' % MAX_BUCKET_LEN)
if len(bucket_name) < MIN_BUCKET_LEN:
raise BackupValidationError(
'Bucket name length should be longer than %d' % MIN_BUCKET_LEN)
if bucket_name.lower().startswith('goog'):
raise BackupValidationError(
'Bucket name should not start with a "goog" prefix')
bucket_elements = bucket_name.split('.')
for bucket_element in bucket_elements:
if len(bucket_element) > MAX_BUCKET_SEGMENT_LEN:
raise BackupValidationError(
'Segment length of bucket name should not be longer than %d' %
MAX_BUCKET_SEGMENT_LEN)
if not re.match(BUCKET_PATTERN, bucket_name):
raise BackupValidationError('Invalid bucket name "%s"' % bucket_name)
def is_accessible_bucket_name(bucket_name):
"""Returns True if the application has access to the specified bucket."""
scope = config.GoogleApiScope('devstorage.read_write')
bucket_url = config.GsBucketURL(bucket_name)
auth_token, _ = app_identity.get_access_token(scope)
result = urlfetch.fetch(bucket_url, method=urlfetch.HEAD, headers={
'Authorization': 'OAuth %s' % auth_token,
'x-goog-api-version': '2'})
return result and result.status_code == 200
def verify_bucket_writable(bucket_name):
"""Verify the application can write to the specified bucket.
Args:
bucket_name: The bucket to verify.
Raises:
BackupValidationError: If the bucket is not writable.
"""
path = '/gs/%s' % bucket_name
try:
file_names = files.gs.listdir(path,
{'prefix': TEST_WRITE_FILENAME_PREFIX,
'max_keys': MAX_KEYS_LIST_SIZE})
except (files.InvalidParameterError, files.PermissionDeniedError):
raise BackupValidationError('Bucket "%s" not accessible' % bucket_name)
except files.InvalidFileNameError:
raise BackupValidationError('Bucket "%s" does not exist' % bucket_name)
file_name = '%s/%s.tmp' % (path, TEST_WRITE_FILENAME_PREFIX)
file_name_try = 0
while True:
if file_name_try >= MAX_TEST_FILENAME_TRIES:
return
if file_name not in file_names:
break
gen = random.randint(0, 9999)
file_name = '%s/%s_%s.tmp' % (path, TEST_WRITE_FILENAME_PREFIX, gen)
file_name_try += 1
try:
test_file = files.open(files.gs.create(file_name), 'a', exclusive_lock=True)
try:
test_file.write('test')
finally:
test_file.close(finalize=True)
except files.PermissionDeniedError:
raise BackupValidationError('Bucket "%s" is not writable' % bucket_name)
try:
files.delete(file_name)
except (files.InvalidArgumentError, files.InvalidFileNameError, IOError):
logging.warn('Failed to delete test file %s', file_name)
def is_readable_gs_handle(gs_handle):
"""Return True if the application can read the specified gs_handle."""
try:
with files.open(gs_handle) as bak_file:
bak_file.read(1)
except files.PermissionDeniedError:
return False
return True
def parse_gs_handle(gs_handle):
"""Splits [/gs/]?bucket_name[/folder]*[/file]? to (bucket_name, path | '')."""
if gs_handle.startswith('/'):
filesystem = gs_handle[1:].split('/', 1)[0]
if filesystem == 'gs':
gs_handle = gs_handle[4:]
else:
raise BackupValidationError('Unsupported filesystem: %s' % filesystem)
tokens = gs_handle.split('/', 1)
return (tokens[0], '') if len(tokens) == 1 else tuple(tokens)
def validate_and_canonicalize_gs_bucket(gs_bucket_name):
bucket_name, path = parse_gs_handle(gs_bucket_name)
gs_bucket_name = ('%s/%s' % (bucket_name, path)).rstrip('/')
validate_gs_bucket_name(bucket_name)
verify_bucket_writable(bucket_name)
return gs_bucket_name
def list_bucket_files(bucket_name, prefix, max_keys=1000):
"""Returns a listing of of a bucket that matches the given prefix."""
scope = config.GoogleApiScope('devstorage.read_only')
bucket_url = config.GsBucketURL(bucket_name)
url = bucket_url + '?'
query = [('max-keys', max_keys)]
if prefix:
query.append(('prefix', prefix))
url += urllib.urlencode(query)
auth_token, _ = app_identity.get_access_token(scope)
result = urlfetch.fetch(url, method=urlfetch.GET, headers={
'Authorization': 'OAuth %s' % auth_token,
'x-goog-api-version': '2'})
if result and result.status_code == 200:
doc = xml.dom.minidom.parseString(result.content)
return [node.childNodes[0].data for node in doc.getElementsByTagName('Key')]
raise BackupValidationError('Request to Google Cloud Storage failed')
def get_gs_object(bucket_name, path):
"""Returns a listing of of a bucket that matches the given prefix."""
scope = config.GoogleApiScope('devstorage.read_only')
bucket_url = config.GsBucketURL(bucket_name)
url = bucket_url + path
auth_token, _ = app_identity.get_access_token(scope)
result = urlfetch.fetch(url, method=urlfetch.GET, headers={
'Authorization': 'OAuth %s' % auth_token,
'x-goog-api-version': '2'})
if result and result.status_code == 200:
return result.content
if result and result.status_code == 403:
raise BackupValidationError(
'Requested path %s is not accessible/access denied' % url)
if result and result.status_code == 404:
raise BackupValidationError('Requested path %s was not found' % url)
raise BackupValidationError('Error encountered accessing requested path %s' %
url)
def get_queue_names(app_id=None, max_rows=100):
"""Returns a list with all non-special queue names for app_id."""
rpc = apiproxy_stub_map.UserRPC('taskqueue')
request = taskqueue_service_pb.TaskQueueFetchQueuesRequest()
response = taskqueue_service_pb.TaskQueueFetchQueuesResponse()
if app_id:
request.set_app_id(app_id)
request.set_max_rows(max_rows)
queues = ['default']
try:
rpc.make_call('FetchQueues', request, response)
rpc.check_success()
for queue in response.queue_list():
if (queue.mode() == taskqueue_service_pb.TaskQueueMode.PUSH and
not queue.queue_name().startswith('__') and
queue.queue_name() != 'default'):
queues.append(queue.queue_name())
except Exception:
logging.exception('Failed to get queue names.')
return queues
def handlers_list(base_path):
return [
(r'%s/%s' % (base_path, BackupLinkHandler.SUFFIX),
BackupLinkHandler),
(r'%s/%s' % (base_path, ConfirmBackupHandler.SUFFIX),
ConfirmBackupHandler),
(r'%s/%s' % (base_path, DoBackupHandler.SUFFIX), DoBackupHandler),
(r'%s/%s' % (base_path, DoBackupRestoreHandler.SUFFIX),
DoBackupRestoreHandler),
(r'%s/%s' % (base_path, DoBackupDeleteHandler.SUFFIX),
DoBackupDeleteHandler),
(r'%s/%s' % (base_path, DoBackupAbortHandler.SUFFIX),
DoBackupAbortHandler),
(r'%s/%s' % (base_path, DoBackupImportHandler.SUFFIX),
DoBackupImportHandler),
]