Update LogDog client library to generate URLs.
Update the LogDog client library to generate LogDog stream viewer URLs.
This includes:
- Capturing the LogDog Coordinator host from the enviornment via
bootstrap.
- Implementing full stream paths and URL generation.
- Adjusting the StreamClient implementations to retain the prefix and
coordinator host values.
- Wrapping text/binary StreamClient file descriptors in a container to
expose per-stream higher-level functionality.
Tests have also been updated to assert the new functionality.
TBR=nodir@chromium.org
BUG=chromium:659291
TEST=unit
Review-Url: https://codereview.chromium.org/2453273002
NOKEYCHECK=True
GitOrigin-RevId: a32ab2fad3bda41efae1607ec66be78f48a9d0b9
diff --git a/bootstrap.py b/bootstrap.py
index 3344e5d..1351db0 100644
--- a/bootstrap.py
+++ b/bootstrap.py
@@ -14,7 +14,7 @@
_ButlerBootstrapBase = collections.namedtuple('_ButlerBootstrapBase',
- ('project', 'prefix', 'streamserver_uri'))
+ ('project', 'prefix', 'streamserver_uri', 'coordinator_host'))
class ButlerBootstrap(_ButlerBootstrapBase):
@@ -28,6 +28,7 @@
_ENV_PROJECT = 'LOGDOG_STREAM_PROJECT'
_ENV_PREFIX = 'LOGDOG_STREAM_PREFIX'
_ENV_STREAM_SERVER_PATH = 'LOGDOG_STREAM_SERVER_PATH'
+ _ENV_COORDINATOR_HOST = 'LOGDOG_COORDINATOR_HOST'
@classmethod
def probe(cls, env=None):
@@ -41,12 +42,12 @@
"""
if env is None:
env = os.environ
- project = env.get(cls._ENV_PROJECT)
- prefix = env.get(cls._ENV_PREFIX)
+ project = env.get(cls._ENV_PROJECT)
if not project:
raise NotBootstrappedError('Missing project [%s]' % (cls._ENV_PROJECT,))
+ prefix = env.get(cls._ENV_PREFIX)
if not prefix:
raise NotBootstrappedError('Missing prefix [%s]' % (cls._ENV_PREFIX,))
try:
@@ -54,19 +55,32 @@
except ValueError as e:
raise NotBootstrappedError('Prefix (%s) is invalid: %s' % (prefix, e))
- return cls(project=project, prefix=prefix,
- streamserver_uri=env.get(cls._ENV_STREAM_SERVER_PATH))
+ return cls(
+ project=project,
+ prefix=prefix,
+ streamserver_uri=env.get(cls._ENV_STREAM_SERVER_PATH),
+ coordinator_host=env.get(cls._ENV_COORDINATOR_HOST))
- def stream_client(self):
+ def stream_client(self, reg=None):
"""Returns: (StreamClient) stream client for the bootstrap streamserver URI.
If the Butler accepts external stream connections, it will export a
streamserver URI in the environment. This will create a StreamClient
instance to operate on the streamserver if one is defined.
+ Args:
+ reg (stream.StreamProtocolRegistry or None): The stream protocol registry
+ to use to create the stream. If None, the default global registry will
+ be used (recommended).
+
Raises:
ValueError: If no streamserver URI is present in the environment.
"""
if not self.streamserver_uri:
raise ValueError('No streamserver in bootstrap environment.')
- return stream.create(self.streamserver_uri)
+ reg = reg or stream._default_registry
+ return reg.create(
+ self.streamserver_uri,
+ project=self.project,
+ prefix=self.prefix,
+ coordinator_host=self.coordinator_host)
diff --git a/stream.py b/stream.py
index 3d60a6c..ea415e1 100644
--- a/stream.py
+++ b/stream.py
@@ -112,7 +112,21 @@
raise KeyError('Duplicate protocol registered.')
self._registry[protocol] = client_cls
- def create(self, uri):
+ def create(self, uri, **kwargs):
+ """Returns (StreamClient): A stream client for the specified URI.
+
+ This uses the default StreamProtocolRegistry to instantiate a StreamClient
+ for the specified URI.
+
+ Args:
+ uri (str): The streamserver URI.
+ kwargs: keyword arguments to forward to the stream. See
+ StreamClient.__init__.
+
+ Raises:
+ ValueError: if the supplied URI references an invalid or improperly
+ configured streamserver.
+ """
uri = uri.split(':', 1)
if len(uri) != 2:
raise ValueError('Invalid stream server URI [%s]' % (uri,))
@@ -121,36 +135,74 @@
client_cls = self._registry.get(protocol)
if not client_cls:
raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
- return client_cls._create(value)
+ return client_cls._create(value, **kwargs)
+
# Default (global) registry.
_default_registry = StreamProtocolRegistry()
-
-
-def create(uri):
- """Returns (StreamClient): A stream client for the specified URI.
-
- This uses the default StreamProtocolRegistry to instantiate a StreamClient
- for the specified URI.
-
- Args:
- uri: The streamserver URI.
-
- Raises:
- ValueError if the supplied URI references an invalid or improperly
- configured streamserver.
- """
- return _default_registry.create(uri)
+create = _default_registry.create
class StreamClient(object):
"""Abstract base class for a streamserver client.
"""
- class _DatagramStream(object):
+ class _StreamBase(object):
+ """ABC for StreamClient streams."""
+
+ def __init__(self, stream_client, params):
+ self._stream_client = stream_client
+ self._params = params
+
+ @property
+ def params(self):
+ """Returns (StreamParams): The stream parameters."""
+ return self._params
+
+ @property
+ def path(self):
+ """Returns (streamname.StreamPath): The stream path.
+
+ Raises:
+ ValueError: if the stream path is invalid, or if the stream prefix is
+ not defined in the client.
+ """
+ return self._stream_client.get_stream_path(self._params.name)
+
+ def get_viewer_url(self):
+ """Returns (str): The viewer URL for this stream.
+
+ Raises:
+ KeyError: if information needed to construct the URL is missing.
+ ValueError: if the stream prefix or name do not form a valid stream
+ path.
+ """
+ return self._stream_client.get_viewer_url(self._params.name)
+
+
+ class _BasicStream(_StreamBase):
+ """Wraps a basic file descriptor, offering "write" and "close"."""
+
+ def __init__(self, stream_client, params, fd):
+ super(StreamClient._BasicStream, self).__init__(stream_client, params)
+ self._fd = fd
+
+ @property
+ def fd(self):
+ return self._fd
+
+ def write(self, data):
+ return self._fd.write(data)
+
+ def close(self):
+ return self._fd.close()
+
+
+ class _DatagramStream(_StreamBase):
"""Wraps a stream object to write length-prefixed datagrams."""
- def __init__(self, fd):
+ def __init__(self, stream_client, params, fd):
+ super(StreamClient._DatagramStream, self).__init__(stream_client, params)
self._fd = fd
def send(self, data):
@@ -160,10 +212,76 @@
def close(self):
return self._fd.close()
- def __init__(self):
+
+ def __init__(self, project=None, prefix=None, coordinator_host=None):
+ """Constructs a new base StreamClient instance.
+
+ Args:
+ project (str or None): If not None, the name of the log stream project.
+ prefix (str or None): If not None, the log stream session prefix.
+ coordinator_host (str or None): If not None, the name of the Coordinator
+ host that this stream client is bound to. This will be used to
+ construct viewer URLs for generated streams.
+ """
+ self._project = project
+ self._prefix = prefix
+ self._coordinator_host = coordinator_host
+
self._name_lock = threading.Lock()
self._names = set()
+ @property
+ def project(self):
+ """Returns (str or None): The stream project, or None if not configured."""
+ return self._project
+
+ @property
+ def prefix(self):
+ """Returns (str or None): The stream prefix, or None if not configured."""
+ return self._prefix
+
+ @property
+ def coordinator_host(self):
+ """Returns (str or None): The coordinator host, or None if not configured.
+ """
+ return self._coordinator_host
+
+ def get_stream_path(self, name):
+ """Returns (streamname.StreamPath): The stream path.
+
+ Args:
+ name (str): The name of the stream.
+
+ Raises:
+ KeyError: if information needed to construct the path is missing.
+ ValueError: if the stream path is invalid, or if the stream prefix is
+ not defined in the client.
+ """
+ if not self._prefix:
+ raise KeyError('Stream prefix is not configured')
+ return streamname.StreamPath.make(self._prefix, name)
+
+ def get_viewer_url(self, name):
+ """Returns (str): The LogDog viewer URL for the named stream.
+
+ Args:
+ name (str): The name of the stream. This can also be a query glob.
+
+ Raises:
+ KeyError: if information needed to construct the URL is missing.
+ ValueError: if the stream prefix or name do not form a valid stream
+ path.
+ """
+ if not self._coordinator_host:
+ raise KeyError('Coordinator host is not configured')
+ if not self._project:
+ raise KeyError('Stream project is not configured')
+
+ return streamname.get_logdog_viewer_url(
+ self._coordinator_host,
+ self._project,
+ self.get_stream_path(name))
+
def _register_new_stream(self, name):
"""Registers a new stream name.
@@ -188,8 +306,8 @@
self._names.add(name)
@classmethod
- def _create(cls, value):
- """Returns (StreamClient): A new stream client connection.
+ def _create(cls, value, **kwargs):
+ """Returns (StreamClient): A new stream client instance.
Validates the streamserver parameters and creates a new StreamClient
instance that connects to them.
@@ -285,7 +403,7 @@
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
- return self.new_connection(params)
+ return self._BasicStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def binary(self, name, **kwargs):
@@ -338,7 +456,7 @@
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
- return self.new_connection(params)
+ return self._BasicStream(self, params, self.new_connection(params))
@contextlib.contextmanager
def datagram(self, name, **kwargs):
@@ -389,25 +507,25 @@
tags=tags,
tee=tee,
binary_file_extension=binary_file_extension)
- return self._DatagramStream(self.new_connection(params))
+ return self._DatagramStream(self, params, self.new_connection(params))
class _NamedPipeStreamClient(StreamClient):
"""A StreamClient implementation that connects to a Windows named pipe.
"""
- def __init__(self, name):
+ def __init__(self, name, **kwargs):
r"""Initializes a new Windows named pipe stream client.
Args:
name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
"""
- super(_NamedPipeStreamClient, self).__init__()
+ super(_NamedPipeStreamClient, self).__init__(**kwargs)
self._name = name
@classmethod
- def _create(cls, value):
- return cls(value)
+ def _create(cls, value, **kwargs):
+ return cls(value, **kwargs)
def _connect_raw(self):
return open(self._name, 'wb')
@@ -432,20 +550,20 @@
self._fd.close()
- def __init__(self, path):
+ def __init__(self, path, **kwargs):
"""Initializes a new UNIX domain socket stream client.
Args:
path (str): The path to the named UNIX domain socket.
"""
- super(_UnixDomainSocketStreamClient, self).__init__()
+ super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
self._path = path
@classmethod
- def _create(cls, value):
+ def _create(cls, value, **kwargs):
if not os.path.exists(value):
raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
- return cls(value)
+ return cls(value, **kwargs)
def _connect_raw(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
diff --git a/streamname.py b/streamname.py
index 8aaffb8..5cd3b31 100644
--- a/streamname.py
+++ b/streamname.py
@@ -2,9 +2,12 @@
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
+import collections
import re
import string
import types
+import urllib
+import urlparse
_ALNUM_CHARS = string.ascii_letters + string.digits
_SEGMENT_RE_BASE = r'[a-zA-Z0-9][a-zA-Z0-9:_\-.]*'
@@ -105,3 +108,81 @@
# Check additional middle-name characters:
return ch in ':_-./'
+
+
+class StreamPath(collections.namedtuple('_StreamPath', ('prefix', 'name'))):
+ """StreamPath is a full stream path.
+
+ This consists of both a stream prefix and a stream name.
+
+ When constructed with parse or make, the stream path must be completely valid.
+ However, invalid stream paths may be constructed by manually instantiation.
+ This can be useful for wildcard query values (e.g., "prefix='foo/*/bar/**'").
+ """
+
+ @classmethod
+ def make(cls, prefix, name):
+ """Returns (StreamPath): The validated StreamPath instance.
+
+ Args:
+ prefix (str): the prefix component
+ name (str): the name component
+
+ Raises:
+ ValueError: If path is not a full, valid stream path string.
+ """
+ inst = cls(prefix=prefix, name=name)
+ inst.validate()
+ return inst
+
+ @classmethod
+ def parse(cls, path):
+ """Returns (StreamPath): The parsed StreamPath instance.
+
+ Args:
+ path (str): the full stream path to parse.
+
+ Raises:
+ ValueError: If path is not a full, valid stream path string.
+ """
+ parts = path.split('/+/', 1)
+ if len(parts) != 2:
+ raise ValueError('Not a full stream path: [%s]' % (path,))
+ return cls.make(*parts)
+
+ def validate(self):
+ """Raises: ValueError if this is not a valid stream name."""
+ try:
+ validate_stream_name(self.prefix)
+ except ValueError as e:
+ raise ValueError('Invalid prefix component [%s]: %s' % (
+ self.prefix, e.message,))
+
+ try:
+ validate_stream_name(self.name)
+ except ValueError as e:
+ raise ValueError('Invalid name component [%s]: %s' % (
+ self.name, e.message,))
+
+ def __str__(self):
+ return '%s/+/%s' % (self.prefix, self.name)
+
+
+def get_logdog_viewer_url(host, project, *stream_paths):
+ """Returns (str): The LogDog viewer URL for the named stream(s).
+
+ Args:
+ host (str): The name of the Coordiantor host.
+ project (str): The project name.
+ stream_paths: A set of StreamPath instances for the stream paths to
+ generate the URL for.
+ """
+ return urlparse.urlunparse((
+ 'https', # Scheme
+ host, # netloc
+ 'v/', # path
+ '', # params
+ '&'.join(('s=%s' % (urllib.quote('%s/%s' % (project, path), safe=''))
+ for path in stream_paths)), # query
+ '', # fragment
+ ))
diff --git a/tests/bootstrap_test.py b/tests/bootstrap_test.py
index 0d5f968..21b0c67 100755
--- a/tests/bootstrap_test.py
+++ b/tests/bootstrap_test.py
@@ -12,7 +12,7 @@
os.pardir, os.pardir, os.pardir)))
sys.path.insert(0, ROOT_DIR)
-from libs.logdog import bootstrap
+from libs.logdog import bootstrap, stream
class BootstrapTestCase(unittest.TestCase):
@@ -22,6 +22,7 @@
bootstrap.ButlerBootstrap._ENV_PROJECT: 'test-project',
bootstrap.ButlerBootstrap._ENV_PREFIX: 'foo/bar',
bootstrap.ButlerBootstrap._ENV_STREAM_SERVER_PATH: 'fake:path',
+ bootstrap.ButlerBootstrap._ENV_COORDINATOR_HOST: 'example.appspot.com',
}
def testProbeSucceeds(self):
@@ -29,7 +30,8 @@
self.assertEqual(bs, bootstrap.ButlerBootstrap(
project='test-project',
prefix='foo/bar',
- streamserver_uri='fake:path'))
+ streamserver_uri='fake:path',
+ coordinator_host='example.appspot.com'))
def testProbeNoBootstrapRaisesError(self):
self.assertRaises(bootstrap.NotBootstrappedError,
@@ -50,6 +52,26 @@
self.assertRaises(bootstrap.NotBootstrappedError,
bootstrap.ButlerBootstrap.probe, env=self.env)
+ def testCreateStreamClient(self):
+ class TestStreamClient(stream.StreamClient):
+ @classmethod
+ def _create(cls, _value, **kwargs):
+ return cls(**kwargs)
+
+ def _connect_raw(self):
+ raise NotImplementedError()
+
+ reg = stream.StreamProtocolRegistry()
+ reg.register_protocol('test', TestStreamClient)
+ bs = bootstrap.ButlerBootstrap(
+ project='test-project',
+ prefix='foo/bar',
+ streamserver_uri='test:',
+ coordinator_host='example.appspot.com')
+ sc = bs.stream_client(reg=reg)
+ self.assertEqual(sc.prefix, 'foo/bar')
+ self.assertEqual(sc.coordinator_host, 'example.appspot.com')
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/stream_test.py b/tests/stream_test.py
index 98cbbda..008379f 100755
--- a/tests/stream_test.py
+++ b/tests/stream_test.py
@@ -14,7 +14,7 @@
os.pardir, os.pardir, os.pardir)))
sys.path.insert(0, ROOT_DIR)
-from libs.logdog import stream, varint
+from libs.logdog import stream, streamname, varint
class StreamParamsTestCase(unittest.TestCase):
@@ -93,14 +93,14 @@
return json.loads(header), data.read()
class _TestStreamClient(stream.StreamClient):
- def __init__(self, value):
- super(StreamClientTestCase._TestStreamClient, self).__init__()
+ def __init__(self, value, **kwargs):
+ super(StreamClientTestCase._TestStreamClient, self).__init__(**kwargs)
self.value = value
self.last_conn = None
@classmethod
- def _create(cls, value):
- return cls(value)
+ def _create(cls, value, **kwargs):
+ return cls(value, **kwargs)
def _connect_raw(self):
conn = StreamClientTestCase._TestStreamClientConnection()
@@ -128,8 +128,17 @@
self.assertEqual(client.value, 'value')
def testTextStream(self):
- client = self._registry.create('test:value')
+ client = self._registry.create('test:value',
+ project='test',
+ prefix='foo/bar',
+ coordinator_host='example.appspot.com')
with client.text('mystream') as fd:
+ self.assertEqual(
+ fd.path,
+ streamname.StreamPath(prefix='foo/bar', name='mystream'))
+ self.assertEqual(
+ fd.get_viewer_url(),
+ 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream')
fd.write('text\nstream\nlines')
conn = client.last_conn
@@ -144,6 +153,14 @@
with client.text('mystream', content_type='foo/bar',
tee=stream.StreamParams.TEE_STDOUT,
tags={'foo': 'bar', 'baz': 'qux'}) as fd:
+ self.assertEqual(
+ fd.params,
+ stream.StreamParams.make(
+ name='mystream',
+ type=stream.StreamParams.TEXT,
+ content_type='foo/bar',
+ tee=stream.StreamParams.TEE_STDOUT,
+ tags={'foo': 'bar', 'baz': 'qux'}))
fd.write('text!')
conn = client.last_conn
@@ -160,8 +177,17 @@
self.assertEqual(data, 'text!')
def testBinaryStream(self):
- client = self._registry.create('test:value')
+ client = self._registry.create('test:value',
+ project='test',
+ prefix='foo/bar',
+ coordinator_host='example.appspot.com')
with client.binary('mystream') as fd:
+ self.assertEqual(
+ fd.path,
+ streamname.StreamPath(prefix='foo/bar', name='mystream'))
+ self.assertEqual(
+ fd.get_viewer_url(),
+ 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream')
fd.write('\x60\x0d\xd0\x65')
conn = client.last_conn
@@ -172,8 +198,17 @@
self.assertEqual(data, '\x60\x0d\xd0\x65')
def testDatagramStream(self):
- client = self._registry.create('test:value')
+ client = self._registry.create('test:value',
+ project='test',
+ prefix='foo/bar',
+ coordinator_host='example.appspot.com')
with client.datagram('mystream') as fd:
+ self.assertEqual(
+ fd.path,
+ streamname.StreamPath(prefix='foo/bar', name='mystream'))
+ self.assertEqual(
+ fd.get_viewer_url(),
+ 'https://example.appspot.com/v/?s=test%2Ffoo%2Fbar%2F%2B%2Fmystream')
fd.send('datagram0')
fd.send('dg1')
fd.send('')
@@ -187,6 +222,35 @@
self.assertEqual(list(self._split_datagrams(data)),
['datagram0', 'dg1', '', 'dg3'])
+ def testStreamWithoutPrefixCannotGenerateUrls(self):
+ client = self._registry.create('test:value',
+ coordinator_host='example.appspot.com')
+ with client.text('mystream') as fd:
+ self.assertRaises(KeyError, fd.get_viewer_url)
+
+ def testStreamWithoutInvalidPrefixCannotGenerateUrls(self):
+ client = self._registry.create('test:value',
+ project='test',
+ prefix='!!! invalid !!!',
+ coordinator_host='example.appspot.com')
+ with client.text('mystream') as fd:
+ self.assertRaises(ValueError, fd.get_viewer_url)
+
+ def testStreamWithoutProjectCannotGenerateUrls(self):
+ client = self._registry.create('test:value',
+ prefix='foo/bar',
+ coordinator_host='example.appspot.com')
+ with client.text('mystream') as fd:
+ self.assertRaises(KeyError, fd.get_viewer_url)
+
+ def testStreamWithoutCoordinatorHostCannotGenerateUrls(self):
+ client = self._registry.create('test:value',
+ project='test',
+ prefix='foo/bar')
+ with client.text('mystream') as fd:
+ self.assertRaises(KeyError, fd.get_viewer_url)
+
+
def testCreatingDuplicateStreamNameRaisesValueError(self):
client = self._registry.create('test:value')
with client.text('mystream') as fd:
diff --git a/tests/streamname_test.py b/tests/streamname_test.py
index c84292a..25adfb6 100755
--- a/tests/streamname_test.py
+++ b/tests/streamname_test.py
@@ -62,5 +62,59 @@
self.assertRaises(ValueError, streamname.normalize, '_invalid_start_char')
+class StreamPathTestCase(unittest.TestCase):
+
+ def testParseValidPath(self):
+ for path in (
+ 'foo/+/bar',
+ 'foo/bar/+/baz',
+ ):
+ prefix, name = path.split('/+/')
+ parsed = streamname.StreamPath.parse(path)
+ self.assertEqual(
+ parsed,
+ streamname.StreamPath(prefix=prefix, name=name))
+ self.assertEqual(str(parsed), path)
+
+ def testParseInvalidValidPathRaisesValueError(self):
+ for path in (
+ '',
+ 'foo/+',
+ 'foo/+/',
+ '+/bar',
+ '/+/bar',
+ 'foo/bar',
+ '!!!invalid!!!/+/bar',
+ 'foo/+/!!!invalid!!!',
+ ):
+ with self.assertRaises(ValueError):
+ streamname.StreamPath.parse(path)
+
+ def testLogDogViewerUrl(self):
+ for project, path, url in (
+ ('test', streamname.StreamPath(prefix='foo', name='bar/baz'),
+ 'https://example.appspot.com/v/?s=test%2Ffoo%2F%2B%2Fbar%2Fbaz'),
+
+ ('test', streamname.StreamPath(prefix='foo', name='bar/**'),
+ 'https://example.appspot.com/v/?s=test%2Ffoo%2F%2B%2Fbar%2F%2A%2A'),
+
+ ('test', streamname.StreamPath(prefix='**', name='**'),
+ 'https://example.appspot.com/v/?s=test%2F%2A%2A%2F%2B%2F%2A%2A'),
+ ):
+ self.assertEqual(
+ streamname.get_logdog_viewer_url('example.appspot.com', project,
+ path),
+ url)
+
+ # Multiple streams.
+ self.assertEqual(
+ streamname.get_logdog_viewer_url('example.appspot.com', 'test',
+ streamname.StreamPath(prefix='foo', name='bar/baz'),
+ streamname.StreamPath(prefix='qux', name='**'),
+ ),
+ ('https://example.appspot.com/v/?s=test%2Ffoo%2F%2B%2Fbar%2Fbaz&'
+ 's=test%2Fqux%2F%2B%2F%2A%2A'))
+
+
if __name__ == '__main__':
unittest.main()