| // Copyright (c) 2013 ActiveState Software Inc. All rights reserved. |
| |
| package tail |
| |
| import ( |
| "bufio" |
| "fmt" |
| "github.com/ActiveState/tail/ratelimiter" |
| "github.com/ActiveState/tail/util" |
| "github.com/ActiveState/tail/watch" |
| "gopkg.in/tomb.v1" |
| "io" |
| "io/ioutil" |
| "log" |
| "os" |
| "strings" |
| "time" |
| ) |
| |
| var ( |
| ErrStop = fmt.Errorf("tail should now stop") |
| ) |
| |
| type Line struct { |
| Text string |
| Time time.Time |
| Err error // Error from tail |
| } |
| |
| // NewLine returns a Line with present time. |
| func NewLine(text string) *Line { |
| return &Line{text, time.Now(), nil} |
| } |
| |
| // SeekInfo represents arguments to `os.Seek` |
| type SeekInfo struct { |
| Offset int64 |
| Whence int // os.SEEK_* |
| } |
| |
| // Config is used to specify how a file must be tailed. |
| type Config struct { |
| // File-specifc |
| Location *SeekInfo // Seek to this location before tailing |
| ReOpen bool // Reopen recreated files (tail -F) |
| MustExist bool // Fail early if the file does not exist |
| Poll bool // Poll for file changes instead of using inotify |
| RateLimiter *ratelimiter.LeakyBucket |
| |
| // Generic IO |
| Follow bool // Continue looking for new lines (tail -f) |
| MaxLineSize int // If non-zero, split longer lines into multiple lines |
| |
| // Logger, when nil, is set to tail.DefaultLogger |
| // To disable logging: set field to tail.DiscardingLogger |
| Logger *log.Logger |
| } |
| |
| type Tail struct { |
| Filename string |
| Lines chan *Line |
| Config |
| |
| file *os.File |
| reader *bufio.Reader |
| watcher watch.FileWatcher |
| changes *watch.FileChanges |
| |
| tomb.Tomb // provides: Done, Kill, Dying |
| } |
| |
| var ( |
| // DefaultLogger is used when Config.Logger == nil |
| DefaultLogger = log.New(os.Stderr, "", log.LstdFlags) |
| // DiscardingLogger can be used to disable logging output |
| DiscardingLogger = log.New(ioutil.Discard, "", 0) |
| ) |
| |
| // TailFile begins tailing the file. Output stream is made available |
| // via the `Tail.Lines` channel. To handle errors during tailing, |
| // invoke the `Wait` or `Err` method after finishing reading from the |
| // `Lines` channel. |
| func TailFile(filename string, config Config) (*Tail, error) { |
| if config.ReOpen && !config.Follow { |
| util.Fatal("cannot set ReOpen without Follow.") |
| } |
| |
| t := &Tail{ |
| Filename: filename, |
| Lines: make(chan *Line), |
| Config: config, |
| } |
| |
| // when Logger was not specified in config, use default logger |
| if t.Logger == nil { |
| t.Logger = log.New(os.Stderr, "", log.LstdFlags) |
| } |
| |
| if t.Poll { |
| t.watcher = watch.NewPollingFileWatcher(filename) |
| } else { |
| t.watcher = watch.NewInotifyFileWatcher(filename) |
| } |
| |
| if t.MustExist { |
| var err error |
| t.file, err = OpenFile(t.Filename) |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| go t.tailFileSync() |
| |
| return t, nil |
| } |
| |
| // Return the file's current position, like stdio's ftell(). |
| // But this value is not very accurate. |
| // it may readed one line in the chan(tail.Lines), |
| // so it may lost one line. |
| func (tail *Tail) Tell() (offset int64, err error) { |
| if tail.file == nil { |
| return |
| } |
| offset, err = tail.file.Seek(0, os.SEEK_CUR) |
| if err == nil { |
| offset -= int64(tail.reader.Buffered()) |
| } |
| return |
| } |
| |
| // Stop stops the tailing activity. |
| func (tail *Tail) Stop() error { |
| tail.Kill(nil) |
| return tail.Wait() |
| } |
| |
| func (tail *Tail) close() { |
| close(tail.Lines) |
| if tail.file != nil { |
| tail.file.Close() |
| } |
| } |
| |
| func (tail *Tail) reopen() error { |
| if tail.file != nil { |
| tail.file.Close() |
| } |
| for { |
| var err error |
| tail.file, err = OpenFile(tail.Filename) |
| if err != nil { |
| if os.IsNotExist(err) { |
| tail.Logger.Printf("Waiting for %s to appear...", tail.Filename) |
| if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil { |
| if err == tomb.ErrDying { |
| return err |
| } |
| return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err) |
| } |
| continue |
| } |
| return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err) |
| } |
| break |
| } |
| return nil |
| } |
| |
| func (tail *Tail) readLine() (string, error) { |
| line, err := tail.reader.ReadString('\n') |
| if err != nil { |
| // Note ReadString "returns the data read before the error" in |
| // case of an error, including EOF, so we return it as is. The |
| // caller is expected to process it if err is EOF. |
| return line, err |
| } |
| |
| line = strings.TrimRight(line, "\n") |
| |
| return line, err |
| } |
| |
| func (tail *Tail) tailFileSync() { |
| defer tail.Done() |
| defer tail.close() |
| |
| if !tail.MustExist { |
| // deferred first open. |
| err := tail.reopen() |
| if err != nil { |
| if err != tomb.ErrDying { |
| tail.Kill(err) |
| } |
| return |
| } |
| } |
| |
| // Seek to requested location on first open of the file. |
| if tail.Location != nil { |
| _, err := tail.file.Seek(tail.Location.Offset, tail.Location.Whence) |
| tail.Logger.Printf("Seeked %s - %+v\n", tail.Filename, tail.Location) |
| if err != nil { |
| tail.Killf("Seek error on %s: %s", tail.Filename, err) |
| return |
| } |
| } |
| |
| tail.openReader() |
| |
| // Read line by line. |
| for { |
| // grab the position in case we need to back up in the event of a half-line |
| offset, err := tail.Tell() |
| if err != nil { |
| tail.Kill(err) |
| return |
| } |
| |
| line, err := tail.readLine() |
| |
| // Process `line` even if err is EOF. |
| if err == nil { |
| cooloff := !tail.sendLine(line) |
| if cooloff { |
| // Wait a second before seeking till the end of |
| // file when rate limit is reached. |
| msg := fmt.Sprintf( |
| "Too much log activity; waiting a second " + |
| "before resuming tailing") |
| tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)} |
| select { |
| case <-time.After(time.Second): |
| case <-tail.Dying(): |
| return |
| } |
| err = tail.seekEnd() |
| if err != nil { |
| tail.Kill(err) |
| return |
| } |
| } |
| } else if err == io.EOF { |
| if !tail.Follow { |
| if line != "" { |
| tail.sendLine(line) |
| } |
| return |
| } |
| |
| if tail.Follow && line != "" { |
| // this has the potential to never return the last line if |
| // it's not followed by a newline; seems a fair trade here |
| err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0}) |
| if err != nil { |
| tail.Kill(err) |
| return |
| } |
| } |
| |
| // When EOF is reached, wait for more data to become |
| // available. Wait strategy is based on the `tail.watcher` |
| // implementation (inotify or polling). |
| err := tail.waitForChanges() |
| if err != nil { |
| if err != ErrStop { |
| tail.Kill(err) |
| } |
| return |
| } |
| } else { |
| // non-EOF error |
| tail.Killf("Error reading %s: %s", tail.Filename, err) |
| return |
| } |
| |
| select { |
| case <-tail.Dying(): |
| return |
| default: |
| } |
| } |
| } |
| |
| // waitForChanges waits until the file has been appended, deleted, |
| // moved or truncated. When moved or deleted - the file will be |
| // reopened if ReOpen is true. Truncated files are always reopened. |
| func (tail *Tail) waitForChanges() error { |
| if tail.changes == nil { |
| st, err := tail.file.Stat() |
| if err != nil { |
| return err |
| } |
| tail.changes = tail.watcher.ChangeEvents(&tail.Tomb, st) |
| } |
| |
| select { |
| case <-tail.changes.Modified: |
| return nil |
| case <-tail.changes.Deleted: |
| tail.changes = nil |
| if tail.ReOpen { |
| // XXX: we must not log from a library. |
| tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename) |
| if err := tail.reopen(); err != nil { |
| return err |
| } |
| tail.Logger.Printf("Successfully reopened %s", tail.Filename) |
| tail.openReader() |
| return nil |
| } else { |
| tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename) |
| return ErrStop |
| } |
| case <-tail.changes.Truncated: |
| // Always reopen truncated files (Follow is true) |
| tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename) |
| if err := tail.reopen(); err != nil { |
| return err |
| } |
| tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename) |
| tail.openReader() |
| return nil |
| case <-tail.Dying(): |
| return ErrStop |
| } |
| panic("unreachable") |
| } |
| |
| func (tail *Tail) openReader() { |
| if tail.MaxLineSize > 0 { |
| // add 2 to account for newline characters |
| tail.reader = bufio.NewReaderSize(tail.file, tail.MaxLineSize+2) |
| } else { |
| tail.reader = bufio.NewReader(tail.file) |
| } |
| } |
| |
| func (tail *Tail) seekEnd() error { |
| return tail.seekTo(SeekInfo{Offset: 0, Whence: 2}) |
| } |
| |
| func (tail *Tail) seekTo(pos SeekInfo) error { |
| _, err := tail.file.Seek(pos.Offset, pos.Whence) |
| if err != nil { |
| return fmt.Errorf("Seek error on %s: %s", tail.Filename, err) |
| } |
| // Reset the read buffer whenever the file is re-seek'ed |
| tail.reader.Reset(tail.file) |
| return nil |
| } |
| |
| // sendLine sends the line(s) to Lines channel, splitting longer lines |
| // if necessary. Return false if rate limit is reached. |
| func (tail *Tail) sendLine(line string) bool { |
| now := time.Now() |
| lines := []string{line} |
| |
| // Split longer lines |
| if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize { |
| lines = util.PartitionString(line, tail.MaxLineSize) |
| } |
| |
| for _, line := range lines { |
| tail.Lines <- &Line{line, now, nil} |
| } |
| |
| if tail.Config.RateLimiter != nil { |
| ok := tail.Config.RateLimiter.Pour(uint16(len(lines))) |
| if !ok { |
| tail.Logger.Printf("Leaky bucket full (%v); entering 1s cooloff period.\n", |
| tail.Filename) |
| return false |
| } |
| } |
| |
| return true |
| } |
| |
| // Cleanup removes inotify watches added by the tail package. This function is |
| // meant to be invoked from a process's exit handler. Linux kernel may not |
| // automatically remove inotify watches after the process exits. |
| func Cleanup() { |
| watch.Cleanup() |
| } |