blob: 97af1bff095101f01bfb8b2a059acb68faf32635 [file] [log] [blame]
#!/usr/bin/env vpython
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import collections
import contextlib
import datetime
import json
import os
import shutil
import tempfile
import threading
import time
import unittest
import StringIO
import test_env
import libs.logdog.stream
import libs.logdog.varint
from google.protobuf import json_format as jsonpb
from recipe_engine import recipe_api
from recipe_engine import stream_logdog
from recipe_engine import annotations_pb2 as pb
@contextlib.contextmanager
def tempdir():
tdir = tempfile.mkdtemp(suffix='stream_logdog_test', dir=test_env.BASE_DIR)
try:
yield tdir
finally:
shutil.rmtree(tdir)
def _translate_annotation_datagram(dg):
"""Translate annotation datagram binary data into a Python dict modeled after
the JSONPB projection of the datagram.
This is chosen because it allows for easy idiomatic equality assertions in
test cases.
Args:
dg (str): The serialized annotation pb.Step datagram.
"""
msg = pb.Step()
msg.ParseFromString(dg)
return json.loads(jsonpb.MessageToJson(msg))
class _TestStreamClient(libs.logdog.stream.StreamClient):
"""A testing StreamClient that retains all data written to it."""
class Stream(object):
"""A file-like object that is explicitly aware of LogDog stream protocol."""
def __init__(self, stream_client):
self._client = stream_client
self._buf = StringIO.StringIO()
self._header = None
self._final_data = None
self._data_offset = None
def write(self, data):
self._buf.write(data)
self._attempt_registration()
def close(self):
# If we never parsed our header, register that we are incomplete.
if self._header is None:
self._client._register_incomplete(self)
self._final_data = self.data
self._buf.close()
@contextlib.contextmanager
def _read_from(self, offset):
# Seek to the specified offset.
self._buf.seek(offset, mode=0)
try:
yield self._buf
finally:
# Seek back to he end of the stream so future writes will append.
self._buf.seek(0, mode=2)
def _attempt_registration(self):
# Only need to register once.
if self._header is not None:
return
# Can we parse a full LogDog stream header?
#
# This means pulling:
# - The LogDog Butler header.
# - The header size varint.
# - The header JSON blob, which needs to be decoded.
with self._read_from(0) as fd:
# Read 'result' bytes.
magic_data = fd.read(len(libs.logdog.stream.BUTLER_MAGIC))
if len(magic_data) != len(libs.logdog.stream.BUTLER_MAGIC):
# Incomplete magic number, cannot complete registration.
return
count = len(magic_data)
try:
size, varint_count = libs.logdog.varint.read_uvarint(fd)
except ValueError:
# Incomplete varint, cannot complete registration.
return
count += varint_count
header_data = fd.read(size)
if len(header_data) != size:
# Incomplete header, cannot complete registration.
return
count += size
# Parse the header as JSON.
self._header = json.loads(header_data)
self._data_offset = count # (varint + header size)
self._client._register_stream(self, self._header)
@property
def data(self):
# If we have already cached our data (on close), return it directly.
if self._final_data is not None:
return self._final_data
# Load our data from our live buffer.
if self._data_offset is None:
# No header has been read, so there is no data.
return None
with self._read_from(self._data_offset) as fd:
return fd.read()
_StreamEntry = collections.namedtuple('_StreamEntry', (
's', 'type', 'content_type'))
_DATAGRAM_CONTENT_TRANSLATE = {
stream_logdog.ANNOTATION_CONTENT_TYPE: _translate_annotation_datagram,
}
def __init__(self):
super(_TestStreamClient, self).__init__()
self.streams = {}
self.incomplete = []
self.unregistered = {}
@classmethod
def _create(cls, value):
raise NotImplementedError('Instances must be created manually.')
def _connect_raw(self):
s = self.Stream(self)
self.unregistered[id(s)] = s
return s
def get(self, name):
se = self.streams[name]
data = se.s.data
if se.type == libs.logdog.stream.StreamParams.TEXT:
# Return text stream data as a list of lines. We use unicode because it
# fits in with the JSON dump from 'all_streams'.
return [unicode(l) for l in data.splitlines()]
elif se.type == libs.logdog.stream.StreamParams.BINARY:
raise NotImplementedError('No support for fetching binary stream data.')
elif se.type == libs.logdog.stream.StreamParams.DATAGRAM:
# Return datagram stream data as a list of datagrams.
sio = StringIO.StringIO(data)
datagrams = []
while sio.tell() < sio.len:
size, _ = libs.logdog.varint.read_uvarint(sio)
dg = sio.read(size)
if len(dg) != size:
raise ValueError('Incomplete datagram (%d != %d)' % (len(dg), size))
# If this datagram is a known type (e.g., protobuf), transform it into
# JSONPB.
translator = self._DATAGRAM_CONTENT_TRANSLATE.get(se.content_type)
if translator is not None:
dg = translator(dg)
datagrams.append(dg)
sio.close()
return dg
else:
raise ValueError('Unknown stream type [%s]' % (se.type,))
def all_streams(self):
return dict((name, self.get(name)) for name in self.streams.iterkeys())
@property
def stream_names(self):
return set(self.streams.iterkeys())
def _remove_from_unregistered(self, s):
if id(s) not in self.unregistered:
raise KeyError('Stream is not known to be unregistered.')
del(self.unregistered[id(s)])
def _register_stream(self, s, header):
name = header.get('name')
if name in self.streams:
raise KeyError('Duplicate stream [%s]' % (name,))
self._remove_from_unregistered(s)
self.streams[name] = self._StreamEntry(
s=s,
type=header['type'],
content_type=header.get('contentType'),
)
def _register_incomplete(self, s):
self._remove_from_unregistered(s)
self.incomplete.append(s)
class EnvironmentTest(unittest.TestCase):
"""Simple test to assert that _Environment, which is stubbed during our tests,
actually works."""
def testRealEnvironment(self):
stream_logdog._Environment.real()
class StreamEngineTest(unittest.TestCase):
def setUp(self):
self.client = _TestStreamClient()
self.now = datetime.datetime(2106, 6, 12, 1, 2, 3)
self.env = stream_logdog._Environment(
now_fn=lambda: self.now,
argv=[],
environ={},
cwd=None,
)
self.maxDiff = 1024*1024
@contextlib.contextmanager
def _new_stream_engine(self, **kwargs):
kwargs.setdefault('client', self.client)
kwargs.setdefault('environment', self.env)
# Initialize and open a StreamEngine.
se = stream_logdog.StreamEngine(**kwargs)
se.open()
yield se
# Close the StreamEngine after we're done with it.
self._advance_time()
se.close()
@contextlib.contextmanager
def _step_stream(self, se, **kwargs):
# Initialize and yield a new step stream.
self._advance_time()
step_stream = se.new_step_stream(recipe_api.StepClient.StepConfig(**kwargs))
yield step_stream
# Close the step stream when we're done with it.
self._advance_time()
step_stream.close()
@contextlib.contextmanager
def _log_stream(self, step_stream, name):
# Initialize and yield a new log stream.
log_stream = step_stream.new_log_stream(name)
yield log_stream
# Close the log stream when we're done with it.
log_stream.close()
def _advance_time(self):
self.now += datetime.timedelta(seconds=1)
def testEmptyStreamEngine(self):
self.env.argv = ['fake_program', 'arg0', 'arg1']
self.env.environ['foo'] = 'bar'
self.env.cwd = 'CWD'
with self._new_stream_engine() as se:
pass
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:03Z',
u'ended': u'2106-06-12T01:02:04Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
},
})
def testIncrementalUpdates(self):
self.env.argv = ['fake_program', 'arg0', 'arg1']
self.env.environ['foo'] = 'bar'
self.env.cwd = 'CWD'
# Create a StreamEngine with an update interval that will trigger each time
# _advance_time is called.
with self._new_stream_engine(
update_interval=datetime.timedelta(seconds=1)) as se:
# Initial stream state (no steps).
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'started': u'2106-06-12T01:02:03Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
},
})
with self._step_stream(se, name='foo') as st:
pass
# Stream state (foo).
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'started': u'2106-06-12T01:02:03Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
u'substep': [
{u'step': {
u'name': u'foo',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:04Z',
u'ended': u'2106-06-12T01:02:05Z',
}},
],
},
})
with self._step_stream(se, name='bar') as st:
pass
# Stream state (bar).
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'started': u'2106-06-12T01:02:03Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
u'substep': [
{u'step': {
u'name': u'foo',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:04Z',
u'ended': u'2106-06-12T01:02:05Z',
}},
{u'step': {
u'name': u'bar',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:06Z',
u'ended': u'2106-06-12T01:02:07Z',
}},
],
},
})
# Final stream state.
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:03Z',
u'ended': u'2106-06-12T01:02:08Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
u'substep': [
{u'step': {
u'name': u'foo',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:04Z',
u'ended': u'2106-06-12T01:02:05Z',
}},
{u'step': {
u'name': u'bar',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:06Z',
u'ended': u'2106-06-12T01:02:07Z',
}},
],
},
})
def testDumpFinalState(self):
self.env.argv = ['fake_program', 'arg0', 'arg1']
self.env.environ['foo'] = 'bar'
self.env.cwd = 'CWD'
# Create a StreamEngine with an update interval that will trigger each time
# _advance_time is called.
with tempdir() as tdir:
dump_path = os.path.join(tdir, 'dump.bin')
with self._new_stream_engine(
update_interval=datetime.timedelta(seconds=1),
dump_path=dump_path) as se:
# Initial stream state (no steps).
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'started': u'2106-06-12T01:02:03Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
},
})
with open(dump_path, 'rb') as fd:
step = _translate_annotation_datagram(fd.read())
self.assertEqual(step, {
u'name': u'steps',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:03Z',
u'ended': u'2106-06-12T01:02:04Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
})
def testBasicStream(self):
self.env.argv = ['fake_program', 'arg0', 'arg1']
self.env.environ['foo'] = 'bar'
self.env.cwd = 'CWD'
with self._new_stream_engine(name_base='test/base') as se:
with self._step_stream(se,
name='first step',
cmd=['first', 'step'],
cwd='FIRST_CWD') as step:
step.add_step_text('Sup')
step.add_step_text('Dawg?')
step.write_line('STDOUT for first step.')
step.write_line('(Another line)')
step.add_step_summary_text('Everything is great.')
step.add_step_link('example 1', 'http://example.com/1')
step.add_step_link('example 2', 'http://example.com/2')
step.set_step_status('SUCCESS')
with self._step_stream(se, name='second step') as step:
step.set_step_status('SUCCESS')
step.write_split('multiple\nlines\nof\ntext')
# Create two log streams with the same name to test indexing.
#
# Note that "log stream" is an invalid LogDog stream name, so this
# will also test normalization.
with self._log_stream(step, 'log stream') as ls:
ls.write_split('foo\nbar\nbaz\n')
with self._log_stream(step, 'log stream') as ls:
ls.write_split('qux\nquux\n')
# This is a different stream name, but will normalize to the same log
# stream name as 'second/step', so this will test that we disambiguate
# the log stream names.
with self._step_stream(se, name='second/step') as step:
pass
self.assertEqual(self.client.all_streams(), {
u'test/base/annotations': {
u'name': u'steps',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:03Z',
u'ended': u'2106-06-12T01:02:10Z',
u'command': {
u'commandLine': [u'fake_program', u'arg0', u'arg1'],
u'cwd': u'CWD',
u'environ': {u'foo': u'bar'},
},
u'substep': [
{u'step': {
u'name': u'first step',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:04Z',
u'ended': u'2106-06-12T01:02:05Z',
u'command': {
u'commandLine': [u'first', u'step'],
u'cwd': u'FIRST_CWD',
},
u'stdoutStream': {
u'name': u'test/base/steps/first_step/stdout',
},
u'text': [u'Everything is great.', u'Sup', u'Dawg?'],
u'otherLinks': [
{
u'label': u'example 1',
u'url': u'http://example.com/1',
},
{
u'label': u'example 2',
u'url': u'http://example.com/2',
},
],
}},
{u'step': {
u'name': u'second step',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:06Z',
u'ended': u'2106-06-12T01:02:07Z',
u'stdoutStream': {
u'name': u'test/base/steps/second_step/stdout',
},
u'otherLinks': [
{
u'label': u'log stream',
u'logdogStream': {
u'name': u'test/base/steps/second_step/logs/log_stream/0',
},
},
{
u'label': u'log stream',
u'logdogStream': {
u'name': u'test/base/steps/second_step/logs/log_stream/1',
},
},
],
}},
{u'step': {
u'name': u'second/step',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:08Z',
u'ended': u'2106-06-12T01:02:09Z',
}},
],
},
u'test/base/steps/first_step/stdout': [
u'STDOUT for first step.',
u'(Another line)',
],
u'test/base/steps/second_step/stdout': [
u'multiple',
u'lines',
u'of',
u'text',
],
u'test/base/steps/second_step/logs/log_stream/0': [
u'foo',
u'bar',
u'baz',
],
u'test/base/steps/second_step/logs/log_stream/1': [
u'qux',
u'quux',
],
})
def testWarningBasicStream(self):
with self._new_stream_engine() as se:
with self._step_stream(se, name='isuck') as step:
step.add_step_summary_text('Something went wrong.')
step.set_step_status('WARNING')
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:03Z',
u'ended': u'2106-06-12T01:02:06Z',
u'substep': [
{u'step': {
u'name': u'isuck',
u'status': u'SUCCESS',
u'failureDetails': {
u'text': u'Something went wrong.',
},
u'started': u'2106-06-12T01:02:04Z',
u'ended': u'2106-06-12T01:02:05Z',
u'text': [u'Something went wrong.'],
}},
],
},
})
def testFailedBasicStream(self):
with self._new_stream_engine() as se:
with self._step_stream(se, name='isuck') as step:
step.add_step_summary_text('Oops I failed.')
step.set_step_status('FAILURE')
with self._step_stream(se, name='irock') as step:
pass
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'status': u'FAILURE',
u'started': u'2106-06-12T01:02:03Z',
u'ended': u'2106-06-12T01:02:08Z',
u'substep': [
{u'step': {
u'name': u'isuck',
u'status': u'FAILURE',
u'failureDetails': {
u'text': u'Oops I failed.',
},
u'started': u'2106-06-12T01:02:04Z',
u'ended': u'2106-06-12T01:02:05Z',
u'text': [u'Oops I failed.'],
}},
{u'step': {
u'name': u'irock',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:06Z',
u'ended': u'2106-06-12T01:02:07Z',
}},
],
},
})
def testNestedStream(self):
with self._new_stream_engine() as se:
# parent
with self._step_stream(se, name='parent') as step:
step.write_line('I am the parent.')
# parent."child 1"
with self._step_stream(se,
name='child 1',
nest_level=1) as step:
step.write_line('I am child #1.')
# parent."child 1"."grandchild"
with self._step_stream(se,
name='grandchild',
nest_level=2) as step:
step.write_line("I am child #1's child.")
# parent."child 2". Mark this child as failed. This should not propagate
# to the parent, since it has an explicit status.
with self._step_stream(se,
name='child 2',
nest_level=1) as step:
step.write_line('I am child #2.')
# parent."child 2". Mark this child as failed. This should not propagate
# to the parent, since it has an explicit status.
with self._step_stream(se, name='friend') as step:
step.write_line("I am the parent's friend.")
self.assertEqual(self.client.all_streams(), {
u'annotations': {
u'name': u'steps',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:03Z',
u'ended': u'2106-06-12T01:02:14Z',
u'substep': [
{u'step': {
u'name': u'parent',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:04Z',
u'ended': u'2106-06-12T01:02:05Z',
u'stdoutStream': {
u'name': u'steps/parent/stdout',
},
u'substep': [
{u'step': {
u'name': u'child 1',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:06Z',
u'ended': u'2106-06-12T01:02:07Z',
u'stdoutStream': {
u'name': u'steps/parent/steps/child_1/stdout',
},
u'substep': [
{u'step': {
u'name': u'grandchild',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:08Z',
u'ended': u'2106-06-12T01:02:09Z',
u'stdoutStream': {
u'name': u'steps/parent/steps/child_1/'
'steps/grandchild/stdout',
},
}},
],
}},
{u'step': {
u'name': u'child 2',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:10Z',
u'ended': u'2106-06-12T01:02:11Z',
u'stdoutStream': {
u'name': u'steps/parent/steps/child_2/stdout',
},
}},
],
}},
{u'step': {
u'name': u'friend',
u'status': u'SUCCESS',
u'started': u'2106-06-12T01:02:12Z',
u'ended': u'2106-06-12T01:02:13Z',
u'stdoutStream': {
u'name': u'steps/friend/stdout',
},
}},
],
},
u'steps/parent/stdout': [u'I am the parent.'],
u'steps/parent/steps/child_1/stdout': [u'I am child #1.'],
u'steps/parent/steps/child_1/steps/grandchild/stdout': [
u"I am child #1's child."],
u'steps/parent/steps/child_2/stdout': [u'I am child #2.'],
u'steps/friend/stdout': [u"I am the parent's friend."],
})
def testTriggersRaiseException(self):
with self._new_stream_engine() as se:
with self._step_stream(se, name='trigger') as step:
with self.assertRaises(NotImplementedError):
step.trigger('trigger spec')
def testTriggersIgnored(self):
with self._new_stream_engine(ignore_triggers=True) as se:
with self._step_stream(se, name='trigger') as step:
step.trigger('trigger spec')
def testNoSubannotations(self):
with self._new_stream_engine(ignore_triggers=True) as se:
with self.assertRaises(NotImplementedError):
se.new_step_stream(recipe_api.StepClient.StepConfig(
name='uses subannotations',
allow_subannotations=True,
))
def testInvalidStepStatusRaisesValueError(self):
with self._new_stream_engine() as se:
with self._step_stream(se, name='trigger') as step:
with self.assertRaises(ValueError):
step.set_step_status('OHAI')
class AnnotationMonitorTest(unittest.TestCase):
"""Tests the stream_logdog._AnnotationMonitor directly."""
# A small timedelta, sufficient to block but fast enough to not make the
# test slow.
_SMALL_TIME_DELTA = datetime.timedelta(milliseconds=5)
class _DatagramBuffer(object):
def __init__(self):
self.datagrams = []
self.data_event = threading.Event()
def send(self, dg):
self.datagrams.append(dg)
self.data_event.set()
def __len__(self):
return len(self.datagrams)
@property
def latest(self):
if self.datagrams:
return self.datagrams[-1]
return None
def wait_for_data(self):
self.data_event.wait()
self.data_event.clear()
return self.latest
def setUp(self):
self.db = self._DatagramBuffer()
self.now = datetime.datetime(2106, 6, 12, 1, 2, 3)
self.env = stream_logdog._Environment(
now_fn=lambda: self.now,
argv=[],
environ={},
cwd=None,
)
@contextlib.contextmanager
def _annotation_monitor(self, flush_period=None):
# Use a really high flush period. This should never naturally trigger during
# a test case.
flush_period = flush_period or datetime.timedelta(hours=1)
am = stream_logdog._AnnotationMonitor(self.env, self.db, flush_period)
try:
yield am
finally:
am.flush_and_join()
with am._lock:
# Assert that our timer has been shut down.
self.assertIsNone(am._flush_timer)
# Assert that there is no buffered data.
self.assertIsNone(am._current_data)
def testMonitorStartsAndJoinsWithNoData(self):
with self._annotation_monitor() as am:
pass
# No datagrams should have been sent.
self.assertIsNone(self.db.latest)
self.assertEqual(len(self.db.datagrams), 0)
def testMonitorBuffersAndSendsData(self):
with self._annotation_monitor() as am:
# The first piece of data should have been immediately sent.
am.signal_update('initial')
self.assertEqual(self.db.wait_for_data(), 'initial')
# Subsequent iterations should not send data, but should start the flush
# timer and buffer the latest data.
with am._lock:
self.assertIsNone(am._flush_timer)
for i in xrange(10):
am.signal_update('test%d' % (i,))
time.sleep(self._SMALL_TIME_DELTA.total_seconds())
with am._lock:
self.assertEqual(am._current_data, 'test9')
self.assertIsNotNone(am._flush_timer)
# Pretend the timer triggered. We should receive the latest buffered data.
am._flush_timer_expired()
self.assertEqual(self.db.wait_for_data(), 'test9')
with am._lock:
# No more timer or buffered data.
self.assertIsNone(am._flush_timer)
self.assertIsNone(am._current_data)
# Send one last chunk of data, but don't let the timer expire. This will
# be sent on final flush.
am.signal_update('final')
with am._lock:
self.assertIsNotNone(am._flush_timer)
# Assert that the final chunk of data was sent.
self.assertEqual(self.db.latest, 'final')
# Only three datagrams should have been sent.
self.assertEqual(len(self.db.datagrams), 3)
def testMonitorIgnoresDuplicateData(self):
with self._annotation_monitor() as am:
# Get initial data out of the way.
am.signal_update('initial')
self.assertEqual(self.db.wait_for_data(), 'initial')
# Send the same thing. It should not be buffered.
am.signal_update('initial')
with am._lock:
self.assertIsNone(am._flush_timer)
self.assertIsNone(am._current_data)
# Only one datagrams should have been sent.
self.assertEqual(len(self.db.datagrams), 1)
def testStructuralUpdateSendsImmediately(self):
with self._annotation_monitor() as am:
# Get initial data out of the way.
am.signal_update('initial')
self.assertEqual(self.db.wait_for_data(), 'initial')
# Send a structural update. It should send immediately.
am.signal_update('test', structural=True)
self.assertEqual(self.db.wait_for_data(), 'test')
# Send a duplicate structural update. It should be ignored.
am.signal_update('test', structural=True)
with am._lock:
self.assertIsNone(am._flush_timer)
self.assertIsNone(am._current_data)
# Only two datagrams should have been sent.
self.assertEqual(len(self.db.datagrams), 2)
def testFlushesPeriodically(self):
with self._annotation_monitor(flush_period=self._SMALL_TIME_DELTA) as am:
# Get initial data out of the way.
am.signal_update('initial')
self.assertEqual(self.db.wait_for_data(), 'initial')
# Send a structural update. It should send immediately.
am.signal_update('test')
self.assertEqual(self.db.wait_for_data(), 'test')
# Only two datagrams should have been sent.
self.assertEqual(len(self.db.datagrams), 2)
class AnnotationStateTest(unittest.TestCase):
"""Tests the stream_logdog._AnnotationState directly."""
def setUp(self):
self.env = stream_logdog._Environment(
None,
argv=['command', 'arg0', 'arg1'],
cwd='path/to/cwd',
environ={
'foo': 'bar',
'FOO': 'baz',
},
)
self.astate = stream_logdog._AnnotationState.create(
stream_logdog._StreamName('strean/name'),
environment=self.env,
properties={'foo': 'bar'},
)
def testFirstCheckReturnsData(self):
# The first check should return data.
self.assertIsNotNone(self.astate.check())
# The second will, since nothing has changed.
self.assertIsNone(self.astate.check())
def testCanCreateAndGetStep(self):
# Root step.
base = self.astate.base
self.astate.create_step(recipe_api.StepClient.StepConfig(name='first'))
self.assertEqual(len(base.substep), 1)
self.assertEqual(base.substep[0].step.name, 'first')
self.assertIsNotNone(self.astate.check())
# Child step.
self.astate.create_step(recipe_api.StepClient.StepConfig(
name='first child',
nest_level=1))
self.assertEqual(len(base.substep), 1)
self.assertEqual(len(base.substep[0].step.substep), 1)
self.assertEqual(base.substep[0].step.substep[0].step.name, 'first child')
self.assertIsNotNone(self.astate.check())
# Sibling step to 'first'.
self.astate.create_step(recipe_api.StepClient.StepConfig(name='second'))
self.assertEqual(len(base.substep), 2)
self.assertEqual(base.substep[1].step.name, 'second')
self.assertIsNotNone(self.astate.check())
def testCanUpdateProperties(self):
self.astate.update_properties(foo='baz', qux='quux')
self.assertEqual(list(self.astate.base.property), [
pb.Step.Property(name='foo', value='baz'),
pb.Step.Property(name='qux', value='quux'),
])
class StreamNameTest(unittest.TestCase):
"""Tests the stream_logdog._StreamName directly."""
def testEmptyStreamNameRaisesValueError(self):
sn = stream_logdog._StreamName(None)
with self.assertRaises(ValueError):
str(sn)
def testInvalidBaseRaisesValueError(self):
with self.assertRaises(ValueError):
stream_logdog._StreamName('!!! invalid !!!')
def testAppendComponents(self):
sn = stream_logdog._StreamName('base')
self.assertEqual(str(sn.append()), 'base')
self.assertEqual(str(sn.append('foo')), 'base/foo')
self.assertEqual(str(sn.append('foo', 'bar')), 'base/foo/bar')
self.assertEqual(str(sn.append('foo', 'bar/baz')), 'base/foo/bar_baz')
def testAugment(self):
sn = stream_logdog._StreamName('base')
self.assertEqual(str(sn.augment('')), 'base')
self.assertEqual(str(sn.augment('foo')), 'basefoo')
self.assertEqual(str(sn.augment('foo/bar baz')), 'basefoo_bar_baz')
def testAppendInvalidStreamNameNormalizes(self):
sn = stream_logdog._StreamName('base')
sn = sn.append('#!!! stream name !!!')
self.assertEqual(str(sn), 'base/s______stream_name____')
def testAugmentInvalidStreamNameNormalizes(self):
sn = stream_logdog._StreamName('base')
self.assertEqual(str(sn.augment(' !!! other !!! ')), 'base_____other_____')
if __name__ == '__main__':
unittest.main()