| # Copyright 2016 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| import collections |
| import contextlib |
| import json |
| import os |
| import posixpath |
| import socket |
| import sys |
| import threading |
| import time |
| |
| from . import streamname, varint |
| |
| |
| if sys.platform == "win32": |
| from ctypes import GetLastError |
| |
| |
| _PY2 = sys.version_info[0] == 2 |
| |
| |
| _StreamParamsBase = collections.namedtuple( |
| '_StreamParamsBase', ('name', 'type', 'content_type', 'tags')) |
| |
| |
| # Magic number at the beginning of a Butler stream |
| # |
| # See "ProtocolFrameHeaderMagic" in: |
| # <luci-go>/logdog/client/butlerlib/streamproto |
| BUTLER_MAGIC = b'BTLR1\x1e' |
| |
| |
| class StreamParams(_StreamParamsBase): |
| """Defines the set of parameters to apply to a new stream.""" |
| |
| # A text content stream. |
| TEXT = 'text' |
| # A binary content stream. |
| BINARY = 'binary' |
| # A datagram content stream. |
| DATAGRAM = 'datagram' |
| |
| @classmethod |
| def make(cls, **kwargs): |
| """Returns (StreamParams): A new StreamParams instance with supplied values. |
| |
| Any parameter that isn't supplied will be set to None. |
| |
| Args: |
| kwargs (dict): Named parameters to apply. |
| """ |
| return cls(**{f: kwargs.get(f) for f in cls._fields}) |
| |
| def validate(self): |
| """Raises (ValueError): if the parameters are not valid.""" |
| streamname.validate_stream_name(self.name) |
| |
| if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM): |
| raise ValueError('Invalid type (%s)' % (self.type,)) |
| |
| if self.tags is not None: |
| if not isinstance( |
| self.tags, collections.Mapping if _PY2 else collections.abc.Mapping): |
| raise ValueError('Invalid tags type (%s)' % (self.tags,)) |
| for k, v in self.tags.items(): |
| streamname.validate_tag(k, v) |
| |
| def to_json(self): |
| """Returns (str): The JSON representation of the StreamParams. |
| |
| Converts stream parameters to JSON for Butler consumption. |
| |
| Raises: |
| ValueError: if these parameters are not valid. |
| """ |
| self.validate() |
| |
| obj = { |
| 'name': self.name, |
| 'type': self.type, |
| } |
| |
| def _maybe_add(key, value): |
| if value is not None: |
| obj[key] = value |
| |
| _maybe_add('contentType', self.content_type) |
| _maybe_add('tags', self.tags) |
| |
| # Note that "dumps' will dump UTF-8 by default, which is what Butler wants. |
| return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None) |
| |
| |
| class StreamProtocolRegistry(object): |
| """Registry of streamserver URI protocols and their client classes. |
| """ |
| |
| def __init__(self): |
| self._registry = {} |
| |
| def register_protocol(self, protocol, client_cls): |
| assert issubclass(client_cls, StreamClient) |
| if self._registry.get(protocol) is not None: |
| raise KeyError('Duplicate protocol registered.') |
| self._registry[protocol] = client_cls |
| |
| def create(self, uri, **kwargs): |
| """Returns (StreamClient): A stream client for the specified URI. |
| |
| This uses the default StreamProtocolRegistry to instantiate a StreamClient |
| for the specified URI. |
| |
| Args: |
| uri (str): The streamserver URI. |
| kwargs: keyword arguments to forward to the stream. See |
| StreamClient.__init__. |
| |
| Raises: |
| ValueError: if the supplied URI references an invalid or improperly |
| configured streamserver. |
| """ |
| uri = uri.split(':', 1) |
| if len(uri) != 2: |
| raise ValueError('Invalid stream server URI [%s]' % (uri,)) |
| protocol, value = uri |
| |
| client_cls = self._registry.get(protocol) |
| if not client_cls: |
| raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) |
| return client_cls._create(value, **kwargs) |
| |
| |
| # Default (global) registry. |
| _default_registry = StreamProtocolRegistry() |
| create = _default_registry.create |
| |
| |
| class StreamClient(object): |
| """Abstract base class for a streamserver client. |
| """ |
| |
| class _StreamBase(object): |
| """ABC for StreamClient streams.""" |
| |
| def __init__(self, stream_client, params): |
| self._stream_client = stream_client |
| self._params = params |
| |
| @property |
| def params(self): |
| """Returns (StreamParams): The stream parameters.""" |
| return self._params |
| |
| @property |
| def path(self): |
| """Returns (streamname.StreamPath): The stream path. |
| |
| Raises: |
| ValueError: if the stream path is invalid, or if the stream prefix is |
| not defined in the client. |
| """ |
| return self._stream_client.get_stream_path(self._params.name) |
| |
| def get_viewer_url(self): |
| """Returns (str): The viewer URL for this stream. |
| |
| Raises: |
| KeyError: if information needed to construct the URL is missing. |
| ValueError: if the stream prefix or name do not form a valid stream |
| path. |
| """ |
| return self._stream_client.get_viewer_url(self._params.name) |
| |
| |
| class _BasicStream(_StreamBase): |
| """Wraps a basic file descriptor, offering "write" and "close".""" |
| |
| def __init__(self, stream_client, params, fd): |
| super(StreamClient._BasicStream, self).__init__(stream_client, params) |
| self._fd = fd |
| |
| @property |
| def fd(self): |
| return self._fd |
| |
| def fileno(self): |
| return self._fd.fileno() |
| |
| def write(self, data): |
| return self._fd.write(data) |
| |
| def close(self): |
| return self._fd.close() |
| |
| |
| class _TextStream(_BasicStream): |
| """Extends _BasicStream, ensuring data written is UTF-8 text.""" |
| |
| def __init__(self, stream_client, params, fd): |
| super(StreamClient._TextStream, self).__init__(stream_client, params, fd) |
| self._fd = fd |
| |
| def write(self, data): |
| if _PY2 and isinstance(data, str): |
| # byte string is unfortunately accepted in py2 because of |
| # undifferentiated usage of `str` and `unicode` but it should be |
| # discontinued in py3. User should switch to binary stream instead |
| # if there's a need to write bytes. |
| return self._fd.write(data) |
| elif _PY2 and isinstance(data, unicode): |
| return self._fd.write(data.encode('utf-8')) |
| elif not _PY2 and isinstance(data, str): |
| return self._fd.write(data.encode('utf-8')) |
| else: |
| raise ValueError( |
| 'expect str, got %r that is type %s' % (data, type(data),)) |
| |
| |
| class _DatagramStream(_StreamBase): |
| """Wraps a stream object to write length-prefixed datagrams.""" |
| |
| def __init__(self, stream_client, params, fd): |
| super(StreamClient._DatagramStream, self).__init__(stream_client, params) |
| self._fd = fd |
| |
| def send(self, data): |
| varint.write_uvarint(self._fd, len(data)) |
| self._fd.write(data) |
| |
| def close(self): |
| return self._fd.close() |
| |
| |
| def __init__(self, project=None, prefix=None, coordinator_host=None, |
| namespace=''): |
| """Constructs a new base StreamClient instance. |
| |
| Args: |
| project (str or None): If not None, the name of the log stream project. |
| prefix (str or None): If not None, the log stream session prefix. |
| coordinator_host (str or None): If not None, the name of the Coordinator |
| host that this stream client is bound to. This will be used to |
| construct viewer URLs for generated streams. |
| namespace (str): The prefix to apply to all streams opened by this client. |
| """ |
| self._project = project |
| self._prefix = prefix |
| self._coordinator_host = coordinator_host |
| self._namespace = namespace |
| |
| self._name_lock = threading.Lock() |
| self._names = set() |
| |
| @property |
| def project(self): |
| """Returns (str or None): The stream project, or None if not configured.""" |
| return self._project |
| |
| @property |
| def prefix(self): |
| """Returns (str or None): The stream prefix, or None if not configured.""" |
| return self._prefix |
| |
| @property |
| def coordinator_host(self): |
| """Returns (str or None): The coordinator host, or None if not configured. |
| """ |
| return self._coordinator_host |
| |
| @property |
| def namespace(self): |
| """Returns (str): The namespace for all streams opened by this client. |
| Empty if not configured. |
| """ |
| return self._namespace |
| |
| def get_stream_path(self, name): |
| """Returns (streamname.StreamPath): The stream path. |
| |
| Args: |
| name (str): The name of the stream. |
| |
| Raises: |
| KeyError: if information needed to construct the path is missing. |
| ValueError: if the stream path is invalid, or if the stream prefix is |
| not defined in the client. |
| """ |
| if not self._prefix: |
| raise KeyError('Stream prefix is not configured') |
| return streamname.StreamPath.make(self._prefix, name) |
| |
| def get_viewer_url(self, name): |
| """Returns (str): The LogDog viewer URL for the named stream. |
| |
| Args: |
| name (str): The name of the stream. This can also be a query glob. |
| |
| Raises: |
| KeyError: if information needed to construct the URL is missing. |
| ValueError: if the stream prefix or name do not form a valid stream |
| path. |
| """ |
| if not self._coordinator_host: |
| raise KeyError('Coordinator host is not configured') |
| if not self._project: |
| raise KeyError('Stream project is not configured') |
| |
| return streamname.get_logdog_viewer_url( |
| self._coordinator_host, |
| self._project, |
| self.get_stream_path(name)) |
| |
| def _register_new_stream(self, name): |
| """Registers a new stream name. |
| |
| The Butler will internally reject any duplicate stream names. However, there |
| isn't really feedback when this happens except a closed stream client. This |
| is a client-side check to provide a more user-friendly experience in the |
| event that a user attempts to register a duplicate stream name. |
| |
| Note that this is imperfect, as something else could register stream names |
| with the same Butler instance and this library has no means of tracking. |
| This is a best-effort experience, not a reliable check. |
| |
| Args: |
| name (str): The name of the stream. |
| |
| Raises: |
| ValueError if the stream name has already been registered. |
| """ |
| with self._name_lock: |
| if name in self._names: |
| raise ValueError("Duplicate stream name [%s]" % (name,)) |
| self._names.add(name) |
| |
| @classmethod |
| def _create(cls, value, **kwargs): |
| """Returns (StreamClient): A new stream client instance. |
| |
| Validates the streamserver parameters and creates a new StreamClient |
| instance that connects to them. |
| |
| Implementing classes must override this. |
| """ |
| raise NotImplementedError() |
| |
| def _connect_raw(self): |
| """Returns (file): A new file-like stream. |
| |
| Creates a new raw connection to the streamserver. This connection MUST not |
| have any data written to it past initialization (if needed) when it has been |
| returned. |
| |
| The file-like object must implement `write`, `fileno`, `flush`, and `close`. |
| |
| Implementing classes must override this. |
| """ |
| raise NotImplementedError() |
| |
| def new_connection(self, params): |
| """Returns (file): A new configured stream. |
| |
| The returned object implements (minimally) `write` and `close`. |
| |
| Creates a new LogDog stream with the specified parameters. |
| |
| Args: |
| params (StreamParams): The parameters to use with the new connection. |
| |
| Raises: |
| ValueError if the stream name has already been used, or if the parameters |
| are not valid. |
| """ |
| self._register_new_stream(params.name) |
| params_bytes = params.to_json().encode('utf-8') |
| |
| fobj = self._connect_raw() |
| fobj.write(BUTLER_MAGIC) |
| varint.write_uvarint(fobj, len(params_bytes)) |
| fobj.write(params_bytes) |
| return fobj |
| |
| @contextlib.contextmanager |
| def text(self, name, **kwargs): |
| """Context manager to create, use, and teardown a TEXT stream. |
| |
| This context manager creates a new butler TEXT stream with the specified |
| parameters, yields it, and closes it on teardown. |
| |
| Args: |
| name (str): the LogDog name of the stream. |
| kwargs (dict): Log stream parameters. These may be any keyword arguments |
| accepted by `open_text`. |
| |
| Returns (file): A file-like object to a Butler UTF-8 text stream supporting |
| `write`. |
| """ |
| fobj = None |
| try: |
| fobj = self.open_text(name, **kwargs) |
| yield fobj |
| finally: |
| if fobj is not None: |
| fobj.close() |
| |
| def open_text(self, name, content_type=None, tags=None): |
| """Returns (file): A file-like object for a single text stream. |
| |
| This creates a new butler TEXT stream with the specified parameters. |
| |
| Args: |
| name (str): the LogDog name of the stream. |
| content_type (str): The optional content type of the stream. If None, a |
| default content type will be chosen by the Butler. |
| tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| |
| Returns (file): A file-like object to a Butler text stream. This object can |
| have UTF-8 text content written to it with its `write` method, and must |
| be closed when finished using its `close` method. |
| """ |
| params = StreamParams.make( |
| name=posixpath.join(self._namespace, name), |
| type=StreamParams.TEXT, |
| content_type=content_type, |
| tags=tags) |
| return self._TextStream(self, params, self.new_connection(params)) |
| |
| @contextlib.contextmanager |
| def binary(self, name, **kwargs): |
| """Context manager to create, use, and teardown a BINARY stream. |
| |
| This context manager creates a new butler BINARY stream with the specified |
| parameters, yields it, and closes it on teardown. |
| |
| Args: |
| name (str): the LogDog name of the stream. |
| kwargs (dict): Log stream parameters. These may be any keyword arguments |
| accepted by `open_binary`. |
| |
| Returns (file): A file-like object to a Butler binary stream supporting |
| `write`. |
| """ |
| fobj = None |
| try: |
| fobj = self.open_binary(name, **kwargs) |
| yield fobj |
| finally: |
| if fobj is not None: |
| fobj.close() |
| |
| def open_binary(self, name, content_type=None, tags=None): |
| """Returns (file): A file-like object for a single binary stream. |
| |
| This creates a new butler BINARY stream with the specified parameters. |
| |
| Args: |
| name (str): the LogDog name of the stream. |
| content_type (str): The optional content type of the stream. If None, a |
| default content type will be chosen by the Butler. |
| tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| |
| Returns (file): A file-like object to a Butler binary stream. This object |
| can have UTF-8 content written to it with its `write` method, and must |
| be closed when finished using its `close` method. |
| """ |
| params = StreamParams.make( |
| name=posixpath.join(self._namespace, name), |
| type=StreamParams.BINARY, |
| content_type=content_type, |
| tags=tags) |
| return self._BasicStream(self, params, self.new_connection(params)) |
| |
| @contextlib.contextmanager |
| def datagram(self, name, **kwargs): |
| """Context manager to create, use, and teardown a DATAGRAM stream. |
| |
| This context manager creates a new butler DATAAGRAM stream with the |
| specified parameters, yields it, and closes it on teardown. |
| |
| Args: |
| name (str): the LogDog name of the stream. |
| kwargs (dict): Log stream parameters. These may be any keyword arguments |
| accepted by `open_datagram`. |
| |
| Returns (_DatagramStream): A datagram stream object. Datagrams can be |
| written to it using its `send` method. |
| """ |
| fobj = None |
| try: |
| fobj = self.open_datagram(name, **kwargs) |
| yield fobj |
| finally: |
| if fobj is not None: |
| fobj.close() |
| |
| def open_datagram(self, name, content_type=None, tags=None): |
| """Creates a new butler DATAGRAM stream with the specified parameters. |
| |
| Args: |
| name (str): the LogDog name of the stream. |
| content_type (str): The optional content type of the stream. If None, a |
| default content type will be chosen by the Butler. |
| tags (dict): An optional key/value dictionary pair of LogDog stream tags. |
| |
| Returns (_DatagramStream): A datagram stream object. Datagrams can be |
| written to it using its `send` method. This object must be closed when |
| finished by using its `close` method. |
| """ |
| params = StreamParams.make( |
| name=posixpath.join(self._namespace, name), |
| type=StreamParams.DATAGRAM, |
| content_type=content_type, |
| tags=tags) |
| return self._DatagramStream(self, params, self.new_connection(params)) |
| |
| |
| class _NamedPipeStreamClient(StreamClient): |
| """A StreamClient implementation that connects to a Windows named pipe. |
| """ |
| |
| def __init__(self, name, **kwargs): |
| r"""Initializes a new Windows named pipe stream client. |
| |
| Args: |
| name (str): The name of the Windows named pipe to use (e.g., "\\.\name") |
| """ |
| super(_NamedPipeStreamClient, self).__init__(**kwargs) |
| self._name = '\\\\.\\pipe\\' + name |
| |
| @classmethod |
| def _create(cls, value, **kwargs): |
| return cls(value, **kwargs) |
| |
| ERROR_PIPE_BUSY = 231 |
| |
| def _connect_raw(self): |
| # This is a similar procedure to the one in |
| # https://github.com/microsoft/go-winio/blob/master/pipe.go (tryDialPipe) |
| while True: |
| try: |
| return open(self._name, 'wb+', buffering=0) |
| except (OSError, IOError): |
| if GetLastError() != self.ERROR_PIPE_BUSY: |
| raise |
| time.sleep(0.001) # 1ms |
| |
| |
| _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) |
| |
| |
| class _UnixDomainSocketStreamClient(StreamClient): |
| """A StreamClient implementation that uses a UNIX domain socket. |
| """ |
| |
| class SocketFile(object): |
| """A write-only file-like object that writes to a UNIX socket.""" |
| |
| def __init__(self, sock): |
| self._sock = sock |
| |
| def fileno(self): |
| return self._sock |
| |
| def write(self, data): |
| self._sock.sendall(data) |
| |
| def flush(self): |
| pass |
| |
| def close(self): |
| self._sock.close() |
| |
| def __init__(self, path, **kwargs): |
| """Initializes a new UNIX domain socket stream client. |
| |
| Args: |
| path (str): The path to the named UNIX domain socket. |
| """ |
| super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) |
| self._path = path |
| |
| @classmethod |
| def _create(cls, value, **kwargs): |
| if not os.path.exists(value): |
| raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) |
| return cls(value, **kwargs) |
| |
| def _connect_raw(self): |
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| sock.connect(self._path) |
| return self.SocketFile(sock) |
| |
| _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) |