// Copyright 2019 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
cycler_pb ""
var newline = fmt.Sprintf("\n")
// Runlog is a sink for jsonl logs produced by cycler. jsonl is simply
// individual json objects without newlines each placed on a single line
// It writes in approximate chunks, and can run over size. There are some
// interesting ways use of this logger can affect performance depending on
// how many logs are outstanding, response times from GS\Disk etc. Generally
// things happen in coroutines unless you've fallen way behind and then it
// will revert to a blocking mode. This may become an issue, but we don't
// want to either prematurely optimize or allow unbounded unshipped logs.
// Logs will pre appended with a timestamp and the uniq id of the cycler run.
// It currently allows both gs:// URLs as well as local file:// urls.
type Runlog struct {
// LogSink is the channel where producers can push their logs.
LogSink chan []byte
// Config is the RunLogConfiguration
Config *cycler_pb.RunLogConfiguration
// Stop channel signals the logging routines to stop.
Stop chan bool
// logBuffer is an internal array of bytes to buffer incoming messages to.
logBuffer [][]byte
// logBufferSize is the size of the log buffer.
logBufferSize int64
// client is the google storage client.
client *storage.Client
// destURL is the gs:// or file:// url desitnation for the logs.
dstURL *url.URL
// wg is the waitgroup for routines of the logger.
wg *sync.WaitGroup
// logShippers counts the number of routines currently engaged in shipping logs.
logShippers *semaphore.Weighted
// Init sets up the runlog, with json config bytes or nil if defaults should be used.
func (rl *Runlog) Init(config *cycler_pb.RunLogConfiguration, client *storage.Client, wg *sync.WaitGroup) {
ctx := context.Background()
// Initialize channels, waitgroups and semaphores.
rl.Config = config
rl.Stop = make(chan bool, 1)
rl.logBuffer = make([][]byte, 0)
rl.LogSink = make(chan []byte, rl.Config.ChannelSize)
rl.client = client
rl.wg = wg
rl.logShippers = semaphore.NewWeighted(rl.Config.MaxUnpersistedLogs)
// Parse / Validate the destination URL.
dstURL, err := url.Parse(rl.Config.DestinationUrl)
if err != nil {
glog.Errorf("DestinationURL failed to parse runlog config: %v\n", err)
rl.dstURL = dstURL
// Access test location writable by clobbering onto a special file.
tstMsg := "Written to validate logging location writablility, ignore please."
// Path has a leading / and we omit it.
tstPath := path.Join(rl.dstURL.Path[1:], "ignore-access-test")
glog.V(0).Infof("Test log location: %v", tstPath)
switch scheme := rl.dstURL.Scheme; scheme {
case "gs":
bkt := rl.client.Bucket(dstURL.Host)
_, err = bkt.Attrs(ctx)
if err != nil {
glog.Errorf("error writing logs, bucket couldn't be retrieved: %v", err)
obj := bkt.Object(tstPath)
f := obj.NewWriter(ctx)
_, err := f.Write([]byte(tstMsg))
if err != nil {
glog.Errorf("error writing logs, write failed: %v", err)
case "file":
err = os.MkdirAll(path.Dir(tstPath), os.ModePerm)
if err != nil {
glog.Errorf("log directory apparently uncreatable: %v\n", err)
f, err := os.Create(tstPath)
if err != nil {
glog.Errorf("log directory apparently unwritable: %v\n", err)
_, err = f.WriteString(tstMsg)
if err != nil {
glog.Errorf("log directory apparently unwritable: %v\n", err)
// Best effort delete.
glog.Errorf("unexpected log destination uri (accepts file:// and gs://")
go rl.loggingCoordinator()
func (rl *Runlog) loggingCoordinator() {
defer func() {
if r := recover(); r != nil {
glog.Errorf("will not recover from panic in loggingCoordinator as we will _not_ run unlogged: %v", r)
// Flag your exit for waiters.
for {
select {
// New log incoming.
case log := <-rl.LogSink:
incomingSize := int64(len(log))
if rl.logBufferSize+incomingSize >= rl.Config.ChunkSizeBytes {
rl.logBufferSize += incomingSize
rl.logBuffer = append(rl.logBuffer, log)
// Stop signal given.
case <-rl.Stop:
// Put all outstanding logs on rl.LogSink (expand to fit).
for i := 0; i < len(rl.LogSink); i++ {
incomingLog := <-rl.LogSink
rl.logBufferSize += int64(len(incomingLog))
rl.logBuffer = append(rl.logBuffer, incomingLog)
// Flush.
// Collect on your log shippers by trying to gobble entire semaphore capacity.
var sleepTime time.Duration = 2 * time.Second
var n int64
for n = 0; n < rl.Config.PersistRetries; n++ {
if rl.logShippers.TryAcquire(rl.Config.MaxUnpersistedLogs) {
glog.Infof("Logs are still being persisted, waiting %v...", sleepTime)
sleepTime <<= 1
glog.Error("Returned with oustanding unpersisted logs!")
func (rl *Runlog) flush() error {
ctx := context.Background()
// No logs, no flush.
if rl.logBufferSize == 0 {
return nil
// Combine messages into a single byte[] called data and 1 byte per newline.
dataSize := rl.logBufferSize + int64(len(rl.logBuffer))
data := make([]byte, dataSize)
nWritten := 0
for i := 0; i < len(rl.logBuffer); i++ {
llen := len(rl.logBuffer[i])
copy(data[nWritten:], rl.logBuffer[i])
nWritten += llen
copy(data[nWritten:], newline)
// data is transfered to the goroutine, do not access after this point.
rl.logShippers.Acquire(ctx, 1)
go rl.compressAndPersistLog(&data)
// Clear rl.logBuffer & reset logBufferSize.
rl.logBuffer = make([][]byte, 0)
rl.logBufferSize = 0
return nil
// Compress some data and persist it. Called as go routine. Retries persist.
// This will be executed as a coroutine and should only access rl for configuration.
func (rl Runlog) compressAndPersistLog(data *[]byte) {
defer func() {
compressedBytes, err := compressBytes(data)
if err != nil {
glog.Errorf("log upload failed on compression, not likely to succeed on retry, aborting.")
// TODO(engeg@) reevaluate if we want to end the process here. We certainly
// want our logs to be persisted...
var sleepTime time.Duration = 2 * time.Second
var n int64
for n = 0; n < rl.Config.PersistRetries; n++ {
if err := rl.persistLog(context.Background(), compressedBytes); err != nil {
glog.V(0).Infof("retrying failed upload %v: %v\nsleeping for %v...", n, err, sleepTime)
sleepTime <<= 1
} else {
// We're persisted :)
// persistLog handles writing/uploading the log buffer whos ownership was given to it.
// This will be executed as a coroutine and should only access rl for configuration.
func (rl Runlog) persistLog(ctx context.Context, compressedBytes *bytes.Buffer) error {
logName := rl.createLogName()
switch scheme := rl.dstURL.Scheme; scheme {
case "gs":
// Path has a leading / and we omit it.
gspath := path.Join(rl.dstURL.Path[1:], logName)
obj := rl.client.Bucket(rl.dstURL.Host).Object(gspath)
f := obj.NewWriter(ctx)
n, err := f.Write(compressedBytes.Bytes())
if n != compressedBytes.Len() || err != nil {
glog.Errorf("error writing logs, write failed: %v", err)
return err
glog.V(2).Infof("log uploaded to: gs://%v/%v", rl.dstURL.Host, gspath)
case "file":
// Attempt to create log dir up to file.
fullPath := path.Join(rl.dstURL.Path, logName)
err := os.MkdirAll(path.Dir(fullPath), os.ModePerm)
if err != nil {
glog.Errorf("log directory apparently uncreatable: %v\n", err)
return err
f, err := os.Create(fullPath)
if err != nil {
glog.Errorf("error writing logs, couldn't open file: %v", err)
return err
n, err := f.Write(compressedBytes.Bytes())
if n != compressedBytes.Len() || err != nil {
glog.Errorf("error writing logs, write failed: %v", err)
return err
glog.V(1).Infof("log written to: %v", fullPath)
return nil
// Generate a timestamped unique name for each log. Time is log written time.
func (rl Runlog) createLogName() string {
timePart := time.Now().UTC().Format(time.RFC3339Nano)
return fmt.Sprintf("%v/%v.jsonl.gz", cyclerInvocationID, timePart)