gh-68552: fix defects policy (#138579)
Extend defect handling via policy to a couple of missed defects.
---------
Co-authored-by: Martin Panter <vadmium@users.noreply.github.com>
Co-authored-by: Ivo Bellin Salarin <ivo@nilleb.com>
diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py
index 6479b9b..ae8ef32 100644
--- a/Lib/email/feedparser.py
+++ b/Lib/email/feedparser.py
@@ -504,10 +504,9 @@ def _parse_headers(self, lines):
self._input.unreadline(line)
return
else:
- # Weirdly placed unix-from line. Note this as a defect
- # and ignore it.
+ # Weirdly placed unix-from line.
defect = errors.MisplacedEnvelopeHeaderDefect(line)
- self._cur.defects.append(defect)
+ self.policy.handle_defect(self._cur, defect)
continue
# Split the line on the colon separating field name from value.
# There will always be a colon, because if there wasn't the part of
@@ -519,7 +518,7 @@ def _parse_headers(self, lines):
# message. Track the error but keep going.
if i == 0:
defect = errors.InvalidHeaderDefect("Missing header name.")
- self._cur.defects.append(defect)
+ self.policy.handle_defect(self._cur, defect)
continue
assert i>0, "_parse_headers fed line with no : and no leading WS"
diff --git a/Lib/test/test_email/test_defect_handling.py b/Lib/test/test_email/test_defect_handling.py
index 44e76c8..acc4acc 100644
--- a/Lib/test/test_email/test_defect_handling.py
+++ b/Lib/test/test_email/test_defect_handling.py
@@ -126,12 +126,10 @@ def test_multipart_invalid_cte(self):
errors.InvalidMultipartContentTransferEncodingDefect)
def test_multipart_no_cte_no_defect(self):
- if self.raise_expected: return
msg = self._str_msg(self.multipart_msg.format(''))
self.assertEqual(len(self.get_defects(msg)), 0)
def test_multipart_valid_cte_no_defect(self):
- if self.raise_expected: return
for cte in ('7bit', '8bit', 'BINary'):
msg = self._str_msg(
self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
@@ -300,6 +298,47 @@ def test_missing_ending_boundary(self):
self.assertDefectsEqual(self.get_defects(msg),
[errors.CloseBoundaryNotFoundDefect])
+ def test_line_beginning_colon(self):
+ string = (
+ "Subject: Dummy subject\r\n: faulty header line\r\n\r\nbody\r\n"
+ )
+
+ with self._raise_point(errors.InvalidHeaderDefect):
+ msg = self._str_msg(string)
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertDefectsEqual(
+ self.get_defects(msg), [errors.InvalidHeaderDefect]
+ )
+
+ if msg:
+ self.assertEqual(msg.items(), [("Subject", "Dummy subject")])
+ self.assertEqual(msg.get_payload(), "body\r\n")
+
+ def test_misplaced_envelope(self):
+ string = (
+ "Subject: Dummy subject\r\nFrom wtf\r\nTo: abc\r\n\r\nbody\r\n"
+ )
+ with self._raise_point(errors.MisplacedEnvelopeHeaderDefect):
+ msg = self._str_msg(string)
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertDefectsEqual(
+ self.get_defects(msg), [errors.MisplacedEnvelopeHeaderDefect]
+ )
+
+ if msg:
+ headers = [("Subject", "Dummy subject"), ("To", "abc")]
+ self.assertEqual(msg.items(), headers)
+ self.assertEqual(msg.get_payload(), "body\r\n")
+
+
+
+class TestCompat32(TestDefectsBase, TestEmailBase):
+
+ policy = policy.compat32
+
+ def get_defects(self, obj):
+ return obj.defects
+
class TestDefectDetection(TestDefectsBase, TestEmailBase):
@@ -332,6 +371,9 @@ def _raise_point(self, defect):
with self.assertRaises(defect):
yield
+ def get_defects(self, obj):
+ return obj.defects
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index 4020f10..4e6c213 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -2263,70 +2263,6 @@ def test_parse_missing_minor_type(self):
eq(msg.get_content_maintype(), 'text')
eq(msg.get_content_subtype(), 'plain')
- # test_defect_handling
- def test_same_boundary_inner_outer(self):
- msg = self._msgobj('msg_15.txt')
- # XXX We can probably eventually do better
- inner = msg.get_payload(0)
- self.assertHasAttr(inner, 'defects')
- self.assertEqual(len(inner.defects), 1)
- self.assertIsInstance(inner.defects[0],
- errors.StartBoundaryNotFoundDefect)
-
- # test_defect_handling
- def test_multipart_no_boundary(self):
- msg = self._msgobj('msg_25.txt')
- self.assertIsInstance(msg.get_payload(), str)
- self.assertEqual(len(msg.defects), 2)
- self.assertIsInstance(msg.defects[0],
- errors.NoBoundaryInMultipartDefect)
- self.assertIsInstance(msg.defects[1],
- errors.MultipartInvariantViolationDefect)
-
- multipart_msg = textwrap.dedent("""\
- Date: Wed, 14 Nov 2007 12:56:23 GMT
- From: foo@bar.invalid
- To: foo@bar.invalid
- Subject: Content-Transfer-Encoding: base64 and multipart
- MIME-Version: 1.0
- Content-Type: multipart/mixed;
- boundary="===============3344438784458119861=="{}
-
- --===============3344438784458119861==
- Content-Type: text/plain
-
- Test message
-
- --===============3344438784458119861==
- Content-Type: application/octet-stream
- Content-Transfer-Encoding: base64
-
- YWJj
-
- --===============3344438784458119861==--
- """)
-
- # test_defect_handling
- def test_multipart_invalid_cte(self):
- msg = self._str_msg(
- self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
- self.assertEqual(len(msg.defects), 1)
- self.assertIsInstance(msg.defects[0],
- errors.InvalidMultipartContentTransferEncodingDefect)
-
- # test_defect_handling
- def test_multipart_no_cte_no_defect(self):
- msg = self._str_msg(self.multipart_msg.format(''))
- self.assertEqual(len(msg.defects), 0)
-
- # test_defect_handling
- def test_multipart_valid_cte_no_defect(self):
- for cte in ('7bit', '8bit', 'BINary'):
- msg = self._str_msg(
- self.multipart_msg.format(
- "\nContent-Transfer-Encoding: {}".format(cte)))
- self.assertEqual(len(msg.defects), 0)
-
# test_headerregistry.TestContentTypeHeader invalid_1 and invalid_2.
def test_invalid_content_type(self):
eq = self.assertEqual
@@ -2403,30 +2339,6 @@ def test_missing_start_boundary(self):
self.assertIsInstance(bad.defects[0],
errors.StartBoundaryNotFoundDefect)
- # test_defect_handling
- def test_first_line_is_continuation_header(self):
- eq = self.assertEqual
- m = ' Line 1\nSubject: test\n\nbody'
- msg = email.message_from_string(m)
- eq(msg.keys(), ['Subject'])
- eq(msg.get_payload(), 'body')
- eq(len(msg.defects), 1)
- self.assertDefectsEqual(msg.defects,
- [errors.FirstHeaderLineIsContinuationDefect])
- eq(msg.defects[0].line, ' Line 1\n')
-
- # test_defect_handling
- def test_missing_header_body_separator(self):
- # Our heuristic if we see a line that doesn't look like a header (no
- # leading whitespace but no ':') is to assume that the blank line that
- # separates the header from the body is missing, and to stop parsing
- # headers and start parsing the body.
- msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
- self.assertEqual(msg.keys(), ['Subject'])
- self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
- self.assertDefectsEqual(msg.defects,
- [errors.MissingHeaderBodySeparatorDefect])
-
def test_string_payload_with_extra_space_after_cte(self):
# https://github.com/python/cpython/issues/98188
cte = "base64 "
diff --git a/Misc/NEWS.d/next/Library/2025-12-04-09-22-31.gh-issue-68552.I_v-xB.rst b/Misc/NEWS.d/next/Library/2025-12-04-09-22-31.gh-issue-68552.I_v-xB.rst
new file mode 100644
index 0000000..bd3e53c
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-12-04-09-22-31.gh-issue-68552.I_v-xB.rst
@@ -0,0 +1 @@
+``MisplacedEnvelopeHeaderDefect`` and ``Missing header name`` defects are now correctly passed to the ``handle_defect`` method of ``policy`` in :class:`~email.parser.FeedParser`.