blob: 8e3919d93f65ec5e5d8509a5234b96f1d6a2f5a3 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package streamserver
import (
type listener interface {
Accept() (io.ReadCloser, error)
Addr() net.Addr
func mkListener(l net.Listener) listener {
return &accepterImpl{l}
type accepterImpl struct {
func (a *accepterImpl) Accept() (io.ReadCloser, error) {
conn, err := a.Listener.Accept()
if err != nil {
return nil, err
return conn.(io.ReadCloser), nil
// streamParams are parameters representing a negotiated stream ready to
// deliver.
type streamParams struct {
// The stream's ReadCloser connection.
rc io.ReadCloser
// Negotiated stream descriptor.
descriptor *logpb.LogStreamDescriptor
// StreamServer implements a Logdog Butler listener.
// This holds a named pipe listener (either a unix domain socket or windows
// Named Pipe, depending on the platform). Local processes on the machine may
// connect to this named pipe to stream data to Logdog.
type StreamServer struct {
log logging.Logger
// address is the string returned by the Address method.
address string
// gen is a generator function that is called to produce the stream server's
// Listener. On success, it returns the instantiated Listener, which will be
// closed when the stream server is closed.
gen func() (listener, error)
l listener
laddr string
streamParamsC chan *streamParams
closedC chan struct{}
acceptFinishedC chan struct{}
// closed is an atomically-protected integer. If its value is 1, the stream
// server has been closed.
closed int32
// discardC is a testing channel. If not nil, failed client connections will
// be written here.
discardC chan *streamClient
func (s *StreamServer) String() string {
return fmt.Sprintf("%T(%s)", s, s.laddr)
// Address returns a string that can be used by the "streamclient" package to
// return a client for this StreamServer.
// Full package is:
func (s *StreamServer) Address() string {
return s.address
// Listen performs initial connection and setup and runs a goroutine to listen
// for new connections.
func (s *StreamServer) Listen() error {
// Create a listener (OS-specific).
var err error
s.l, err = s.gen()
if err != nil {
return err
s.laddr = s.l.Addr().String()
s.streamParamsC = make(chan *streamParams)
s.closedC = make(chan struct{})
s.acceptFinishedC = make(chan struct{})
// Poll the Listener for new connections in a separate goroutine. This will
// terminate when the server is Close()d.
go s.serve()
return nil
// Next blocks, returning a new Stream when one is available. If the stream
// server has closed, this will return nil.
func (s *StreamServer) Next() (io.ReadCloser, *logpb.LogStreamDescriptor) {
if streamParams, ok := <-s.streamParamsC; ok {
return streamParams.rc, streamParams.descriptor
return nil, nil
// Close terminates the stream server, cleaning up resources.
func (s *StreamServer) Close() {
if s.l == nil {
panic("server is not currently serving")
// Mark that we've been closed.
atomic.StoreInt32(&s.closed, 1)
// Close our Listener. This will cause our 'Accept' goroutine to terminate.
// Close our streamParamsC to signal that we're closed. Any blocking Next will
// drain the channel, then return with nil.
s.l = nil
// Continuously pulls connections from the supplied Listener and returns them as
// connections to streamParamsC for consumption by Next().
func (s *StreamServer) serve() {
defer close(s.acceptFinishedC)
nextID := 0
clientWG := sync.WaitGroup{}
for {
s.log.Debugf("Beginning Accept() loop cycle.")
conn, err := s.l.Accept()
if err != nil {
if atomic.LoadInt32(&s.closed) != 0 {
s.log.Debugf("Error during Accept() encountered (closed): %s", err)
} else {
s.log.Errorf("Error during Accept(): %s", err)
// Spawn a goroutine to handle this connection. This goroutine will take
// ownership of the connection, closing it as appropriate.
client := &streamClient{
log: s.log,
closedC: s.closedC,
id: nextID,
conn: conn,
go func() {
defer clientWG.Done()
var params *streamParams
paniccatcher.Do(func() {
var err error
params, err = client.handle()
if err != nil {
s.log.Errorf("Failed to negotitate stream client: %s", err)
params = nil
}, func(p *paniccatcher.Panic) {
s.log.Errorf("Panic during client handshake:\n%s\n%s", p.Reason, p.Stack)
params = nil
// Did the negotiation fail?
if params != nil {
// Punt the client to our Next function. If we close while waiting, close
// the Client.
select {
case s.streamParamsC <- params:
case <-s.closedC:
s.log.Warningf("Closed with client in hand. Cleaning up client.")
params = nil
// (Testing) write failed connections to discardC if configured.
if params == nil && s.discardC != nil {
s.discardC <- client
// Wait for client connections to finish.
s.log.Infof("Exiting serve loop. Served %d connections", nextID)
// streamClient manages a single client connection.
type streamClient struct {
log logging.Logger
closedC chan struct{} // Signal channel to indicate that the server has closed.
id int // Client ID, used for debugging correlation.
conn io.ReadCloser // The underlying client connection.
// decoupleMu is used to ensure that decoupleConn is called at most one time.
decoupleMu sync.Mutex
// handle initializes the supplied connection. On success, it will prepare
// the connection as a Butler stream and pass its parameters through the
// supplied channel.
// On failure, the connection will be closed.
// This method returns the negotiated stream parameters, or nil if the
// negotiation failed and the client was closed.
func (c *streamClient) handle() (*streamParams, error) {
c.log.Infof("Received new connection.")
// Close the connection as a failsafe. If we have already decoupled it, this
// will end up being a no-op.
defer c.closeConn()
// Perform our handshake. We pass the connection explicitly into this method
// because it can get decoupled during operation.
p, err := c.handshake()
if err != nil {
return nil, err
// Successful handshake. Forward our properties.
return &streamParams{c.decoupleConn(), p}, nil
// handshake handles the handshaking and registration of a single connection. If
// the connection successfully handshakes, it will be registered as a stream and
// supplied to the local streamParamsC; otherwise, it will be closed.
// The client connection opens with a handshake protocol. Once complete, the
// connection itself becomes the stream.
func (c *streamClient) handshake() (*logpb.LogStreamDescriptor, error) {
c.log.Infof("Beginning handshake.")
flags := &streamproto.Flags{}
if err := flags.FromHandshake(c.conn); err != nil {
return nil, err
return flags.Descriptor(), nil
// Closes the underlying connection.
func (c *streamClient) closeConn() {
conn := c.decoupleConn()
if conn != nil {
if err := conn.Close(); err != nil {
c.log.Warningf("Error on connection close: %s", err)
// Decouples the active connection, returning it and setting the connection to
// nil.
func (c *streamClient) decoupleConn() (conn io.ReadCloser) {
defer c.decoupleMu.Unlock()
conn, c.conn = c.conn, nil
// New creates a new StreamServer.
// This has a platform-specific implementation.
// On Mac/Linux, this makes a Unix Domain Socket based server, and `path` is
// required to be the absolute path to a filesystem location which is suitable
// for a domain socket. The location will be removed prior to, and after, use.
// On Windows, this makes a Named Pipe based server. `path` must be a valid UNC
// path component, and will be used to listen on the named pipe
// "\\.\$path.$PID.$UNIQUE" where $PID is the process id of the current process
// and $UNIQUE is a monotonically increasing value within this process' memory.
// `path` may be empty and a unique name will be chosen for you.
func New(ctx context.Context, path string) (*StreamServer, error) {
return newStreamServer(ctx, path)