blob: a3e37c622716a555e91ca41b2eaa681cbe7e1ddd [file]
#!/usr/bin/env vpython3
# Copyright 2025 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
from __future__ import annotations
import test_env
from PB.turboci.graph.orchestrator.v1.write_nodes_request import WriteNodesRequest
from PB.turboci.graph.orchestrator.v1.check import Check
from PB.turboci.graph.orchestrator.v1.check_state import CheckState
from PB.turboci.graph.orchestrator.v1.dependencies import Dependencies
from PB.turboci.graph.orchestrator.v1.edge import RESOLUTION_SATISFIED, RESOLUTION_UNKNOWN, Edge
from PB.turboci.graph.orchestrator.v1.revision import Revision
from PB.turboci.graph.orchestrator.v1.stage import Stage
from recipe_engine.internal.turboci import edge
from recipe_engine import turboci
from recipe_engine.turboci import dep_group
class TestExtractDependencies(test_env.RecipeEngineUnitTest):
def test_ok(self):
deps = edge.extract_dependencies(
dep_group(
dep_group('a', 'b'),
dep_group('b', 'c'),
threshold=1,
))
self.assertEqual(
deps,
Dependencies(
edges=[
Edge(check=Edge.Check(identifier=turboci.check_id('a'))),
Edge(check=Edge.Check(identifier=turboci.check_id('b'))),
Edge(check=Edge.Check(identifier=turboci.check_id('c'))),
],
predicate=Dependencies.Group(
groups=[
Dependencies.Group(edges=[0, 1]),
Dependencies.Group(edges=[1, 2]),
],
threshold=1,
),
))
def test_check_cannot_depend_on_stage(self):
with self.assertRaises(turboci.InvalidArgumentException):
edge.extract_dependencies(dep_group(stages=['foo']))
def test_bad_thresholds(self):
with self.assertRaises(turboci.InvalidArgumentException):
dg = dep_group('foo')
dg.threshold = 2 # dep_group has its own check
edge.extract_dependencies(dg)
with self.assertRaises(turboci.InvalidArgumentException):
dg = dep_group('foo')
dg.threshold = -1 # dep_group has its own check
edge.extract_dependencies(dg)
def test_normalizes_threshold(self):
dg = (dep_group('foo'))
dg.threshold = 1
deps = edge.extract_dependencies(dg)
self.assertEqual(deps.predicate.threshold, 0)
def test_absorb(self):
deps = edge.extract_dependencies(
dep_group(
dep_group(dep_group(
'a',
'b',
),),
threshold=1,
))
self.assertEqual(
deps,
Dependencies(
edges=[
Edge(check=Edge.Check(identifier=turboci.check_id('a'))),
Edge(check=Edge.Check(identifier=turboci.check_id('b'))),
],
predicate=Dependencies.Group(edges=[0, 1],)))
def test_empty(self):
deps = edge.extract_dependencies(dep_group())
self.assertEqual(deps, Dependencies(predicate=Dependencies.Group(),))
def test_empty_subgroup(self):
with self.assertRaises(turboci.InvalidArgumentException):
edge.extract_dependencies(dep_group(dep_group(dep_group(),),))
class TestDependencyIndex(test_env.RecipeEngineUnitTest):
def setUp(self):
self.di = edge.DependencyIndex()
self.ccri = edge.CheckCondition(CheckState.CHECK_STATE_FINAL, "true")
self.nodes: dict[str, Check | Stage] = {}
self.now = Revision()
self.now.ts.seconds = 12345
return super().setUp()
def get_check(self, name: str) -> Check:
ident = turboci.check_id(name)
ident_str = turboci.from_id(ident)
cur = self.nodes.get(ident_str)
if cur:
assert isinstance(cur, Check)
return cur
cur = Check(identifier=ident, state='CHECK_STATE_PLANNING')
self.nodes[ident_str] = cur
return cur
def must_get_existing_check(self, ident_str: str) -> Check:
ret = self.nodes[ident_str]
if isinstance(ret, Check):
return ret
raise AssertionError(f'{ident_str!r} is not a Check: {type(ret).__name__}')
def set_dependencies(
self,
name: str,
deps: WriteNodesRequest.DependencyGroup,
*,
do_index: bool = True,
):
"""Writes dependencies for check `name`.
If `do_index` is True, calls ensure_conditions and index_predicate for the
check `name`. Uses node.state != PLANNING for mark_immutable. Then calls
index_node_write(node)."""
node = self.get_check(name)
node.dependencies.CopyFrom(edge.extract_dependencies(deps))
if do_index:
self.di.ensure_conditions(node.dependencies.edges, self.nodes.get)
self.di.index_predicate(
turboci.from_id(node.identifier),
node.dependencies,
self.now,
mark_immutable=node.state != CheckState.CHECK_STATE_PLANNING)
self.di.index_node_write(node)
def test_add_remove(self):
a = self.get_check('A')
a_id = turboci.from_id(a.identifier)
b = self.get_check('B')
b_id = turboci.from_id(b.identifier)
c = self.get_check('C')
c_id = turboci.from_id(c.identifier)
self.set_dependencies('A', dep_group('B', 'C'))
self.assertEqual(
self.di._data, {
a_id:
edge.DependencyIndex._Entry(edges_flat={b_id, c_id},),
b_id:
edge.DependencyIndex._Entry(
conditions={self.ccri: RESOLUTION_UNKNOWN},
dependents={a_id},
),
c_id:
edge.DependencyIndex._Entry(
conditions={self.ccri: RESOLUTION_UNKNOWN},
dependents={a_id},
),
})
self.set_dependencies('A', dep_group('B'))
self.assertEqual(
self.di._data, {
a_id:
edge.DependencyIndex._Entry(edges_flat={b_id},),
b_id:
edge.DependencyIndex._Entry(
conditions={self.ccri: RESOLUTION_UNKNOWN},
dependents={a_id},
),
c_id:
edge.DependencyIndex._Entry(conditions={self.ccri: RESOLUTION_UNKNOWN},),
})
def test_evaluate_conditions(self):
b = self.get_check('B')
b_id = turboci.from_id(b.identifier)
self.set_dependencies('A', dep_group('B'))
self.assertEqual(self.di._data[b_id].conditions, {
self.ccri: RESOLUTION_UNKNOWN,
})
self.di.index_node_write(b)
self.assertEqual(self.di._data[b_id].conditions, {self.ccri: RESOLUTION_UNKNOWN})
self.assertFalse(self.di.has_events())
b.state = CheckState.CHECK_STATE_FINAL
self.di.index_node_write(b) # resolves default conditions
self.assertEqual(self.di._data[b_id].conditions, {self.ccri: RESOLUTION_SATISFIED})
self.assertEqual(self.di._resolution_events, {
edge.DependencyIndex._ResolutionEvent(b_id, self.ccri, RESOLUTION_SATISFIED),
})
self.assertTrue(self.di.has_events())
self.di.index_node_write(b) # no-op
self.assertEqual(self.di._data[b_id].conditions, {self.ccri: RESOLUTION_SATISFIED})
self.assertEqual(self.di._resolution_events, {
edge.DependencyIndex._ResolutionEvent(b_id, self.ccri, RESOLUTION_SATISFIED),
})
def test_process_queue(self):
a = self.get_check('A')
a_id = turboci.from_id(a.identifier)
b = self.get_check('B')
a.state = CheckState.CHECK_STATE_PLANNED
self.set_dependencies('A', dep_group('B'))
b.state = CheckState.CHECK_STATE_FINAL
self.di.index_node_write(b)
write = self.di.process_queue(self.must_get_existing_check, self.now)
self.assertEqual(
write, {
a_id:
Dependencies(
edges=a.dependencies.edges,
predicate=a.dependencies.predicate,
resolution_events={
0:
Dependencies.ResolutionEvent(
version=self.now,
resolution=RESOLUTION_SATISFIED),
},
resolution=RESOLUTION_SATISFIED,
)
})
self.assertFalse(self.di.has_events())
# try adding a new check depending on 'B'
c = self.get_check('C')
c.state = CheckState.CHECK_STATE_PLANNED
self.set_dependencies('C', dep_group('B'))
self.assertEqual(
c.dependencies,
Dependencies(
edges=[Edge(check=Edge.Check(identifier=b.identifier))],
predicate=Dependencies.Group(edges=[0]),
resolution_events={
0:
Dependencies.ResolutionEvent(
version=self.now, resolution=RESOLUTION_SATISFIED),
},
resolution=RESOLUTION_SATISFIED,
))
def test_two_nodes(self):
a = self.get_check('A')
a_id = turboci.from_id(a.identifier)
b = self.get_check('B')
b_id = turboci.from_id(b.identifier)
a.state = CheckState.CHECK_STATE_PLANNED
self.set_dependencies('A', dep_group('B'))
self.assertEqual(
self.di._data, {
a_id:
edge.DependencyIndex._Entry(
edges_flat={b_id},
state=edge._DepsState.RECEIVING_EVENTS,
),
b_id:
edge.DependencyIndex._Entry(
conditions={self.ccri: RESOLUTION_UNKNOWN},
dependents={a_id},
),
})
# We have new conditions for B, but nothing is resolved yet, so don't need to
# propagate anything.
self.assertFalse(self.di.has_events())
# now resolve b
b.state = CheckState.CHECK_STATE_FINAL
self.set_dependencies('B', dep_group())
# we have an event waiting to process
self.assertEqual(
self.di._data, {
a_id:
edge.DependencyIndex._Entry(
edges_flat={b_id},
state=edge._DepsState.RECEIVING_EVENTS,
),
b_id:
edge.DependencyIndex._Entry(
conditions={self.ccri: RESOLUTION_SATISFIED},
dependents={a_id},
state=edge._DepsState.RECEIVING_EVENTS,
),
})
self.assertEqual(
self.di._resolution_events, {
edge.DependencyIndex._ResolutionEvent(
node_ident_str=b_id,
condition=self.ccri,
resolution=RESOLUTION_SATISFIED,
)
})
# When we process the queue, we'll see that `a` needs to be written
to_write = self.di.process_queue(self.must_get_existing_check, self.now)
self.assertEqual(
to_write, {
a_id:
Dependencies(
edges=[Edge(check=Edge.Check(identifier=b.identifier))],
predicate=Dependencies.Group(edges=[0]),
resolution_events={
0:
Dependencies.ResolutionEvent(
version=self.now, resolution=RESOLUTION_SATISFIED),
},
resolution=RESOLUTION_SATISFIED,
)
})
def test_one_nodes_no_deps(self):
a = self.get_check('A')
a_id = turboci.from_id(a.identifier)
self.set_dependencies('A', dep_group())
self.assertEqual(self.di._data, {
a_id: edge.DependencyIndex._Entry(),
})
# we can process the queue, but this won't do any propagation.
self.assertEqual(self.di.process_queue(self.must_get_existing_check, self.now), {})
self.assertFalse(self.di.has_events())
# now resolve a
a.state = CheckState.CHECK_STATE_FINAL
self.set_dependencies('A', dep_group())
self.assertEqual(a.dependencies.resolution, RESOLUTION_SATISFIED)
if __name__ == '__main__':
test_env.main()