[RPP] Add FlowsHandler to parse flows in traces

A flow is a logic connection between trace events. We display this
connection as arrows between trace events belonging to the same flow.

In the trace event format, flows are represented with pairing "flow
phase" events. Each flow phase event corresponds to one trace event
and indicates the role a trace event plays in a flow (start, step or
end).

To parse flows, we first handle flow phase events, by creating unique
flows with the timestamps of each phase. Then, we place trace events
in the flow where their corresponding phase event was placed (if a
corresponding flow phase event does exist).

In a follow up CL, I'll add flows exported by the flows handler to the
InitiatorsHandler.

Bug: 381033695
Change-Id: Idf00526dc96614e3c83cc4bdced076e59b10d6fd
Reviewed-on: https://chromium-review.googlesource.com/c/devtools/devtools-frontend/+/6049061
Commit-Queue: Andres Olivares <andoli@chromium.org>
Reviewed-by: Jack Franklin <jacktfranklin@chromium.org>
diff --git a/config/gni/devtools_grd_files.gni b/config/gni/devtools_grd_files.gni
index 3c9e697..33354c6 100644
--- a/config/gni/devtools_grd_files.gni
+++ b/config/gni/devtools_grd_files.gni
@@ -1053,6 +1053,7 @@
   "front_end/models/trace/handlers/AnimationHandler.js",
   "front_end/models/trace/handlers/AuctionWorkletsHandler.js",
   "front_end/models/trace/handlers/ExtensionTraceDataHandler.js",
+  "front_end/models/trace/handlers/FlowsHandler.js",
   "front_end/models/trace/handlers/FramesHandler.js",
   "front_end/models/trace/handlers/GPUHandler.js",
   "front_end/models/trace/handlers/ImagePaintingHandler.js",
diff --git a/front_end/models/trace/handlers/BUILD.gn b/front_end/models/trace/handlers/BUILD.gn
index 63e937b..4f5b98c 100644
--- a/front_end/models/trace/handlers/BUILD.gn
+++ b/front_end/models/trace/handlers/BUILD.gn
@@ -12,6 +12,7 @@
     "AnimationHandler.ts",
     "AuctionWorkletsHandler.ts",
     "ExtensionTraceDataHandler.ts",
+    "FlowsHandler.ts",
     "FramesHandler.ts",
     "GPUHandler.ts",
     "ImagePaintingHandler.ts",
@@ -67,6 +68,7 @@
     "AnimationHandler.test.ts",
     "AuctionWorkletsHandler.test.ts",
     "ExtensionTraceDataHandler.test.ts",
+    "FlowsHandler.test.ts",
     "FramesHandler.test.ts",
     "GPUHandler.test.ts",
     "ImagePaintingHandler.test.ts",
diff --git a/front_end/models/trace/handlers/FlowsHandler.test.ts b/front_end/models/trace/handlers/FlowsHandler.test.ts
new file mode 100644
index 0000000..cdb30b2
--- /dev/null
+++ b/front_end/models/trace/handlers/FlowsHandler.test.ts
@@ -0,0 +1,185 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+import {
+  makeCompleteEvent,
+  makeFlowPhaseEvent,
+} from '../../../testing/TraceHelpers.js';
+import * as Trace from '../trace.js';
+
+async function getFlowsHandlerData(events: Trace.Types.Events.Event[]):
+    Promise<ReturnType<typeof Trace.Handlers.ModelHandlers.FlowsHandler.data>> {
+  Trace.Handlers.ModelHandlers.FlowsHandler.reset();
+  for (const event of events) {
+    Trace.Handlers.ModelHandlers.FlowsHandler.handleEvent(event);
+  }
+  await Trace.Handlers.ModelHandlers.FlowsHandler.finalize();
+  return Trace.Handlers.ModelHandlers.FlowsHandler.data();
+}
+const cat = 'mewtwo';
+const pid = 0;
+const tid = 0;
+
+describe('FlowsHandler', function() {
+  it('parses a flow correctly', async () => {
+    const events = [
+      // Trace events linked to flow phase events.
+      makeCompleteEvent('A', 0, 10, cat, pid, tid),
+      makeCompleteEvent('B', 10, 10, cat, pid, tid),
+      makeCompleteEvent('C', 20, 10, cat, pid, tid),
+
+      // Flow phase events
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 1, pid, tid),
+      makeFlowPhaseEvent('A', 10, cat, Trace.Types.Events.Phase.FLOW_STEP, 1, pid, tid),
+      makeFlowPhaseEvent('A', 20, cat, Trace.Types.Events.Phase.FLOW_END, 1, pid, tid),
+
+    ];
+
+    const {flows} = await getFlowsHandlerData(events);
+    assert.lengthOf(flows, 1);
+    assert.deepEqual(flows[0], events.slice(0, 3));
+  });
+  it('handles multiple flows with the same group token', async function() {
+    const events = [
+      // Flow 1 phase events
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 1, pid, tid),
+      makeFlowPhaseEvent('A', 10, cat, Trace.Types.Events.Phase.FLOW_STEP, 1, pid, tid),
+      makeFlowPhaseEvent('A', 20, cat, Trace.Types.Events.Phase.FLOW_END, 1, pid, tid),
+
+      // Flow 2 phase
+      makeFlowPhaseEvent('A', 30, cat, Trace.Types.Events.Phase.FLOW_START, 2, pid, tid),
+      makeFlowPhaseEvent('A', 40, cat, Trace.Types.Events.Phase.FLOW_STEP, 2, pid, tid),
+      makeFlowPhaseEvent('A', 50, cat, Trace.Types.Events.Phase.FLOW_END, 2, pid, tid),
+
+      // Flow 1 events
+      makeCompleteEvent('A', 0, 10, cat, pid, tid),
+      makeCompleteEvent('B', 10, 10, cat, pid, tid),
+      makeCompleteEvent('C', 20, 10, cat, pid, tid),
+
+      // Flow 2 events
+      makeCompleteEvent('A', 30, 10, cat, pid, tid),
+      makeCompleteEvent('B', 40, 10, cat, pid, tid),
+      makeCompleteEvent('C', 50, 10, cat, pid, tid),
+    ];
+
+    const {flows} = await getFlowsHandlerData(events);
+    assert.lengthOf(flows, 2);
+    assert.deepEqual(flows[0], events.slice(6, 9));
+    assert.deepEqual(flows[1], events.slice(9));
+  });
+
+  it('handles events belonging to multiple flows', async function() {
+    const events = [
+      // Flow 1 phase events
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 1, pid, tid),
+      makeFlowPhaseEvent('A', 10, cat, Trace.Types.Events.Phase.FLOW_END, 1, pid, tid),
+
+      // Flow 2 phase events
+      makeFlowPhaseEvent('B', 10, cat, Trace.Types.Events.Phase.FLOW_START, 2, pid, tid),
+      makeFlowPhaseEvent('B', 20, cat, Trace.Types.Events.Phase.FLOW_END, 2, pid, tid),
+
+      // Flow 1 & 2: A -> B, B -> C
+      makeCompleteEvent('A', 0, 10, cat, pid, tid),
+      makeCompleteEvent('B', 10, 10, cat, pid, tid),
+      makeCompleteEvent('C', 20, 10, cat, pid, tid),
+    ];
+
+    const {flows} = await getFlowsHandlerData(events);
+    assert.lengthOf(flows, 2);
+    // Flow 1: A -> B
+    assert.deepEqual(flows[0], events.slice(4, 6));
+    // Flow 2: B -> C
+    assert.deepEqual(flows[1], events.slice(5));
+  });
+  it('handles a flow connecting different threads', async function() {
+    const otherThread = 1;
+    const events = [
+      // Flow phase events. Flow events happen in different threads
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 1, pid, tid),
+      makeFlowPhaseEvent('A', 10, cat, Trace.Types.Events.Phase.FLOW_STEP, 1, pid, otherThread),
+      makeFlowPhaseEvent('A', 20, cat, Trace.Types.Events.Phase.FLOW_END, 1, pid, otherThread),
+
+      makeCompleteEvent('A', 0, 10, cat, pid, tid),
+      makeCompleteEvent('B', 10, 10, cat, pid, otherThread),
+      makeCompleteEvent('C', 20, 10, cat, pid, otherThread),
+    ];
+
+    const {flows} = await getFlowsHandlerData(events);
+    assert.lengthOf(flows, 1);
+    assert.deepEqual(flows[0], events.slice(3, 6));
+  });
+
+  it('does not link flow phase events to trace events if the thread ids are different', async function() {
+    const otherThread = 1;
+    const events = [
+      // Flow phase events. Flow events happen in different threads
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 1, pid, tid),
+      makeFlowPhaseEvent('A', 20, cat, Trace.Types.Events.Phase.FLOW_END, 1, pid, tid),
+
+      // Flow events (pid and tid combination differs from the one
+      // of the flow phase events, thus they should not be matched).
+      makeCompleteEvent('A', 0, 10, cat, pid, otherThread),
+      makeCompleteEvent('C', 20, 10, cat, pid, otherThread),
+    ];
+
+    const {flows} = await getFlowsHandlerData(events);
+    assert.lengthOf(flows, 0);
+  });
+
+  it('handles multiple flows with different group tokens', async function() {
+    const pidForFlow2 = 1;
+    const tidForFlow2 = 1;
+    const events = [
+      // Flow 1 phase events
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 1, pid, tid),
+      makeFlowPhaseEvent('A', 10, cat, Trace.Types.Events.Phase.FLOW_STEP, 1, pid, tid),
+      makeFlowPhaseEvent('A', 20, cat, Trace.Types.Events.Phase.FLOW_END, 1, pid, tid),
+
+      // Flow 2 phase events. Using different pid and tid to ensure
+      // the flow group token is different from flow 1.
+      // The overlapping timestamps should not cause issues identifying
+      // the events belonging to the two different flows.
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 2, pidForFlow2, tidForFlow2),
+      makeFlowPhaseEvent('A', 10, cat, Trace.Types.Events.Phase.FLOW_STEP, 2, pidForFlow2, tidForFlow2),
+      makeFlowPhaseEvent('A', 20, cat, Trace.Types.Events.Phase.FLOW_END, 2, pidForFlow2, tidForFlow2),
+
+      // Flow 1 events
+      makeCompleteEvent('A', 0, 10, cat, pid, tid),
+      makeCompleteEvent('B', 10, 10, cat, pid, tid),
+      makeCompleteEvent('C', 20, 10, cat, pid, tid),
+
+      // Flow 2 events
+      makeCompleteEvent('A', 0, 10, cat, pidForFlow2, tidForFlow2),
+      makeCompleteEvent('B', 10, 10, cat, pidForFlow2, tidForFlow2),
+      makeCompleteEvent('C', 20, 10, cat, pidForFlow2, tidForFlow2),
+    ];
+
+    const {flows} = await getFlowsHandlerData(events);
+    assert.lengthOf(flows, 2);
+    assert.deepEqual(flows[0], events.slice(6, 9));
+    assert.deepEqual(flows[1], events.slice(9));
+  });
+  it('ignores events with no corresponding flow phase event', async function() {
+    const events = [
+      // Flow 1 phase events
+      makeFlowPhaseEvent('A', 0, cat, Trace.Types.Events.Phase.FLOW_START, 1, pid, tid),
+      makeFlowPhaseEvent('A', 10, cat, Trace.Types.Events.Phase.FLOW_STEP, 1, pid, tid),
+      makeFlowPhaseEvent('A', 20, cat, Trace.Types.Events.Phase.FLOW_END, 1, pid, tid),
+
+      // Flow 1 events
+      makeCompleteEvent('A', 0, 10, cat, pid, tid),
+      makeCompleteEvent('B', 10, 10, cat, pid, tid),
+      makeCompleteEvent('C', 20, 10, cat, pid, tid),
+
+      // non-flow events.
+      makeCompleteEvent('A', 30, 10, cat, pid, tid),
+      makeCompleteEvent('B', 40, 10, cat, pid, tid),
+      makeCompleteEvent('C', 50, 10, cat, pid, tid),
+    ];
+
+    const {flows} = await getFlowsHandlerData(events);
+    assert.lengthOf(flows, 1);
+    assert.deepEqual(flows[0], events.slice(3, 6));
+  });
+});
diff --git a/front_end/models/trace/handlers/FlowsHandler.ts b/front_end/models/trace/handlers/FlowsHandler.ts
new file mode 100644
index 0000000..3817920
--- /dev/null
+++ b/front_end/models/trace/handlers/FlowsHandler.ts
@@ -0,0 +1,170 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+import * as Types from '../types/types.js';
+
+// A flow is a logic connection between trace events. We display this
+// connection as arrows between trace events belonging to the same flow.
+
+// In the trace event format, flows are represented with pairing "flow
+// phase" events. Each flow phase event corresponds to one trace event
+// and indicates the role a trace event plays in a flow (start, step or
+// end). For each flow, one `start` and one `end` phase events are
+// included, while the amount of `step` phase events can be >= 0.
+
+// A flow phase event is assigned to a trace event when their cat, tid,
+// pid and ts are equal (see @flowPhaseBindingTokenForEvent ).
+
+// It's possible for a single event to belong to multiple flows. In that
+// case, it will have multiple corresponding flow phase events (one
+// per flow).
+
+// To parse flows, we first handle flow phase events, by creating unique
+// flows with the timestamps of each phase. Then, we place trace events
+// in the flows where their corresponding phase events were placed (if
+// there are any corresponding flow phase events at all).
+const flowDataByGroupToken = new Map<string, {flowId: number, times: Types.Timing.MicroSeconds[]}>();
+const flowPhaseBindingTokenToFlowId = new Map<string, Set<number>>();
+const flowsById = new Map<number, {times: Types.Timing.MicroSeconds[], events: Types.Events.Event[]}>();
+const flowEvents: Types.Events.FlowEvent[] = [];
+const nonFlowEvents: Types.Events.Event[] = [];
+let flows: Types.Events.Event[][] = [];
+const ID_COMPONENT_SEPARATOR = '-$-';
+export function reset(): void {
+  flows = [];
+  flowEvents.length = 0;
+  nonFlowEvents.length = 0;
+  flowDataByGroupToken.clear();
+  flowPhaseBindingTokenToFlowId.clear();
+  flowsById.clear();
+}
+
+export function handleEvent(event: Types.Events.Event): void {
+  if (Types.Events.isFlowPhaseEvent(event)) {
+    flowEvents.push(event);
+    return;
+  }
+  nonFlowEvents.push(event);
+}
+
+function processNonFlowEvent(event: Types.Events.Event): void {
+  const flowPhaseBindingToken = flowPhaseBindingTokenForEvent(event);
+  const flowIds = flowPhaseBindingTokenToFlowId.get(flowPhaseBindingToken);
+  if (!flowIds) {
+    return;
+  }
+  for (const flowId of flowIds) {
+    const flow = flowsById.get(flowId);
+    if (!flow) {
+      continue;
+    }
+    const timeIndex = flow.times.indexOf(event.ts);
+    if (timeIndex < 0) {
+      continue;
+    }
+    flow.events[timeIndex] = event;
+  }
+}
+
+/**
+ * Creates unique flows by tracking flow phase events. A new created
+ * flow whenever a flow start phase event is detected.
+ * Subsequent flow phase events with the same group token are added to
+ * this flow until a flow end phase is detected.
+ */
+function processFlowEvent(flowPhaseEvent: Types.Events.FlowEvent): void {
+  const flowGroup = flowGroupTokenForFlowPhaseEvent(flowPhaseEvent);
+  switch (flowPhaseEvent.ph) {
+    case (Types.Events.Phase.FLOW_START): {
+      const flowMetadata = {flowId: flowPhaseEvent.id, times: [flowPhaseEvent.ts]};
+      flowDataByGroupToken.set(flowGroup, flowMetadata);
+      addFlowIdToFlowPhaseBinding(flowPhaseBindingTokenForEvent(flowPhaseEvent), flowMetadata.flowId);
+      return;
+    }
+    case (Types.Events.Phase.FLOW_STEP): {
+      const flow = flowDataByGroupToken.get(flowGroup);
+      if (!flow || flow.times.length < 0) {
+        // Found non-start flow event with no corresponding start flow,
+        // start event. Quietly ignore problematic event.
+        return;
+      }
+      addFlowIdToFlowPhaseBinding(flowPhaseBindingTokenForEvent(flowPhaseEvent), flow.flowId);
+      flow.times.push(flowPhaseEvent.ts);
+      return;
+    }
+    case (Types.Events.Phase.FLOW_END): {
+      const flow = flowDataByGroupToken.get(flowGroup);
+      if (!flow || flow.times.length < 0) {
+        // Found non-start flow event with no corresponding start flow,
+        // start event. Quietly ignore problematic event.
+        return;
+      }
+      addFlowIdToFlowPhaseBinding(flowPhaseBindingTokenForEvent(flowPhaseEvent), flow.flowId);
+      flow.times.push(flowPhaseEvent.ts);
+      flowsById.set(flow.flowId, {times: flow.times, events: []});
+      // We don't need this data anymore as the flow has been finished,
+      // so we can drop it.
+      flowDataByGroupToken.delete(flowGroup);
+    }
+  }
+}
+
+/**
+ * A single trace event can belong to multiple flows. This method
+ * tracks which flows (flowId) an event belongs to (given
+ * its flow phase binding token).
+ */
+function addFlowIdToFlowPhaseBinding(flowPhaseBinding: string, flowId: number): void {
+  let flowIds = flowPhaseBindingTokenToFlowId.get(flowPhaseBinding);
+  if (!flowIds) {
+    flowIds = new Set();
+  }
+  flowIds.add(flowId);
+  flowPhaseBindingTokenToFlowId.set(flowPhaseBinding, flowIds);
+}
+
+/**
+ * Returns a token to group flow phase events (start, step and end)
+ * belonging to the same flow. Flow phase events belonging to the same
+ * flow share category, thread id, process id and name.
+ *
+ * Note that other phase events of other flows can share these
+ * attributes too. For this reason, we group flow phase events in
+ * cycles. A cycle starts on a flow start phase event and finishes on a
+ * flow end phase event. For this reason, flow phase events need to be
+ * handled in timestamp order.
+ */
+function flowGroupTokenForFlowPhaseEvent(event: Types.Events.FlowEvent): string {
+  return `${event.cat}${ID_COMPONENT_SEPARATOR}${event.name}${ID_COMPONENT_SEPARATOR}${event.id}`;
+}
+
+/**
+ * A flow phase binding is a token that allows us to associate a flow
+ * phase event to its corresponding event. This association indicates
+ * what role a trace event plays in a flow.
+ * We can assign a trace event with a flow phase when its category,
+ * thread id, process id and timestamp matches those of a flow phase
+ * event.
+ */
+function flowPhaseBindingTokenForEvent(event: Types.Events.Event): string {
+  // This function is called many times (one per event) and creating a
+  // string every time can trigger GC. If this becomes a problem, a
+  // possible optimization is to use a multi-key map with the
+  // binding token components, a trade off between memory performance
+  // and readability.
+  return `${event.cat}${ID_COMPONENT_SEPARATOR}${event.tid}${ID_COMPONENT_SEPARATOR}${event.pid}${
+      ID_COMPONENT_SEPARATOR}${event.ts}`;
+}
+
+export async function finalize(): Promise<void> {
+  // Order is important: flow events need to be handled first.
+  flowEvents.forEach(processFlowEvent);
+  nonFlowEvents.forEach(processNonFlowEvent);
+  flows = [...flowsById.values()].map(f => f.events).filter(flow => flow.length > 1);
+}
+
+export function data(): {flows: Types.Events.Event[][]} {
+  return {
+    flows,
+  };
+}
diff --git a/front_end/models/trace/handlers/ModelHandlers.ts b/front_end/models/trace/handlers/ModelHandlers.ts
index 4d62e2f..eb5d19f 100644
--- a/front_end/models/trace/handlers/ModelHandlers.ts
+++ b/front_end/models/trace/handlers/ModelHandlers.ts
@@ -5,6 +5,7 @@
 export * as Animations from './AnimationHandler.js';
 export * as AuctionWorklets from './AuctionWorkletsHandler.js';
 export * as ExtensionTraceData from './ExtensionTraceDataHandler.js';
+export * as FlowsHandler from './FlowsHandler.js';
 export * as Frames from './FramesHandler.js';
 export * as GPU from './GPUHandler.js';
 export * as ImagePainting from './ImagePaintingHandler.js';
diff --git a/front_end/models/trace/types/TraceEvents.ts b/front_end/models/trace/types/TraceEvents.ts
index 4649932..acfbadb 100644
--- a/front_end/models/trace/types/TraceEvents.ts
+++ b/front_end/models/trace/types/TraceEvents.ts
@@ -2655,6 +2655,17 @@
   return false;
 }
 
+export interface FlowEvent extends Event {
+  // Contains a flow id created by perfetto for the flow this phase
+  // event belongs to.
+  id: number;
+  ph: Phase.FLOW_START|Phase.FLOW_END|Phase.FLOW_STEP;
+}
+
+export function isFlowPhaseEvent(event: Event): event is FlowEvent {
+  return event.ph === Phase.FLOW_START || event.ph === Phase.FLOW_STEP || event.ph === Phase.FLOW_END;
+}
+
 /**
  * This is an exhaustive list of events we track in the Performance
  * panel. Note not all of them are necessarliry shown in the flame
diff --git a/front_end/testing/TraceHelpers.ts b/front_end/testing/TraceHelpers.ts
index ea48157..27b34b7 100644
--- a/front_end/testing/TraceHelpers.ts
+++ b/front_end/testing/TraceHelpers.ts
@@ -276,6 +276,26 @@
   };
 }
 
+/**
+ * Builds a mock flow phase event.
+ */
+export function makeFlowPhaseEvent(
+    name: string, ts: number, cat: string = '*',
+    ph: Trace.Types.Events.Phase.FLOW_START|Trace.Types.Events.Phase.FLOW_END|Trace.Types.Events.Phase.FLOW_STEP,
+    id: number = 0, pid: number = 0, tid: number = 0): Trace.Types.Events.FlowEvent {
+  return {
+    args: {},
+    cat,
+    name,
+    id,
+    ph,
+    pid: Trace.Types.Events.ProcessID(pid),
+    tid: Trace.Types.Events.ThreadID(tid),
+    ts: Trace.Types.Timing.MicroSeconds(ts),
+    dur: Trace.Types.Timing.MicroSeconds(0),
+  };
+}
+
 export function makeCompleteEventInMilliseconds(
     name: string, tsMillis: number, durMillis: number, cat: string = '*', pid: number = 0,
     tid: number = 0): Trace.Types.Events.Complete {
@@ -662,6 +682,9 @@
       workerSessionIdEvents: [],
       workerURLById: new Map(),
     },
+    FlowsHandler: {
+      flows: [],
+    },
     ...overrides,
   };
 }