blob: 51b4c842b798d9d148c59a60747ae851ee218958 [file] [log] [blame]
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""File Interface for Google Cloud Storage."""
from __future__ import with_statement
__all__ = ['delete',
'listbucket',
'open',
'stat',
]
import logging
import StringIO
import urllib
import xml.etree.cElementTree as ET
from . import api_utils
from . import common
from . import errors
from . import storage_api
def open(filename,
mode='r',
content_type=None,
options=None,
read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE,
retry_params=None,
_account_id=None):
"""Opens a Google Cloud Storage file and returns it as a File-like object.
Args:
filename: A Google Cloud Storage filename of form '/bucket/filename'.
mode: 'r' for reading mode. 'w' for writing mode.
In reading mode, the file must exist. In writing mode, a file will
be created or be overrode.
content_type: The MIME type of the file. str. Only valid in writing mode.
options: A str->basestring dict to specify additional headers to pass to
GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
Supported options are x-goog-acl, x-goog-meta-, cache-control,
content-disposition, and content-encoding.
Only valid in writing mode.
See https://developers.google.com/storage/docs/reference-headers
for details.
read_buffer_size: The buffer size for read. Read keeps a buffer
and prefetches another one. To minimize blocking for large files,
always read by buffer size. To minimize number of RPC requests for
small files, set a large buffer size. Max is 30MB.
retry_params: An instance of api_utils.RetryParams for subsequent calls
to GCS from this file handle. If None, the default one is used.
_account_id: Internal-use only.
Returns:
A reading or writing buffer that supports File-like interface. Buffer
must be closed after operations are done.
Raises:
errors.AuthorizationError: if authorization failed.
errors.NotFoundError: if an object that's expected to exist doesn't.
ValueError: invalid open mode or if content_type or options are specified
in reading mode.
"""
common.validate_file_path(filename)
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
filename = api_utils._quote_filename(filename)
if mode == 'w':
common.validate_options(options)
return storage_api.StreamingBuffer(api, filename, content_type, options)
elif mode == 'r':
if content_type or options:
raise ValueError('Options and content_type can only be specified '
'for writing mode.')
return storage_api.ReadBuffer(api,
filename,
buffer_size=read_buffer_size)
else:
raise ValueError('Invalid mode %s.' % mode)
def delete(filename, retry_params=None, _account_id=None):
"""Delete a Google Cloud Storage file.
Args:
filename: A Google Cloud Storage filename of form '/bucket/filename'.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
_account_id: Internal-use only.
Raises:
errors.NotFoundError: if the file doesn't exist prior to deletion.
"""
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
common.validate_file_path(filename)
filename = api_utils._quote_filename(filename)
status, resp_headers, _ = api.delete_object(filename)
errors.check_status(status, [204], filename, resp_headers=resp_headers)
def stat(filename, retry_params=None, _account_id=None):
"""Get GCSFileStat of a Google Cloud storage file.
Args:
filename: A Google Cloud Storage filename of form '/bucket/filename'.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
_account_id: Internal-use only.
Returns:
a GCSFileStat object containing info about this file.
Raises:
errors.AuthorizationError: if authorization failed.
errors.NotFoundError: if an object that's expected to exist doesn't.
"""
common.validate_file_path(filename)
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
status, headers, _ = api.head_object(api_utils._quote_filename(filename))
errors.check_status(status, [200], filename, resp_headers=headers)
file_stat = common.GCSFileStat(
filename=filename,
st_size=headers.get('content-length'),
st_ctime=common.http_time_to_posix(headers.get('last-modified')),
etag=headers.get('etag'),
content_type=headers.get('content-type'),
metadata=common.get_metadata(headers))
return file_stat
def _copy2(src, dst, metadata=None, retry_params=None):
"""Copy the file content from src to dst.
Internal use only!
Args:
src: /bucket/filename
dst: /bucket/filename
metadata: a dict of metadata for this copy. If None, old metadata is copied.
For example, {'x-goog-meta-foo': 'bar'}.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
Raises:
errors.AuthorizationError: if authorization failed.
errors.NotFoundError: if an object that's expected to exist doesn't.
"""
common.validate_file_path(src)
common.validate_file_path(dst)
if metadata is None:
metadata = {}
copy_meta = 'COPY'
else:
copy_meta = 'REPLACE'
metadata.update({'x-goog-copy-source': src,
'x-goog-metadata-directive': copy_meta})
api = storage_api._get_storage_api(retry_params=retry_params)
status, resp_headers, _ = api.put_object(
api_utils._quote_filename(dst), headers=metadata)
errors.check_status(status, [200], src, metadata, resp_headers)
def listbucket(path_prefix, marker=None, prefix=None, max_keys=None,
delimiter=None, retry_params=None, _account_id=None):
"""Returns a GCSFileStat iterator over a bucket.
Optional arguments can limit the result to a subset of files under bucket.
This function has two modes:
1. List bucket mode: Lists all files in the bucket without any concept of
hierarchy. GCS doesn't have real directory hierarchies.
2. Directory emulation mode: If you specify the 'delimiter' argument,
it is used as a path separator to emulate a hierarchy of directories.
In this mode, the "path_prefix" argument should end in the delimiter
specified (thus designates a logical directory). The logical directory's
contents, both files and subdirectories, are listed. The names of
subdirectories returned will end with the delimiter. So listbucket
can be called with the subdirectory name to list the subdirectory's
contents.
Args:
path_prefix: A Google Cloud Storage path of format "/bucket" or
"/bucket/prefix". Only objects whose fullpath starts with the
path_prefix will be returned.
marker: Another path prefix. Only objects whose fullpath starts
lexicographically after marker will be returned (exclusive).
prefix: Deprecated. Use path_prefix.
max_keys: The limit on the number of objects to return. int.
For best performance, specify max_keys only if you know how many objects
you want. Otherwise, this method requests large batches and handles
pagination for you.
delimiter: Use to turn on directory mode. str of one or multiple chars
that your bucket uses as its directory separator.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
_account_id: Internal-use only.
Examples:
For files "/bucket/a",
"/bucket/bar/1"
"/bucket/foo",
"/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1",
Regular mode:
listbucket("/bucket/f", marker="/bucket/foo/1")
will match "/bucket/foo/2/1", "/bucket/foo/3/1".
Directory mode:
listbucket("/bucket/", delimiter="/")
will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/".
listbucket("/bucket/foo/", delimiter="/")
will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/"
Returns:
Regular mode:
A GCSFileStat iterator over matched files ordered by filename.
The iterator returns GCSFileStat objects. filename, etag, st_size,
st_ctime, and is_dir are set.
Directory emulation mode:
A GCSFileStat iterator over matched files and directories ordered by
name. The iterator returns GCSFileStat objects. For directories,
only the filename and is_dir fields are set.
The last name yielded can be used as next call's marker.
"""
if prefix:
common.validate_bucket_path(path_prefix)
bucket = path_prefix
else:
bucket, prefix = common._process_path_prefix(path_prefix)
if marker and marker.startswith(bucket):
marker = marker[len(bucket) + 1:]
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
options = {}
if marker:
options['marker'] = marker
if max_keys:
options['max-keys'] = max_keys
if prefix:
options['prefix'] = prefix
if delimiter:
options['delimiter'] = delimiter
return _Bucket(api, bucket, options)
class _Bucket(object):
"""A wrapper for a GCS bucket as the return value of listbucket."""
def __init__(self, api, path, options):
"""Initialize.
Args:
api: storage_api instance.
path: bucket path of form '/bucket'.
options: a dict of listbucket options. Please see listbucket doc.
"""
self._init(api, path, options)
def _init(self, api, path, options):
self._api = api
self._path = path
self._options = options.copy()
self._get_bucket_fut = self._api.get_bucket_async(
self._path + '?' + urllib.urlencode(self._options))
self._last_yield = None
self._new_max_keys = self._options.get('max-keys')
def __getstate__(self):
options = self._options
if self._last_yield:
options['marker'] = self._last_yield.filename[len(self._path) + 1:]
if self._new_max_keys is not None:
options['max-keys'] = self._new_max_keys
return {'api': self._api,
'path': self._path,
'options': options}
def __setstate__(self, state):
self._init(state['api'], state['path'], state['options'])
def __iter__(self):
"""Iter over the bucket.
Yields:
GCSFileStat: a GCSFileStat for an object in the bucket.
They are ordered by GCSFileStat.filename.
"""
total = 0
max_keys = self._options.get('max-keys')
while self._get_bucket_fut:
status, resp_headers, content = self._get_bucket_fut.get_result()
errors.check_status(status, [200], self._path, resp_headers=resp_headers,
extras=self._options)
if self._should_get_another_batch(content):
self._get_bucket_fut = self._api.get_bucket_async(
self._path + '?' + urllib.urlencode(self._options))
else:
self._get_bucket_fut = None
root = ET.fromstring(content)
dirs = self._next_dir_gen(root)
files = self._next_file_gen(root)
next_file = files.next()
next_dir = dirs.next()
while ((max_keys is None or total < max_keys) and
not (next_file is None and next_dir is None)):
total += 1
if next_file is None:
self._last_yield = next_dir
next_dir = dirs.next()
elif next_dir is None:
self._last_yield = next_file
next_file = files.next()
elif next_dir < next_file:
self._last_yield = next_dir
next_dir = dirs.next()
elif next_file < next_dir:
self._last_yield = next_file
next_file = files.next()
else:
logging.error(
'Should never reach. next file is %r. next dir is %r.',
next_file, next_dir)
if self._new_max_keys:
self._new_max_keys -= 1
yield self._last_yield
def _next_file_gen(self, root):
"""Generator for next file element in the document.
Args:
root: root element of the XML tree.
Yields:
GCSFileStat for the next file.
"""
for e in root.getiterator(common._T_CONTENTS):
st_ctime, size, etag, key = None, None, None, None
for child in e.getiterator('*'):
if child.tag == common._T_LAST_MODIFIED:
st_ctime = common.dt_str_to_posix(child.text)
elif child.tag == common._T_ETAG:
etag = child.text
elif child.tag == common._T_SIZE:
size = child.text
elif child.tag == common._T_KEY:
key = child.text
yield common.GCSFileStat(self._path + '/' + key,
size, etag, st_ctime)
e.clear()
yield None
def _next_dir_gen(self, root):
"""Generator for next directory element in the document.
Args:
root: root element in the XML tree.
Yields:
GCSFileStat for the next directory.
"""
for e in root.getiterator(common._T_COMMON_PREFIXES):
yield common.GCSFileStat(
self._path + '/' + e.find(common._T_PREFIX).text,
st_size=None, etag=None, st_ctime=None, is_dir=True)
e.clear()
yield None
def _should_get_another_batch(self, content):
"""Whether to issue another GET bucket call.
Args:
content: response XML.
Returns:
True if should, also update self._options for the next request.
False otherwise.
"""
if ('max-keys' in self._options and
self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT):
return False
elements = self._find_elements(
content, set([common._T_IS_TRUNCATED,
common._T_NEXT_MARKER]))
if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true':
return False
next_marker = elements.get(common._T_NEXT_MARKER)
if next_marker is None:
self._options.pop('marker', None)
return False
self._options['marker'] = next_marker
return True
def _find_elements(self, result, elements):
"""Find interesting elements from XML.
This function tries to only look for specified elements
without parsing the entire XML. The specified elements is better
located near the beginning.
Args:
result: response XML.
elements: a set of interesting element tags.
Returns:
A dict from element tag to element value.
"""
element_mapping = {}
result = StringIO.StringIO(result)
for _, e in ET.iterparse(result, events=('end',)):
if not elements:
break
if e.tag in elements:
element_mapping[e.tag] = e.text
elements.remove(e.tag)
return element_mapping