| /* |
| Copyright 2016 The LUCI Authors. All rights reserved. |
| Use of this source code is governed under the Apache License, Version 2.0 |
| that can be found in the LICENSE file. |
| */ |
| |
| ///<reference path="../logdog-stream/logdog.ts" /> |
| ///<reference path="../luci-operation/operation.ts" /> |
| ///<reference path="../luci-sleep-promise/promise.ts" /> |
| ///<reference path="../rpc/client.ts" /> |
| ///<reference path="fetcher.ts" /> |
| ///<reference path="query.ts" /> |
| ///<reference path="view.ts" /> |
| |
| namespace LogDog { |
| |
| /** Sentinel error: not authenticated. */ |
| let NOT_AUTHENTICATED = new Error('Not Authenticated'); |
| |
| /** |
| * Resolves an arbitrary error into special sentinels where appropriate. |
| * |
| * Currently supports resolving gRPC "unauthenticated" error into the |
| * NOT_AUTHENTICATED sentinel. |
| */ |
| function resolveErr(err: Error): Error { |
| let grpc = luci.GrpcError.convert(err); |
| if (grpc && grpc.code === luci.Code.UNAUTHENTICATED) { |
| return NOT_AUTHENTICATED; |
| } |
| return err; |
| } |
| |
| /** An individual log stream's status. */ |
| type LogStreamStatus = { |
| stream: LogDog.StreamPath; state: string; fetchStatus: LogDog.FetchStatus; |
| finished: boolean; |
| needsAuth: boolean; |
| }; |
| |
| type StreamStatusCallback = (v: LogStreamStatus[]) => void; |
| |
| /** Location where a log stream should be loaded from. */ |
| export enum Location { |
| /** |
| * Represents the upper half of a split view. Logs start at 0 and go through |
| * the HEAD point. |
| */ |
| HEAD, |
| /** |
| * Represents the lower half of the split view. Logs start at the TAIL point |
| * and go through the BOTTOM anchor point. |
| */ |
| TAIL, |
| /** |
| * Represents an anchor point where the split occurred, obtained through a |
| * single "Tail()" RPC call. If the terminal index is known when the split |
| * occurs, this should be the terminal index. |
| */ |
| BOTTOM, |
| } |
| |
| /** |
| * Interface of the specific Model functions used by the view. This is |
| * explicitly listed here as a reference for the functions that the |
| * model-viewer interface calls. See the Model class for method details. |
| * |
| * These methods are used to control the Model state and to nofity the Model |
| * of external events that occur. |
| */ |
| interface ModelInterface { |
| fetch(cancel: boolean): Promise<void>; |
| split(): Promise<void>; |
| |
| reset(): void; |
| setAutomatic(v: boolean): void; |
| setFetchFromTail(v: boolean): void; |
| notifyAuthenticationChanged(): void; |
| } |
| |
| /** A log loading profile type. */ |
| type ModelProfile = { |
| /** The size of the first fetch. */ |
| initialFetchSize: number; |
| /** The size of the first fetch, if loading multiple streams. */ |
| multiInitialFetchSize: number; |
| /** The size of each subsequent fetch. */ |
| fetchSize: number; |
| }; |
| |
| /** |
| * Model manages stream loading. |
| * |
| * Model exports features to the user interface by conforming to |
| * ModelInterface. All Polymer Model functions must be documented in that |
| * interface. |
| * |
| * Model pushes state update to the user interface with the ViewBinding that |
| * is provided to it on construction. |
| * |
| * Model represents a view of the loading state of the configured log streams. |
| * Log streams are individual named sets of consecutive records that are |
| * either streaming (no known terminal index, so the expectation is that new |
| * log records are being generated) or finished (known terminal index). The |
| * Model's job is to understand the state of these streams, load their data, |
| * and cause it to be output to the user interface. |
| * |
| * A Model is bound to a LogProvider, which manages a sequential series of log |
| * records. If a single log stream is being fetched, the Model will use a |
| * "LogStream" log provider. If multiple log streams are being fetched, the |
| * Model will use the "AggregateLogStream" log provider, which internally |
| * muxes log records from multiple "LogStream" providers together based on an |
| * ordering. |
| * |
| * "AggregateLogStream" is an append-only LogProvider, meaning that it ONLY |
| * supports emitting log records from its set of streams in order until no |
| * more records are available. |
| * |
| * "LogStream" (single stream) offers a more complex set of options via its |
| * "split" functionality. At the user's request, the LogStream can split, |
| * jumping to the highest-indexed log entry in that stream. This is followed |
| * up with three fetch options, which can be used interchangeably based on the |
| * user's actions: |
| * - HEAD fetches logs sequentially starting from 0 and ending at SPLIT. |
| * - TAIL fetches logs backwards, starting from SPLIT and fetching towards 0. |
| * - BOTTOM fetches logs from after SPLIT, fetching from SPLIT and ending at |
| * the log stream's terminal index. |
| * |
| * If the HEAD pointer ever reaches the TAIL pointer, all logs between 0 and |
| * SPLIT have been filled in and the fetching situation turns back into a |
| * standard sequential fetch. |
| * |
| * This scheme is designed around the capabilities of the LogDog log API. |
| * |
| * The view may optionally expose itself as "mobile", indicating to the Model |
| * that it should use mobile-friendly tuning parameters. |
| */ |
| export class Model implements ModelInterface { |
| /** Default single-stream profile. */ |
| static DEFAULT_PROFILE: ModelProfile = { |
| initialFetchSize: (1024 * 24), // 24KiB |
| multiInitialFetchSize: (1024 * 4), // 4KiB |
| fetchSize: (4 * 1024 * 1024), // 4MiB |
| }; |
| |
| /** Profile to use for a mobile device, regardless of stream count. */ |
| static MOBILE_PROFILE: ModelProfile = { |
| initialFetchSize: (1024 * 4), // 4KiB |
| multiInitialFetchSize: (1024 * 4), // 4KiB |
| fetchSize: (1024 * 256), // 256KiB |
| }; |
| |
| /** |
| * Amount of time after which we consider the build to have been loading |
| * "for a while". |
| */ |
| private static LOADING_WHILE_THRESHOLD_MS = 60000; // 1 Minute |
| |
| /** |
| * If >0, the maximum number of log lines to push at a time. We will sleep |
| * in between these entries to allow the rest of the app to be responsive |
| * during log dumping. |
| */ |
| private static logAppendInterval = 4000; |
| /** |
| * Amount of time to sleep in between log append chunks. Larger numbers |
| * will load logs slower, but will offer more opportunities for user |
| * interaction in between log dumps. |
| */ |
| private static logAppendDelay = 0; |
| |
| /** Our log provider. */ |
| private provider: LogProvider = this.nullProvider(); |
| |
| /** The LogDog Coordinator client instance. */ |
| readonly client: LogDog.Client; |
| |
| /** |
| * Promise that is resolved when authentication state changes. When this |
| * happens, a new Promise is installed, and future authentication changes |
| * will resolve the new Promise. |
| */ |
| private authChangedPromise: Promise<void>; |
| /** |
| * Retained callback (Promise resolve) to invoke when authentication state |
| * changes. |
| */ |
| private authChangedCallback: (() => void); |
| |
| /** The current fetch Promise. */ |
| private currentOperation: luci.Operation|null = null; |
| private currentFetchPromise: Promise<void>|null = null; |
| |
| /** Are we in automatic mode? */ |
| private automatic = false; |
| /** Are we tailing? */ |
| private fetchFromTail = false; |
| /** Are we in the middle of rendering logs? */ |
| private rendering = true; |
| |
| private cachedLogStreamUrl: string|undefined = undefined; |
| |
| private loadingStateValue: LoadingState = LoadingState.NONE; |
| private streamStatusValue: StreamStatusEntry[]; |
| |
| /** |
| * When rendering a Promise that will resolve when the render completes. We |
| * use this to pipeline parallel data fetching and rendering. |
| */ |
| private renderPromise: Promise<void>|null; |
| |
| constructor( |
| client: luci.Client, readonly profile: ModelProfile, |
| readonly view: ViewBinding) { |
| this.client = new LogDog.Client(client); |
| this.resetAuthChanged(); |
| } |
| |
| /** |
| * Resolves user stream path strings to actual log streams. |
| * |
| * After the returned Promise resolves, log stream data can be fetched by |
| * calling "fetch()". |
| * |
| * @return a Promise that will resolve once all log streams have been |
| * identified and the configured. |
| */ |
| async resolve(paths: string[]) { |
| this.reset(); |
| |
| // For any path that is a query, execute that query. |
| this.loadingState = LoadingState.RESOLVING; |
| let streamBlocks: LogDog.StreamPath[][]; |
| try { |
| streamBlocks = await this.resolvePathsIntoStreams(paths); |
| } catch (err) { |
| this.loadingState = LoadingState.ERROR; |
| console.error('Failed to resolve log streams:', err); |
| return; |
| } |
| |
| // Flatten them all. |
| let streams: LogDog.StreamPath[] = []; |
| for (let streamBlock of streamBlocks) { |
| streams.push.apply(streams, streamBlock); |
| } |
| |
| let initialFetchSize = (streams.length <= 1) ? |
| this.profile.initialFetchSize : |
| this.profile.multiInitialFetchSize; |
| |
| // Generate a LogStream client entry for each composite stream. |
| let logStreams = streams.map((stream) => { |
| console.log('Resolved log stream:', stream); |
| return new LogStream( |
| this.client, stream, initialFetchSize, this.profile.fetchSize); |
| }); |
| |
| // Reset any existing state. |
| this.reset(); |
| |
| // If we have exactly one stream, then use it directly. This allows |
| // it to split. |
| let provider: LogProvider; |
| switch (logStreams.length) { |
| case 0: |
| provider = this.nullProvider(); |
| break; |
| case 1: |
| provider = logStreams[0]; |
| break; |
| default: |
| provider = new AggregateLogStream(logStreams); |
| break; |
| } |
| provider.setStreamStatusCallback((st: LogStreamStatus[]) => { |
| if (this.provider === provider) { |
| this.streamStatus = this.buildStreamStatus(st); |
| } |
| }); |
| this.provider = provider; |
| this.loadingState = LoadingState.NONE; |
| } |
| |
| private resolvePathsIntoStreams(paths: string[]) { |
| return Promise.all(paths.map(async path => { |
| let stream = LogDog.StreamPath.splitProject(path); |
| if (!LogDog.isQuery(stream.path)) { |
| return [stream]; |
| } |
| |
| // This "path" is really a query. Construct and execute. |
| // |
| // If we failed due to an auth error, but our auth changed during the |
| // operation, try again automatically. |
| let query: LogDog.QueryRequest = { |
| project: stream.project, |
| path: stream.path, |
| streamType: LogDog.StreamType.TEXT, |
| }; |
| |
| while (true) { |
| try { |
| let results = await LogDog.queryAll(this.client, query, 100); |
| return results.map(qr => qr.stream); |
| } catch (err) { |
| err = resolveErr(err); |
| if (err !== NOT_AUTHENTICATED) { |
| throw err; |
| } |
| |
| await this.authChangedPromise; |
| } |
| } |
| })); |
| } |
| |
| /** |
| * Resets the state. |
| */ |
| reset() { |
| this.view.clearLogEntries(); |
| this.clearCurrentOperation(); |
| this.provider = this.nullProvider(); |
| |
| this.updateControls(); |
| } |
| |
| private nullProvider(): LogProvider { |
| return new AggregateLogStream([]); |
| } |
| |
| /** |
| * Clears the current operation, cancelling it if set. If the operation is |
| * cleared, the current fetch and rendering states will be reset. |
| * |
| * @param op if provided, only cancel the current operation if it equals |
| * the supplied "op". If "op" does not match the current operation, it |
| * will be cancelled, but the current operation will be left in-tact. |
| * If "op" is undefined, cancel the current operation regardless. |
| */ |
| private clearCurrentOperation(op?: luci.Operation) { |
| if (this.currentOperation) { |
| if (op && op !== this.currentOperation) { |
| // Conditional clear, and we are not the current operation, so do |
| // nothing. |
| op.cancel(); |
| return; |
| } |
| this.currentOperation.cancel(); |
| this.currentOperation = this.currentFetchPromise = null; |
| } |
| this.rendering = false; |
| } |
| |
| private get loadingState(): LoadingState { |
| return this.loadingStateValue; |
| } |
| private set loadingState(v: LoadingState) { |
| if (v !== this.loadingStateValue) { |
| this.loadingStateValue = v; |
| this.updateControls(); |
| } |
| } |
| |
| private get streamStatus(): StreamStatusEntry[] { |
| return this.streamStatusValue; |
| } |
| private set streamStatus(st: StreamStatusEntry[]) { |
| this.streamStatusValue = st; |
| this.updateControls(); |
| } |
| |
| private updateControls() { |
| this.view.updateControls({ |
| canSplit: this.providerCanSplit, |
| split: this.isSplit, |
| bottom: !this.fetchedEndOfStream, |
| fullyLoaded: (this.fetchedFullStream && (!this.rendering)), |
| logStreamUrl: this.logStreamUrl, |
| loadingState: this.loadingState, |
| streamStatus: this.streamStatus, |
| }); |
| } |
| |
| /** |
| * Note that the authentication state for the client has changed. This will |
| * trigger an automatic fetch retry if our previous fetch failed due to |
| * lack of authentication. |
| */ |
| notifyAuthenticationChanged() { |
| // Resolve our current "auth changed" Promise. |
| this.authChangedCallback(); |
| } |
| |
| private resetAuthChanged() { |
| // Resolve our previous function, if it's not already resolved. |
| if (this.authChangedCallback) { |
| this.authChangedCallback(); |
| } |
| |
| // Create a new Promise and install it. |
| this.authChangedPromise = new Promise<void>((resolve, _) => { |
| this.authChangedCallback = resolve; |
| }); |
| } |
| |
| /** |
| * Causes the view to immediately split, if possible, creating a region |
| * representing the end of the log stream. |
| * |
| * If the view cannot split, or if it is already split, then this is a |
| * no-op. |
| * |
| * This cancels the current fetch, if one is in progress. |
| */ |
| async split() { |
| // If we haven't already split, and our provider lets us split, then go |
| // ahead and do so. |
| if (this.providerCanSplit) { |
| return this.fetchLocation(Location.TAIL, true); |
| } |
| return this.fetch(false); |
| } |
| |
| /** |
| * Fetches the next batch of logs. |
| * |
| * By default, if a fetch is already in-progress, this new fetch is ignored. |
| * If cancel is true, then the current fetch should be cancelled and a new |
| * fetch initiated. |
| * |
| * @param cancel true if we should abandon any current fetches. |
| * @return A Promise that will resolve when the fetch operation is complete. |
| */ |
| async fetch(cancel: boolean) { |
| if (this.isSplit) { |
| if (this.fetchFromTail) { |
| // Next fetch grabs logs from the bottom (continue tailing). |
| if (!this.fetchedEndOfStream) { |
| return this.fetchLocation(Location.BOTTOM, cancel); |
| } else { |
| return this.fetchLocation(Location.TAIL, cancel); |
| } |
| } |
| |
| // We're split, but not tailing, so fetch logs from HEAD. |
| return this.fetchLocation(Location.HEAD, cancel); |
| } |
| |
| // We're not split. If we haven't reached end of stream, fetch logs from |
| // HEAD. |
| return this.fetchLocation(Location.HEAD, cancel); |
| } |
| |
| /** Fetch logs from an explicit location. */ |
| async fetchLocation(l: Location, cancel: boolean): Promise<void> { |
| if (this.currentFetchPromise && (!cancel)) { |
| return this.currentFetchPromise; |
| } |
| this.clearCurrentOperation(); |
| |
| // If our provider is finished, then do nothing. |
| if (this.fetchedFullStream) { |
| // There are no more logs. |
| return undefined; |
| } |
| |
| // If we're asked to fetch BOTTOM, but we're not split, fetch HEAD |
| // instead. |
| if (l === Location.BOTTOM && !this.isSplit) { |
| l = Location.HEAD; |
| } |
| |
| // Rotate our fetch ID. This will effectively cancel any pending fetches. |
| this.currentOperation = new luci.Operation(); |
| this.currentFetchPromise = |
| this.fetchLocationImpl(l, this.currentOperation); |
| return this.currentFetchPromise; |
| } |
| |
| private async fetchLocationImpl(l: Location, op: luci.Operation) { |
| for (let continueFetching = true; continueFetching;) { |
| this.loadingState = LoadingState.LOADING; |
| |
| let loadingWhileTimer = |
| new luci.Timer(Model.LOADING_WHILE_THRESHOLD_MS, () => { |
| if (this.loadingState === LoadingState.LOADING) { |
| this.loadingState = LoadingState.LOADING_BEEN_A_WHILE; |
| } |
| }); |
| |
| let hasLogs = false; |
| try { |
| hasLogs = await this.fetchLocationRound(l, op); |
| } catch (err) { |
| // Cancel the timer here, since we may enter other states in this |
| // "catch" block and we don't want to have the timer override them. |
| loadingWhileTimer.cancel(); |
| |
| // If we've been canceled, discard this result. |
| if (err === luci.Operation.CANCELLED) { |
| return; |
| } |
| |
| this.clearCurrentOperation(op); |
| if (err === NOT_AUTHENTICATED) { |
| this.loadingState = LoadingState.NEEDS_AUTH; |
| |
| // We failed because we were not authenticated. Mark this |
| // so we can retry if that state changes. |
| await this.authChangedPromise; |
| |
| // Our authentication state changed during the fetch! |
| // Retry automatically. |
| continueFetching = true; |
| continue; |
| } |
| |
| console.error('Failed to load log streams:', err); |
| return; |
| } finally { |
| loadingWhileTimer.cancel(); |
| } |
| |
| continueFetching = (this.automatic && hasLogs); |
| if (continueFetching) { |
| console.log('Automatic: starting next fetch.'); |
| } |
| } |
| |
| // Post-fetch cleanup. |
| this.clearCurrentOperation(op); |
| } |
| |
| private async fetchLocationRound(l: Location, op: luci.Operation) { |
| let buf = await this.provider.fetch(op, l); |
| |
| // Clear our fetching status. |
| this.rendering = true; |
| this.loadingState = LoadingState.RENDERING; |
| let hasLogs = !!(buf && buf.peek()); |
| |
| // Resolve any previous rendering Promise that we have. This |
| // makes sure our rendering and fetching don't get more than |
| // one round out of sync. |
| if (this.renderPromise) { |
| await this.renderPromise; |
| } |
| |
| // Clear our loading state (updates controls automatically). |
| this.loadingState = LoadingState.RENDERING; |
| |
| // Initiate the next render. This will happen in the |
| // background while we enqueue our next fetch. |
| let doRender = async () => { |
| await this.renderLogs(buf, l); |
| if (this.loadingState === LoadingState.RENDERING) { |
| this.loadingState = LoadingState.NONE; |
| } |
| }; |
| this.renderPromise = doRender(); |
| |
| if (this.fetchedFullStream) { |
| return false; |
| } |
| return hasLogs; |
| } |
| |
| private async renderLogs(buf: BufferedLogs, l: Location) { |
| if (!(buf && buf.peek())) { |
| return; |
| } |
| |
| let logBlock: LogDog.LogEntry[] = []; |
| |
| // Create a promise loop to push logs at intervals. |
| let lines = 0; |
| for (let nextLog = buf.next(); (nextLog); nextLog = buf.next()) { |
| // Add the next log to the append block. |
| logBlock.push(nextLog); |
| if (nextLog.text && nextLog.text.lines) { |
| lines += nextLog.text.lines.length; |
| } |
| |
| // Add logs until we reach our interval lines. |
| // If we've exceeded our burst, then interleave a sleep (yield). This |
| // will reduce user jank a bit. |
| if (Model.logAppendInterval > 0 && lines >= Model.logAppendInterval) { |
| this.appendBlock(logBlock, l); |
| |
| await luci.sleepPromise(Model.logAppendDelay); |
| lines = 0; |
| } |
| } |
| |
| // If there are any buffered logs, append that block. |
| this.appendBlock(logBlock, l); |
| } |
| |
| /** |
| * Appends the contents of the "block" array to the viewer, consuming |
| * "block" in the process. |
| * |
| * Block will be reset (but not resized) to zero elements after appending. |
| */ |
| private appendBlock(block: LogDog.LogEntry[], l: Location) { |
| if (!block.length) { |
| return; |
| } |
| |
| console.log('Rendering', block.length, 'logs...'); |
| this.view.pushLogEntries(block, l); |
| block.length = 0; |
| |
| // Update our status and controls. |
| this.updateControls(); |
| } |
| |
| /** |
| * Sets whether the next fetch will pull from the tail (end) or the top |
| * (begining) region. |
| * |
| * This is only relevant when there is a log split. |
| */ |
| setFetchFromTail(v: boolean) { |
| this.fetchFromTail = v; |
| } |
| |
| /** |
| * Sets whether automatic loading is enabled. |
| * |
| * When enabled, a new fetch will immediately be dispatched when a previous |
| * fetch finishes so long as there is still stream data to load. |
| */ |
| setAutomatic(v: boolean) { |
| this.automatic = v; |
| if (v) { |
| // Passively kick off a new fetch. |
| this.fetch(false); |
| } |
| } |
| |
| private buildStreamStatus(v: LogStreamStatus[]): StreamStatusEntry[] { |
| let maxStatus = LogDog.FetchStatus.IDLE; |
| let maxStatusCount = 0; |
| let needsAuth = false; |
| |
| // Prune any finished entries and accumulate them for status bar change. |
| v = (v || []).filter((st) => { |
| needsAuth = (needsAuth || st.needsAuth); |
| |
| if (st.fetchStatus > maxStatus) { |
| maxStatus = st.fetchStatus; |
| maxStatusCount = 1; |
| } else if (st.fetchStatus === maxStatus) { |
| maxStatusCount++; |
| } |
| |
| return (!st.finished); |
| }); |
| |
| return v.map((st): StreamStatusEntry => { |
| return { |
| name: '.../+/' + st.stream.name, |
| desc: st.state, |
| }; |
| }); |
| } |
| |
| private get providerCanSplit(): boolean { |
| let split = this.provider.split(); |
| return (!!(split && split.canSplit())); |
| } |
| |
| private get isSplit(): boolean { |
| let split = this.provider.split(); |
| return (!!(split && split.isSplit())); |
| } |
| |
| private get fetchedEndOfStream(): boolean { |
| return (this.provider.fetchedEndOfStream()); |
| } |
| |
| private get fetchedFullStream(): boolean { |
| return (this.fetchedEndOfStream && (!this.isSplit)); |
| } |
| |
| private get logStreamUrl(): string|undefined { |
| if (!this.cachedLogStreamUrl) { |
| this.cachedLogStreamUrl = this.provider.getLogStreamUrl(); |
| } |
| return this.cachedLogStreamUrl; |
| } |
| } |
| |
| /** Generic interface for a log provider. */ |
| interface LogProvider { |
| setStreamStatusCallback(cb: StreamStatusCallback): void; |
| fetch(op: luci.Operation, l: Location): Promise<BufferedLogs>; |
| getLogStreamUrl(): string|undefined; |
| |
| /** Will return null if this LogProvider doesn't support splitting. */ |
| split(): SplitLogProvider|null; |
| fetchedEndOfStream(): boolean; |
| } |
| |
| /** Additional methods for log stream splitting, if supported. */ |
| interface SplitLogProvider { |
| canSplit(): boolean; |
| isSplit(): boolean; |
| } |
| |
| /** A LogStream is a LogProvider manages a single log stream. */ |
| class LogStream implements LogProvider { |
| /** |
| * Always begin with a small fetch. We'll disable this afterward the first |
| * finishes. |
| */ |
| private initialFetch = true; |
| |
| private fetcher: LogDog.Fetcher; |
| private activeFetch: LogDog.Fetch|undefined; |
| |
| /** The log stream index of the next head() log. */ |
| private nextHeadIndex = 0; |
| /** |
| * The lowest log stream index of all of the tail logs. If this is <0, then |
| * it is uninitialized. |
| */ |
| private firstTailIndex = -1; |
| /** |
| * The next log stream index to fetch to continue pulling logs from the |
| * bottom. If this is <0, it is uninitialized. |
| */ |
| private nextBottomIndex = -1; |
| |
| private streamStatusCallback: StreamStatusCallback; |
| |
| /** The size of the tail walkback region. */ |
| private static TAIL_WALKBACK = 500; |
| |
| constructor( |
| client: LogDog.Client, readonly stream: LogDog.StreamPath, |
| readonly initialFetchSize: number, readonly fetchSize: number) { |
| this.fetcher = new LogDog.Fetcher(client, stream); |
| } |
| |
| private setActiveFetch(fetch: LogDog.Fetch): LogDog.Fetch { |
| this.activeFetch = fetch; |
| this.activeFetch.addStateChangedCallback((_: LogDog.Fetch) => { |
| this.statusChanged(); |
| }); |
| return fetch; |
| } |
| |
| get fetchStatus(): LogDog.FetchStatus { |
| if (this.activeFetch) { |
| return this.activeFetch.lastStatus; |
| } |
| return LogDog.FetchStatus.IDLE; |
| } |
| |
| get fetchError(): Error|undefined { |
| if (this.activeFetch) { |
| return this.activeFetch.lastError; |
| } |
| return undefined; |
| } |
| |
| async fetch(op: luci.Operation, l: Location) { |
| // Determine which method to use based on the insertion point and current |
| // log stream fetch state. |
| let getLogs: Promise<LogDog.LogEntry[]>; |
| switch (l) { |
| case Location.HEAD: |
| getLogs = this.getHead(op); |
| break; |
| |
| case Location.TAIL: |
| getLogs = this.getTail(op); |
| break; |
| |
| case Location.BOTTOM: |
| getLogs = this.getBottom(op); |
| break; |
| |
| default: |
| // Nothing to do. |
| throw new Error('Unknown Location: ' + l); |
| } |
| |
| try { |
| let logs = await getLogs; |
| this.initialFetch = false; |
| this.statusChanged(); |
| return new BufferedLogs(logs); |
| } catch (err) { |
| throw resolveErr(err); |
| } |
| } |
| |
| get descriptor() { |
| return this.fetcher.desc; |
| } |
| |
| getLogStreamUrl(): string|undefined { |
| let desc = this.descriptor; |
| if (desc) { |
| return (desc.tags || {})['logdog.viewer_url']; |
| } |
| return undefined; |
| } |
| |
| setStreamStatusCallback(cb: StreamStatusCallback) { |
| this.streamStatusCallback = cb; |
| } |
| |
| private statusChanged() { |
| if (this.streamStatusCallback) { |
| this.streamStatusCallback([this.getStreamStatus()]); |
| } |
| } |
| |
| getStreamStatus(): LogStreamStatus { |
| let pieces: string[] = []; |
| let tidx = this.fetcher.terminalIndex; |
| if (this.nextHeadIndex > 0) { |
| pieces.push('1..' + this.nextHeadIndex); |
| } else { |
| pieces.push('0'); |
| } |
| if (this.isSplit()) { |
| if (tidx >= 0) { |
| pieces.push('| ' + this.firstTailIndex + ' / ' + tidx); |
| tidx = -1; |
| } else { |
| pieces.push( |
| '| ' + this.firstTailIndex + '..' + this.nextBottomIndex + |
| ' ...'); |
| } |
| } else if (tidx >= 0) { |
| pieces.push('/ ' + tidx); |
| } else { |
| pieces.push('...'); |
| } |
| |
| let needsAuth = false; |
| let finished = this.finished; |
| if (finished) { |
| pieces.push('(Finished)'); |
| } else { |
| switch (this.fetchStatus) { |
| case LogDog.FetchStatus.IDLE: |
| case LogDog.FetchStatus.LOADING: |
| pieces.push('(Loading)'); |
| break; |
| |
| case LogDog.FetchStatus.STREAMING: |
| pieces.push('(Streaming)'); |
| break; |
| |
| case LogDog.FetchStatus.MISSING: |
| pieces.push('(Missing)'); |
| break; |
| |
| case LogDog.FetchStatus.ERROR: |
| let err = this.fetchError; |
| if (err) { |
| err = resolveErr(err); |
| if (err === NOT_AUTHENTICATED) { |
| pieces.push('(Auth Error)'); |
| needsAuth = true; |
| } else { |
| pieces.push('(Error)'); |
| } |
| } else { |
| pieces.push('(Error)'); |
| } |
| break; |
| default: |
| // Nothing to do. |
| break; |
| } |
| } |
| |
| return { |
| stream: this.stream, |
| state: pieces.join(' '), |
| finished: finished, |
| fetchStatus: this.fetchStatus, |
| needsAuth: needsAuth, |
| }; |
| } |
| |
| split(): SplitLogProvider { |
| return this; |
| } |
| |
| isSplit(): boolean { |
| // We're split if we have a bottom and we're not finished tailing. |
| return ( |
| this.firstTailIndex >= 0 && |
| (this.nextHeadIndex < this.firstTailIndex)); |
| } |
| |
| canSplit(): boolean { |
| return (!(this.isSplit() || this.caughtUp)); |
| } |
| |
| private get caughtUp(): boolean { |
| // We're caught up if we have both a head and bottom index, and the head |
| // is at or past the bottom. |
| return ( |
| this.nextHeadIndex >= 0 && this.nextBottomIndex >= 0 && |
| this.nextHeadIndex >= this.nextBottomIndex); |
| } |
| |
| fetchedEndOfStream(): boolean { |
| let tidx = this.fetcher.terminalIndex; |
| return ( |
| tidx >= 0 && |
| ((this.nextHeadIndex > tidx) || (this.nextBottomIndex > tidx))); |
| } |
| |
| private get finished(): boolean { |
| return ((!this.isSplit()) && this.fetchedEndOfStream()); |
| } |
| |
| private updateIndexes() { |
| if (this.firstTailIndex >= 0) { |
| if (this.nextBottomIndex < this.firstTailIndex) { |
| this.nextBottomIndex = this.firstTailIndex + 1; |
| } |
| |
| if (this.nextHeadIndex >= this.firstTailIndex && |
| this.nextBottomIndex >= 0) { |
| // Synchronize our head and bottom pointers. |
| this.nextHeadIndex = this.nextBottomIndex = |
| Math.max(this.nextHeadIndex, this.nextBottomIndex); |
| } |
| } |
| } |
| |
| private nextFetcherOptions(): LogDog.FetcherOptions { |
| let opts: LogDog.FetcherOptions = {}; |
| if (this.initialFetch && this.initialFetchSize > 0) { |
| opts.byteCount = this.initialFetchSize; |
| } else if (this.fetchSize > 0) { |
| opts.byteCount = this.fetchSize; |
| } |
| return opts; |
| } |
| |
| private async getHead(op: luci.Operation) { |
| this.updateIndexes(); |
| |
| if (this.finished) { |
| // Our HEAD region has met/surpassed our TAIL region, so there are no |
| // HEAD logs to return. Only bottom. |
| return null; |
| } |
| |
| // If we have a tail pointer, only fetch HEAD up to that point. |
| let opts = this.nextFetcherOptions(); |
| if (this.firstTailIndex >= 0) { |
| opts.logCount = (this.firstTailIndex - this.nextHeadIndex); |
| } |
| |
| let f = |
| this.setActiveFetch(this.fetcher.get(op, this.nextHeadIndex, opts)); |
| let logs = await f.p; |
| if (logs && logs.length) { |
| this.nextHeadIndex = (logs[logs.length - 1].streamIndex + 1); |
| this.updateIndexes(); |
| } |
| return logs; |
| } |
| |
| private async getTail(op: luci.Operation) { |
| // If we haven't performed a Tail before, start with one. |
| if (this.firstTailIndex < 0) { |
| let tidx = this.fetcher.terminalIndex; |
| if (tidx < 0) { |
| let f = this.setActiveFetch(this.fetcher.getLatest(op)); |
| |
| let logs = await f.p; |
| |
| // Mark our initial "tail" position. |
| if (logs && logs.length) { |
| this.firstTailIndex = logs[0].streamIndex; |
| this.updateIndexes(); |
| } |
| return logs; |
| } |
| |
| this.firstTailIndex = (tidx + 1); |
| this.updateIndexes(); |
| } |
| |
| // We're doing incremental reverse fetches. If we're finished tailing, |
| // return no logs. |
| if (!this.isSplit()) { |
| return []; |
| } |
| |
| // Determine our walkback region. |
| let startIndex = this.firstTailIndex - LogStream.TAIL_WALKBACK; |
| if (this.nextHeadIndex >= 0) { |
| if (startIndex < this.nextHeadIndex) { |
| startIndex = this.nextHeadIndex; |
| } |
| } else if (startIndex < 0) { |
| startIndex = 0; |
| } |
| let count = (this.firstTailIndex - startIndex); |
| |
| // Fetch the full walkback region. |
| let f = this.setActiveFetch(this.fetcher.getAll(op, startIndex, count)); |
| let logs = await f.p; |
| |
| this.firstTailIndex = startIndex; |
| this.updateIndexes(); |
| return logs; |
| } |
| |
| private async getBottom(op: luci.Operation) { |
| this.updateIndexes(); |
| |
| // If there are no more logs in the stream, return no logs. |
| if (this.fetchedEndOfStream()) { |
| return []; |
| } |
| |
| // If our bottom index isn't initialized, initialize it via tail. |
| if (this.nextBottomIndex < 0) { |
| return this.getTail(op); |
| } |
| |
| let opts = this.nextFetcherOptions(); |
| let f = |
| this.setActiveFetch(this.fetcher.get(op, this.nextBottomIndex, opts)); |
| let logs = await f.p; |
| if (logs && logs.length) { |
| this.nextBottomIndex = (logs[logs.length - 1].streamIndex + 1); |
| } |
| return logs; |
| } |
| } |
| |
| /** |
| * LogSorter is an interface that used by AggregateLogStream to extract sorted |
| * logs from a set of BufferedLogs. |
| * |
| * It is used to compare two log entries to determine their relative order. |
| */ |
| type LogSorter = { |
| /** Returns true if "a" comes before "b". */ |
| before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => number; |
| |
| /** |
| * If implemented, returns an implicit next log in the buffer set. |
| * |
| * This is useful if the next log can be determined from the current |
| * buffered data, even if it is partial or incomplete. |
| */ |
| implicitNext?: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => |
| LogDog.LogEntry | null; |
| }; |
| |
| const prefixIndexLogSorter: LogSorter = { |
| before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => { |
| return (a.prefixIndex - b.prefixIndex); |
| }, |
| |
| implicitNext: (prev: LogDog.LogEntry, buffers: BufferedLogs[]) => { |
| let nextPrefixIndex = (prev.prefixIndex + 1); |
| for (let buf of buffers) { |
| let le = buf.peek(); |
| if (le && le.prefixIndex === nextPrefixIndex) { |
| return buf.next(); |
| } |
| } |
| return null; |
| }, |
| }; |
| |
| const timestampLogSorter: LogSorter = { |
| before: (a: LogDog.LogEntry, b: LogDog.LogEntry) => { |
| if (a.timestamp) { |
| if (b.timestamp) { |
| return a.timestamp.getTime() - b.timestamp.getTime(); |
| } |
| return 1; |
| } |
| if (b.timestamp) { |
| return -1; |
| } |
| return 0; |
| }, |
| |
| // No implicit "next" with timestamp-based logs, since the next log in |
| // an empty buffer may actually be the next contiguous log. |
| implicitNext: undefined, |
| }; |
| |
| /** |
| * An aggregate log stream. It presents a single-stream view, but is really |
| * composed of several log streams interleaved based on their prefix indices |
| * (if they share a prefix) or timestamps (if they don't). |
| * |
| * At least one log entry from each stream must be buffered before any log |
| * entries can be yielded, since we don't know what ordering to apply |
| * otherwise. To make this fast, we will make the first request for each |
| * stream small so it finishes quickly and we can start rendering. Subsequent |
| * entries will be larger for efficiency. |
| * |
| * @param {LogStream} streams the composite streams. |
| */ |
| class AggregateLogStream implements LogProvider { |
| private streams: AggregateLogStream.Entry[]; |
| private active: AggregateLogStream.Entry[]; |
| private currentNextPromise: Promise<BufferedLogs[]>|null; |
| private readonly logSorter: LogSorter; |
| |
| private streamStatusCallback: StreamStatusCallback; |
| |
| constructor(streams: LogStream[]) { |
| // Input streams, ordered by input order. |
| this.streams = streams.map<AggregateLogStream.Entry>((ls, i) => { |
| ls.setStreamStatusCallback((st: LogStreamStatus[]) => { |
| if (st) { |
| this.streams[i].status = st[0]; |
| this.statusChanged(); |
| } |
| }); |
| |
| return new AggregateLogStream.Entry(ls); |
| }); |
| |
| // Subset of input streams that are still active (not finished). |
| this.active = this.streams; |
| |
| // The currently-active "next" promise. |
| this.currentNextPromise = null; |
| |
| // Determine our log comparison function. If all of our logs share a |
| // prefix, we will use the prefix index. Otherwise, we will use the |
| // timestamp. |
| let template: LogDog.StreamPath; |
| let sharedPrefix = this.streams.every((entry) => { |
| if (!template) { |
| template = entry.ls.stream; |
| return true; |
| } |
| return template.samePrefixAs(entry.ls.stream); |
| }); |
| |
| if (sharedPrefix) { |
| this.logSorter = prefixIndexLogSorter; |
| } else { |
| this.logSorter = timestampLogSorter; |
| } |
| } |
| |
| split(): SplitLogProvider|null { |
| return null; |
| } |
| fetchedEndOfStream(): boolean { |
| return (!this.active.length); |
| } |
| |
| setStreamStatusCallback(cb: StreamStatusCallback) { |
| this.streamStatusCallback = cb; |
| } |
| |
| private statusChanged() { |
| if (this.streamStatusCallback) { |
| // Iterate through our composite stream statuses and pick the one that |
| // we want to report. |
| this.streamStatusCallback(this.streams.map((entry): LogStreamStatus => { |
| return entry.status; |
| })); |
| } |
| } |
| |
| getLogStreamUrl(): string|undefined { |
| // Return the first log stream viewer URL. IF we have a shared prefix, |
| // this will always work. Otherwise, returning something is better than |
| // nothing, so if any of the base streams have a URL, we will return it. |
| for (let s of this.streams) { |
| let url = s.ls.getLogStreamUrl(); |
| if (url) { |
| return url; |
| } |
| } |
| return undefined; |
| } |
| |
| /** |
| * Implements LogProvider.next |
| */ |
| async fetch(op: luci.Operation, _: Location) { |
| // If we're already are fetching the next buffer, this is an error. |
| if (this.currentNextPromise) { |
| throw new Error('In-progress next(), cannot start another.'); |
| } |
| |
| // Filter out any finished streams from our active list. A stream is |
| // finished if it is finished streaming and we don't have a retained |
| // buffer from it. |
| // |
| // This updates our "finished" property, since it's derived from the |
| // length of our active array. |
| this.active = this.active.filter((entry) => { |
| return ( |
| (!entry.buffer) || entry.buffer.peek() || |
| (!entry.ls.fetchedEndOfStream())); |
| }); |
| |
| if (!this.active.length) { |
| // No active streams, so we're finished. Permanently set our promise to |
| // the finished state. |
| return; |
| } |
| |
| let buffers: BufferedLogs[]; |
| this.currentNextPromise = this.ensureActiveBuffers(op); |
| try { |
| buffers = await this.currentNextPromise; |
| } finally { |
| this.currentNextPromise = null; |
| } |
| return this._aggregateBuffers(buffers); |
| } |
| |
| private async ensureActiveBuffers(op: luci.Operation): |
| Promise<BufferedLogs[]|null> { |
| // Fill all buffers for all active streams. This may result in an RPC to |
| // load new buffer content for streams whose buffers are empty. |
| await Promise.all(this.active.map((entry) => entry.ensure(op))); |
| |
| // Examine the error status of each stream. |
| // |
| // The error is interesting, since we must present a common error view to |
| // our caller. If all returned errors are "NOT_AUTHENTICATED", we will |
| // return a NOT_AUTHENTICATED. Otherwise, we will return a generic |
| // "streams failed" error. |
| // |
| // The outer Promise will pull logs for any streams that don't have any. |
| // On success, the "buffer" for the entry will be populated. On failure, |
| // an error will be returned. Because Promise.all fails fast, we will |
| // catch inner errors and return them as values (null if no error). |
| let buffers = new Array<BufferedLogs>(this.active.length); |
| let errors = new Array<Error>(); |
| this.active.forEach((entry, idx) => { |
| buffers[idx] = entry.buffer; |
| if (entry.lastError) { |
| errors.push(entry.lastError); |
| } |
| }); |
| |
| // We are done, and will return a value. |
| this.currentNextPromise = null; |
| if (errors.length) { |
| throw this._aggregateErrors(errors); |
| } |
| return buffers; |
| } |
| |
| private _aggregateErrors(errors: Error[]): Error { |
| let isNotAuthenticated = false; |
| errors.every((err) => { |
| if (!err) { |
| return true; |
| } |
| if (err === NOT_AUTHENTICATED) { |
| isNotAuthenticated = true; |
| return true; |
| } |
| isNotAuthenticated = false; |
| return false; |
| }); |
| return ( |
| (isNotAuthenticated) ? (NOT_AUTHENTICATED) : |
| new Error('Stream Error')); |
| } |
| |
| private _aggregateBuffers(buffers: BufferedLogs[]): BufferedLogs { |
| switch (buffers.length) { |
| case 0: |
| // No buffers, so no logs. |
| return new BufferedLogs(null); |
| case 1: |
| // As a special case, if we only have one buffer, and we assume that |
| // its entries are sorted, then that buffer is a return value. |
| return new BufferedLogs(buffers[0].getAll()); |
| default: |
| break; |
| } |
| |
| // Preload our peek array. |
| let peek = new Array<LogDog.LogEntry>(buffers.length); |
| peek.length = 0; |
| for (let buf of buffers) { |
| let le = buf.peek(); |
| if (!le) { |
| // One of our input buffers had no log entries. |
| return new BufferedLogs(null); |
| } |
| peek.push(le); |
| } |
| |
| // Assemble our aggregate buffer array. |
| // |
| // As we add log entries, latestAdded will be updated to point to the most |
| // recently added LogEntry. |
| let entries: LogDog.LogEntry[] = []; |
| let latestAdded: LogDog.LogEntry|null = null; |
| while (true) { |
| // Choose the next stream. |
| let earliest = 0; |
| for (let i = 1; i < buffers.length; i++) { |
| if (this.logSorter.before(peek[i], peek[earliest])) { |
| earliest = i; |
| } |
| } |
| |
| // Get the next log from the earliest stream. |
| let next = buffers[earliest].next(); |
| if (next) { |
| latestAdded = next; |
| entries.push(latestAdded); |
| } |
| |
| // Repopulate that buffer's "peek" value. If the buffer has no more |
| // entries, then we're done this round. |
| next = buffers[earliest].peek(); |
| if (!next) { |
| break; |
| } |
| peek[earliest] = next; |
| } |
| |
| // One or more of our buffers is exhausted. If we have the ability to load |
| // implicit next logs, try and extract more using that. |
| if (latestAdded && this.logSorter.implicitNext) { |
| while (true) { |
| latestAdded = this.logSorter.implicitNext(latestAdded, buffers); |
| if (!latestAdded) { |
| break; |
| } |
| entries.push(latestAdded); |
| } |
| } |
| return new BufferedLogs(entries); |
| } |
| } |
| |
| /** Internal namespace for AggregateLogStream types. */ |
| namespace AggregateLogStream { |
| /** Entry is an entry for a single log stream and its buffered logs. */ |
| export class Entry { |
| buffer = new BufferedLogs(null); |
| status: LogStreamStatus; |
| lastError: Error|null; |
| |
| constructor(readonly ls: LogStream) { |
| this.status = ls.getStreamStatus(); |
| } |
| |
| get active() { |
| return ( |
| (!this.buffer) || this.buffer.peek() || |
| this.ls.fetchedEndOfStream()); |
| } |
| |
| async ensure(op: luci.Operation) { |
| this.lastError = null; |
| if (this.buffer && this.buffer.peek()) { |
| return; |
| } |
| |
| try { |
| this.buffer = await this.ls.fetch(op, Location.HEAD); |
| } catch (e) { |
| // Log stream source of error. Raise a generic "failed to |
| // buffer" error. This will become a permanent failure. |
| console.error( |
| 'Error loading buffer for', this.ls.stream.fullName(), '(', |
| this.ls, '): ', e); |
| this.lastError = e; |
| } |
| } |
| }; |
| } |
| |
| /** |
| * A buffer of ordered log entries. |
| * |
| * Assumes total ownership of the input log buffer, which can be null to |
| * indicate no logs. |
| */ |
| class BufferedLogs { |
| private index = 0; |
| |
| constructor(private logs: LogDog.LogEntry[]|null) {} |
| |
| /** |
| * Peek returns the next log in the buffer without modifying the buffer. If |
| * there are no logs in the buffer, peek will return null. |
| */ |
| peek(): LogDog.LogEntry|null { |
| return (this.logs) ? (this.logs[this.index]) : (null); |
| } |
| |
| /** |
| * Returns a copy of the remaining logs in the buffer. |
| * If there are no logs, an empty array will be returned. |
| */ |
| peekAll(): LogDog.LogEntry[] { |
| return (this.logs || []).slice(0); |
| } |
| |
| /** |
| * GetAll returns all logs in the buffer. Afterwards, the buffer will be |
| * empty. |
| */ |
| getAll(): LogDog.LogEntry[] { |
| // Pop all logs. |
| let logs = this.logs; |
| this.logs = null; |
| return (logs || []); |
| } |
| |
| /** |
| * Next fetches the next log in the buffer, removing it from the buffer. If |
| * no more logs are available, it will return null. |
| */ |
| next(): LogDog.LogEntry|null { |
| if (!(this.logs && this.logs.length)) { |
| return null; |
| } |
| |
| // Get the next log and increment our index. |
| let log = this.logs[this.index++]; |
| if (this.index >= this.logs.length) { |
| this.logs = null; |
| } |
| return log; |
| } |
| } |
| } |