blob: 7e02772385474d6772b65b80631fdf037d8605da [file] [log] [blame]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import collections
import contextlib
import json
import os
import posixpath
import socket
import sys
import threading
import types
from . import streamname, varint
_StreamParamsBase = collections.namedtuple('_StreamParamsBase',
('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
# Magic number at the beginning of a Butler stream
#
# See "ProtocolFrameHeaderMagic" in:
# <luci-go>/logdog/client/butlerlib/streamproto
BUTLER_MAGIC = 'BTLR1\x1e'
class StreamParams(_StreamParamsBase):
"""Defines the set of parameters to apply to a new stream."""
# A text content stream.
TEXT = 'text'
# A binary content stream.
BINARY = 'binary'
# A datagram content stream.
DATAGRAM = 'datagram'
# Tee parameter to tee this stream through the Butler's STDOUT.
TEE_STDOUT = 'stdout'
# Tee parameter to tee this stream through the Butler's STDERR.
TEE_STDERR = 'stderr'
@classmethod
def make(cls, **kwargs):
"""Returns (StreamParams): A new StreamParams instance with supplied values.
Any parameter that isn't supplied will be set to None.
Args:
kwargs (dict): Named parameters to apply.
"""
return cls(**{f: kwargs.get(f) for f in cls._fields})
def validate(self):
"""Raises (ValueError): if the parameters are not valid."""
streamname.validate_stream_name(self.name)
if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM):
raise ValueError('Invalid type (%s)' % (self.type,))
if self.tags is not None:
if not isinstance(self.tags, collections.Mapping):
raise ValueError('Invalid tags type (%s)' % (self.tags,))
for k, v in self.tags.iteritems():
streamname.validate_tag(k, v)
if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR):
raise ValueError('Invalid tee type (%s)' % (self.tee,))
if not isinstance(self.binary_file_extension,
(types.NoneType, types.StringTypes)):
raise ValueError('Invalid binary file extension type (%s)' % (
self.binary_file_extension,))
def to_json(self):
"""Returns (str): The JSON representation of the StreamParams.
Converts stream parameters to JSON for Butler consumption.
Raises:
ValueError: if these parameters are not valid.
"""
self.validate()
obj = {
'name': self.name,
'type': self.type,
}
def maybe_add(key, value):
if value is not None:
obj[key] = value
maybe_add('contentType', self.content_type)
maybe_add('tags', self.tags)
maybe_add('tee', self.tee)
maybe_add('binaryFileExtension', self.binary_file_extension)
# Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
class StreamProtocolRegistry(object):
"""Registry of streamserver URI protocols and their client classes.
"""
def __init__(self):
self._registry = {}
def register_protocol(self, protocol, client_cls):
assert issubclass(client_cls, StreamClient)
if self._registry.get(protocol) is not None:
raise KeyError('Duplicate protocol registered.')
self._registry[protocol] = client_cls
def create(self, uri, **kwargs):
"""Returns (StreamClient): A stream client for the specified URI.
This uses the default StreamProtocolRegistry to instantiate a StreamClient
for the specified URI.
Args:
uri (str): The streamserver URI.
kwargs: keyword arguments to forward to the stream. See
StreamClient.__init__.
Raises:
ValueError: if the supplied URI references an invalid or improperly
configured streamserver.
"""
uri = uri.split(':', 1)
if len(uri) != 2:
raise ValueError('Invalid stream server URI [%s]' % (uri,))
protocol, value = uri
client_cls = self._registry.get(protocol)
if not client_cls:
raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
return client_cls._create(value, **kwargs)
# Default (global) registry.
_default_registry = StreamProtocolRegistry()
create = _default_registry.create
class StreamClient(object):
"""Abstract base class for a streamserver client.
"""
class _StreamBase(object):
"""ABC for StreamClient streams."""
def __init__(self, stream_client, params):
self._stream_client = stream_client
self._params = params
@property
def params(self):
"""Returns (StreamParams): The stream parameters."""
return self._params
@property
def path(self):
"""Returns (streamname.StreamPath): The stream path.
Raises:
ValueError: if the stream path is invalid, or if the stream prefix is
not defined in the client.
"""
return self._stream_client.get_stream_path(self._params.name)
def get_viewer_url(self):
"""Returns (str): The viewer URL for this stream.
Raises:
KeyError: if information needed to construct the URL is missing.
ValueError: if the stream prefix or name do not form a valid stream
path.
"""
return self._stream_client.get_viewer_url(self._params.name)
class _BasicStream(_StreamBase):
"""Wraps a basic file descriptor, offering "write" and "close"."""
def __init__(self, stream_client, params, fd):
super(StreamClient._BasicStream, self).__init__(stream_client, params)
self._fd = fd
@property
def fd(self):
return self._fd
def fileno(self):
return self._fd.fileno()
def write(self, data):
return self._fd.write(data)
def close(self):
return self._fd.close()
class _DatagramStream(_StreamBase):
"""Wraps a stream object to write length-prefixed datagrams."""
def __init__(self, stream_client, params, fd):
super(StreamClient._DatagramStream, self).__init__(stream_client, params)
self._fd = fd
def send(self, data):
varint.write_uvarint(self._fd, len(data))
self._fd.write(data)
def close(self):
return self._fd.close()
def __init__(self, project=None, prefix=None, coordinator_host=None,
namespace=''):
"""Constructs a new base StreamClient instance.
Args:
project (str or None): If not None, the name of the log stream project.
prefix (str or None): If not None, the log stream session prefix.
coordinator_host (str or None): If not None, the name of the Coordinator
host that this stream client is bound to. This will be used to
construct viewer URLs for generated streams.
namespace (str): The prefix to apply to all streams opened by this client.
"""
self._project = project
self._prefix = prefix
self._coordinator_host = coordinator_host
self._namespace = namespace
self._name_lock = threading.Lock()
self._names = set()
@property
def project(self):
"""Returns (str or None): The stream project, or None if not configured."""
return self._project
@property
def prefix(self):
"""Returns (str or None): The stream prefix, or None if not configured."""
return self._prefix
@property
def coordinator_host(self):
"""Returns (str or None): The coordinator host, or None if not configured.
"""
return self._coordinator_host
def get_stream_path(self, name):
"""Returns (streamname.StreamPath): The stream path.
Args:
name (str): The name of the stream.
Raises:
KeyError: if information needed to construct the path is missing.
ValueError: if the stream path is invalid, or if the stream prefix is
not defined in the client.
"""
if not self._prefix:
raise KeyError('Stream prefix is not configured')
return streamname.StreamPath.make(self._prefix, name)
def get_viewer_url(self, name):
"""Returns (str): The LogDog viewer URL for the named stream.
Args:
name (str): The name of the stream. This can also be a query glob.
Raises:
KeyError: if information needed to construct the URL is missing.
ValueError: if the stream prefix or name do not form a valid stream
path.
"""
if not self._coordinator_host:
raise KeyError('Coordinator host is not configured')
if not self._project:
raise KeyError('Stream project is not configured')
return streamname.get_logdog_viewer_url(
self._coordinator_host,
self._project,
self.get_stream_path(name))
def _register_new_stream(self, name):
"""Registers a new stream name.
The Butler will internally reject any duplicate stream names. However, there
isn't really feedback when this happens except a closed stream client. This
is a client-side check to provide a more user-friendly experience in the
event that a user attempts to register a duplicate stream name.
Note that this is imperfect, as something else could register stream names
with the same Butler instance and this library has no means of tracking.
This is a best-effort experience, not a reliable check.
Args:
name (str): The name of the stream.
Raises:
ValueError if the stream name has already been registered.
"""
with self._name_lock:
if name in self._names:
raise ValueError("Duplicate stream name [%s]" % (name,))
self._names.add(name)
@classmethod
def _create(cls, value, **kwargs):
"""Returns (StreamClient): A new stream client instance.
Validates the streamserver parameters and creates a new StreamClient
instance that connects to them.
Implementing classes must override this.
"""
raise NotImplementedError()
def _connect_raw(self):
"""Returns (file): A new file-like stream.
Creates a new raw connection to the streamserver. This connection MUST not
have any data written to it past initialization (if needed) when it has been
returned.
The file-like object must implement `write` and `close`.
Implementing classes must override this.
"""
raise NotImplementedError()
def new_connection(self, params):
"""Returns (file): A new configured stream.
The returned object implements (minimally) `write` and `close`.
Creates a new LogDog stream with the specified parameters.
Args:
params (StreamParams): The parameters to use with the new connection.
Raises:
ValueError if the stream name has already been used, or if the parameters
are not valid.
"""
self._register_new_stream(params.name)
params_json = params.to_json()
fd = self._connect_raw()
fd.write(BUTLER_MAGIC)
varint.write_uvarint(fd, len(params_json))
fd.write(params_json)
return fd
@contextlib.contextmanager
def text(self, name, **kwargs):
"""Context manager to create, use, and teardown a TEXT stream.
This context manager creates a new butler TEXT stream with the specified
parameters, yields it, and closes it on teardown.
Args:
name (str): the LogDog name of the stream.
kwargs (dict): Log stream parameters. These may be any keyword arguments
accepted by `open_text`.
Returns (file): A file-like object to a Butler UTF-8 text stream supporting
`write`.
"""
fd = None
try:
fd = self.open_text(name, **kwargs)
yield fd
finally:
if fd is not None:
fd.close()
def open_text(self, name, content_type=None, tags=None, tee=None,
binary_file_extension=None):
"""Returns (file): A file-like object for a single text stream.
This creates a new butler TEXT stream with the specified parameters.
Args:
name (str): the LogDog name of the stream.
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
tee (str): Describes how stream data should be tee'd through the Butler.
One of StreamParams' TEE arguments.
binary_file_extension (str): A custom binary file extension. If not
provided, a default extension may be chosen or the binary stream may
not be emitted.
Returns (file): A file-like object to a Butler text stream. This object can
have UTF-8 text content written to it with its `write` method, and must
be closed when finished using its `close` method.
"""
params = StreamParams.make(
name=posixpath.join(self._namespace, name),
type=StreamParams.TEXT,
content_type=content_type,
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
return self._BasicStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def binary(self, name, **kwargs):
"""Context manager to create, use, and teardown a BINARY stream.
This context manager creates a new butler BINARY stream with the specified
parameters, yields it, and closes it on teardown.
Args:
name (str): the LogDog name of the stream.
kwargs (dict): Log stream parameters. These may be any keyword arguments
accepted by `open_binary`.
Returns (file): A file-like object to a Butler binary stream supporting
`write`.
"""
fd = None
try:
fd = self.open_binary(name, **kwargs)
yield fd
finally:
if fd is not None:
fd.close()
def open_binary(self, name, content_type=None, tags=None, tee=None,
binary_file_extension=None):
"""Returns (file): A file-like object for a single binary stream.
This creates a new butler BINARY stream with the specified parameters.
Args:
name (str): the LogDog name of the stream.
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
tee (str): Describes how stream data should be tee'd through the Butler.
One of StreamParams' TEE arguments.
binary_file_extension (str): A custom binary file extension. If not
provided, a default extension may be chosen or the binary stream may
not be emitted.
Returns (file): A file-like object to a Butler binary stream. This object
can have UTF-8 content written to it with its `write` method, and must
be closed when finished using its `close` method.
"""
params = StreamParams.make(
name=posixpath.join(self._namespace, name),
type=StreamParams.BINARY,
content_type=content_type,
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
return self._BasicStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def datagram(self, name, **kwargs):
"""Context manager to create, use, and teardown a DATAGRAM stream.
This context manager creates a new butler DATAAGRAM stream with the
specified parameters, yields it, and closes it on teardown.
Args:
name (str): the LogDog name of the stream.
kwargs (dict): Log stream parameters. These may be any keyword arguments
accepted by `open_datagram`.
Returns (_DatagramStream): A datagram stream object. Datagrams can be
written to it using its `send` method.
"""
fd = None
try:
fd = self.open_datagram(name, **kwargs)
yield fd
finally:
if fd is not None:
fd.close()
def open_datagram(self, name, content_type=None, tags=None, tee=None,
binary_file_extension=None):
"""Creates a new butler DATAGRAM stream with the specified parameters.
Args:
name (str): the LogDog name of the stream.
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
tee (str): Describes how stream data should be tee'd through the Butler.
One of StreamParams' TEE arguments.
binary_file_extension (str): A custom binary file extension. If not
provided, a default extension may be chosen or the binary stream may
not be emitted.
Returns (_DatagramStream): A datagram stream object. Datagrams can be
written to it using its `send` method. This object must be closed when
finished by using its `close` method.
"""
params = StreamParams.make(
name=posixpath.join(self._namespace, name),
type=StreamParams.DATAGRAM,
content_type=content_type,
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
return self._DatagramStream(self, params, self.new_connection(params))
class _NamedPipeStreamClient(StreamClient):
"""A StreamClient implementation that connects to a Windows named pipe.
"""
def __init__(self, name, **kwargs):
r"""Initializes a new Windows named pipe stream client.
Args:
name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
"""
super(_NamedPipeStreamClient, self).__init__(**kwargs)
self._name = name
@classmethod
def _create(cls, value, **kwargs):
return cls(value, **kwargs)
def _connect_raw(self):
return open(self._name, 'wb')
_default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
class _UnixDomainSocketStreamClient(StreamClient):
"""A StreamClient implementation that uses a UNIX domain socket.
"""
class SocketFile(object):
"""A write-only file-like object that writes to a UNIX socket."""
def __init__(self, fd):
self._fd = fd
def fileno(self):
return self._fd
def write(self, data):
self._fd.send(data)
def close(self):
self._fd.close()
def __init__(self, path, **kwargs):
"""Initializes a new UNIX domain socket stream client.
Args:
path (str): The path to the named UNIX domain socket.
"""
super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
self._path = path
@classmethod
def _create(cls, value, **kwargs):
if not os.path.exists(value):
raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
return cls(value, **kwargs)
def _connect_raw(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self._path)
return self.SocketFile(sock)
_default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)