blob: 4fec065a4fdd1d368b7bf765c7113cfac2aeb46f [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fakelogs
import (
"bytes"
"sync"
"sync/atomic"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"go.chromium.org/luci/common/clock"
services "go.chromium.org/luci/logdog/api/endpoints/coordinator/services/v1"
"go.chromium.org/luci/logdog/api/logpb"
logdog_types "go.chromium.org/luci/logdog/common/types"
)
// Stream represents a single logdog stream.
//
// Each invocation of Write() will append a new LogEntry to the stream
// internally. For datagram streams, this means that each Write is a single
// datagram.
//
// Once the Stream is Close()'d it will be marked as complete.
type Stream struct {
c *Client
pth logdog_types.StreamPath
streamType logpb.StreamType
streamID string
secret []byte
prefixIndex *uint64
start time.Time
mutex sync.Mutex
streamIndex int64
sequence uint64
}
func (s *Stream) Write(bs []byte) (int, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
sIdx := s.streamIndex
s.streamIndex++
entry := &logpb.LogEntry{
TimeOffset: durationpb.New(s.start.Sub(clock.Now(s.c.ctx))),
PrefixIndex: atomic.AddUint64(s.prefixIndex, 1) - 1, // 0 based
StreamIndex: uint64(sIdx),
Sequence: s.sequence,
}
switch s.streamType {
case logpb.StreamType_TEXT:
rawLines := bytes.Split(bs, []byte("\n"))
lines := make([]*logpb.Text_Line, len(rawLines))
for i, line := range rawLines {
lines[i] = &logpb.Text_Line{
Value: append([]byte(nil), line...),
Delimiter: "\n",
}
}
entry.Content = &logpb.LogEntry_Text{Text: &logpb.Text{
Lines: lines,
}}
s.sequence += uint64(len(lines))
case logpb.StreamType_BINARY:
entry.Content = &logpb.LogEntry_Binary{Binary: &logpb.Binary{
Data: bs,
}}
s.sequence += uint64(len(bs))
case logpb.StreamType_DATAGRAM:
entry.Content = &logpb.LogEntry_Datagram{Datagram: &logpb.Datagram{
Data: bs,
}}
s.sequence++
}
s.c.storage.PutEntries(s.c.ctx, Project, s.pth, entry)
return len(bs), nil
}
// Close terminates this stream.
func (s *Stream) Close() error {
_, err := s.c.srvServ.TerminateStream(s.c.ctx, &services.TerminateStreamRequest{
Project: Project,
Id: s.streamID,
Secret: s.secret,
TerminalIndex: s.streamIndex - 1,
})
return err
}