blob: cb5415145f61784a2fde639ea84a2a70c19ff9d9 [file] [log] [blame]
// Copyright 2019 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package rpc
import (
"context"
"io"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/tast/core/errors"
"go.chromium.org/tast/core/internal/logging"
"go.chromium.org/tast/core/internal/protocol"
)
// remoteLoggingClient is a client of the tast.core.Logging gRPC service that
// calls a logging function for every received ReadLogsResponse.
//
// It is used by gRPC clients to receive logs from gRPC services.
type remoteLoggingClient struct {
stream protocol.Logging_ReadLogsClient
doneCh chan struct{} // closed when the streaming RPC call is done
runErr error // set when doneCh is closed
mu sync.Mutex // protects other fields
lastSeq uint64 // last observed sequence ID
waiters []chan<- struct{} // channels waiting for logs
}
// newRemoteLoggingClient constructs remoteLoggingClient using conn. logger is
// called for every received ReadLogsResponse.
func newRemoteLoggingClient(ctx context.Context, conn *grpc.ClientConn) (*remoteLoggingClient, error) {
cl := protocol.NewLoggingClient(conn)
stream, err := cl.ReadLogs(ctx)
if err != nil {
return nil, err
}
// Read the initial response to check success and make sure we have been
// subscribed to logs.
if _, err := stream.Recv(); err != nil {
return nil, err
}
l := &remoteLoggingClient{
stream: stream,
doneCh: make(chan struct{}),
}
// Start a goroutine to call logger for every received ReadLogsResponse.
go l.runBackground(ctx)
return l, nil
}
// Wait waits until an entry of the specified sequence ID is received.
func (l *remoteLoggingClient) Wait(ctx context.Context, seq uint64) error {
for {
l.mu.Lock()
if l.lastSeq >= seq {
l.mu.Unlock()
return nil
}
// Wait for next entry.
waiter := make(chan struct{})
l.waiters = append(l.waiters, waiter)
l.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-l.doneCh:
if l.runErr == nil {
return errors.New("logging stream closed")
}
return errors.Wrap(l.runErr, "logging stream closed")
case <-waiter:
}
}
}
// Close finishes the remote logging.
func (l *remoteLoggingClient) Close() error {
l.stream.CloseSend()
<-l.doneCh
if l.runErr != nil {
return errors.Wrap(l.runErr, "remote logging background routine failed")
}
return nil
}
func (l *remoteLoggingClient) runBackground(ctx context.Context) {
defer close(l.doneCh)
l.runErr = func() error {
for {
res, err := l.stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
grpcStatus, ok := status.FromError(err)
if ok && grpcStatus.Code() == codes.Unavailable {
logging.Infof(ctx, "Remote Logging interrupted with following error: %v", grpcStatus)
return nil
}
return err
}
logging.Info(ctx, res.Entry.GetMsg())
l.mu.Lock()
l.lastSeq = res.Entry.GetSeq()
for _, w := range l.waiters {
close(w)
}
l.waiters = nil
l.mu.Unlock()
}
}()
}