blob: 77fed2f95a6a04ed8f02bc47cd203c429c9d1e73 [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package fakelogs
import (
logs_api ""
reg_api ""
services_api ""
reg_impl ""
services_impl ""
logs_impl ""
mem_storage ""
logdog_types ""
type prefixState struct {
index *uint64
secret []byte
streamsMu sync.Mutex
streams stringset.Set
// Project is the logdog project namespace that you should use with your
// fakelogs clients.
const Project = coordinatorTest.AllAccessProject
// Client implements the logs.LogsClient API, and also has some 'reach-around'
// APIs to insert stream data into the backend.
// The reach-around APIs are very primitive; they don't simulate butler-side
// constraints (i.e. no prefix secrets, no multi-stream bundling, etc.), however
// from the server-side they should present a sufficient surface to write
// testable server code.
// The Open*Stream methods take a singular optional flags object. If provided,
// a copy of this Flags object will be populated with the given logdog path and
// stream type. You can use this to give your streams a content type, tags, etc.
type Client struct {
ctx context.Context
env *coordinatorTest.Environment
storage *mem_storage.Storage
logsServ logs_api.LogsServer
regServ reg_api.RegistrationServer
srvServ services_api.ServicesServer
prefixMu sync.Mutex
prefixes map[string]*prefixState
// Get implements logs.Get.
func (c *Client) Get(_ context.Context, in *logs_api.GetRequest, _ ...grpc.CallOption) (*logs_api.GetResponse, error) {
realIn := proto.Clone(in).(*logs_api.GetRequest)
realIn.Project = coordinatorTest.AllAccessProject
return c.logsServ.Get(c.ctx, realIn)
// Tail implements logs.Tail.
func (c *Client) Tail(_ context.Context, in *logs_api.TailRequest, _ ...grpc.CallOption) (*logs_api.GetResponse, error) {
realIn := proto.Clone(in).(*logs_api.TailRequest)
realIn.Project = coordinatorTest.AllAccessProject
return c.logsServ.Tail(c.ctx, realIn)
// Query implements logs.Query.
func (c *Client) Query(_ context.Context, in *logs_api.QueryRequest, _ ...grpc.CallOption) (*logs_api.QueryResponse, error) {
realIn := proto.Clone(in).(*logs_api.QueryRequest)
realIn.Project = coordinatorTest.AllAccessProject
return c.logsServ.Query(c.ctx, realIn)
// OpenTextStream returns a stream for text (line delimited) data.
// - Lines are always delimited with "\n".
// - Write calls will always have an implied "\n" at the end, if the input
// data doesn't have one. If you want to do multiple Write calls to affect
// the same line, you'll need to buffer it yourself.
func (c *Client) OpenTextStream(prefix, path logdog_types.StreamName, flags ...*streamproto.Flags) (*Stream, error) {
return, prefix, path, flags...)
// OpenDatagramStream returns a stream for datagrams.
// Each Write will produce a single complete datagram in the stream.
func (c *Client) OpenDatagramStream(prefix, path logdog_types.StreamName, flags ...*streamproto.Flags) (*Stream, error) {
return, prefix, path, flags...)
// OpenBinaryStream returns a stream for binary data.
// Each Write will append to the binary data like a normal raw file.
func (c *Client) OpenBinaryStream(prefix, path logdog_types.StreamName, flags ...*streamproto.Flags) (*Stream, error) {
return, prefix, path, flags...)
func prepFlags(streamType logpb.StreamType, name logdog_types.StreamName, flags []*streamproto.Flags) *streamproto.Flags {
if _, ok := logpb.StreamType_name[int32(streamType)]; !ok {
panic("unknown logpb.StreamType: " + streamType.String())
typ := streamproto.StreamType(streamType)
switch len(flags) {
case 0:
return &streamproto.Flags{
Name: streamproto.StreamNameFlag(name),
Type: typ,
case 1:
newFlags := *flags[0]
newFlags.Name = streamproto.StreamNameFlag(name)
newFlags.Type = typ
return &newFlags
panic(fmt.Sprintf("`flags...` must can exactly 0 or 1 value, got %d", len(flags)))
func (c *Client) getPrefixState(prefix string) (*prefixState, error) {
defer c.prefixMu.Unlock()
state, seenPrefix := c.prefixes[prefix]
if !seenPrefix {
rsp, err := c.regServ.RegisterPrefix(c.ctx, &reg_api.RegisterPrefixRequest{
Project: coordinatorTest.AllAccessProject,
Prefix: prefix,
if err != nil {
return nil, errors.Annotate(err, "registering prefix").Err()
idx := uint64(0)
state = &prefixState{index: &idx, secret: rsp.Secret, streams: stringset.New(0)}
c.prefixes[prefix] = state
return state, nil
func (c *Client) open(streamType logpb.StreamType, prefix, path logdog_types.StreamName, flags ...*streamproto.Flags) (*Stream, error) {
flg := prepFlags(streamType, path, flags)
start := clock.Now(c.ctx)
flg.Timestamp = clockflag.Time(start)
lsd := flg.Descriptor()
lsd.Prefix = string(prefix)
data, err := proto.Marshal(lsd)
if err != nil {
panic("error serializing a proto: " + err.Error())
state, err := c.getPrefixState(string(prefix))
if err != nil {
return nil, errors.Annotate(err, "loading prefix state").Err()
defer state.streamsMu.Unlock()
if state.streams.Has(string(path)) {
return nil, errors.Reason("duplicate stream: %s/+/%s", prefix, path).Err()
rsp, err := c.srvServ.RegisterStream(c.ctx, &services_api.RegisterStreamRequest{
Project: coordinatorTest.AllAccessProject,
Secret: state.secret,
ProtoVersion: logpb.Version,
Desc: data,
TerminalIndex: -1,
if err != nil {
return nil, errors.Annotate(err, "registering stream").Err()
return &Stream{
c: c,
pth: lsd.Path(),
streamType: streamType,
prefixIndex: state.index,
streamID: rsp.Id,
secret: state.secret,
start: start,
}, nil
type storageclient struct {
// Close is implemented here so that we can return this coordinator.Storage
// client multiple times without worrying about the coordinator closing it.
func (s storageclient) Close() {}
// GetSignedURLs is implemented to fill out the coordinator.Storage interface.
func (s storageclient) GetSignedURLs(context.Context, *coordinator.URLSigningRequest) (*coordinator.URLSigningResponse, error) {
return nil, errors.New("NOT IMPLEMENTED")
// NewClient generates a new fake Client which can be used as a logs.LogsClient,
// and can also have its underlying stream data manipulated by the test.
// Functions taking context.Context will ignore it (i.e. they don't expect
// anything in the context).
// This client is attached to an in-memory datastore of its own and the real
// coordinator services are attached to that in-memory datastore. That means
// that the Client API SHOULD actually behave identically to the real
// coordinator (since it's the same code). Additionally, the 'Open' methods on
// the Client do the full Prefix/Stream registration process, and so should also
// behave like the real thing.
func NewClient() *Client {
ctx, env := coordinatorTest.Install(false)
env.AuthState.IdentityGroups = []string{"admin", "all", "auth", "services"}
ts := taskqueue.GetTestable(ctx)
storage := &mem_storage.Storage{}
env.Services.ST = func(*coordinator.LogStreamState) (coordinator.SigningStorage, error) {
return storageclient{storage}, nil
return &Client{
ctx: ctx, env: env,
logsServ: logs_impl.New(),
regServ: reg_impl.New(),
srvServ: services_impl.New(services_impl.ServerSettings{
NumQueues: 1,
storage: storage,
prefixes: map[string]*prefixState{},