blob: 2269c28047612cd15152647e8c3822b565c65b7d [file] [log] [blame]
<?php
/**
* Copyright 2007 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Cloud Storage Write Client implements the stream wrapper functions required
* to write to a Google Cloud Storage object.
*
*/
namespace google\appengine\ext\cloud_storage_streams;
// TODO: Retry on transient errors.
final class CloudStorageWriteClient extends CloudStorageClient {
// GS requires all chunks of data written to be multiples of 256K except for
// the last chunk.
const WRITE_CHUNK_SIZE = 262144;
// Conservative pattern for metadata headers name - could be relaxed
const METADATA_KEY_REGEX = "/^[[:alnum:]-]+$/";
// Metadata header value must be printable US ascii
// http://tools.ietf.org/html/rfc2616#section-4.2
const METADATA_VALUE_REGEX = "/^[[:print:]]*$/";
// The array of bytes to be written to GS
private $byte_buffer;
// The resumable upload ID we are using for this upload.
private $upload_id;
// The offset in the file where the current buffer starts
private $buffer_start_offset;
// The number of bytes we've written to GS so far.
private $total_bytes_uploaded;
public function __construct($bucket, $object, $context) {
parent::__construct($bucket, $object, $context);
}
/**
* Called when the stream is being opened. Try and start a resumable upload
* here.
*
* @return true if the streamable upload started, false otherwise.
*/
public function initialize() {
$headers = parent::$upload_start_header;
$token_header = $this->getOAuthTokenHeader(parent::WRITE_SCOPE);
if ($token_header === false) {
trigger_error("Unable to acquire OAuth token.", E_USER_WARNING);
return false;
}
$headers = array_merge($headers, $token_header);
if (array_key_exists("Content-Type", $this->context_options)) {
$headers["Content-Type"] = $this->context_options["Content-Type"];
$this->content_type = $this->context_options["Content-Type"];
}
if (array_key_exists("acl", $this->context_options)) {
$acl = $this->context_options["acl"];
if (in_array($acl, parent::$valid_acl_values)) {
$headers["x-goog-acl"] = $acl;
} else {
trigger_error(sprintf("Invalid ACL value: %s", $acl), E_USER_WARNING);
return false;
}
}
if (array_key_exists("metadata", $this->context_options)) {
$metadata = $this->context_options["metadata"];
foreach ($metadata as $name => $value) {
if (!preg_match(self::METADATA_KEY_REGEX, $name)) {
trigger_error(sprintf("Invalid metadata key: %s", $name),
E_USER_WARNING);
return false;
}
if (!preg_match(self::METADATA_KEY_REGEX, $value)) {
trigger_error(sprintf("Invalid metadata value: %s", $value),
E_USER_WARNING);
return false;
}
$headers['x-goog-meta-' . $name] = $value;
$this->metadata[$name] = $value;
}
}
$http_response = $this->makeHttpRequest($this->url,
"POST",
$headers);
if ($http_response === false) {
trigger_error("Unable to connect to Google Cloud Storage Service.",
E_USER_WARNING);
return false;
}
$status_code = $http_response['status_code'];
if ($status_code == HttpResponse::FORBIDDEN) {
trigger_error("Access Denied", E_USER_WARNING);
return false;
}
if ($status_code != HttpResponse::CREATED) {
trigger_error($this->getErrorMessage($http_response['status_code'],
$http_response['body']),
E_USER_WARNING);
return false;
}
$location = $this->getHeaderValue("Location", $http_response['headers']);
$query_str = parse_url($location)["query"];
parse_str($query_str, $query_arr);
$this->upload_id = $query_arr["upload_id"];
if (!isset($this->upload_id)) {
trigger_error(sprintf("Location Header was not returned (%s).",
implode(",",
array_keys($http_response['headers']))),
E_USER_WARNING);
return false;
}
$this->buffer_start_offset = 0;
$this->total_bytes_uploaded = 0;
$this->byte_buffer = "";
return true;
}
/**
* Return the number of bytes written.
*/
public function write($data) {
$this->byte_buffer .= $data;
$current_buffer_len = strlen($this->byte_buffer);
$data_len = strlen($data);
// If this data doesn't fill the buffer then write it and return.
if ($current_buffer_len < self::WRITE_CHUNK_SIZE) {
return $data_len;
}
// Write out this data
if (!$this->writeBufferToGS()) {
// Remove the bytes we added to the buffer
$this->byte_buffer = substr($this->byte_buffer, 0, -strlen($data));
return 0;
}
// We wrote the buffered content - but only return the amount of $data
// we wrote as per the contract of write()
return $data_len;
}
/**
* Because of the write byte alignment required by GS we will not write any
* data on a flush. If there is data remaining in the buffer we'll write it
* during close.
*/
public function flush() {
return true;
}
/**
* When closing the stream we need to complete the upload.
*/
public function close() {
$this->writeBufferToGS(true);
}
public function getMetaData() {
if (array_key_exists("metadata", $this->context_options)) {
return $this->context_options["metadata"];
}
return [];
}
public function getContentType() {
if (array_key_exists("Content-Type", $this->context_options)) {
return $this->context_options["Content-Type"];
}
return null;
}
private function writeBufferToGS($complete = false) {
$headers = $this->getOAuthTokenHeader(parent::WRITE_SCOPE);
if ($headers === false) {
trigger_error("Unable to acquire OAuth token.", E_USER_ERROR);
return false;
}
$buffer_len = strlen($this->byte_buffer);
if ($complete) {
$write_size = $buffer_len;
} else {
// Incomplete writes should never be less than WRITE_CHUNK_SIZE
assert($buffer_len >= self::WRITE_CHUNK_SIZE);
// Is PHP the only language in the world where the quotient of two
// integers is a double?
$write_size =
floor($buffer_len / self::WRITE_CHUNK_SIZE) * self::WRITE_CHUNK_SIZE;
}
// Determine the final byte of the buffer we're writing for Range header.
if ($write_size !== 0) {
$write_end_byte = $this->buffer_start_offset + $write_size - 1;
$body = substr($this->byte_buffer, 0, $write_size);
} else {
$body = null;
}
if ($complete) {
$object_length = $this->buffer_start_offset + $write_size;
if ($write_size === 0) {
$headers['Content-Range'] = sprintf(parent::FINAL_CONTENT_RANGE_NO_DATA,
$object_length);
} else {
$headers['Content-Range'] = sprintf(parent::FINAL_CONTENT_RANGE_FORMAT,
$this->buffer_start_offset,
$write_end_byte,
$object_length);
}
} else {
$headers['Content-Range'] = sprintf(parent::PARTIAL_CONTENT_RANGE_FORMAT,
$this->buffer_start_offset,
$write_end_byte);
}
$url = sprintf("%s?upload_id=%s", $this->url, $this->upload_id);
$http_response = $this->makeHttpRequest($url, "PUT", $headers, $body);
$code = $http_response['status_code'];
// TODO: Retry on some status codes.
if (($complete && $code != HttpResponse::OK) ||
(!$complete && $code != HttpResponse::RESUME_INCOMPLETE)) {
trigger_error($this->getErrorMessage($http_response['status_code'],
$http_response['body']),
E_USER_WARNING);
return false;
}
// Buffer flushed, update pointers if we actually wrote something.
if ($write_size !== 0) {
$this->buffer_start_offset = $write_end_byte + 1;
$this->byte_buffer = substr($this->byte_buffer, $write_size);
}
// Invalidate any cached object with the same name. Note that there is a
// potential race condition when using optimistic caching and invalidate
// on write where the old version of an object can still be returned from
// the cache.
if ($complete && $this->context_options['enable_cache'] === true) {
if ($object_length > 0) {
$key_names = [];
for ($i = 0; $i < $object_length; $i += parent::DEFAULT_READ_SIZE) {
$range = $this->getRangeHeader($i,
$i + parent::DEFAULT_READ_SIZE - 1);
$key_names[] = sprintf(parent::MEMCACHE_KEY_FORMAT,
$this->url,
$range['Range']);
}
$memcached = new \Memcached();
$memcached->deleteMulti($key_names);
}
}
return true;
}
}