[logdog] Add normalize_segment function
R=iannucci
Bug: 1164585
Change-Id: Iee951970d32f5fd946f1e8328211c6a3ded04c29
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-py/+/2621986
Auto-Submit: Yiwei Zhang <yiwzhang@google.com>
Commit-Queue: Robbie Iannucci <iannucci@chromium.org>
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
NOKEYCHECK=True
GitOrigin-RevId: 90e54b5ca4760f29e78b4299b54cae62b378a995
diff --git a/streamname.py b/streamname.py
index a16a398..8e9bf33 100644
--- a/streamname.py
+++ b/streamname.py
@@ -9,9 +9,11 @@
# third_party/
from six.moves import urllib
-
+_STREAM_SEP = '/'
_ALNUM_CHARS = string.ascii_letters + string.digits
+_VALID_SEG_CHARS = _ALNUM_CHARS + ':_-.'
_SEGMENT_RE_BASE = r'[a-zA-Z0-9][a-zA-Z0-9:_\-.]*'
+_SEGMENT_RE = re.compile('^' + _SEGMENT_RE_BASE + '$')
_STREAM_NAME_RE = re.compile('^(' + _SEGMENT_RE_BASE + ')(/' +
_SEGMENT_RE_BASE + ')*$')
_MAX_STREAM_NAME_LENGTH = 4096
@@ -51,70 +53,68 @@
validate_stream_name(value, maxlen=_MAX_TAG_VALUE_LENGTH)
+def normalize_segment(seg, prefix=None):
+ """Given a string (str|unicode), mutate it into a valid segment name (str).
+
+ This operates by replacing invalid segment name characters with underscores
+ (_) when encountered.
+
+ A special case is when "seg" begins with non-alphanumeric character. In this
+ case, we will prefix it with the "prefix", if one is supplied. Otherwise,
+ raises ValueError.
+
+ See _VALID_SEG_CHARS for all valid characters for a segment.
+
+ Raises:
+ ValueError: If normalization could not be successfully performed.
+ """
+ if not seg:
+ if prefix is None:
+ raise ValueError('Cannot normalize empty segment with no prefix.')
+ seg = prefix
+ else:
+
+ def replace_if_invalid(ch, first=False):
+ ret = ch if ch in _VALID_SEG_CHARS else '_'
+ if first and ch not in _ALNUM_CHARS:
+ if prefix is None:
+ raise ValueError('Segment has invalid beginning, and no prefix was '
+ 'provided.')
+ return prefix + ret
+ return ret
+
+ seg = ''.join(replace_if_invalid(ch, i == 0) for i, ch in enumerate(seg))
+
+ if _SEGMENT_RE.match(seg) is None:
+ raise AssertionError('Normalized segment is still invalid: %r' % seg)
+
+ # v could be of type unicode. As a valid stream name contains only ascii
+ # characters, it is safe to transcode v to ascii encoding (become str type).
+ if isinstance(seg, unicode):
+ return seg.encode('ascii')
+ return seg
+
+
def normalize(v, prefix=None):
"""Given a string (str|unicode), mutate it into a valid stream name (str).
This operates by replacing invalid stream name characters with underscores (_)
when encountered.
- A special case is when "v" begins with an invalid character. In this case, we
- will replace it with the "prefix", if one is supplied.
+ A special case is when any segment of "v" begins with an non-alphanumeric
+ character. In this case, we will prefix the segment with the "prefix", if one
+ is supplied. Otherwise, raises ValueError.
See _STREAM_NAME_RE for a description of a valid stream name.
Raises:
ValueError: If normalization could not be successfully performed.
"""
- if not v:
- if not prefix:
- raise ValueError('Cannot normalize empty name with no prefix.')
- v = prefix
- else:
- out = []
- for i, ch in enumerate(v):
- # Either the first character in v, or immediately after /
- isFirst = i == 0 or out[-1][-1] == '/'
- if isFirst and not _is_valid_stream_char(ch, first=True):
- # The first letter is special, and must be alphanumeric.
- # If we have a prefix, prepend that to the resulting string.
- if prefix is None:
- raise ValueError('Name has invalid beginning, and no prefix was '
- 'provided.')
- out.append(prefix)
-
- if not _is_valid_stream_char(ch):
- ch = '_'
- out.append(ch)
- v = ''.join(out)
-
+ normalized = _STREAM_SEP.join(
+ normalize_segment(seg, prefix=prefix) for seg in v.split(_STREAM_SEP))
# Validate the resulting string.
- validate_stream_name(v)
- # v could be of type unicode. As a valid stream name contains only ascii
- # characters, it is safe to transcode v to ascii encoding (become str type).
- if isinstance(v, unicode):
- return v.encode('ascii')
- return v
-
-
-def _is_valid_stream_char(ch, first=False):
- """Returns (bool): True if a character is alphanumeric.
-
- The first character must be alphanumeric, matching [a-zA-Z0-9].
- Additional characters must either be alphanumeric or one of: (: _ - .).
-
- Args:
- ch (str): the character to evaluate.
- first (bool): if true, apply special first-character constraints.
- """
- # Alphanumeric check.
- if ch in _ALNUM_CHARS:
- return True
- if first:
- # The first character must be alphanumeric.
- return False
-
- # Check additional middle-name characters:
- return ch in ':_-./'
+ validate_stream_name(normalized)
+ return normalized
class StreamPath(collections.namedtuple('_StreamPath', ('prefix', 'name'))):
diff --git a/tests/streamname_test.py b/tests/streamname_test.py
index fd41549..d2800f0 100755
--- a/tests/streamname_test.py
+++ b/tests/streamname_test.py
@@ -47,23 +47,34 @@
raised = True
self.assertFalse(raised, "Stream name '%s' raised ValueError" % (name,))
- def testNormalize(self):
+ def testNormalizeSegment(self):
for name, normalized in (
('', 'PFX'),
- ('_invalid_start_char', 'PFX_invalid_start_char'),
- ('valid_stream_name.1:2-3', 'valid_stream_name.1:2-3'),
- ('some stream (with stuff)', 'some_stream__with_stuff_'),
- ('_invalid/st!ream/name entry', 'PFX_invalid/st_ream/name_entry'),
(' ', 'PFX_____'),
+ ('_invalid_start_char', 'PFX_invalid_start_char'),
+ ('valid_seg.1:2-3', 'valid_seg.1:2-3'),
+ ('some seg (with stuff)', 'some_seg__with_stuff_'),
+ # treat '/' as invalid
+ ('_invalid/se!g/entry', 'PFX_invalid_se_g_entry'),
+ ('/seg/', 'PFX_seg_'),
):
- self.assertEqual(streamname.normalize(name, prefix='PFX'), normalized)
+ self.assertEqual(
+ streamname.normalize_segment(name, prefix='PFX'), normalized)
# Assert that an empty stream name with no prefix will raise a ValueError.
- self.assertRaises(ValueError, streamname.normalize, '')
+ self.assertRaises(ValueError, streamname.normalize_segment, '')
# Assert that a stream name with an invalid starting character and no prefix
# will raise a ValueError.
- self.assertRaises(ValueError, streamname.normalize, '_invalid_start_char')
+ self.assertRaises(ValueError, streamname.normalize_segment,
+ '_invalid_start_char')
+
+ def testNormalize(self):
+ self.assertEqual(
+ streamname.normalize('valid/st:ream/na-me.',), 'valid/st:ream/na-me.')
+ self.assertEqual(
+ streamname.normalize('_invalid/st!ream/name entry', prefix='PFX'),
+ 'PFX_invalid/st_ream/name_entry')
class StreamPathTestCase(unittest.TestCase):