blob: 75183b976634922df62d4e61fcbf790a8eb40327 [file] [log] [blame]
// Copyright 2019 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package rpc
import (
"container/list"
"errors"
"sync"
"chromiumos/tast/internal/protocol"
)
// remoteLoggingServer implements the tast.core.Logging gRPC service.
//
// It is provided by gRPC servers to let clients receive logs from gRPC services.
type remoteLoggingServer struct {
// mu protects fields.
mu sync.Mutex
// inbox is a channel to send logs from gRPC servers to. It is nil if there is
// no client. Sending to this channel will never block.
inbox chan<- *protocol.LogEntry
// lastSeq is the sequential ID attached to the last log entry.
lastSeq uint64
}
func newRemoteLoggingServer() *remoteLoggingServer {
return &remoteLoggingServer{}
}
func (s *remoteLoggingServer) ReadLogs(srv protocol.Logging_ReadLogsServer) error {
ctx := srv.Context()
var logs <-chan *protocol.LogEntry
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.inbox != nil {
return errors.New("concurrent ReadLogs calls are disallowed")
}
dst, src := make(chan *protocol.LogEntry), make(chan *protocol.LogEntry)
logs, s.inbox = dst, src
go bufferedRelay(dst, src)
return nil
}(); err != nil {
return err
}
defer func() {
s.mu.Lock()
close(s.inbox)
s.inbox = nil
s.mu.Unlock()
// Process all remaining logs before finishing the stream to
// transfer them to the client, as well as to stop the
// bufferedRelay goroutine.
for e := range logs {
// Send by best effort.
_ = srv.Send(&protocol.ReadLogsResponse{Entry: e})
}
}()
// Send an initial response to notify successful subscription.
if err := srv.Send(&protocol.ReadLogsResponse{}); err != nil {
return err
}
// Stop when the request stream is closed or broken.
finCh := make(chan struct{})
go func() {
defer close(finCh)
for {
if _, err := srv.Recv(); err != nil {
return
}
// Discard valid ReadLogsRequest.
}
}()
for {
select {
case e := <-logs:
if err := srv.Send(&protocol.ReadLogsResponse{Entry: e}); err != nil {
return err
}
case <-finCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
var _ protocol.LoggingServer = (*remoteLoggingServer)(nil)
// Log sends msg to connected clients if any.
// This method can be called on any goroutine.
func (s *remoteLoggingServer) Log(msg string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.inbox == nil {
return
}
s.lastSeq++
s.inbox <- &protocol.LogEntry{
Msg: msg,
Seq: s.lastSeq,
}
}
// LastSeq returns the sequence ID of the last log entry.
// It returns 0 if there is no log entry yet.
func (s *remoteLoggingServer) LastSeq() uint64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.lastSeq
}
// bufferedRelay receives logs from src and sends them to dst keeping the order.
// Even if dst is blocked, it keeps receiving from src by maintaining an internal
// buffer of logs.
// Once src is closed and all buffered logs are sent to dst, it closes dst and
// returns.
func bufferedRelay(dst chan<- *protocol.LogEntry, src <-chan *protocol.LogEntry) {
var buf list.List
loop:
for {
if buf.Len() == 0 {
msg, ok := <-src
if !ok {
break loop
}
buf.PushBack(msg)
}
select {
case msg, ok := <-src:
if !ok {
break loop
}
buf.PushBack(msg)
case dst <- buf.Front().Value.(*protocol.LogEntry):
buf.Remove(buf.Front())
}
}
for buf.Len() > 0 {
e := buf.Front()
dst <- e.Value.(*protocol.LogEntry)
buf.Remove(e)
}
close(dst)
}