LogDog: Properly emit Butler stream magic.
The LogDog Butler protocol was not properly implemented: it was missing
the magic byte sequence that must precede the exchange. Add that
sequence to the stream implementation.
BUG=chromium:628770
TEST=local
- Ran with live Butler instance, observed successful stream
negotiation.
Review-Url: https://codereview.chromium.org/2251413004
NOKEYCHECK=True
GitOrigin-RevId: 999aff1abdcaf37cfed37f7451ea0a2f54a7fdea
diff --git a/stream.py b/stream.py
index c34f8fa..3d60a6c 100644
--- a/stream.py
+++ b/stream.py
@@ -18,6 +18,13 @@
('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
+# Magic number at the beginning of a Butler stream
+#
+# See "ProtocolFrameHeaderMagic" in:
+# <luci-go>/logdog/client/butlerlib/streamproto
+BUTLER_MAGIC = 'BTLR1\x1e'
+
+
class StreamParams(_StreamParamsBase):
"""Defines the set of parameters to apply to a new stream."""
@@ -222,6 +229,7 @@
params_json = params.to_json()
fd = self._connect_raw()
+ fd.write(BUTLER_MAGIC)
varint.write_uvarint(fd, len(params_json))
fd.write(params_json)
return fd
@@ -411,6 +419,19 @@
"""A StreamClient implementation that uses a UNIX domain socket.
"""
+ class SocketFile(object):
+ """A write-only file-like object that writes to a UNIX socket."""
+
+ def __init__(self, fd):
+ self._fd = fd
+
+ def write(self, data):
+ self._fd.send(data)
+
+ def close(self):
+ self._fd.close()
+
+
def __init__(self, path):
"""Initializes a new UNIX domain socket stream client.
@@ -429,6 +450,6 @@
def _connect_raw(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self._path)
- return sock
+ return self.SocketFile(sock)
_default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
diff --git a/tests/stream_test.py b/tests/stream_test.py
index c05d05f..98cbbda 100755
--- a/tests/stream_test.py
+++ b/tests/stream_test.py
@@ -84,6 +84,10 @@
def interpret(self):
data = StringIO.StringIO(self.buffer.getvalue())
+ magic = data.read(len(stream.BUTLER_MAGIC))
+ if magic != stream.BUTLER_MAGIC:
+ raise ValueError('Invalid magic value ([%s] != [%s])' % (
+ magic, stream.BUTLER_MAGIC))
length, _ = varint.read_uvarint(data)
header = data.read(length)
return json.loads(header), data.read()