blob: 18e9b78b80a4292bffa82aaa824316544873ff7e [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Dispatcher to handle Google Cloud Storage stub requests."""
import cgi
import httplib
import re
import urllib
import urlparse
import xml.etree.ElementTree as ET
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import urlfetch_stub
from google.appengine.ext.cloudstorage import cloudstorage_stub
from google.appengine.ext.cloudstorage import common
def _urlfetch_to_gcs_stub(url, payload, method, headers, request, response,
follow_redirects=False, deadline=None,
validate_certificate=None):
"""Forwards gcs urlfetch requests to gcs_dispatcher.
See apphosting.api.urlfetch_stub.URLFetchServiceStub._RetrieveURL.
"""
headers_map = dict(
(header.key().lower(), header.value()) for header in headers)
result = dispatch(method, headers_map, url, payload)
response.set_statuscode(result.status_code)
response.set_content(result.content[:urlfetch_stub.MAX_RESPONSE_SIZE])
for k, v in result.headers.iteritems():
if k.lower() == 'content-length' and method != 'HEAD':
v = len(response.content())
header_proto = response.add_header()
header_proto.set_key(k)
header_proto.set_value(str(v))
if len(result.content) > urlfetch_stub.MAX_RESPONSE_SIZE:
response.set_contentwastruncated(True)
def _urlmatcher_for_gcs_stub(url):
"""Determines whether a url should be handled by gcs stub."""
_, host, _, _, _ = urlparse.urlsplit(url)
return host == common.LOCAL_API_HOST
URLMATCHERS_TO_FETCH_FUNCTIONS = [
(_urlmatcher_for_gcs_stub, _urlfetch_to_gcs_stub)]
class _FakeUrlFetchResult(object):
def __init__(self, status, headers, content):
self.status_code = status
self.headers = headers
self.content = content
def dispatch(method, headers, url, payload):
"""Dispatches incoming request and returns response.
In dev appserver or unittest environment, this method is called instead of
urlfetch.
Args:
method: urlfetch method.
headers: urlfetch headers.
url: urlfetch url.
payload: urlfecth payload.
Returns:
A _FakeUrlFetchResult.
Raises:
ValueError: invalid request method.
"""
method, headers, filename, param_dict = _preprocess(method, headers, url)
gcs_stub = cloudstorage_stub.CloudStorageStub(
apiproxy_stub_map.apiproxy.GetStub('blobstore').storage)
if method == 'POST':
return _handle_post(gcs_stub, filename, headers)
elif method == 'PUT':
return _handle_put(gcs_stub, filename, param_dict, headers, payload)
elif method == 'GET':
return _handle_get(gcs_stub, filename, param_dict, headers)
elif method == 'HEAD':
return _handle_head(gcs_stub, filename)
elif method == 'DELETE':
return _handle_delete(gcs_stub, filename)
raise ValueError('Unrecognized request method %r.' % method)
def _preprocess(method, headers, url):
"""Unify input.
Example:
_preprocess('POST', {'Content-Type': 'Foo'}, http://gcs.com/b/f?foo=bar)
-> 'POST', {'content-type': 'Foo'}, '/b/f', {'foo':'bar'}
Args:
method: HTTP method used by the request.
headers: HTTP request headers in a dict.
url: HTTP request url.
Returns:
method: method in all upper case.
headers: headers with keys in all lower case.
filename: a google storage filename of form /bucket/filename or
a bucket path of form /bucket
param_dict: a dict of query parameters.
"""
_, _, filename, query, _ = urlparse.urlsplit(url)
param_dict = urlparse.parse_qs(query, True)
for k in param_dict:
param_dict[k] = urllib.unquote(param_dict[k][0])
headers = dict((k.lower(), v) for k, v in headers.iteritems())
return method, headers, urllib.unquote(filename), param_dict
def _handle_post(gcs_stub, filename, headers):
"""Handle POST that starts object creation."""
content_type = _ContentType(headers)
token = gcs_stub.post_start_creation(filename, headers)
response_headers = {
'location': 'https://storage.googleapis.com/%s?%s' % (
filename,
urllib.urlencode({'upload_id': token})),
'content-type': content_type.value,
'content-length': 0
}
return _FakeUrlFetchResult(httplib.CREATED, response_headers, '')
def _handle_put(gcs_stub, filename, param_dict, headers, payload):
"""Handle PUT."""
if _iscopy(headers):
return _copy(gcs_stub, filename, headers)
token = _get_param('upload_id', param_dict)
content_range = _ContentRange(headers)
if not content_range.value:
raise ValueError('Missing header content-range.')
if not token:
if content_range.length is None:
raise ValueError('Content-Range must have a final length.')
elif not content_range.no_data and content_range.range[0] != 0:
raise ValueError('Content-Range must specify complete object.')
else:
token = gcs_stub.post_start_creation(filename, headers)
try:
gcs_stub.put_continue_creation(token,
payload,
content_range.range,
content_range.length)
except ValueError, e:
return _FakeUrlFetchResult(e.args[1], {}, e.args[0])
if content_range.length is not None:
filestat = gcs_stub.head_object(filename)
response_headers = {
'content-length': filestat.st_size,
}
response_status = httplib.OK
else:
response_headers = {}
response_status = 308
return _FakeUrlFetchResult(response_status, response_headers, '')
def _iscopy(headers):
copysource = _XGoogCopySource(headers)
return copysource.value is not None
def _copy(gcs_stub, filename, headers):
"""Copy file.
Args:
gcs_stub: an instance of gcs stub.
filename: dst filename of format /bucket/filename
headers: a dict of request headers. Must contain _XGoogCopySource header.
Returns:
An _FakeUrlFetchResult instance.
"""
source = _XGoogCopySource(headers).value
result = _handle_head(gcs_stub, source)
if result.status_code == httplib.NOT_FOUND:
return result
gcs_stub.put_copy(source, filename)
return _FakeUrlFetchResult(httplib.OK, {}, '')
def _handle_get(gcs_stub, filename, param_dict, headers):
"""Handle GET object and GET bucket."""
if filename.rfind('/') == 0:
return _handle_get_bucket(gcs_stub, filename, param_dict)
else:
result = _handle_head(gcs_stub, filename)
if result.status_code == httplib.NOT_FOUND:
return result
start, end = _Range(headers).value
st_size = result.headers['content-length']
if end is None:
end = st_size - 1
result.headers['content-range'] = 'bytes: %d-%d/%d' % (start,
end,
st_size)
result.content = gcs_stub.get_object(filename, start, end)
return result
def _handle_get_bucket(gcs_stub, bucketpath, param_dict):
"""Handle get bucket request."""
prefix = _get_param('prefix', param_dict, '')
max_keys = _get_param('max-keys', param_dict, common._MAX_GET_BUCKET_RESULT)
marker = _get_param('marker', param_dict, '')
delimiter = _get_param('delimiter', param_dict, '')
stats, last_filename, is_truncated = gcs_stub.get_bucket(
bucketpath, prefix, marker, max_keys, delimiter)
builder = ET.TreeBuilder()
builder.start('ListBucketResult', {'xmlns': common.CS_XML_NS})
for stat in stats:
filename = stat.filename[len(bucketpath) + 1:]
if stat.is_dir:
builder.start('CommonPrefixes', {})
builder.start('Prefix', {})
builder.data(filename)
builder.end('Prefix')
builder.end('CommonPrefixes')
else:
builder.start('Contents', {})
builder.start('Key', {})
builder.data(filename)
builder.end('Key')
builder.start('LastModified', {})
builder.data(common.posix_to_dt_str(stat.st_ctime))
builder.end('LastModified')
builder.start('ETag', {})
builder.data(stat.etag)
builder.end('ETag')
builder.start('Size', {})
builder.data(str(stat.st_size))
builder.end('Size')
builder.end('Contents')
if last_filename:
builder.start('NextMarker', {})
builder.data(last_filename[len(bucketpath) + 1:])
builder.end('NextMarker')
builder.start('IsTruncated', {})
builder.data(str(is_truncated))
builder.end('IsTruncated')
max_keys = _get_param('max-keys', param_dict)
if max_keys is not None:
builder.start('MaxKeys', {})
builder.data(str(max_keys))
builder.end('MaxKeys')
builder.end('ListBucketResult')
root = builder.close()
body = ET.tostring(root)
response_headers = {'content-length': len(body),
'content-type': 'application/xml'}
return _FakeUrlFetchResult(httplib.OK, response_headers, body)
def _handle_head(gcs_stub, filename):
"""Handle HEAD request."""
filestat = gcs_stub.head_object(filename)
if not filestat:
return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')
http_time = common.posix_time_to_http(filestat.st_ctime)
response_headers = {
'content-length': filestat.st_size,
'content-type': filestat.content_type,
'etag': filestat.etag,
'last-modified': http_time
}
if filestat.metadata:
response_headers.update(filestat.metadata)
return _FakeUrlFetchResult(httplib.OK, response_headers, '')
def _handle_delete(gcs_stub, filename):
"""Handle DELETE object."""
if gcs_stub.delete_object(filename):
return _FakeUrlFetchResult(httplib.NO_CONTENT, {}, '')
else:
return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')
class _Header(object):
"""Wrapper class for a header.
A subclass helps to parse a specific header.
"""
HEADER = ''
DEFAULT = None
def __init__(self, headers):
"""Initialize.
Initializes self.value to the value in request header, or DEFAULT if
not defined in headers.
Args:
headers: request headers.
"""
self.value = self.DEFAULT
for k in headers:
if k.lower() == self.HEADER.lower():
self.value = headers[k]
break
class _XGoogCopySource(_Header):
"""x-goog-copy-source: /bucket/filename."""
HEADER = 'x-goog-copy-source'
class _ContentType(_Header):
"""Content-type header."""
HEADER = 'Content-Type'
DEFAULT = 'binary/octet-stream'
class _ContentRange(_Header):
"""Content-Range header.
Used by resumable upload of unknown size. Possible formats:
Content-Range: bytes 1-3/* (for uploading of unknown size)
Content-Range: bytes */5 (for finalizing with no data)
"""
HEADER = 'Content-Range'
RE_PATTERN = re.compile(r'^bytes (([0-9]+)-([0-9]+)|\*)/([0-9]+|\*)$')
def __init__(self, headers):
super(_ContentRange, self).__init__(headers)
if self.value:
result = self.RE_PATTERN.match(self.value)
if not result:
raise ValueError('Invalid content-range header %s' % self.value)
self.no_data = result.group(1) == '*'
last = result.group(4) != '*'
self.length = None
if last:
self.length = long(result.group(4))
if self.no_data and not last:
raise ValueError('Invalid content-range header %s' % self.value)
self.range = None
if not self.no_data:
self.range = (long(result.group(2)), long(result.group(3)))
class _Range(_Header):
"""_Range header.
Used by read. Format: Range: bytes=1-3.
"""
HEADER = 'Range'
def __init__(self, headers):
super(_Range, self).__init__(headers)
if self.value:
start, end = self.value.rsplit('=', 1)[-1].split('-')
start, end = long(start), long(end)
else:
start, end = 0, None
self.value = start, end
def _get_param(param, param_dict, default=None):
"""Gets a parameter value from request query parameters.
Args:
param: name of the parameter to get.
param_dict: a dict of request query parameters.
default: default value if not defined.
Returns:
Value of the parameter or default if not defined.
"""
result = param_dict.get(param, default)
if param in ['max-keys'] and result:
return long(result)
return result