[logdog_client] Add $LOGDOG_NAMESPACE to allow transparent logdog client
nesting.

R=martiniss@chromium.org, nodir@chromium.org, tandrii@chromium.org

Bug: 909848
Change-Id: Ia3d8de73e0c39aa97852e6abe006cf296a12b93d
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-py/+/1497446
Auto-Submit: Robbie Iannucci <iannucci@chromium.org>
Reviewed-by: Andrii Shyshkalov <tandrii@chromium.org>
Commit-Queue: Robbie Iannucci <iannucci@chromium.org>
NOKEYCHECK=True
GitOrigin-RevId: c4fb24fd9aac900b9f44d1829e9dc9dbd52cf2b0
diff --git a/bootstrap.py b/bootstrap.py
index 1351db0..1207823 100644
--- a/bootstrap.py
+++ b/bootstrap.py
@@ -14,7 +14,8 @@
 
 
 _ButlerBootstrapBase = collections.namedtuple('_ButlerBootstrapBase',
-    ('project', 'prefix', 'streamserver_uri', 'coordinator_host'))
+    ('project', 'prefix', 'streamserver_uri', 'coordinator_host',
+     'namespace'))
 
 
 class ButlerBootstrap(_ButlerBootstrapBase):
@@ -25,10 +26,12 @@
   environment and identifies those parameters.
   """
 
+  # TODO(iannucci): move all of these to LUCI_CONTEXT
   _ENV_PROJECT = 'LOGDOG_STREAM_PROJECT'
   _ENV_PREFIX = 'LOGDOG_STREAM_PREFIX'
   _ENV_STREAM_SERVER_PATH = 'LOGDOG_STREAM_SERVER_PATH'
   _ENV_COORDINATOR_HOST = 'LOGDOG_COORDINATOR_HOST'
+  _ENV_NAMESPACE = 'LOGDOG_NAMESPACE'
 
   @classmethod
   def probe(cls, env=None):
@@ -52,14 +55,23 @@
       raise NotBootstrappedError('Missing prefix [%s]' % (cls._ENV_PREFIX,))
     try:
       streamname.validate_stream_name(prefix)
-    except ValueError as e:
-      raise NotBootstrappedError('Prefix (%s) is invalid: %s' % (prefix, e))
+    except ValueError as exp:
+      raise NotBootstrappedError('Prefix (%s) is invalid: %s' % (prefix, exp))
+
+    namespace = env.get(cls._ENV_NAMESPACE, '')
+    if namespace:
+      try:
+        streamname.validate_stream_name(namespace)
+      except ValueError as exp:
+        raise NotBootstrappedError(
+            'Namespace (%s) is invalid: %s' % (prefix, exp))
 
     return cls(
         project=project,
         prefix=prefix,
         streamserver_uri=env.get(cls._ENV_STREAM_SERVER_PATH),
-        coordinator_host=env.get(cls._ENV_COORDINATOR_HOST))
+        coordinator_host=env.get(cls._ENV_COORDINATOR_HOST),
+        namespace=namespace)
 
   def stream_client(self, reg=None):
     """Returns: (StreamClient) stream client for the bootstrap streamserver URI.
@@ -83,4 +95,5 @@
         self.streamserver_uri,
         project=self.project,
         prefix=self.prefix,
-        coordinator_host=self.coordinator_host)
+        coordinator_host=self.coordinator_host,
+        namespace=self.namespace)
diff --git a/stream.py b/stream.py
index ea415e1..76f3c33 100644
--- a/stream.py
+++ b/stream.py
@@ -6,6 +6,7 @@
 import contextlib
 import json
 import os
+import posixpath
 import socket
 import sys
 import threading
@@ -213,7 +214,8 @@
       return self._fd.close()
 
 
-  def __init__(self, project=None, prefix=None, coordinator_host=None):
+  def __init__(self, project=None, prefix=None, coordinator_host=None,
+               namespace=''):
     """Constructs a new base StreamClient instance.
 
     Args:
@@ -222,10 +224,12 @@
       coordinator_host (str or None): If not None, the name of the Coordinator
           host that this stream client is bound to. This will be used to
           construct viewer URLs for generated streams.
+      namespace (str): The prefix to apply to all streams opened by this client.
     """
     self._project = project
     self._prefix = prefix
     self._coordinator_host = coordinator_host
+    self._namespace = namespace
 
     self._name_lock = threading.Lock()
     self._names = set()
@@ -397,7 +401,7 @@
         be closed when finished using its `close` method.
     """
     params = StreamParams.make(
-        name=name,
+        name=posixpath.join(self._namespace, name),
         type=StreamParams.TEXT,
         content_type=content_type,
         tags=tags,
@@ -450,7 +454,7 @@
         be closed when finished using its `close` method.
     """
     params = StreamParams.make(
-        name=name,
+        name=posixpath.join(self._namespace, name),
         type=StreamParams.BINARY,
         content_type=content_type,
         tags=tags,
@@ -501,7 +505,7 @@
         finished by using its `close` method.
     """
     params = StreamParams.make(
-        name=name,
+        name=posixpath.join(self._namespace, name),
         type=StreamParams.DATAGRAM,
         content_type=content_type,
         tags=tags,
diff --git a/tests/bootstrap_test.py b/tests/bootstrap_test.py
index 21b0c67..a55e160 100755
--- a/tests/bootstrap_test.py
+++ b/tests/bootstrap_test.py
@@ -15,6 +15,8 @@
 from libs.logdog import bootstrap, stream
 
 
+# pylint: disable=protected-access
+
 class BootstrapTestCase(unittest.TestCase):
 
   def setUp(self):
@@ -23,20 +25,46 @@
         bootstrap.ButlerBootstrap._ENV_PREFIX: 'foo/bar',
         bootstrap.ButlerBootstrap._ENV_STREAM_SERVER_PATH: 'fake:path',
         bootstrap.ButlerBootstrap._ENV_COORDINATOR_HOST: 'example.appspot.com',
+        bootstrap.ButlerBootstrap._ENV_NAMESPACE: 'something',
     }
 
+  @classmethod
+  def setUpClass(cls):
+    class TestStreamClient(stream.StreamClient):
+      @classmethod
+      def _create(cls, _value, **kwargs):
+        return cls(**kwargs)
+
+      def _connect_raw(self):
+        class fakeFile(object):
+          def write(self, data):
+            pass
+        return fakeFile()
+    cls.reg = stream.StreamProtocolRegistry()
+    cls.reg.register_protocol('test', TestStreamClient)
+
   def testProbeSucceeds(self):
     bs = bootstrap.ButlerBootstrap.probe(self.env)
     self.assertEqual(bs, bootstrap.ButlerBootstrap(
       project='test-project',
       prefix='foo/bar',
       streamserver_uri='fake:path',
-      coordinator_host='example.appspot.com'))
+      coordinator_host='example.appspot.com',
+      namespace='something'))
 
   def testProbeNoBootstrapRaisesError(self):
     self.assertRaises(bootstrap.NotBootstrappedError,
         bootstrap.ButlerBootstrap.probe, env={})
 
+  def testNoNamespaceOK(self):
+    del self.env[bootstrap.ButlerBootstrap._ENV_NAMESPACE]
+    bootstrap.ButlerBootstrap.probe(self.env)
+
+  def testProbeBadNamespaceRaisesError(self):
+    self.env[bootstrap.ButlerBootstrap._ENV_NAMESPACE] = '!!! invalid'
+    self.assertRaises(bootstrap.NotBootstrappedError,
+        bootstrap.ButlerBootstrap.probe, env=self.env)
+
   def testProbeMissingProjectRaisesError(self):
     self.env.pop(bootstrap.ButlerBootstrap._ENV_PROJECT)
     self.assertRaises(bootstrap.NotBootstrappedError,
@@ -53,24 +81,27 @@
         bootstrap.ButlerBootstrap.probe, env=self.env)
 
   def testCreateStreamClient(self):
-    class TestStreamClient(stream.StreamClient):
-      @classmethod
-      def _create(cls, _value, **kwargs):
-        return cls(**kwargs)
-
-      def _connect_raw(self):
-        raise NotImplementedError()
-
-    reg = stream.StreamProtocolRegistry()
-    reg.register_protocol('test', TestStreamClient)
     bs = bootstrap.ButlerBootstrap(
       project='test-project',
       prefix='foo/bar',
       streamserver_uri='test:',
-      coordinator_host='example.appspot.com')
-    sc = bs.stream_client(reg=reg)
+      coordinator_host='example.appspot.com',
+      namespace='something/deep')
+    sc = bs.stream_client(reg=self.reg)
     self.assertEqual(sc.prefix, 'foo/bar')
     self.assertEqual(sc.coordinator_host, 'example.appspot.com')
+    self.assertEqual(sc.open_text('foobar').params.name,
+                     'something/deep/foobar')
+
+  def testCreateStreamClientNoNamespace(self):
+    bs = bootstrap.ButlerBootstrap(
+      project='test-project',
+      prefix='foo/bar',
+      streamserver_uri='test:',
+      coordinator_host='example.appspot.com',
+      namespace='')
+    sc = bs.stream_client(reg=self.reg)
+    self.assertEqual(sc.open_text('foobar').params.name, 'foobar')
 
 
 if __name__ == '__main__':