[logdog] Merge all changes to copy of logdog client lib in recipe engine

Included changes:
https://chromium.googlesource.com/infra/luci/recipes-py/+log/aa56edf..33d1705/recipe_engine/third_party/logdog

R=iannucci

Change-Id: I5339bc855d45cc12403dbd180abbcfd8ee326a6a
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-py/+/2229077
Auto-Submit: Yiwei Zhang <yiwzhang@google.com>
Commit-Queue: Robbie Iannucci <iannucci@chromium.org>
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
NOKEYCHECK=True
GitOrigin-RevId: 8b702e20d8a26bcc6de708f3e7d9b23a5e02a73e
diff --git a/OWNERS b/OWNERS
index 5c92963..760558a 100644
--- a/OWNERS
+++ b/OWNERS
@@ -1 +1 @@
-dnj@chromium.org
+iannucci@chromium.org
diff --git a/bootstrap.py b/bootstrap.py
index ea75822..aa88cef 100644
--- a/bootstrap.py
+++ b/bootstrap.py
@@ -46,32 +46,25 @@
     if env is None:
       env = os.environ
 
-    project = env.get(cls._ENV_PROJECT)
-    if not project:
-      raise NotBootstrappedError('Missing project [%s]' % (cls._ENV_PROJECT,))
-
-    prefix = env.get(cls._ENV_PREFIX)
-    if not prefix:
-      raise NotBootstrappedError('Missing prefix [%s]' % (cls._ENV_PREFIX,))
-    try:
-      streamname.validate_stream_name(prefix)
-    except ValueError as exp:
-      raise NotBootstrappedError('Prefix (%s) is invalid: %s' % (prefix, exp))
-
-    namespace = env.get(cls._ENV_NAMESPACE, '')
-    if namespace:
+    def _check(kind, val):
+      if not val:
+        return val
       try:
-        streamname.validate_stream_name(namespace)
+        streamname.validate_stream_name(val)
+        return val
       except ValueError as exp:
-        raise NotBootstrappedError(
-            'Namespace (%s) is invalid: %s' % (prefix, exp))
+        raise NotBootstrappedError('%s (%s) is invalid: %s' % (kind, val, exp))
+
+    streamserver_uri = env.get(cls._ENV_STREAM_SERVER_PATH)
+    if not streamserver_uri:
+      raise NotBootstrappedError('No streamserver in bootstrap environment.')
 
     return cls(
-        project=project,
-        prefix=prefix,
-        streamserver_uri=env.get(cls._ENV_STREAM_SERVER_PATH),
-        coordinator_host=env.get(cls._ENV_COORDINATOR_HOST),
-        namespace=namespace)
+        project=env.get(cls._ENV_PROJECT, ''),
+        prefix=_check("Prefix", env.get(cls._ENV_PREFIX, '')),
+        streamserver_uri=streamserver_uri,
+        coordinator_host=env.get(cls._ENV_COORDINATOR_HOST, ''),
+        namespace=_check("Namespace", env.get(cls._ENV_NAMESPACE, '')))
 
   def stream_client(self, reg=None):
     """Returns: (StreamClient) stream client for the bootstrap streamserver URI.
@@ -88,8 +81,6 @@
     Raises:
       ValueError: If no streamserver URI is present in the environment.
     """
-    if not self.streamserver_uri:
-      raise ValueError('No streamserver in bootstrap environment.')
     reg = reg or stream._default_registry
     return reg.create(
         self.streamserver_uri,
diff --git a/stream.py b/stream.py
index 89bbe77..fa1429a 100644
--- a/stream.py
+++ b/stream.py
@@ -10,13 +10,22 @@
 import socket
 import sys
 import threading
-import types
+import time
 
 from . import streamname, varint
 
+try:
+  # We use FileObjectThread on gevented processes so that gevent can do work
+  # while we block on sending to a logdog stream.
+  from gevent.fileobject import FileObjectThread
+except ImportError:
+  FileObjectThread = lambda x: x
 
-_StreamParamsBase = collections.namedtuple('_StreamParamsBase',
-    ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
+if sys.platform == "win32":
+  from ctypes import GetLastError
+
+_StreamParamsBase = collections.namedtuple(
+    '_StreamParamsBase', ('name', 'type', 'content_type', 'tags'))
 
 
 # Magic number at the beginning of a Butler stream
@@ -36,11 +45,6 @@
   # A datagram content stream.
   DATAGRAM = 'datagram'
 
-  # Tee parameter to tee this stream through the Butler's STDOUT.
-  TEE_STDOUT = 'stdout'
-  # Tee parameter to tee this stream through the Butler's STDERR.
-  TEE_STDERR = 'stderr'
-
   @classmethod
   def make(cls, **kwargs):
     """Returns (StreamParams): A new StreamParams instance with supplied values.
@@ -65,14 +69,6 @@
       for k, v in self.tags.items():
         streamname.validate_tag(k, v)
 
-    if self.tee not in (None, self.TEE_STDOUT, self.TEE_STDERR):
-      raise ValueError('Invalid tee type (%s)' % (self.tee,))
-
-    if not isinstance(self.binary_file_extension,
-        (types.NoneType, types.StringTypes)):
-      raise ValueError('Invalid binary file extension type (%s)' % (
-          self.binary_file_extension,))
-
   def to_json(self):
     """Returns (str): The JSON representation of the StreamParams.
 
@@ -88,13 +84,12 @@
         'type': self.type,
     }
 
-    def maybe_add(key, value):
+    def _maybe_add(key, value):
       if value is not None:
         obj[key] = value
-    maybe_add('contentType', self.content_type)
-    maybe_add('tags', self.tags)
-    maybe_add('tee', self.tee)
-    maybe_add('binaryFileExtension', self.binary_file_extension)
+
+    _maybe_add('contentType', self.content_type)
+    _maybe_add('tags', self.tags)
 
     # Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
     return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
@@ -253,6 +248,13 @@
     """
     return self._coordinator_host
 
+  @property
+  def namespace(self):
+    """Returns (str): The namespace for all streams opened by this client.
+    Empty if not configured.
+    """
+    return self._namespace
+
   def get_stream_path(self, name):
     """Returns (streamname.StreamPath): The stream path.
 
@@ -330,13 +332,13 @@
     have any data written to it past initialization (if needed) when it has been
     returned.
 
-    The file-like object must implement `write` and `close`.
+    The file-like object must implement `write`, `fileno`, `flush`, and `close`.
 
     Implementing classes must override this.
     """
     raise NotImplementedError()
 
-  def new_connection(self, params):
+  def new_connection(self, params, for_process):
     """Returns (file): A new configured stream.
 
     The returned object implements (minimally) `write` and `close`.
@@ -345,6 +347,8 @@
 
     Args:
       params (StreamParams): The parameters to use with the new connection.
+      for_process (bool): If this connection will be attached to a standard
+        handle on a subprocess.
 
     Raises:
       ValueError if the stream name has already been used, or if the parameters
@@ -353,11 +357,15 @@
     self._register_new_stream(params.name)
     params_json = params.to_json()
 
-    fd = self._connect_raw()
-    fd.write(BUTLER_MAGIC)
-    varint.write_uvarint(fd, len(params_json))
-    fd.write(params_json)
-    return fd
+    fobj = self._connect_raw()
+    fobj.write(BUTLER_MAGIC)
+    varint.write_uvarint(fobj, len(params_json))
+    fobj.write(params_json)
+
+    if not for_process:
+      fobj = FileObjectThread(fobj)
+
+    return fobj
 
   @contextlib.contextmanager
   def text(self, name, **kwargs):
@@ -374,16 +382,15 @@
     Returns (file): A file-like object to a Butler UTF-8 text stream supporting
         `write`.
     """
-    fd = None
+    fobj = None
     try:
-      fd = self.open_text(name, **kwargs)
-      yield fd
+      fobj = self.open_text(name, **kwargs)
+      yield fobj
     finally:
-      if fd is not None:
-        fd.close()
+      if fobj is not None:
+        fobj.close()
 
-  def open_text(self, name, content_type=None, tags=None, tee=None,
-                binary_file_extension=None):
+  def open_text(self, name, content_type=None, tags=None, for_process=False):
     """Returns (file): A file-like object for a single text stream.
 
     This creates a new butler TEXT stream with the specified parameters.
@@ -393,11 +400,8 @@
       content_type (str): The optional content type of the stream. If None, a
           default content type will be chosen by the Butler.
       tags (dict): An optional key/value dictionary pair of LogDog stream tags.
-      tee (str): Describes how stream data should be tee'd through the Butler.
-          One of StreamParams' TEE arguments.
-      binary_file_extension (str): A custom binary file extension. If not
-          provided, a default extension may be chosen or the binary stream may
-          not be emitted.
+      for_process (bool): Indicates that this stream will be directly attached
+        to a subprocess's stdout/stderr
 
     Returns (file): A file-like object to a Butler text stream. This object can
         have UTF-8 text content written to it with its `write` method, and must
@@ -407,10 +411,9 @@
         name=posixpath.join(self._namespace, name),
         type=StreamParams.TEXT,
         content_type=content_type,
-        tags=tags,
-        tee=tee,
-        binary_file_extension=binary_file_extension)
-    return self._BasicStream(self, params, self.new_connection(params))
+        tags=tags)
+    return self._BasicStream(self, params,
+                             self.new_connection(params, for_process))
 
   @contextlib.contextmanager
   def binary(self, name, **kwargs):
@@ -427,16 +430,15 @@
     Returns (file): A file-like object to a Butler binary stream supporting
         `write`.
     """
-    fd = None
+    fobj = None
     try:
-      fd = self.open_binary(name, **kwargs)
-      yield fd
+      fobj = self.open_binary(name, **kwargs)
+      yield fobj
     finally:
-      if fd is not None:
-        fd.close()
+      if fobj is not None:
+        fobj.close()
 
-  def open_binary(self, name, content_type=None, tags=None, tee=None,
-                binary_file_extension=None):
+  def open_binary(self, name, content_type=None, tags=None, for_process=False):
     """Returns (file): A file-like object for a single binary stream.
 
     This creates a new butler BINARY stream with the specified parameters.
@@ -446,11 +448,8 @@
       content_type (str): The optional content type of the stream. If None, a
           default content type will be chosen by the Butler.
       tags (dict): An optional key/value dictionary pair of LogDog stream tags.
-      tee (str): Describes how stream data should be tee'd through the Butler.
-          One of StreamParams' TEE arguments.
-      binary_file_extension (str): A custom binary file extension. If not
-          provided, a default extension may be chosen or the binary stream may
-          not be emitted.
+      for_process (bool): Indicates that this stream will be directly attached
+        to a subprocess's stdout/stderr
 
     Returns (file): A file-like object to a Butler binary stream. This object
         can have UTF-8 content written to it with its `write` method, and must
@@ -460,10 +459,9 @@
         name=posixpath.join(self._namespace, name),
         type=StreamParams.BINARY,
         content_type=content_type,
-        tags=tags,
-        tee=tee,
-        binary_file_extension=binary_file_extension)
-    return self._BasicStream(self, params, self.new_connection(params))
+        tags=tags)
+    return self._BasicStream(self, params,
+                             self.new_connection(params, for_process))
 
   @contextlib.contextmanager
   def datagram(self, name, **kwargs):
@@ -480,16 +478,15 @@
     Returns (_DatagramStream): A datagram stream object. Datagrams can be
         written to it using its `send` method.
     """
-    fd = None
+    fobj = None
     try:
-      fd = self.open_datagram(name, **kwargs)
-      yield fd
+      fobj = self.open_datagram(name, **kwargs)
+      yield fobj
     finally:
-      if fd is not None:
-        fd.close()
+      if fobj is not None:
+        fobj.close()
 
-  def open_datagram(self, name, content_type=None, tags=None, tee=None,
-                    binary_file_extension=None):
+  def open_datagram(self, name, content_type=None, tags=None):
     """Creates a new butler DATAGRAM stream with the specified parameters.
 
     Args:
@@ -497,11 +494,6 @@
       content_type (str): The optional content type of the stream. If None, a
           default content type will be chosen by the Butler.
       tags (dict): An optional key/value dictionary pair of LogDog stream tags.
-      tee (str): Describes how stream data should be tee'd through the Butler.
-          One of StreamParams' TEE arguments.
-      binary_file_extension (str): A custom binary file extension. If not
-          provided, a default extension may be chosen or the binary stream may
-          not be emitted.
 
     Returns (_DatagramStream): A datagram stream object. Datagrams can be
         written to it using its `send` method. This object must be closed when
@@ -511,10 +503,9 @@
         name=posixpath.join(self._namespace, name),
         type=StreamParams.DATAGRAM,
         content_type=content_type,
-        tags=tags,
-        tee=tee,
-        binary_file_extension=binary_file_extension)
-    return self._DatagramStream(self, params, self.new_connection(params))
+        tags=tags)
+    return self._DatagramStream(self, params,
+                                self.new_connection(params, False))
 
 
 class _NamedPipeStreamClient(StreamClient):
@@ -528,14 +519,25 @@
       name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
     """
     super(_NamedPipeStreamClient, self).__init__(**kwargs)
-    self._name = name
+    self._name = '\\\\.\\pipe\\' + name
 
   @classmethod
   def _create(cls, value, **kwargs):
     return cls(value, **kwargs)
 
+  ERROR_PIPE_BUSY = 231
+
   def _connect_raw(self):
-    return open(self._name, 'wb')
+    # This is a similar procedure to the one in
+    #   https://github.com/microsoft/go-winio/blob/master/pipe.go (tryDialPipe)
+    while True:
+      try:
+        return open(self._name, 'wb+', buffering=0)
+      except (OSError, IOError):
+        if GetLastError() != self.ERROR_PIPE_BUSY:
+          raise
+      time.sleep(0.001)  # 1ms
+
 
 _default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
 
@@ -547,18 +549,23 @@
   class SocketFile(object):
     """A write-only file-like object that writes to a UNIX socket."""
 
-    def __init__(self, fd):
-      self._fd = fd
+    def __init__(self, sock):
+      self._sock = sock
+
+    def fileno(self):
+      return self._sock
 
     def fileno(self):
       return self._fd
 
     def write(self, data):
-      self._fd.send(data)
+      self._sock.sendall(data)
+
+    def flush(self):
+      pass
 
     def close(self):
-      self._fd.close()
-
+      self._sock.close()
 
   def __init__(self, path, **kwargs):
     """Initializes a new UNIX domain socket stream client.
diff --git a/streamname.py b/streamname.py
index bee5388..a16a398 100644
--- a/streamname.py
+++ b/streamname.py
@@ -52,9 +52,9 @@
 
 
 def normalize(v, prefix=None):
-  """Given a string, "v", mutate it into a valid stream name.
+  """Given a string (str|unicode), mutate it into a valid stream name (str).
 
-  This operates by replacing invalid stream naem characters with underscores (_)
+  This operates by replacing invalid stream name characters with underscores (_)
   when encountered.
 
   A special case is when "v" begins with an invalid character. In this case, we
@@ -89,6 +89,10 @@
 
   # Validate the resulting string.
   validate_stream_name(v)
+  # v could be of type unicode. As a valid stream name contains only ascii
+  # characters, it is safe to transcode v to ascii encoding (become str type).
+  if isinstance(v, unicode):
+    return v.encode('ascii')
   return v
 
 
diff --git a/tests/bootstrap_test.py b/tests/bootstrap_test.py
index 9129a52..4c49424 100755
--- a/tests/bootstrap_test.py
+++ b/tests/bootstrap_test.py
@@ -69,16 +69,6 @@
     self.assertRaises(bootstrap.NotBootstrappedError,
         bootstrap.ButlerBootstrap.probe, env=self.env)
 
-  def testProbeMissingProjectRaisesError(self):
-    self.env.pop(bootstrap.ButlerBootstrap._ENV_PROJECT)
-    self.assertRaises(bootstrap.NotBootstrappedError,
-        bootstrap.ButlerBootstrap.probe, env=self.env)
-
-  def testProbeMissingPrefixRaisesError(self):
-    self.env.pop(bootstrap.ButlerBootstrap._ENV_PREFIX)
-    self.assertRaises(bootstrap.NotBootstrappedError,
-        bootstrap.ButlerBootstrap.probe, env=self.env)
-
   def testProbeInvalidPrefixRaisesError(self):
     self.env[bootstrap.ButlerBootstrap._ENV_PREFIX] = '!!! not valid !!!'
     self.assertRaises(bootstrap.NotBootstrappedError,
diff --git a/tests/stream_test.py b/tests/stream_test.py
index c3d54dd..eea9575 100755
--- a/tests/stream_test.py
+++ b/tests/stream_test.py
@@ -31,22 +31,17 @@
         tags={
             'foo': 'bar',
             'baz': 'qux',
-        },
-        tee=stream.StreamParams.TEE_STDOUT,
-        binary_file_extension='ext')
+        })
 
   def testParamsToJson(self):
     self.assertEqual(self.params.to_json(),
-        ('{"binaryFileExtension": "ext", "contentType": "content-type", '
-         '"name": "name", "tags": {"baz": "qux", "foo": "bar"}, '
-         '"tee": "stdout", "type": "text"}'))
+                     ('{"contentType": "content-type", "name": "name", '
+                      '"tags": {"baz": "qux", "foo": "bar"}, "type": "text"}'))
 
   def testParamsToJsonWithEmpties(self):
     params = self.params._replace(
         content_type=None,
         tags=None,
-        tee=None,
-        binary_file_extension=None,
     )
     self.assertEqual(params.to_json(), '{"name": "name", "type": "text"}')
 
@@ -54,10 +49,6 @@
     params = self.params._replace(type=None)
     self.assertRaises(ValueError, params.to_json)
 
-  def testParamsWithInvalidTeeTypeRaisesValueError(self):
-    params = self.params._replace(tee='somewhere')
-    self.assertRaises(ValueError, params.to_json)
-
   def testParamsWithInvalidTagRaisesValueError(self):
     params = self.params._replace(tags='foo')
     self.assertRaises(ValueError, params.to_json)
@@ -155,7 +146,6 @@
   def testTextStreamWithParams(self):
     client = self._registry.create('test:value')
     with client.text('mystream', content_type='foo/bar',
-                     tee=stream.StreamParams.TEE_STDOUT,
                      tags={'foo': 'bar', 'baz': 'qux'}) as fd:
       self.assertEqual(
           fd.params,
@@ -163,7 +153,6 @@
               name='mystream',
               type=stream.StreamParams.TEXT,
               content_type='foo/bar',
-              tee=stream.StreamParams.TEE_STDOUT,
               tags={'foo': 'bar', 'baz': 'qux'}))
       fd.write('text!')
 
@@ -175,7 +164,6 @@
         'name': 'mystream',
         'type': 'text',
         'contentType': 'foo/bar',
-         'tee': 'stdout',
          'tags': {'foo': 'bar', 'baz': 'qux'},
     })
     self.assertEqual(data, 'text!')