blob: b585328901159a956437c5b0a1658b26956094a4 [file] [log] [blame]
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
import apache_beam as beam
from apache_beam.testing import test_pipeline
from apache_beam.testing import util
from dataflow import cq_attempts as sanitize_cq_attempts
from dataflow import new_cq_attempts
def _beam_equals(expected, actual):
if expected != actual:
raise util.BeamAssertException(
'Expected {} to equal {}'.format(expected, actual))
class IntegrationTest(unittest.TestCase):
def setUp(self):
self.pipeline = test_pipeline.TestPipeline()
def construct_cq_events_pcol(self, failure_reason=None):
attempt_start_usec = 1000000
cq_name = 'test_cq'
issue = '1'
patchset = '1'
combine_class = sanitize_cq_attempts.CombineEventsToAttempt
actions = [
combine_class.ACTION_PATCH_START,
combine_class.ACTION_VERIFIER_CUSTOM_TRYBOTS,
combine_class.ACTION_PATCH_FAILED,
combine_class.ACTION_PATCH_FAILED,
combine_class.ACTION_PATCH_STOP,
]
event_basic = {
'attempt_start_usec': attempt_start_usec,
'cq_name': cq_name,
'issue': issue,
'patchset': patchset,
}
events = []
for i, action in enumerate(actions):
event = event_basic.copy()
event_failure_reason = (failure_reason if
action == combine_class.ACTION_PATCH_FAILED else None)
event.update({
'action': action,
'timestamp_millis': 1000 * (i + 1),
'failure_reason': event_failure_reason,
})
if action == combine_class.ACTION_VERIFIER_CUSTOM_TRYBOTS:
event['contributing_buildbucket_ids'] = [11, 12]
events.append(event)
return self.pipeline | beam.Create(events)
def construct_bb_entries_pcol(self, builder_name, status):
bb_entries = [{'id':11, 'builder': builder_name, 'status':status}]
return self.pipeline | 'Construct BuildBucket entries' >> beam.Create(
bb_entries)
# One CQ attempt, with matching BuildBucket entries that all pass.
def test_basic_pass(self):
cq_events_pcol = self.construct_cq_events_pcol()
bb_entries_pcol = self.construct_bb_entries_pcol('random_builder',
'SUCCESS')
results = new_cq_attempts.process_input(cq_events_pcol, bb_entries_pcol)
# There should be exactly 1 CQ attempt, with no fail type.
def expectation_checker(cq_attempts):
_beam_equals(len(cq_attempts), 1)
_beam_equals(cq_attempts[0]['fail_type'], None)
util.assert_that(results, expectation_checker)
self.pipeline.run()
def test_basic_failure_random_builder(self):
failure_reason = {'fail_type': 'FAILED_JOBS'}
cq_events_pcol = self.construct_cq_events_pcol(failure_reason)
bb_entries_pcol = self.construct_bb_entries_pcol('random_builder',
'FAILURE')
results = new_cq_attempts.process_input(cq_events_pcol, bb_entries_pcol)
# There should be exactly 1 CQ attempt, with no fail type.
def expectation_checker(cq_attempts):
_beam_equals(len(cq_attempts), 1)
_beam_equals(cq_attempts[0]['fail_type'], 'FAILED_JOBS')
util.assert_that(results, expectation_checker)
self.pipeline.run()
def test_basic_failure_chromium_presubmit(self):
failure_reason = {'fail_type': 'FAILED_JOBS'}
cq_events_pcol = self.construct_cq_events_pcol(failure_reason)
bb_entries_pcol = self.construct_bb_entries_pcol(
'chromium_presubmit', 'FAILURE')
results = new_cq_attempts.process_input(cq_events_pcol, bb_entries_pcol)
# There should be exactly 1 CQ attempt, with no fail type.
def expectation_checker(cq_attempts):
_beam_equals(len(cq_attempts), 1)
_beam_equals(cq_attempts[0]['fail_type'], 'FAILED_PRESUBMIT_BOT')
util.assert_that(results, expectation_checker)
self.pipeline.run()
def test_two_failures(self):
failure_reason = {'fail_type': 'FAILED_JOBS'}
cq_events_pcol = self.construct_cq_events_pcol(failure_reason)
bb_entries = [
{'id':11, 'builder': 'chromium_presubmit', 'status':'FAILURE'},
{'id':12, 'builder': 'win7-rel', 'status':'INFRA_FAILURE'},
]
bb_entries_pcol = (self.pipeline | 'Construct BuildBucket entries' >>
beam.Create(bb_entries))
results = new_cq_attempts.process_input(cq_events_pcol, bb_entries_pcol)
# There should be exactly 1 CQ attempt, with no fail type.
def expectation_checker(cq_attempts):
_beam_equals(len(cq_attempts), 1)
_beam_equals(cq_attempts[0]['fail_type'], 'FAILED_JOBS')
util.assert_that(results, expectation_checker)
self.pipeline.run()
def test_cq_attempt_no_bb_entries(self):
cq_events_pcol = self.construct_cq_events_pcol()
bb_entries_pcol = (
self.pipeline | 'Construct 0 BuildBucket entries' >> beam.Create([]))
results = new_cq_attempts.process_input(cq_events_pcol, bb_entries_pcol)
# There should be exactly 1 CQ attempt, with no fail type.
def expectation_checker(cq_attempts):
_beam_equals(len(cq_attempts), 1)
cq_attempt = cq_attempts[0]
_beam_equals(cq_attempt['fail_type'], None)
_beam_equals(cq_attempt['issue'], '1')
util.assert_that(results, expectation_checker)
self.pipeline.run()
if __name__ == '__main__':
unittest.main()