| #!/usr/bin/env python |
| # |
| # Copyright 2007 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| |
| |
| |
| """Handler for data copy operation. |
| |
| Generic datastore admin console transfers control to ConfirmCopyHandler |
| after selection of entities. The ConfirmCopyHandler confirms with user |
| his choice, enters target application id and transfers control to |
| DoCopyHandler. DoCopyHandler starts copying mappers and displays confirmation |
| page. |
| |
| This module also contains actual mapper code for copying data over. |
| """ |
| |
| |
| import logging |
| import urllib |
| |
| from google.appengine.api import capabilities |
| from google.appengine.api import datastore |
| from google.appengine.datastore import datastore_rpc |
| from google.appengine.ext import blobstore |
| from google.appengine.ext import webapp |
| from google.appengine.ext.datastore_admin import config |
| from google.appengine.ext.datastore_admin import remote_api_put_stub |
| from google.appengine.ext.datastore_admin import utils |
| |
| |
| try: |
| |
| from google.appengine.ext.mapreduce import context |
| from google.appengine.ext.mapreduce import input_readers |
| from google.appengine.ext.mapreduce import operation |
| except ImportError: |
| |
| from google.appengine._internal.mapreduce import context |
| from google.appengine._internal.mapreduce import input_readers |
| from google.appengine._internal.mapreduce import operation |
| |
| |
| XSRF_ACTION = 'copy' |
| |
| |
| class ConfirmCopyHandler(webapp.RequestHandler): |
| """Handler to deal with requests from the admin console to copy data.""" |
| |
| SUFFIX = 'confirm_copy' |
| |
| @classmethod |
| def Render(cls, handler): |
| """Rendering method that can be called by main.py. |
| |
| Args: |
| handler: the webapp.RequestHandler invoking the method |
| """ |
| namespace = handler.request.get('namespace') |
| kinds = handler.request.get_all('kind') |
| sizes_known, size_total, remainder = utils.ParseKindsAndSizes(kinds) |
| |
| (namespace_str, kind_str) = utils.GetPrintableStrs(namespace, kinds) |
| notreadonly_warning = capabilities.CapabilitySet( |
| 'datastore_v3', capabilities=['write']).is_enabled() |
| blob_warning = bool(blobstore.BlobInfo.all().fetch(1)) |
| datastore_type = datastore._GetConnection().get_datastore_type() |
| high_replication_warning = ( |
| datastore_type == datastore_rpc.Connection.HIGH_REPLICATION_DATASTORE) |
| |
| template_params = { |
| 'form_target': DoCopyHandler.SUFFIX, |
| 'kind_list': kinds, |
| 'remainder': remainder, |
| 'sizes_known': sizes_known, |
| 'size_total': size_total, |
| 'app_id': handler.request.get('app_id'), |
| 'datastore_admin_home': utils.GenerateHomeUrl(handler.request), |
| 'kind_str': kind_str, |
| 'namespace_str': namespace_str, |
| 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION), |
| 'notreadonly_warning': notreadonly_warning, |
| 'blob_warning': blob_warning, |
| 'high_replication_warning': high_replication_warning, |
| } |
| utils.RenderToResponse(handler, 'confirm_copy.html', template_params) |
| |
| |
| |
| |
| class DoCopyHandler(webapp.RequestHandler): |
| """Handler to deal with requests from the admin console to copy data.""" |
| |
| SUFFIX = 'copy.do' |
| |
| COPY_HANDLER = ('google.appengine.ext.datastore_admin.copy_handler.' |
| 'RemoteCopyEntity.map') |
| INPUT_READER = input_readers.__name__ + '.DatastoreKeyInputReader' |
| MAPREDUCE_DETAIL = config.MAPREDUCE_PATH + '/detail?mapreduce_id=' |
| |
| def get(self): |
| """Handler for get requests to datastore_admin/copy.do. |
| |
| Status of executed jobs is displayed. |
| """ |
| jobs = self.request.get_all('job') |
| error = self.request.get('error', '') |
| xsrf_error = self.request.get('xsrf_error', '') |
| |
| template_params = { |
| 'job_list': jobs, |
| 'mapreduce_detail': self.MAPREDUCE_DETAIL, |
| 'error': error, |
| 'xsrf_error': xsrf_error, |
| 'datastore_admin_home': config.BASE_PATH, |
| } |
| utils.RenderToResponse(self, 'do_copy.html', template_params) |
| |
| def post(self): |
| """Handler for post requests to datastore_admin/copy.do. |
| |
| Jobs are executed and user is redirected to the get handler. |
| """ |
| namespace = self.request.get('namespace') |
| kinds = self.request.get_all('kind') |
| (namespace_str, kinds_str) = utils.GetPrintableStrs(namespace, kinds) |
| token = self.request.get('xsrf_token') |
| remote_url = self.request.get('remote_url') |
| extra_header = self.request.get('extra_header') |
| |
| jobs = [] |
| if not remote_url: |
| parameters = [('error', 'Unspecified remote URL.')] |
| elif not utils.ValidateXsrfToken(token, XSRF_ACTION): |
| parameters = [('xsrf_error', '1')] |
| else: |
| try: |
| |
| |
| if extra_header: |
| extra_headers = dict([extra_header.split(':', 1)]) |
| else: |
| extra_headers = None |
| target_app = remote_api_put_stub.get_remote_app_id(remote_url, |
| extra_headers) |
| op = utils.StartOperation( |
| 'Copying %s%s to %s' % (kinds_str, namespace_str, target_app)) |
| name_template = 'Copy all %(kind)s objects%(namespace)s' |
| mapper_params = { |
| 'target_app': target_app, |
| 'remote_url': remote_url, |
| 'extra_header': extra_header, |
| } |
| jobs = utils.RunMapForKinds( |
| op.key(), |
| kinds, |
| name_template, |
| self.COPY_HANDLER, |
| self.INPUT_READER, |
| None, |
| mapper_params) |
| |
| error = '' |
| |
| |
| except Exception, e: |
| logging.exception('Handling exception.') |
| error = self._HandleException(e) |
| |
| parameters = [('job', job) for job in jobs] |
| if error: |
| parameters.append(('error', error)) |
| |
| query = urllib.urlencode(parameters) |
| self.redirect('%s/%s?%s' % (config.BASE_PATH, self.SUFFIX, query)) |
| |
| def _HandleException(self, e): |
| """Make exception handling overrideable by tests. |
| |
| In normal cases, return only the error string; do not fail to render the |
| page for user. |
| """ |
| return str(e) |
| |
| |
| |
| def KindPathFromKey(key): |
| """Return kinds path as '/'-delimited string for a particular key.""" |
| path = key.to_path() |
| kinds = [] |
| is_kind = True |
| for item in path: |
| if is_kind: |
| kinds.append(item) |
| is_kind = not is_kind |
| kind_path = '/'.join(kinds) |
| return kind_path |
| |
| |
| def get_mapper_params(): |
| """Return current mapreduce mapper params. Easily stubbed out for testing.""" |
| return context.get().mapreduce_spec.mapper.params |
| |
| |
| class CopyEntity(object): |
| """A class which contains a map handler to copy entities.""" |
| |
| def map(self, key): |
| """Copy data map handler. |
| |
| Args: |
| key: Datastore entity key or entity itself to copy. |
| |
| Yields: |
| A db operation to store the entity in the target app. |
| An operation which updates max used ID if necessary. |
| A counter operation incrementing the count for the entity kind. |
| """ |
| |
| mapper_params = get_mapper_params() |
| target_app = mapper_params['target_app'] |
| |
| if isinstance(key, datastore.Entity): |
| |
| entity = key |
| key = entity.key() |
| else: |
| entity = datastore.Get(key) |
| entity_proto = entity._ToPb() |
| utils.FixKeys(entity_proto, target_app) |
| target_entity = datastore.Entity._FromPb(entity_proto) |
| |
| yield operation.db.Put(target_entity) |
| yield utils.ReserveKey(target_entity.key()) |
| yield operation.counters.Increment(KindPathFromKey(key)) |
| |
| |
| class RemoteCopyEntity(CopyEntity): |
| """A class which contains a map handler to copy entities remotely. |
| |
| The class manages the connection. |
| """ |
| |
| def __init__(self): |
| super(RemoteCopyEntity, self).__init__() |
| self.remote_api_stub_initialized = False |
| |
| def setup_stub(self): |
| """Set up the remote API stub.""" |
| if self.remote_api_stub_initialized: |
| return |
| params = get_mapper_params() |
| if 'extra_header' in params and params['extra_header']: |
| |
| extra_headers = dict([params['extra_header'].split(':', 1)]) |
| else: |
| extra_headers = {} |
| |
| remote_api_put_stub.configure_remote_put(params['remote_url'], |
| params['target_app'], |
| extra_headers) |
| |
| self.remote_api_stub_initialized = True |
| |
| def map(self, key): |
| """Copy data map handler. |
| |
| Args: |
| key: Datastore entity key to copy. |
| |
| Yields: |
| A db operation to store the entity in the target app. |
| An operation which updates max used ID if necessary. |
| A counter operation incrementing the count for the entity kind. |
| """ |
| if not self.remote_api_stub_initialized: |
| self.setup_stub() |
| |
| for op in CopyEntity.map(self, key): |
| yield op |
| |
| |
| def handlers_list(base_path): |
| return [ |
| (r'%s/%s' % (base_path, ConfirmCopyHandler.SUFFIX), ConfirmCopyHandler), |
| (r'%s/%s' % (base_path, DoCopyHandler.SUFFIX), DoCopyHandler), |
| ] |