blob: 37a7cb45d795bbfedd18520cc11e974cf4c19185 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
LogService API.
This module allows apps to flush logs, provide status messages, as well as the
ability to programmatically access their log files.
"""
import cStringIO
import os
import sys
import threading
import time
from google.appengine.api import api_base_pb
from google.appengine.api import apiproxy_stub_map
from google.appengine.api.logservice import log_service_pb
from google.appengine.api.logservice import logsutil
from google.appengine.runtime import apiproxy_errors
AUTOFLUSH_ENABLED = True
AUTOFLUSH_EVERY_SECONDS = 10
AUTOFLUSH_EVERY_BYTES = 1024
AUTOFLUSH_EVERY_LINES = 20
MAX_ITEMS_PER_FETCH = 20
LOG_LEVEL_DEBUG = 0
LOG_LEVEL_INFO = 1
LOG_LEVEL_WARNING = 2
LOG_LEVEL_ERROR = 3
LOG_LEVEL_CRITICAL = 4
class Error(Exception):
"""Base error class for this module."""
class InvalidArgumentError(Error):
"""Function argument has invalid value."""
class LogsBuffer(object):
"""Threadsafe buffer for storing and periodically flushing app logs."""
def __init__(self, stream=None, stderr=False):
"""Initializes the buffer, which wraps the given stream or sys.stderr.
The state of the LogsBuffer is protected by a separate lock. The lock is
acquired before any variables are mutated or accessed, and released
afterward. A recursive lock is used so that a single thread can acquire the
lock multiple times, and release it only when an identical number of
'unlock()' calls have been performed.
Args:
stream: A file-like object to store logs. Defaults to a cStringIO object.
stderr: If specified, use sys.stderr as the underlying stream.
"""
self._stderr = stderr
if self._stderr:
assert stream is None
else:
self._stream = stream or cStringIO.StringIO()
self._lock = threading.RLock()
self._reset()
def _lock_and_call(self, method, *args):
"""Calls 'method' while holding the buffer lock."""
self._lock.acquire()
try:
return method(*args)
finally:
self._lock.release()
def stream(self):
"""Returns the underlying file-like object used to buffer logs."""
if self._stderr:
return sys.stderr
else:
return self._stream
def lines(self):
"""Returns the number of log lines currently buffered."""
return self._lock_and_call(lambda: self._lines)
def bytes(self):
"""Returns the size of the log buffer, in bytes."""
return self._lock_and_call(lambda: self._bytes)
def age(self):
"""Returns the number of seconds since the log buffer was flushed."""
return self._lock_and_call(lambda: time.time() - self._flush_time)
def flush_time(self):
"""Returns last time that the log buffer was flushed."""
return self._lock_and_call(lambda: self._flush_time)
def contents(self):
"""Returns the contents of the logs buffer."""
return self._lock_and_call(self._contents)
def _contents(self):
"""Internal version of contents() with no locking."""
try:
return self.stream().getvalue()
except AttributeError:
return ''
def reset(self):
"""Resets the buffer state, without clearing the underlying stream."""
self._lock_and_call(self._reset)
def _reset(self):
"""Internal version of reset() with no locking."""
contents = self._contents()
self._bytes = len(contents)
self._lines = len(contents.split('\n')) - 1
self._flush_time = time.time()
self._request = logsutil.RequestID()
def clear(self):
"""Clears the contents of the logs buffer, and resets autoflush state."""
self._lock_and_call(self._clear)
def _clear(self):
"""Internal version of clear() with no locking."""
if self._bytes > 0:
self.stream().truncate(0)
self._reset()
def close(self):
"""Closes the underlying stream, flushing the current contents."""
self._lock_and_call(self._close)
def _close(self):
"""Internal version of close() with no locking."""
self._flush()
self.stream().close()
def parse_logs(self):
"""Parse the contents of the buffer and return an array of log lines."""
return logsutil.ParseLogs(self.contents())
def write(self, line):
"""Writes a line to the logs buffer."""
return self._lock_and_call(self._write, line)
def _write(self, line):
"""Writes a line to the logs buffer."""
if self._request != logsutil.RequestID():
self._reset()
self.stream().write(line)
self.stream().flush()
self._lines += 1
self._bytes += len(line)
self._autoflush()
def flush(self):
"""Flushes the contents of the logs buffer.
This method holds the buffer lock until the API call has finished to ensure
that flush calls are performed in the correct order, so that log messages
written during the flush call aren't dropped or accidentally wiped, and so
that the other buffer state variables (flush time, lines, bytes) are updated
synchronously with the flush.
"""
self._lock_and_call(self._flush)
def _flush(self):
"""Internal version of flush() with no locking."""
logs = self.parse_logs()
self._clear()
if len(logs) == 0:
return
request = log_service_pb.FlushRequest()
group = log_service_pb.UserAppLogGroup()
for entry in logs:
line = group.add_log_line()
line.set_timestamp_usec(entry[0])
line.set_level(entry[1])
line.set_message(entry[2])
request.set_logs(group.Encode())
response = api_base_pb.VoidProto()
apiproxy_stub_map.MakeSyncCall('logservice', 'Flush', request, response)
def autoflush(self):
"""Flushes the buffer if certain conditions have been met."""
self._lock_and_call(self._autoflush)
def _autoflush(self):
"""Internal version of autoflush() with no locking."""
if not self.autoflush_enabled():
return
if ((AUTOFLUSH_EVERY_SECONDS and self.age() >= AUTOFLUSH_EVERY_SECONDS) or
(AUTOFLUSH_EVERY_LINES and self.lines() >= AUTOFLUSH_EVERY_LINES) or
(AUTOFLUSH_EVERY_BYTES and self.bytes() >= AUTOFLUSH_EVERY_BYTES)):
self._flush()
def autoflush_enabled(self):
"""Indicates if the buffer will periodically flush logs during a request."""
return AUTOFLUSH_ENABLED and 'BACKEND_ID' in os.environ
_global_buffer = LogsBuffer(stderr=True)
def logs_buffer():
"""Returns the LogsBuffer used by the current request."""
return _global_buffer
def write(message):
"""Adds 'message' to the logs buffer, and checks for autoflush.
Args:
message: A message (string) to be written to application logs.
"""
logs_buffer().write(message)
def clear():
"""Clear the logs buffer and reset the autoflush state."""
logs_buffer().clear()
def autoflush():
"""If AUTOFLUSH conditions have been met, performs a Flush API call."""
logs_buffer().autoflush()
def flush():
"""Flushes log lines that are currently buffered."""
logs_buffer().flush()
def flush_time():
"""Returns last time that the logs buffer was flushed."""
return logs_buffer().flush_time()
def log_buffer_age():
"""Returns the number of seconds since the logs buffer was flushed."""
return logs_buffer().age()
def log_buffer_contents():
"""Returns the contents of the logs buffer."""
return logs_buffer().contents()
def log_buffer_bytes():
"""Returns the size of the logs buffer, in bytes."""
return logs_buffer().bytes()
def log_buffer_lines():
"""Returns the number of log lines currently buffered."""
return logs_buffer().lines()
class _LogQueryResult(object):
"""A container that holds logs and a cursor to fetch additional logs.
A _LogQueryResult object is the standard returned item for a call to fetch().
It is iterable - each value returned is a log that the user has queried for,
and internally, it holds a cursor that it uses to fetch more results once the
current, locally held set, are exhausted.
"""
def __init__(self, response):
"""Constructor.
Args:
response: A LogReadResponse object acquired from a call to fetch().
"""
self._logs = response.log_
self._cursor = response.offset_
self._current_log = 0
self._num_logs = len(self._logs)
def __iter__(self):
"""Provides an iterator that yields log records one at a time.
This iterator yields items held locally first, and once these items have
been exhausted, it fetched more items via _advance() and yields them. The
number of items it holds is min(MAX_ITEMS_PER_FETCH, batch_size) - the
latter value can be provided by the user on an initial call to fetch().
"""
while True:
for log_item in self._logs:
yield log_item
if self._cursor:
self._advance()
else:
break
def _advance(self):
"""Acquires additional logs via cursor.
This method is used by the iterator when it has exhausted its current set of
logs to acquire more logs and update its internal structures accordingly.
"""
request = log_service_pb.LogReadRequest()
response = log_service_pb.LogReadResponse()
request.set_app_id(os.environ['APPLICATION_ID'])
if self._cursor:
request.offset_ = self._cursor
apiproxy_stub_map.MakeSyncCall('logservice', 'Read', request, response)
self._logs = response.log_
self._cursor = response.offset_
def fetch(start_time_usec=None,
end_time_usec=None,
batch_size=MAX_ITEMS_PER_FETCH,
min_log_level=None,
include_incomplete=False,
include_app_logs=True,
version_ids=None):
"""Fetches an application's request and/or application-level logs.
Args:
start_time_usec: A long corresponding to the earliest time (in microseconds
since epoch) that results should be fetched for.
end_time_usec: A long corresponding to the latest time (in microseconds
since epoch) that results should be fetched for.
batch_size: The maximum number of log records that this request should
return. A log record corresponds to a web request made to the
application. Therefore, it may include a single request log and multiple
application level logs (e.g., WARN and INFO messages).
min_log_level: The minimum app log level that this request should be
returned. This means that querying for a certain log level always returns
that log level and all log levels above it. In ascending order, the log
levels available are: logs.DEBUG, logs.INFO, logs.WARNING, logs.ERROR,
and logs.CRITICAL.
include_incomplete: Whether or not to include requests that have started but
not yet finished, as a boolean.
include_app_logs: Whether or not to include application level logs in the
results, as a boolean.
version_ids: A list of version ids whose logs should be queried against.
Defaults to the application's current version id only.
Returns:
An iterable object containing the logs that the user has queried for.
Raises:
InvalidArgumentError: Raised if any of the input parameters are not of the
correct type.
"""
request = log_service_pb.LogReadRequest()
response = log_service_pb.LogReadResponse()
request.set_app_id(os.environ['APPLICATION_ID'])
if start_time_usec:
if not isinstance(start_time_usec, long):
raise InvalidArgumentError('start_time_usec must be a long')
request.set_start_time(start_time_usec)
if end_time_usec:
if not isinstance(end_time_usec, long):
raise InvalidArgumentError('end_time_usec must be a long')
request.set_end_time(end_time_usec)
if not isinstance(batch_size, int):
raise InvalidArgumentError('batch_size must be an integer')
if batch_size < 1:
raise InvalidArgumentError('batch_size must be greater than zero')
if batch_size > MAX_ITEMS_PER_FETCH:
raise InvalidArgumentError('batch_size specified was too large')
request.set_count(batch_size)
if min_log_level:
if not isinstance(min_log_level, int):
raise InvalidArgumentError('min_log_level must be an int')
if not min_log_level in range(LOG_LEVEL_CRITICAL+1):
raise InvalidArgumentError("""min_log_level must be between 0 and 4
inclusive""")
request.set_minimum_log_level(min_log_level)
if not isinstance(include_incomplete, bool):
raise InvalidArgumentError('include_incomplete must be boolean')
request.set_include_incomplete(include_incomplete)
if not isinstance(include_app_logs, bool):
raise InvalidArgumentError('include_app_logs must be boolean')
request.set_include_app_logs(include_app_logs)
if version_ids is None:
version_ids = [os.environ['CURRENT_VERSION_ID']]
if not isinstance(version_ids, list):
raise InvalidArgumentError('version_ids must be a list')
request.version_id_ = version_ids
apiproxy_stub_map.MakeSyncCall('logservice', 'Read', request, response)
return _LogQueryResult(response)