LogDog: Properly emit Butler stream magic.

The LogDog Butler protocol was not properly implemented: it was missing
the magic byte sequence that must precede the exchange. Add that
sequence to the stream implementation.

BUG=chromium:628770
TEST=local
  - Ran with live Butler instance, observed successful stream
    negotiation.

Review-Url: https://codereview.chromium.org/2251413004
NOKEYCHECK=True
GitOrigin-RevId: 999aff1abdcaf37cfed37f7451ea0a2f54a7fdea
diff --git a/stream.py b/stream.py
index c34f8fa..3d60a6c 100644
--- a/stream.py
+++ b/stream.py
@@ -18,6 +18,13 @@
     ('name', 'type', 'content_type', 'tags', 'tee', 'binary_file_extension'))
 
 
+# Magic number at the beginning of a Butler stream
+#
+# See "ProtocolFrameHeaderMagic" in:
+# <luci-go>/logdog/client/butlerlib/streamproto
+BUTLER_MAGIC = 'BTLR1\x1e'
+
+
 class StreamParams(_StreamParamsBase):
   """Defines the set of parameters to apply to a new stream."""
 
@@ -222,6 +229,7 @@
     params_json = params.to_json()
 
     fd = self._connect_raw()
+    fd.write(BUTLER_MAGIC)
     varint.write_uvarint(fd, len(params_json))
     fd.write(params_json)
     return fd
@@ -411,6 +419,19 @@
   """A StreamClient implementation that uses a UNIX domain socket.
   """
 
+  class SocketFile(object):
+    """A write-only file-like object that writes to a UNIX socket."""
+
+    def __init__(self, fd):
+      self._fd = fd
+
+    def write(self, data):
+      self._fd.send(data)
+
+    def close(self):
+      self._fd.close()
+
+
   def __init__(self, path):
     """Initializes a new UNIX domain socket stream client.
 
@@ -429,6 +450,6 @@
   def _connect_raw(self):
     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
     sock.connect(self._path)
-    return sock
+    return self.SocketFile(sock)
 
 _default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
diff --git a/tests/stream_test.py b/tests/stream_test.py
index c05d05f..98cbbda 100755
--- a/tests/stream_test.py
+++ b/tests/stream_test.py
@@ -84,6 +84,10 @@
 
     def interpret(self):
       data = StringIO.StringIO(self.buffer.getvalue())
+      magic = data.read(len(stream.BUTLER_MAGIC))
+      if magic != stream.BUTLER_MAGIC:
+        raise ValueError('Invalid magic value ([%s] != [%s])' % (
+            magic, stream.BUTLER_MAGIC))
       length, _ = varint.read_uvarint(data)
       header = data.read(length)
       return json.loads(header), data.read()