blob: d526ef9fd70c246672342d217269a9bcfb08ca3f [file] [log] [blame]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import collections
import contextlib
import json
import os
import posixpath
import socket
import sys
import threading
import time
from . import streamname, varint
if sys.platform == "win32":
from ctypes import GetLastError
_PY2 = sys.version_info[0] == 2
_StreamParamsBase = collections.namedtuple(
'_StreamParamsBase', ('name', 'type', 'content_type', 'tags'))
# Magic number at the beginning of a Butler stream
#
# See "ProtocolFrameHeaderMagic" in:
# <luci-go>/logdog/client/butlerlib/streamproto
BUTLER_MAGIC = b'BTLR1\x1e'
class StreamParams(_StreamParamsBase):
"""Defines the set of parameters to apply to a new stream."""
# A text content stream.
TEXT = 'text'
# A binary content stream.
BINARY = 'binary'
# A datagram content stream.
DATAGRAM = 'datagram'
@classmethod
def make(cls, **kwargs):
"""Returns (StreamParams): A new StreamParams instance with supplied values.
Any parameter that isn't supplied will be set to None.
Args:
kwargs (dict): Named parameters to apply.
"""
return cls(**{f: kwargs.get(f) for f in cls._fields})
def validate(self):
"""Raises (ValueError): if the parameters are not valid."""
streamname.validate_stream_name(self.name)
if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM):
raise ValueError('Invalid type (%s)' % (self.type,))
if self.tags is not None:
if not isinstance(
self.tags, collections.Mapping if _PY2 else collections.abc.Mapping):
raise ValueError('Invalid tags type (%s)' % (self.tags,))
for k, v in self.tags.items():
streamname.validate_tag(k, v)
def to_json(self):
"""Returns (str): The JSON representation of the StreamParams.
Converts stream parameters to JSON for Butler consumption.
Raises:
ValueError: if these parameters are not valid.
"""
self.validate()
obj = {
'name': self.name,
'type': self.type,
}
def _maybe_add(key, value):
if value is not None:
obj[key] = value
_maybe_add('contentType', self.content_type)
_maybe_add('tags', self.tags)
# Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
class StreamProtocolRegistry(object):
"""Registry of streamserver URI protocols and their client classes.
"""
def __init__(self):
self._registry = {}
def register_protocol(self, protocol, client_cls):
assert issubclass(client_cls, StreamClient)
if self._registry.get(protocol) is not None:
raise KeyError('Duplicate protocol registered.')
self._registry[protocol] = client_cls
def create(self, uri, **kwargs):
"""Returns (StreamClient): A stream client for the specified URI.
This uses the default StreamProtocolRegistry to instantiate a StreamClient
for the specified URI.
Args:
uri (str): The streamserver URI.
kwargs: keyword arguments to forward to the stream. See
StreamClient.__init__.
Raises:
ValueError: if the supplied URI references an invalid or improperly
configured streamserver.
"""
uri = uri.split(':', 1)
if len(uri) != 2:
raise ValueError('Invalid stream server URI [%s]' % (uri,))
protocol, value = uri
client_cls = self._registry.get(protocol)
if not client_cls:
raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
return client_cls._create(value, **kwargs)
# Default (global) registry.
_default_registry = StreamProtocolRegistry()
create = _default_registry.create
class StreamClient(object):
"""Abstract base class for a streamserver client.
"""
class _StreamBase(object):
"""ABC for StreamClient streams."""
def __init__(self, stream_client, params):
self._stream_client = stream_client
self._params = params
@property
def params(self):
"""Returns (StreamParams): The stream parameters."""
return self._params
@property
def path(self):
"""Returns (streamname.StreamPath): The stream path.
Raises:
ValueError: if the stream path is invalid, or if the stream prefix is
not defined in the client.
"""
return self._stream_client.get_stream_path(self._params.name)
def get_viewer_url(self):
"""Returns (str): The viewer URL for this stream.
Raises:
KeyError: if information needed to construct the URL is missing.
ValueError: if the stream prefix or name do not form a valid stream
path.
"""
return self._stream_client.get_viewer_url(self._params.name)
class _BasicStream(_StreamBase):
"""Wraps a basic file descriptor, offering "write" and "close"."""
def __init__(self, stream_client, params, fd):
super(StreamClient._BasicStream, self).__init__(stream_client, params)
self._fd = fd
@property
def fd(self):
return self._fd
def fileno(self):
return self._fd.fileno()
def write(self, data):
return self._fd.write(data)
def close(self):
return self._fd.close()
class _TextStream(_BasicStream):
"""Extends _BasicStream, ensuring data written is UTF-8 text."""
def __init__(self, stream_client, params, fd):
super(StreamClient._TextStream, self).__init__(stream_client, params, fd)
self._fd = fd
def write(self, data):
if _PY2 and isinstance(data, str):
# byte string is unfortunately accepted in py2 because of
# undifferentiated usage of `str` and `unicode` but it should be
# discontinued in py3. User should switch to binary stream instead
# if there's a need to write bytes.
return self._fd.write(data)
elif _PY2 and isinstance(data, unicode):
return self._fd.write(data.encode('utf-8'))
elif not _PY2 and isinstance(data, str):
return self._fd.write(data.encode('utf-8'))
else:
raise ValueError(
'expect str, got %r that is type %s' % (data, type(data),))
class _DatagramStream(_StreamBase):
"""Wraps a stream object to write length-prefixed datagrams."""
def __init__(self, stream_client, params, fd):
super(StreamClient._DatagramStream, self).__init__(stream_client, params)
self._fd = fd
def send(self, data):
varint.write_uvarint(self._fd, len(data))
self._fd.write(data)
def close(self):
return self._fd.close()
def __init__(self, project=None, prefix=None, coordinator_host=None,
namespace=''):
"""Constructs a new base StreamClient instance.
Args:
project (str or None): If not None, the name of the log stream project.
prefix (str or None): If not None, the log stream session prefix.
coordinator_host (str or None): If not None, the name of the Coordinator
host that this stream client is bound to. This will be used to
construct viewer URLs for generated streams.
namespace (str): The prefix to apply to all streams opened by this client.
"""
self._project = project
self._prefix = prefix
self._coordinator_host = coordinator_host
self._namespace = namespace
self._name_lock = threading.Lock()
self._names = set()
@property
def project(self):
"""Returns (str or None): The stream project, or None if not configured."""
return self._project
@property
def prefix(self):
"""Returns (str or None): The stream prefix, or None if not configured."""
return self._prefix
@property
def coordinator_host(self):
"""Returns (str or None): The coordinator host, or None if not configured.
"""
return self._coordinator_host
@property
def namespace(self):
"""Returns (str): The namespace for all streams opened by this client.
Empty if not configured.
"""
return self._namespace
def get_stream_path(self, name):
"""Returns (streamname.StreamPath): The stream path.
Args:
name (str): The name of the stream.
Raises:
KeyError: if information needed to construct the path is missing.
ValueError: if the stream path is invalid, or if the stream prefix is
not defined in the client.
"""
if not self._prefix:
raise KeyError('Stream prefix is not configured')
return streamname.StreamPath.make(self._prefix, name)
def get_viewer_url(self, name):
"""Returns (str): The LogDog viewer URL for the named stream.
Args:
name (str): The name of the stream. This can also be a query glob.
Raises:
KeyError: if information needed to construct the URL is missing.
ValueError: if the stream prefix or name do not form a valid stream
path.
"""
if not self._coordinator_host:
raise KeyError('Coordinator host is not configured')
if not self._project:
raise KeyError('Stream project is not configured')
return streamname.get_logdog_viewer_url(
self._coordinator_host,
self._project,
self.get_stream_path(name))
def _register_new_stream(self, name):
"""Registers a new stream name.
The Butler will internally reject any duplicate stream names. However, there
isn't really feedback when this happens except a closed stream client. This
is a client-side check to provide a more user-friendly experience in the
event that a user attempts to register a duplicate stream name.
Note that this is imperfect, as something else could register stream names
with the same Butler instance and this library has no means of tracking.
This is a best-effort experience, not a reliable check.
Args:
name (str): The name of the stream.
Raises:
ValueError if the stream name has already been registered.
"""
with self._name_lock:
if name in self._names:
raise ValueError("Duplicate stream name [%s]" % (name,))
self._names.add(name)
@classmethod
def _create(cls, value, **kwargs):
"""Returns (StreamClient): A new stream client instance.
Validates the streamserver parameters and creates a new StreamClient
instance that connects to them.
Implementing classes must override this.
"""
raise NotImplementedError()
def _connect_raw(self):
"""Returns (file): A new file-like stream.
Creates a new raw connection to the streamserver. This connection MUST not
have any data written to it past initialization (if needed) when it has been
returned.
The file-like object must implement `write`, `fileno`, `flush`, and `close`.
Implementing classes must override this.
"""
raise NotImplementedError()
def new_connection(self, params):
"""Returns (file): A new configured stream.
The returned object implements (minimally) `write` and `close`.
Creates a new LogDog stream with the specified parameters.
Args:
params (StreamParams): The parameters to use with the new connection.
Raises:
ValueError if the stream name has already been used, or if the parameters
are not valid.
"""
self._register_new_stream(params.name)
params_bytes = params.to_json().encode('utf-8')
fobj = self._connect_raw()
fobj.write(BUTLER_MAGIC)
varint.write_uvarint(fobj, len(params_bytes))
fobj.write(params_bytes)
return fobj
@contextlib.contextmanager
def text(self, name, **kwargs):
"""Context manager to create, use, and teardown a TEXT stream.
This context manager creates a new butler TEXT stream with the specified
parameters, yields it, and closes it on teardown.
Args:
name (str): the LogDog name of the stream.
kwargs (dict): Log stream parameters. These may be any keyword arguments
accepted by `open_text`.
Returns (file): A file-like object to a Butler UTF-8 text stream supporting
`write`.
"""
fobj = None
try:
fobj = self.open_text(name, **kwargs)
yield fobj
finally:
if fobj is not None:
fobj.close()
def open_text(self, name, content_type=None, tags=None):
"""Returns (file): A file-like object for a single text stream.
This creates a new butler TEXT stream with the specified parameters.
Args:
name (str): the LogDog name of the stream.
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
Returns (file): A file-like object to a Butler text stream. This object can
have UTF-8 text content written to it with its `write` method, and must
be closed when finished using its `close` method.
"""
params = StreamParams.make(
name=posixpath.join(self._namespace, name),
type=StreamParams.TEXT,
content_type=content_type,
tags=tags)
return self._TextStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def binary(self, name, **kwargs):
"""Context manager to create, use, and teardown a BINARY stream.
This context manager creates a new butler BINARY stream with the specified
parameters, yields it, and closes it on teardown.
Args:
name (str): the LogDog name of the stream.
kwargs (dict): Log stream parameters. These may be any keyword arguments
accepted by `open_binary`.
Returns (file): A file-like object to a Butler binary stream supporting
`write`.
"""
fobj = None
try:
fobj = self.open_binary(name, **kwargs)
yield fobj
finally:
if fobj is not None:
fobj.close()
def open_binary(self, name, content_type=None, tags=None):
"""Returns (file): A file-like object for a single binary stream.
This creates a new butler BINARY stream with the specified parameters.
Args:
name (str): the LogDog name of the stream.
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
Returns (file): A file-like object to a Butler binary stream. This object
can have UTF-8 content written to it with its `write` method, and must
be closed when finished using its `close` method.
"""
params = StreamParams.make(
name=posixpath.join(self._namespace, name),
type=StreamParams.BINARY,
content_type=content_type,
tags=tags)
return self._BasicStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def datagram(self, name, **kwargs):
"""Context manager to create, use, and teardown a DATAGRAM stream.
This context manager creates a new butler DATAAGRAM stream with the
specified parameters, yields it, and closes it on teardown.
Args:
name (str): the LogDog name of the stream.
kwargs (dict): Log stream parameters. These may be any keyword arguments
accepted by `open_datagram`.
Returns (_DatagramStream): A datagram stream object. Datagrams can be
written to it using its `send` method.
"""
fobj = None
try:
fobj = self.open_datagram(name, **kwargs)
yield fobj
finally:
if fobj is not None:
fobj.close()
def open_datagram(self, name, content_type=None, tags=None):
"""Creates a new butler DATAGRAM stream with the specified parameters.
Args:
name (str): the LogDog name of the stream.
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
Returns (_DatagramStream): A datagram stream object. Datagrams can be
written to it using its `send` method. This object must be closed when
finished by using its `close` method.
"""
params = StreamParams.make(
name=posixpath.join(self._namespace, name),
type=StreamParams.DATAGRAM,
content_type=content_type,
tags=tags)
return self._DatagramStream(self, params, self.new_connection(params))
class _NamedPipeStreamClient(StreamClient):
"""A StreamClient implementation that connects to a Windows named pipe.
"""
def __init__(self, name, **kwargs):
r"""Initializes a new Windows named pipe stream client.
Args:
name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
"""
super(_NamedPipeStreamClient, self).__init__(**kwargs)
self._name = '\\\\.\\pipe\\' + name
@classmethod
def _create(cls, value, **kwargs):
return cls(value, **kwargs)
ERROR_PIPE_BUSY = 231
def _connect_raw(self):
# This is a similar procedure to the one in
# https://github.com/microsoft/go-winio/blob/master/pipe.go (tryDialPipe)
while True:
try:
return open(self._name, 'wb+', buffering=0)
except (OSError, IOError):
if GetLastError() != self.ERROR_PIPE_BUSY:
raise
time.sleep(0.001) # 1ms
_default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
class _UnixDomainSocketStreamClient(StreamClient):
"""A StreamClient implementation that uses a UNIX domain socket.
"""
class SocketFile(object):
"""A write-only file-like object that writes to a UNIX socket."""
def __init__(self, sock):
self._sock = sock
def fileno(self):
return self._sock
def write(self, data):
self._sock.sendall(data)
def flush(self):
pass
def close(self):
self._sock.close()
def __init__(self, path, **kwargs):
"""Initializes a new UNIX domain socket stream client.
Args:
path (str): The path to the named UNIX domain socket.
"""
super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
self._path = path
@classmethod
def _create(cls, value, **kwargs):
if not os.path.exists(value):
raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
return cls(value, **kwargs)
def _connect_raw(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self._path)
return self.SocketFile(sock)
_default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)