[logdog] Merge all changes to copy of logdog client lib in recipe engine
Included changes:
https://chromium.googlesource.com/infra/luci/recipes-py/+log/aa56edf..33d1705/recipe_engine/third_party/logdog
R=iannucci
Change-Id: I5339bc855d45cc12403dbd180abbcfd8ee326a6a
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-py/+/2229077
Auto-Submit: Yiwei Zhang <yiwzhang@google.com>
Commit-Queue: Robbie Iannucci <iannucci@chromium.org>
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
NOKEYCHECK=True
GitOrigin-RevId: 8b702e20d8a26bcc6de708f3e7d9b23a5e02a73e
diff --git a/OWNERS b/OWNERS
index 5c92963..760558a 100644
--- a/OWNERS
+++ b/OWNERS
@@ -1 +1 @@
-dnj@chromium.org
+iannucci@chromium.org
diff --git a/bootstrap.py b/bootstrap.py
index ea75822..aa88cef 100644
--- a/bootstrap.py
+++ b/bootstrap.py
@@ -46,32 +46,25 @@
if env is None:
env = os.environ
- project = env.get(cls._ENV_PROJECT)
- if not project:
- raise NotBootstrappedError('Missing project [%s]' % (cls._ENV_PROJECT,))
-
- prefix = env.get(cls._ENV_PREFIX)
- if not prefix:
- raise NotBootstrappedError('Missing prefix [%s]' % (cls._ENV_PREFIX,))
- try:
- streamname.validate_stream_name(prefix)
- except ValueError as exp:
- raise NotBootstrappedError('Prefix (%s) is invalid: %s' % (prefix, exp))
-
- namespace = env.get(cls._ENV_NAMESPACE, '')
- if namespace:
+ def _check(kind, val):
+ if not val:
+ return val
try:
- streamname.validate_stream_name(namespace)
+ streamname.validate_stream_name(val)
+ return val
except ValueError as exp:
- raise NotBootstrappedError(
- 'Namespace (%s) is invalid: %s' % (prefix, exp))
+ raise NotBootstrappedError('%s (%s) is invalid: %s' % (kind, val, exp))
+
+ streamserver_uri = env.get(cls._ENV_STREAM_SERVER_PATH)
+ if not streamserver_uri:
+ raise NotBootstrappedError('No streamserver in bootstrap environment.')
return cls(
- project=project,
- prefix=prefix,
- streamserver_uri=env.get(cls._ENV_STREAM_SERVER_PATH),
- coordinator_host=env.get(cls._ENV_COORDINATOR_HOST),
- namespace=namespace)
+ project=env.get(cls._ENV_PROJECT, ''),
+ prefix=_check("Prefix", env.get(cls._ENV_PREFIX, '')),
+ streamserver_uri=streamserver_uri,
+ coordinator_host=env.get(cls._ENV_COORDINATOR_HOST, ''),
+ namespace=_check("Namespace", env.get(cls._ENV_NAMESPACE, '')))
def stream_client(self, reg=None):
"""Returns: (StreamClient) stream client for the bootstrap streamserver URI.
@@ -88,8 +81,6 @@
Raises:
ValueError: If no streamserver URI is present in the environment.
"""
- if not self.streamserver_uri:
- raise ValueError('No streamserver in bootstrap environment.')
reg = reg or stream._default_registry
return reg.create(
self.streamserver_uri,
diff --git a/stream.py b/stream.py
index 89bbe77..fa1429a 100644
--- a/stream.py
+++ b/stream.py
@@ -10,13 +10,22 @@
import socket
import sys
import threading
-import types
+import time
from . import streamname, varint
+try:
+ # We use FileObjectThread on gevented processes so that gevent can do work
+ # while we block on sending to a logdog stream.
+ from gevent.fileobject import FileObjectThread
+except ImportError:
+ FileObjectThread = lambda x: x
-_StreamParamsBase = collections.namedtuple('_StreamParamsBase',
- ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
+if sys.platform == "win32":
+ from ctypes import GetLastError
+
+_StreamParamsBase = collections.namedtuple(
+ '_StreamParamsBase', ('name', 'type', 'content_type', 'tags'))
# Magic number at the beginning of a Butler stream
@@ -36,11 +45,6 @@
# A datagram content stream.
DATAGRAM = 'datagram'
- # Tee parameter to tee this stream through the Butler's STDOUT.
- TEE_STDOUT = 'stdout'
- # Tee parameter to tee this stream through the Butler's STDERR.
- TEE_STDERR = 'stderr'
-
@classmethod
def make(cls, **kwargs):
"""Returns (StreamParams): A new StreamParams instance with supplied values.
@@ -65,14 +69,6 @@
for k, v in self.tags.items():
streamname.validate_tag(k, v)
- if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR):
- raise ValueError('Invalid tee type (%s)' % (self.tee,))
-
- if not isinstance(self.binary_file_extension,
- (types.NoneType, types.StringTypes)):
- raise ValueError('Invalid binary file extension type (%s)' % (
- self.binary_file_extension,))
-
def to_json(self):
"""Returns (str): The JSON representation of the StreamParams.
@@ -88,13 +84,12 @@
'type': self.type,
}
- def maybe_add(key, value):
+ def _maybe_add(key, value):
if value is not None:
obj[key] = value
- maybe_add('contentType', self.content_type)
- maybe_add('tags', self.tags)
- maybe_add('tee', self.tee)
- maybe_add('binaryFileExtension', self.binary_file_extension)
+
+ _maybe_add('contentType', self.content_type)
+ _maybe_add('tags', self.tags)
# Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
@@ -253,6 +248,13 @@
"""
return self._coordinator_host
+ @property
+ def namespace(self):
+ """Returns (str): The namespace for all streams opened by this client.
+ Empty if not configured.
+ """
+ return self._namespace
+
def get_stream_path(self, name):
"""Returns (streamname.StreamPath): The stream path.
@@ -330,13 +332,13 @@
have any data written to it past initialization (if needed) when it has been
returned.
- The file-like object must implement `write` and `close`.
+ The file-like object must implement `write`, `fileno`, `flush`, and `close`.
Implementing classes must override this.
"""
raise NotImplementedError()
- def new_connection(self, params):
+ def new_connection(self, params, for_process):
"""Returns (file): A new configured stream.
The returned object implements (minimally) `write` and `close`.
@@ -345,6 +347,8 @@
Args:
params (StreamParams): The parameters to use with the new connection.
+ for_process (bool): If this connection will be attached to a standard
+ handle on a subprocess.
Raises:
ValueError if the stream name has already been used, or if the parameters
@@ -353,11 +357,15 @@
self._register_new_stream(params.name)
params_json = params.to_json()
- fd = self._connect_raw()
- fd.write(BUTLER_MAGIC)
- varint.write_uvarint(fd, len(params_json))
- fd.write(params_json)
- return fd
+ fobj = self._connect_raw()
+ fobj.write(BUTLER_MAGIC)
+ varint.write_uvarint(fobj, len(params_json))
+ fobj.write(params_json)
+
+ if not for_process:
+ fobj = FileObjectThread(fobj)
+
+ return fobj
@contextlib.contextmanager
def text(self, name, **kwargs):
@@ -374,16 +382,15 @@
Returns (file): A file-like object to a Butler UTF-8 text stream supporting
`write`.
"""
- fd = None
+ fobj = None
try:
- fd = self.open_text(name, **kwargs)
- yield fd
+ fobj = self.open_text(name, **kwargs)
+ yield fobj
finally:
- if fd is not None:
- fd.close()
+ if fobj is not None:
+ fobj.close()
- def open_text(self, name, content_type=None, tags=None, tee=None,
- binary_file_extension=None):
+ def open_text(self, name, content_type=None, tags=None, for_process=False):
"""Returns (file): A file-like object for a single text stream.
This creates a new butler TEXT stream with the specified parameters.
@@ -393,11 +400,8 @@
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
- tee (str): Describes how stream data should be tee'd through the Butler.
- One of StreamParams' TEE arguments.
- binary_file_extension (str): A custom binary file extension. If not
- provided, a default extension may be chosen or the binary stream may
- not be emitted.
+ for_process (bool): Indicates that this stream will be directly attached
+ to a subprocess's stdout/stderr
Returns (file): A file-like object to a Butler text stream. This object can
have UTF-8 text content written to it with its `write` method, and must
@@ -407,10 +411,9 @@
name=posixpath.join(self._namespace, name),
type=StreamParams.TEXT,
content_type=content_type,
- tags=tags,
- tee=tee,
- binary_file_extension=binary_file_extension)
- return self._BasicStream(self, params, self.new_connection(params))
+ tags=tags)
+ return self._BasicStream(self, params,
+ self.new_connection(params, for_process))
@contextlib.contextmanager
def binary(self, name, **kwargs):
@@ -427,16 +430,15 @@
Returns (file): A file-like object to a Butler binary stream supporting
`write`.
"""
- fd = None
+ fobj = None
try:
- fd = self.open_binary(name, **kwargs)
- yield fd
+ fobj = self.open_binary(name, **kwargs)
+ yield fobj
finally:
- if fd is not None:
- fd.close()
+ if fobj is not None:
+ fobj.close()
- def open_binary(self, name, content_type=None, tags=None, tee=None,
- binary_file_extension=None):
+ def open_binary(self, name, content_type=None, tags=None, for_process=False):
"""Returns (file): A file-like object for a single binary stream.
This creates a new butler BINARY stream with the specified parameters.
@@ -446,11 +448,8 @@
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
- tee (str): Describes how stream data should be tee'd through the Butler.
- One of StreamParams' TEE arguments.
- binary_file_extension (str): A custom binary file extension. If not
- provided, a default extension may be chosen or the binary stream may
- not be emitted.
+ for_process (bool): Indicates that this stream will be directly attached
+ to a subprocess's stdout/stderr
Returns (file): A file-like object to a Butler binary stream. This object
can have UTF-8 content written to it with its `write` method, and must
@@ -460,10 +459,9 @@
name=posixpath.join(self._namespace, name),
type=StreamParams.BINARY,
content_type=content_type,
- tags=tags,
- tee=tee,
- binary_file_extension=binary_file_extension)
- return self._BasicStream(self, params, self.new_connection(params))
+ tags=tags)
+ return self._BasicStream(self, params,
+ self.new_connection(params, for_process))
@contextlib.contextmanager
def datagram(self, name, **kwargs):
@@ -480,16 +478,15 @@
Returns (_DatagramStream): A datagram stream object. Datagrams can be
written to it using its `send` method.
"""
- fd = None
+ fobj = None
try:
- fd = self.open_datagram(name, **kwargs)
- yield fd
+ fobj = self.open_datagram(name, **kwargs)
+ yield fobj
finally:
- if fd is not None:
- fd.close()
+ if fobj is not None:
+ fobj.close()
- def open_datagram(self, name, content_type=None, tags=None, tee=None,
- binary_file_extension=None):
+ def open_datagram(self, name, content_type=None, tags=None):
"""Creates a new butler DATAGRAM stream with the specified parameters.
Args:
@@ -497,11 +494,6 @@
content_type (str): The optional content type of the stream. If None, a
default content type will be chosen by the Butler.
tags (dict): An optional key/value dictionary pair of LogDog stream tags.
- tee (str): Describes how stream data should be tee'd through the Butler.
- One of StreamParams' TEE arguments.
- binary_file_extension (str): A custom binary file extension. If not
- provided, a default extension may be chosen or the binary stream may
- not be emitted.
Returns (_DatagramStream): A datagram stream object. Datagrams can be
written to it using its `send` method. This object must be closed when
@@ -511,10 +503,9 @@
name=posixpath.join(self._namespace, name),
type=StreamParams.DATAGRAM,
content_type=content_type,
- tags=tags,
- tee=tee,
- binary_file_extension=binary_file_extension)
- return self._DatagramStream(self, params, self.new_connection(params))
+ tags=tags)
+ return self._DatagramStream(self, params,
+ self.new_connection(params, False))
class _NamedPipeStreamClient(StreamClient):
@@ -528,14 +519,25 @@
name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
"""
super(_NamedPipeStreamClient, self).__init__(**kwargs)
- self._name = name
+ self._name = '\\\\.\\pipe\\' + name
@classmethod
def _create(cls, value, **kwargs):
return cls(value, **kwargs)
+ ERROR_PIPE_BUSY = 231
+
def _connect_raw(self):
- return open(self._name, 'wb')
+ # This is a similar procedure to the one in
+ # https://github.com/microsoft/go-winio/blob/master/pipe.go (tryDialPipe)
+ while True:
+ try:
+ return open(self._name, 'wb+', buffering=0)
+ except (OSError, IOError):
+ if GetLastError() != self.ERROR_PIPE_BUSY:
+ raise
+ time.sleep(0.001) # 1ms
+
_default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
@@ -547,18 +549,23 @@
class SocketFile(object):
"""A write-only file-like object that writes to a UNIX socket."""
- def __init__(self, fd):
- self._fd = fd
+ def __init__(self, sock):
+ self._sock = sock
+
+ def fileno(self):
+ return self._sock
def fileno(self):
return self._fd
def write(self, data):
- self._fd.send(data)
+ self._sock.sendall(data)
+
+ def flush(self):
+ pass
def close(self):
- self._fd.close()
-
+ self._sock.close()
def __init__(self, path, **kwargs):
"""Initializes a new UNIX domain socket stream client.
diff --git a/streamname.py b/streamname.py
index bee5388..a16a398 100644
--- a/streamname.py
+++ b/streamname.py
@@ -52,9 +52,9 @@
def normalize(v, prefix=None):
- """Given a string, "v", mutate it into a valid stream name.
+ """Given a string (str|unicode), mutate it into a valid stream name (str).
- This operates by replacing invalid stream naem characters with underscores (_)
+ This operates by replacing invalid stream name characters with underscores (_)
when encountered.
A special case is when "v" begins with an invalid character. In this case, we
@@ -89,6 +89,10 @@
# Validate the resulting string.
validate_stream_name(v)
+ # v could be of type unicode. As a valid stream name contains only ascii
+ # characters, it is safe to transcode v to ascii encoding (become str type).
+ if isinstance(v, unicode):
+ return v.encode('ascii')
return v
diff --git a/tests/bootstrap_test.py b/tests/bootstrap_test.py
index 9129a52..4c49424 100755
--- a/tests/bootstrap_test.py
+++ b/tests/bootstrap_test.py
@@ -69,16 +69,6 @@
self.assertRaises(bootstrap.NotBootstrappedError,
bootstrap.ButlerBootstrap.probe, env=self.env)
- def testProbeMissingProjectRaisesError(self):
- self.env.pop(bootstrap.ButlerBootstrap._ENV_PROJECT)
- self.assertRaises(bootstrap.NotBootstrappedError,
- bootstrap.ButlerBootstrap.probe, env=self.env)
-
- def testProbeMissingPrefixRaisesError(self):
- self.env.pop(bootstrap.ButlerBootstrap._ENV_PREFIX)
- self.assertRaises(bootstrap.NotBootstrappedError,
- bootstrap.ButlerBootstrap.probe, env=self.env)
-
def testProbeInvalidPrefixRaisesError(self):
self.env[bootstrap.ButlerBootstrap._ENV_PREFIX] = '!!! not valid !!!'
self.assertRaises(bootstrap.NotBootstrappedError,
diff --git a/tests/stream_test.py b/tests/stream_test.py
index c3d54dd..eea9575 100755
--- a/tests/stream_test.py
+++ b/tests/stream_test.py
@@ -31,22 +31,17 @@
tags={
'foo': 'bar',
'baz': 'qux',
- },
- tee=stream.StreamParams.TEE_STDOUT,
- binary_file_extension='ext')
+ })
def testParamsToJson(self):
self.assertEqual(self.params.to_json(),
- ('{"binaryFileExtension": "ext", "contentType": "content-type", '
- '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, '
- '"tee": "stdout", "type": "text"}'))
+ ('{"contentType": "content-type", "name": "name", '
+ '"tags": {"baz": "qux", "foo": "bar"}, "type": "text"}'))
def testParamsToJsonWithEmpties(self):
params = self.params._replace(
content_type=None,
tags=None,
- tee=None,
- binary_file_extension=None,
)
self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}')
@@ -54,10 +49,6 @@
params = self.params._replace(type=None)
self.assertRaises(ValueError, params.to_json)
- def testParamsWithInvalidTeeTypeRaisesValueError(self):
- params = self.params._replace(tee='somewhere')
- self.assertRaises(ValueError, params.to_json)
-
def testParamsWithInvalidTagRaisesValueError(self):
params = self.params._replace(tags='foo')
self.assertRaises(ValueError, params.to_json)
@@ -155,7 +146,6 @@
def testTextStreamWithParams(self):
client = self._registry.create('test:value')
with client.text('mystream', content_type='foo/bar',
- tee=stream.StreamParams.TEE_STDOUT,
tags={'foo': 'bar', 'baz': 'qux'}) as fd:
self.assertEqual(
fd.params,
@@ -163,7 +153,6 @@
name='mystream',
type=stream.StreamParams.TEXT,
content_type='foo/bar',
- tee=stream.StreamParams.TEE_STDOUT,
tags={'foo': 'bar', 'baz': 'qux'}))
fd.write('text!')
@@ -175,7 +164,6 @@
'name': 'mystream',
'type': 'text',
'contentType': 'foo/bar',
- 'tee': 'stdout',
'tags': {'foo': 'bar', 'baz': 'qux'},
})
self.assertEqual(data, 'text!')