[logdog_client] Add $LOGDOG_NAMESPACE to allow transparent logdog client
nesting.
R=martiniss@chromium.org, nodir@chromium.org, tandrii@chromium.org
Bug: 909848
Change-Id: Ia3d8de73e0c39aa97852e6abe006cf296a12b93d
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-py/+/1497446
Auto-Submit: Robbie Iannucci <iannucci@chromium.org>
Reviewed-by: Andrii Shyshkalov <tandrii@chromium.org>
Commit-Queue: Robbie Iannucci <iannucci@chromium.org>
NOKEYCHECK=True
GitOrigin-RevId: c4fb24fd9aac900b9f44d1829e9dc9dbd52cf2b0
diff --git a/bootstrap.py b/bootstrap.py
index 1351db0..1207823 100644
--- a/bootstrap.py
+++ b/bootstrap.py
@@ -14,7 +14,8 @@
_ButlerBootstrapBase = collections.namedtuple('_ButlerBootstrapBase',
- ('project', 'prefix', 'streamserver_uri', 'coordinator_host'))
+ ('project', 'prefix', 'streamserver_uri', 'coordinator_host',
+ 'namespace'))
class ButlerBootstrap(_ButlerBootstrapBase):
@@ -25,10 +26,12 @@
environment and identifies those parameters.
"""
+ # TODO(iannucci): move all of these to LUCI_CONTEXT
_ENV_PROJECT = 'LOGDOG_STREAM_PROJECT'
_ENV_PREFIX = 'LOGDOG_STREAM_PREFIX'
_ENV_STREAM_SERVER_PATH = 'LOGDOG_STREAM_SERVER_PATH'
_ENV_COORDINATOR_HOST = 'LOGDOG_COORDINATOR_HOST'
+ _ENV_NAMESPACE = 'LOGDOG_NAMESPACE'
@classmethod
def probe(cls, env=None):
@@ -52,14 +55,23 @@
raise NotBootstrappedError('Missing prefix [%s]' % (cls._ENV_PREFIX,))
try:
streamname.validate_stream_name(prefix)
- except ValueError as e:
- raise NotBootstrappedError('Prefix (%s) is invalid: %s' % (prefix, e))
+ except ValueError as exp:
+ raise NotBootstrappedError('Prefix (%s) is invalid: %s' % (prefix, exp))
+
+ namespace = env.get(cls._ENV_NAMESPACE, '')
+ if namespace:
+ try:
+ streamname.validate_stream_name(namespace)
+ except ValueError as exp:
+ raise NotBootstrappedError(
+ 'Namespace (%s) is invalid: %s' % (prefix, exp))
return cls(
project=project,
prefix=prefix,
streamserver_uri=env.get(cls._ENV_STREAM_SERVER_PATH),
- coordinator_host=env.get(cls._ENV_COORDINATOR_HOST))
+ coordinator_host=env.get(cls._ENV_COORDINATOR_HOST),
+ namespace=namespace)
def stream_client(self, reg=None):
"""Returns: (StreamClient) stream client for the bootstrap streamserver URI.
@@ -83,4 +95,5 @@
self.streamserver_uri,
project=self.project,
prefix=self.prefix,
- coordinator_host=self.coordinator_host)
+ coordinator_host=self.coordinator_host,
+ namespace=self.namespace)
diff --git a/stream.py b/stream.py
index ea415e1..76f3c33 100644
--- a/stream.py
+++ b/stream.py
@@ -6,6 +6,7 @@
import contextlib
import json
import os
+import posixpath
import socket
import sys
import threading
@@ -213,7 +214,8 @@
return self._fd.close()
- def __init__(self, project=None, prefix=None, coordinator_host=None):
+ def __init__(self, project=None, prefix=None, coordinator_host=None,
+ namespace=''):
"""Constructs a new base StreamClient instance.
Args:
@@ -222,10 +224,12 @@
coordinator_host (str or None): If not None, the name of the Coordinator
host that this stream client is bound to. This will be used to
construct viewer URLs for generated streams.
+ namespace (str): The prefix to apply to all streams opened by this client.
"""
self._project = project
self._prefix = prefix
self._coordinator_host = coordinator_host
+ self._namespace = namespace
self._name_lock = threading.Lock()
self._names = set()
@@ -397,7 +401,7 @@
be closed when finished using its `close` method.
"""
params = StreamParams.make(
- name=name,
+ name=posixpath.join(self._namespace, name),
type=StreamParams.TEXT,
content_type=content_type,
tags=tags,
@@ -450,7 +454,7 @@
be closed when finished using its `close` method.
"""
params = StreamParams.make(
- name=name,
+ name=posixpath.join(self._namespace, name),
type=StreamParams.BINARY,
content_type=content_type,
tags=tags,
@@ -501,7 +505,7 @@
finished by using its `close` method.
"""
params = StreamParams.make(
- name=name,
+ name=posixpath.join(self._namespace, name),
type=StreamParams.DATAGRAM,
content_type=content_type,
tags=tags,
diff --git a/tests/bootstrap_test.py b/tests/bootstrap_test.py
index 21b0c67..a55e160 100755
--- a/tests/bootstrap_test.py
+++ b/tests/bootstrap_test.py
@@ -15,6 +15,8 @@
from libs.logdog import bootstrap, stream
+# pylint: disable=protected-access
+
class BootstrapTestCase(unittest.TestCase):
def setUp(self):
@@ -23,20 +25,46 @@
bootstrap.ButlerBootstrap._ENV_PREFIX: 'foo/bar',
bootstrap.ButlerBootstrap._ENV_STREAM_SERVER_PATH: 'fake:path',
bootstrap.ButlerBootstrap._ENV_COORDINATOR_HOST: 'example.appspot.com',
+ bootstrap.ButlerBootstrap._ENV_NAMESPACE: 'something',
}
+ @classmethod
+ def setUpClass(cls):
+ class TestStreamClient(stream.StreamClient):
+ @classmethod
+ def _create(cls, _value, **kwargs):
+ return cls(**kwargs)
+
+ def _connect_raw(self):
+ class fakeFile(object):
+ def write(self, data):
+ pass
+ return fakeFile()
+ cls.reg = stream.StreamProtocolRegistry()
+ cls.reg.register_protocol('test', TestStreamClient)
+
def testProbeSucceeds(self):
bs = bootstrap.ButlerBootstrap.probe(self.env)
self.assertEqual(bs, bootstrap.ButlerBootstrap(
project='test-project',
prefix='foo/bar',
streamserver_uri='fake:path',
- coordinator_host='example.appspot.com'))
+ coordinator_host='example.appspot.com',
+ namespace='something'))
def testProbeNoBootstrapRaisesError(self):
self.assertRaises(bootstrap.NotBootstrappedError,
bootstrap.ButlerBootstrap.probe, env={})
+ def testNoNamespaceOK(self):
+ del self.env[bootstrap.ButlerBootstrap._ENV_NAMESPACE]
+ bootstrap.ButlerBootstrap.probe(self.env)
+
+ def testProbeBadNamespaceRaisesError(self):
+ self.env[bootstrap.ButlerBootstrap._ENV_NAMESPACE] = '!!! invalid'
+ self.assertRaises(bootstrap.NotBootstrappedError,
+ bootstrap.ButlerBootstrap.probe, env=self.env)
+
def testProbeMissingProjectRaisesError(self):
self.env.pop(bootstrap.ButlerBootstrap._ENV_PROJECT)
self.assertRaises(bootstrap.NotBootstrappedError,
@@ -53,24 +81,27 @@
bootstrap.ButlerBootstrap.probe, env=self.env)
def testCreateStreamClient(self):
- class TestStreamClient(stream.StreamClient):
- @classmethod
- def _create(cls, _value, **kwargs):
- return cls(**kwargs)
-
- def _connect_raw(self):
- raise NotImplementedError()
-
- reg = stream.StreamProtocolRegistry()
- reg.register_protocol('test', TestStreamClient)
bs = bootstrap.ButlerBootstrap(
project='test-project',
prefix='foo/bar',
streamserver_uri='test:',
- coordinator_host='example.appspot.com')
- sc = bs.stream_client(reg=reg)
+ coordinator_host='example.appspot.com',
+ namespace='something/deep')
+ sc = bs.stream_client(reg=self.reg)
self.assertEqual(sc.prefix, 'foo/bar')
self.assertEqual(sc.coordinator_host, 'example.appspot.com')
+ self.assertEqual(sc.open_text('foobar').params.name,
+ 'something/deep/foobar')
+
+ def testCreateStreamClientNoNamespace(self):
+ bs = bootstrap.ButlerBootstrap(
+ project='test-project',
+ prefix='foo/bar',
+ streamserver_uri='test:',
+ coordinator_host='example.appspot.com',
+ namespace='')
+ sc = bs.stream_client(reg=self.reg)
+ self.assertEqual(sc.open_text('foobar').params.name, 'foobar')
if __name__ == '__main__':