blob: 7e04da4a662c06af6153a6a91e39d171083c956a [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package cloudtail
import (
var (
droppedCounter = metric.NewCounter("cloudtail/pipe_drops",
"Log entries read from a pipe and dropped because the sender couldn't keep up",
// PipeReader reads lines from io.Reader, parses and pushes them to the buffer.
type PipeReader struct {
// ClientID identifies the log stream for monitoring.
ClientID ClientID
// Source is a reader to read logs from.
Source io.Reader
// PushBuffer knows how to forward log entries to the client.
PushBuffer PushBuffer
// Parser converts text lines into log entries, default is StdParser().
Parser LogParser
// LineBufferSize defines how many log lines to accumulate (if the flush is
// blocked) before starting to drop them.
// Default is 0, which means to never drop lines (stop reading from the
// source instead).
LineBufferSize int
// OnEOF is called immediately when EOF (or reading error) is encountered.
// Note that this happens before 'Run' returns, because 'Run' waits for data
// to be pushed to the PushBuffer.
OnEOF func()
// OnLineDropped is called whenever a line gets dropped due to full buffer.
OnLineDropped func()
// Run reads from the reader until EOF or until the context is closed.
// Returns error only if reading from io.Reader fails. On EOF or on context
// cancellation returns nil. Always returns same error as was sent to OnEOF.
// Waits for all read data to be pushed to PushBuffer.
func (r *PipeReader) Run(ctx context.Context) error {
source := make(chan string, r.LineBufferSize)
result := make(chan error, 1)
go func() {
scanner := bufio.NewScanner(r.Source)
droppedTotal := 0
droppedReport := 0
nextDropReport := clock.Now(ctx).Add(time.Second)
defer func() {
if r.OnEOF != nil {
err := scanner.Err()
if err == nil && droppedTotal != 0 {
err = fmt.Errorf("%d lines in total were dropped due to insufficient line buffer size", droppedTotal)
result <- err
logDropped := func(force bool) {
if force || clock.Now(ctx).After(nextDropReport) {
if droppedReport != 0 {
logging.Warningf(ctx, "%d lines were dropped due to insufficient line buffer size", droppedReport)
droppedReport = 0
nextDropReport = clock.Now(ctx).Add(time.Second)
for scanner.Scan() {
if r.LineBufferSize == 0 {
select {
case <-ctx.Done():
case source <- scanner.Text(): // Blocking.
} else {
select {
case <-ctx.Done():
case source <- scanner.Text():
// The buffer is full - drop this log line rather than blocking the pipe.
droppedCounter.Add(ctx, 1, r.ClientID.LogID, r.ClientID.ResourceType, r.ClientID.ResourceID)
if r.OnLineDropped != nil {
drainChannel(ctx, source, r.Parser, r.PushBuffer)
return <-result