[logdog] Add normalize_segment function

R=iannucci

Bug: 1164585
Change-Id: Iee951970d32f5fd946f1e8328211c6a3ded04c29
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-py/+/2621986
Auto-Submit: Yiwei Zhang <yiwzhang@google.com>
Commit-Queue: Robbie Iannucci <iannucci@chromium.org>
Reviewed-by: Robbie Iannucci <iannucci@chromium.org>
NOKEYCHECK=True
GitOrigin-RevId: 90e54b5ca4760f29e78b4299b54cae62b378a995
diff --git a/streamname.py b/streamname.py
index a16a398..8e9bf33 100644
--- a/streamname.py
+++ b/streamname.py
@@ -9,9 +9,11 @@
 # third_party/
 from six.moves import urllib
 
-
+_STREAM_SEP = '/'
 _ALNUM_CHARS = string.ascii_letters + string.digits
+_VALID_SEG_CHARS = _ALNUM_CHARS + ':_-.'
 _SEGMENT_RE_BASE = r'[a-zA-Z0-9][a-zA-Z0-9:_\-.]*'
+_SEGMENT_RE = re.compile('^' + _SEGMENT_RE_BASE + '$')
 _STREAM_NAME_RE = re.compile('^(' + _SEGMENT_RE_BASE + ')(/' +
                              _SEGMENT_RE_BASE + ')*$')
 _MAX_STREAM_NAME_LENGTH = 4096
@@ -51,70 +53,68 @@
   validate_stream_name(value, maxlen=_MAX_TAG_VALUE_LENGTH)
 
 
+def normalize_segment(seg, prefix=None):
+  """Given a string (str|unicode), mutate it into a valid segment name (str).
+
+  This operates by replacing invalid segment name characters with underscores
+  (_) when encountered.
+
+  A special case is when "seg" begins with non-alphanumeric character. In this
+  case, we will prefix it with the "prefix", if one is supplied. Otherwise,
+  raises ValueError.
+
+  See _VALID_SEG_CHARS for all valid characters for a segment.
+
+  Raises:
+    ValueError: If normalization could not be successfully performed.
+  """
+  if not seg:
+    if prefix is None:
+      raise ValueError('Cannot normalize empty segment with no prefix.')
+    seg = prefix
+  else:
+
+    def replace_if_invalid(ch, first=False):
+      ret = ch if ch in _VALID_SEG_CHARS else '_'
+      if first and ch not in _ALNUM_CHARS:
+        if prefix is None:
+          raise ValueError('Segment has invalid beginning, and no prefix was '
+                           'provided.')
+        return prefix + ret
+      return ret
+
+    seg = ''.join(replace_if_invalid(ch, i == 0) for i, ch in enumerate(seg))
+
+  if _SEGMENT_RE.match(seg) is None:
+    raise AssertionError('Normalized segment is still invalid: %r' % seg)
+
+  # v could be of type unicode. As a valid stream name contains only ascii
+  # characters, it is safe to transcode v to ascii encoding (become str type).
+  if isinstance(seg, unicode):
+    return seg.encode('ascii')
+  return seg
+
+
 def normalize(v, prefix=None):
   """Given a string (str|unicode), mutate it into a valid stream name (str).
 
   This operates by replacing invalid stream name characters with underscores (_)
   when encountered.
 
-  A special case is when "v" begins with an invalid character. In this case, we
-  will replace it with the "prefix", if one is supplied.
+  A special case is when any segment of "v" begins with an non-alphanumeric
+  character. In this case, we will prefix the segment with the "prefix", if one
+  is supplied. Otherwise, raises ValueError.
 
   See _STREAM_NAME_RE for a description of a valid stream name.
 
   Raises:
     ValueError: If normalization could not be successfully performed.
   """
-  if not v:
-    if not prefix:
-      raise ValueError('Cannot normalize empty name with no prefix.')
-    v = prefix
-  else:
-    out = []
-    for i, ch in enumerate(v):
-      # Either the first character in v, or immediately after /
-      isFirst = i == 0 or out[-1][-1] == '/'
-      if isFirst and not _is_valid_stream_char(ch, first=True):
-        # The first letter is special, and must be alphanumeric.
-        # If we have a prefix, prepend that to the resulting string.
-        if prefix is None:
-          raise ValueError('Name has invalid beginning, and no prefix was '
-                           'provided.')
-        out.append(prefix)
-
-      if not _is_valid_stream_char(ch):
-        ch = '_'
-      out.append(ch)
-    v = ''.join(out)
-
+  normalized = _STREAM_SEP.join(
+      normalize_segment(seg, prefix=prefix) for seg in v.split(_STREAM_SEP))
   # Validate the resulting string.
-  validate_stream_name(v)
-  # v could be of type unicode. As a valid stream name contains only ascii
-  # characters, it is safe to transcode v to ascii encoding (become str type).
-  if isinstance(v, unicode):
-    return v.encode('ascii')
-  return v
-
-
-def _is_valid_stream_char(ch, first=False):
-  """Returns (bool): True if a character is alphanumeric.
-
-  The first character must be alphanumeric, matching [a-zA-Z0-9].
-  Additional characters must either be alphanumeric or one of: (: _ - .).
-
-  Args:
-    ch (str): the character to evaluate.
-    first (bool): if true, apply special first-character constraints.
-  """
-  # Alphanumeric check.
-  if ch in _ALNUM_CHARS:
-    return True
-  if first:
-    # The first character must be alphanumeric.
-    return False
-
-  # Check additional middle-name characters:
-  return ch in ':_-./'
+  validate_stream_name(normalized)
+  return normalized
 
 
 class StreamPath(collections.namedtuple('_StreamPath', ('prefix', 'name'))):
diff --git a/tests/streamname_test.py b/tests/streamname_test.py
index fd41549..d2800f0 100755
--- a/tests/streamname_test.py
+++ b/tests/streamname_test.py
@@ -47,23 +47,34 @@
         raised = True
       self.assertFalse(raised, "Stream name '%s' raised ValueError" % (name,))
 
-  def testNormalize(self):
+  def testNormalizeSegment(self):
     for name, normalized in (
         ('', 'PFX'),
-        ('_invalid_start_char', 'PFX_invalid_start_char'),
-        ('valid_stream_name.1:2-3', 'valid_stream_name.1:2-3'),
-        ('some stream (with stuff)', 'some_stream__with_stuff_'),
-        ('_invalid/st!ream/name entry', 'PFX_invalid/st_ream/name_entry'),
         ('     ', 'PFX_____'),
+        ('_invalid_start_char', 'PFX_invalid_start_char'),
+        ('valid_seg.1:2-3', 'valid_seg.1:2-3'),
+        ('some seg (with stuff)', 'some_seg__with_stuff_'),
+        # treat '/' as invalid
+        ('_invalid/se!g/entry', 'PFX_invalid_se_g_entry'),
+        ('/seg/', 'PFX_seg_'),
     ):
-      self.assertEqual(streamname.normalize(name, prefix='PFX'), normalized)
+      self.assertEqual(
+          streamname.normalize_segment(name, prefix='PFX'), normalized)
 
     # Assert that an empty stream name with no prefix will raise a ValueError.
-    self.assertRaises(ValueError, streamname.normalize, '')
+    self.assertRaises(ValueError, streamname.normalize_segment, '')
 
     # Assert that a stream name with an invalid starting character and no prefix
     # will raise a ValueError.
-    self.assertRaises(ValueError, streamname.normalize, '_invalid_start_char')
+    self.assertRaises(ValueError, streamname.normalize_segment,
+                      '_invalid_start_char')
+
+  def testNormalize(self):
+    self.assertEqual(
+        streamname.normalize('valid/st:ream/na-me.',), 'valid/st:ream/na-me.')
+    self.assertEqual(
+        streamname.normalize('_invalid/st!ream/name entry', prefix='PFX'),
+        'PFX_invalid/st_ream/name_entry')
 
 
 class StreamPathTestCase(unittest.TestCase):