blob: ef4ecde12fbb91f2085ec024c5dffb5afd9e5721 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""An apiproxy stub that calls a remote handler via HTTP.
This is a special version of the remote_api_stub which sends all traffic
to the local backends *except* for datastore_v3 Put and datastore_v4
AllocateIds calls where the key contains a remote app_id.
Calls to datastore_v3 Put and datastore_v4 AllocateIds for which the entity
keys contain a remote app_id are sent to the remote app.
It re-implements parts of the remote_api_stub so as to replace dependencies on
the (SDK only) appengine_rpc with urlfetch.
"""
import logging
import pickle
import random
import yaml
from google.appengine.api import apiproxy_rpc
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import urlfetch
from google.appengine.ext.remote_api import remote_api_pb
from google.appengine.runtime import apiproxy_errors
class Error(Exception):
"""Base class for exceptions in this module."""
class ConfigurationError(Error):
"""Exception for configuration errors."""
class FetchFailed(Exception):
"""Remote fetch failed."""
class UnknownJavaServerError(Error):
"""Exception for exceptions returned from a Java remote_api handler."""
class RemoteTransactionsUnimplemented(Error):
"""Remote Put requests do not support transactions."""
class RemoteApiDatastoreStub(object):
"""A specialised stub for writing to a remote App Engine datastore.
This class supports checking the app_id of a datastore op and either passing
the request through to the local app or sending it to a remote app.
Subclassed to implement supported services datastore_v3 and datastore_v4.
"""
_SERVICE_NAME = None
def __init__(self, remote_url, target_app_id, extra_headers, normal_stub):
"""Constructor.
Args:
remote_url: The URL of the remote_api handler.
target_app_id: The app_id to intercept calls for.
extra_headers: Headers to send (for authentication).
normal_stub: The standard stub to delegate most calls to.
"""
self.remote_url = remote_url
self.target_app_id = target_app_id
self.extra_headers = InsertDefaultExtraHeaders(extra_headers)
self.normal_stub = normal_stub
def CreateRPC(self):
"""Creates RPC object instance.
Returns:
a instance of RPC.
"""
return apiproxy_rpc.RPC(stub=self)
@classmethod
def ServiceName(cls):
"""Return the name of the datastore service supported by this stub."""
return cls._SERVICE_NAME
def MakeSyncCall(self, service, call, request, response):
"""Handle all calls to this stub; delegate as appropriate."""
assert service == self.ServiceName(), '%s does not support service %s' % (
type(self), service)
explanation = []
assert request.IsInitialized(explanation), explanation
handler = getattr(self, '_Dynamic_' + call, None)
if handler:
handler(request, response)
else:
self.normal_stub.MakeSyncCall(service, call, request, response)
assert response.IsInitialized(explanation), explanation
def _MakeRemoteSyncCall(self, service, call, request, response):
"""Send an RPC to a remote_api endpoint."""
request_pb = remote_api_pb.Request()
request_pb.set_service_name(service)
request_pb.set_method(call)
request_pb.set_request(request.Encode())
response_pb = remote_api_pb.Response()
encoded_request = request_pb.Encode()
try:
urlfetch_response = urlfetch.fetch(self.remote_url, encoded_request,
urlfetch.POST, self.extra_headers,
follow_redirects=False,
deadline=10)
except Exception, e:
logging.exception('Fetch failed to %s', self.remote_url)
raise FetchFailed(e)
if urlfetch_response.status_code != 200:
logging.error('Fetch failed to %s; Status %s; body %s',
self.remote_url,
urlfetch_response.status_code,
urlfetch_response.content)
raise FetchFailed(urlfetch_response.status_code)
response_pb.ParseFromString(urlfetch_response.content)
if response_pb.has_application_error():
error_pb = response_pb.application_error()
raise apiproxy_errors.ApplicationError(error_pb.code(),
error_pb.detail())
elif response_pb.has_exception():
raise pickle.loads(response_pb.exception())
elif response_pb.has_java_exception():
raise UnknownJavaServerError('An unknown error has occured in the '
'Java remote_api handler for this call.')
else:
response.ParseFromString(response_pb.response())
class RemoteApiDatastoreV3Stub(RemoteApiDatastoreStub):
"""A specialised stub for calling datastore_v3 Put on a foreign datastore.
This stub passes through all requests to the normal stub except for
datastore v3 Put. It will check those to see if the put is for the local app
or a remote app, and if remote will send traffic remotely.
"""
_SERVICE_NAME = 'datastore_v3'
def _Dynamic_Put(self, request, response):
"""Handle a Put request and route remotely if it matches the target app.
Args:
request: A datastore_pb.PutRequest
response: A datastore_pb.PutResponse
Raises:
RemoteTransactionsUnimplemented: Remote transactions are unimplemented.
"""
if request.entity_list():
entity = request.entity(0)
if entity.has_key() and entity.key().app() == self.target_app_id:
if request.has_transaction():
raise RemoteTransactionsUnimplemented()
self._MakeRemoteSyncCall(self.ServiceName(), 'Put', request, response)
return
self.normal_stub.MakeSyncCall(self.ServiceName(), 'Put', request, response)
class RemoteApiDatastoreV4Stub(RemoteApiDatastoreStub):
"""A remote api stub to call datastore_v4 AllocateIds on a foreign datastore.
This stub passes through all requests to the normal datastore_v4 stub except
for datastore v4 AllocateIds. It will check those to see if the keys are for
the local app or a remote app, and if remote will send traffic remotely.
"""
_SERVICE_NAME = 'datastore_v4'
def _Dynamic_AllocateIds(self, request, response):
"""Handle v4 AllocateIds and route remotely if it matches the target app.
Args:
request: A datastore_v4_pb.AllocateIdsRequest
response: A datastore_v4_pb.AllocateIdsResponse
"""
if request.reserve_size() > 0:
app_id = request.reserve(0).partition_id().dataset_id()
elif request.allocate_size() > 0:
app_id = request.allocate(0).partition_id().dataset_id()
else:
app_id = None
if app_id == self.target_app_id:
self._MakeRemoteSyncCall(self.ServiceName(), 'AllocateIds', request,
response)
else:
self.normal_stub.MakeSyncCall(self.ServiceName(), 'AllocateIds', request,
response)
def get_remote_app_id(remote_url, extra_headers=None):
"""Get the app_id from the remote_api endpoint.
This also has the side effect of verifying that it is a remote_api endpoint.
Args:
remote_url: The url to the remote_api handler.
extra_headers: Headers to send (for authentication).
Returns:
app_id: The app_id of the target app.
Raises:
FetchFailed: Urlfetch call failed.
ConfigurationError: URLfetch succeeded but results were invalid.
"""
rtok = str(random.random())[2:]
url = remote_url + '?rtok=' + rtok
if not extra_headers:
extra_headers = {}
if 'X-appcfg-api-version' not in extra_headers:
extra_headers['X-appcfg-api-version'] = '1'
try:
urlfetch_response = urlfetch.fetch(url, None, urlfetch.GET,
extra_headers, follow_redirects=False,
deadline=10)
except Exception, e:
logging.exception('Fetch failed to %s', remote_url)
raise FetchFailed('Fetch to %s failed: %r' % (remote_url, e))
if urlfetch_response.status_code != 200:
logging.error('Fetch failed to %s; Status %s; body %s',
remote_url,
urlfetch_response.status_code,
urlfetch_response.content)
raise FetchFailed('Fetch to %s failed with status %s' %
(remote_url, urlfetch_response.status_code))
response = urlfetch_response.content
if not response.startswith('{'):
logging.info('Response unparasable: %s', response)
raise ConfigurationError(
'Invalid response received from server: %s' % response)
app_info = yaml.load(response)
if not app_info or 'rtok' not in app_info or 'app_id' not in app_info:
logging.info('Response unparsable: %s', response)
raise ConfigurationError('Error parsing app_id lookup response')
if str(app_info['rtok']) != rtok:
logging.info('Response invalid token (expected %s): %s', rtok, response)
raise ConfigurationError('Token validation failed during app_id lookup. '
'(sent %s, got %s)' % (repr(rtok),
repr(app_info['rtok'])))
return app_info['app_id']
def InsertDefaultExtraHeaders(extra_headers):
"""Add defaults to extra_headers arg for stub configuration.
This permits comparison of a proposed RemoteApiDatastoreStub config with
an existing config.
Args:
extra_headers: The dict of headers to transform.
Returns:
A new copy of the input dict with defaults set.
"""
extra_headers = extra_headers.copy() if extra_headers else {}
if 'X-appcfg-api-version' not in extra_headers:
extra_headers['X-appcfg-api-version'] = '1'
return extra_headers
def StubConfigEqualsRequestedConfig(stub, remote_url, target_app_id,
extra_headers):
"""Return true if the stub and requseted stub config match.
Args:
stub: a RemoteApiDatastore stub.
remote_url: requested remote_api url of target app.
target_app_id: requested app_id of target (remote) app.
extra_headers: requested headers for auth, possibly not yet including
defaults applied at stub instantiation time.
Returns:
True if the requested config matches the stub, else False.
"""
return (stub.remote_url == remote_url and
stub.target_app_id == target_app_id and
stub.extra_headers == InsertDefaultExtraHeaders(extra_headers))
def configure_remote_put(remote_url, target_app_id, extra_headers=None):
"""Does necessary setup to intercept v3 Put and v4 AllocateIds.
Args:
remote_url: The url to the remote_api handler.
target_app_id: The app_id of the target app.
extra_headers: Headers to send (for authentication).
Raises:
ConfigurationError: if there is a error configuring the stubs.
"""
if not target_app_id or not remote_url:
raise ConfigurationError('app_id and remote_url required')
for stub_class in (RemoteApiDatastoreV3Stub, RemoteApiDatastoreV4Stub):
service_name = stub_class.ServiceName()
original_datastore_stub = apiproxy_stub_map.apiproxy.GetStub(service_name)
if isinstance(original_datastore_stub, stub_class):
logging.info('Datastore Admin %s RemoteApi stub is already configured.',
service_name)
if not StubConfigEqualsRequestedConfig(
original_datastore_stub, remote_url, target_app_id, extra_headers):
logging.warning('Requested Datastore Admin %s RemoteApi stub '
'configuration differs from existing configuration, '
'attempting reconfiguration.', service_name)
datastore_stub = stub_class(remote_url, target_app_id, extra_headers,
original_datastore_stub.normal_stub)
apiproxy_stub_map.apiproxy.ReplaceStub(service_name, datastore_stub)
else:
datastore_stub = stub_class(remote_url, target_app_id, extra_headers,
original_datastore_stub)
apiproxy_stub_map.apiproxy.RegisterStub(service_name, datastore_stub)