blob: cb30f98d0191e4feee50ac443f511580e910267d [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package streamclient
import (
"context"
"io"
"strings"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/clockflag"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/client/butlerlib/streamproto"
"go.chromium.org/luci/logdog/common/types"
)
// This is populated via init() functions in this package.
var protocolRegistry = map[string]dialFactory{}
// dialFactory takes an implementation-specific address (e.g. `localhost:1234`
// or `/path/to/fifo`), and returns a `dialer` function which can be invoked to
// open a new, raw, connection to the butler.
type dialFactory func(addr string) (dialer, error)
// dialer is called by Client for every new stream created, and is expected to:
// - open a connection (as appropriate) to the dialer's destination
// - "handshake" with the opened connection (if needed)
// - return an appropriate writer type around the connection.
type dialer interface {
// if forProcess is true, this must do its best to return an *os.File.
//
// If the implementation fails to do so, then it will cause "os/exec" to
// allocate an extra goroutine and copy loop when this stream is attached to
// a command's stdout/stderr.
DialStream(forProcess bool, f streamproto.Flags) (io.WriteCloser, error)
DialDgramStream(f streamproto.Flags) (DatagramStream, error)
}
// Client is a client to a local LogDog Butler.
//
// The methods here allow you to open a stream (text, binary or datagram) which
// you can then use to send data to LogDog.
type Client struct {
dial dialer
ns types.StreamName
}
// New instantiates a new Client instance. This type of instance will be parsed
// from the supplied path string, which takes the form:
//
// <protocol>:<protocol-specific-spec>
//
// # Supported protocols
//
// Below is the list of all supported protocols:
//
// unix:/path/to/socket (POSIX only)
//
// Connects to a UNIX domain socket at "/path/to/socket".
// This is the preferred protocol for Linux/Mac.
//
// net.pipe:name (Windows only)
//
// Connects to a local Windows named pipe "\\.\pipe\name". This is the preferred
// protocol for Windows.
//
// null (All platforms)
//
// Sinks all connections and writes into a null data sink. Useful for tests, or
// for running programs which use logdog but you don't care about their logdog
// outputs.
//
// fake:$id
//
// Connects to an in-memory Fake created in this package by calling NewFake.
// The string `fake:$id` can be obtained by calling the Fake's StreamServerPath
// method.
func New(path string, namespace types.StreamName) (*Client, error) {
parts := strings.SplitN(path, ":", 2)
protocol, value := parts[0], ""
if len(parts) == 2 {
value = parts[1]
}
if f, ok := protocolRegistry[protocol]; ok {
dial, err := f(value)
if err != nil {
return nil, errors.Annotate(err, "opening path %q", path).Err()
}
return &Client{dial, namespace}, nil
}
return nil, errors.Reason("no protocol registered for [%s]", parts[0]).Err()
}
func (c *Client) mkOptions(ctx context.Context, name types.StreamName, opts []Option) (ret options, err error) {
ret.desc, ret.forProcess = RenderOptions(opts...)
ret.desc.Name = streamproto.StreamNameFlag(c.ns.Concat(name))
if time.Time(ret.desc.Timestamp).IsZero() {
ret.desc.Timestamp = clockflag.Time(clock.Now(ctx))
}
if ret.desc.ContentType == "" {
ret.desc.ContentType = string(ret.desc.Type.DefaultContentType())
}
if err = ret.desc.Descriptor().Validate(false); err != nil {
err = errors.Annotate(err, "invalid stream descriptor").Err()
}
return
}
// NewStream returns a new open stream to the butler.
//
// By default this is text-based (line-oriented); pass Binary for a binary stream.
//
// Text streams look for newlines to delimit log chunks.
// Binary streams use fixed size chunks to delimit log chunks.
func (c *Client) NewStream(ctx context.Context, name types.StreamName, opts ...Option) (io.WriteCloser, error) {
fullOpts, err := c.mkOptions(ctx, name, opts)
if err != nil {
return nil, err
}
ret, err := c.dial.DialStream(fullOpts.forProcess, fullOpts.desc)
return ret, errors.Annotate(err, "attempting to connect stream %q", name).Err()
}
// NewDatagramStream returns a new datagram stream to the butler.
//
// Datagram streams allow you to send messages without having to demark the
// separation between messages.
//
// NOTE: It is an error to pass ForProcess as an Option (see documentation on
// ForProcess for more detail).
func (c *Client) NewDatagramStream(ctx context.Context, name types.StreamName, opts ...Option) (DatagramStream, error) {
newOpts := make([]Option, 0, len(opts)+1)
newOpts = append(newOpts, opts...)
newOpts = append(newOpts, func(o *options) {
o.desc.Type = streamproto.StreamType(logpb.StreamType_DATAGRAM)
})
fullOpts, err := c.mkOptions(ctx, name, newOpts)
if err != nil {
return nil, err
}
if fullOpts.forProcess {
return nil, errors.Reason("cannot specify ForProcess on a datagram stream").Err()
}
ret, err := c.dial.DialDgramStream(fullOpts.desc)
return ret, errors.Annotate(err, "attempting to connect datagram stream %q", name).Err()
}
// GetNamespace returns the LOGDOG_NAMESPACE value associated with this Client.
//
// Safe to call on a nil client; will return an empty StreamName.
func (c *Client) GetNamespace() types.StreamName {
if c == nil {
return types.StreamName("")
}
return c.ns
}