mirror of
https://github.com/python/cpython.git
synced 2024-12-02 22:35:26 +08:00
#1672568: email now registers defects for base64 payload format errors.
Which also means that it is now producing *something* for any base64 payload, which is what leads to the couple of older test changes in test_email. This is a slightly backward incompatible behavior change, but the new behavior is so much more useful than the old (you can now *reliably* detect errors, and any program that was detecting errors by sniffing for a base64 return from get_payload(decode=True) and then doing its own error-recovery decode will just get the error-recovery decode right away). So this seems to me to be worth the small risk inherent in this behavior change. This patch also refactors the defect tests into a separate test file, since they are no longer just parser tests.
This commit is contained in:
parent
adbdcdbd95
commit
80e0aee95b
@ -96,3 +96,10 @@ this class is *not* an exception!
|
||||
this defect, its :meth:`is_multipart` method may return false even though its
|
||||
content type claims to be :mimetype:`multipart`.
|
||||
|
||||
* :class:`InvalidBase64PaddingDefect` -- When decoding a block of base64
|
||||
enocded bytes, the padding was not correct. Enough padding is added to
|
||||
perform the decode, but the resulting decoded bytes may be invalid.
|
||||
|
||||
* :class:`InvalidBase64CharactersDefect` -- When decoding a block of base64
|
||||
enocded bytes, characters outside the base64 alphebet were encountered.
|
||||
The characters are ignored, but the resulting decoded bytes may be invalid.
|
||||
|
@ -111,10 +111,14 @@ Here are the methods of the :class:`Message` class:
|
||||
header. When ``True`` and the message is not a multipart, the payload will
|
||||
be decoded if this header's value is ``quoted-printable`` or ``base64``.
|
||||
If some other encoding is used, or :mailheader:`Content-Transfer-Encoding`
|
||||
header is missing, or if the payload has bogus base64 data, the payload is
|
||||
header is missing, the payload is
|
||||
returned as-is (undecoded). In all cases the returned value is binary
|
||||
data. If the message is a multipart and the *decode* flag is ``True``,
|
||||
then ``None`` is returned.
|
||||
then ``None`` is returned. If the payload is base64 and it was not
|
||||
perfectly formed (missing padding, characters outside the base64
|
||||
alphabet), then an appropriate defect will be added to the message's
|
||||
defect property (:class:`~email.errors.InvalidBase64PaddingDefect` or
|
||||
:class:`~email.errors.InvalidBase64CharactersDefect`, respectively).
|
||||
|
||||
When *decode* is ``False`` (the default) the body is returned as a string
|
||||
without decoding the :mailheader:`Content-Transfer-Encoding`. However,
|
||||
|
@ -17,6 +17,7 @@ from email import utils
|
||||
from email import errors
|
||||
from email._policybase import compat32
|
||||
from email import charset as _charset
|
||||
from email._encoded_words import decode_b
|
||||
Charset = _charset.Charset
|
||||
|
||||
SEMISPACE = '; '
|
||||
@ -249,11 +250,12 @@ class Message:
|
||||
if cte == 'quoted-printable':
|
||||
return utils._qdecode(bpayload)
|
||||
elif cte == 'base64':
|
||||
try:
|
||||
return base64.b64decode(bpayload)
|
||||
except binascii.Error:
|
||||
# Incorrect padding
|
||||
return bpayload
|
||||
# XXX: this is a bit of a hack; decode_b should probably be factored
|
||||
# out somewhere, but I haven't figured out where yet.
|
||||
value, defects = decode_b(b''.join(bpayload.splitlines()))
|
||||
for defect in defects:
|
||||
self.policy.handle_defect(self, defect)
|
||||
return value
|
||||
elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
|
||||
in_file = BytesIO(bpayload)
|
||||
out_file = BytesIO()
|
||||
|
304
Lib/test/test_email/test_defect_handling.py
Normal file
304
Lib/test/test_email/test_defect_handling.py
Normal file
@ -0,0 +1,304 @@
|
||||
import textwrap
|
||||
import unittest
|
||||
from email._policybase import Compat32
|
||||
from email import errors
|
||||
from test.test_email import TestEmailBase
|
||||
|
||||
|
||||
class TestMessageDefectDetectionBase:
|
||||
|
||||
dup_boundary_msg = textwrap.dedent("""\
|
||||
Subject: XX
|
||||
From: xx@xx.dk
|
||||
To: XX
|
||||
Mime-version: 1.0
|
||||
Content-type: multipart/mixed;
|
||||
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: multipart/alternative;
|
||||
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: text/plain; charset="ISO-8859-1"
|
||||
Content-transfer-encoding: quoted-printable
|
||||
|
||||
text
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: text/html; charset="ISO-8859-1"
|
||||
Content-transfer-encoding: quoted-printable
|
||||
|
||||
<HTML></HTML>
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part--
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: image/gif; name="xx.gif";
|
||||
Content-disposition: attachment
|
||||
Content-transfer-encoding: base64
|
||||
|
||||
Some removed base64 encoded chars.
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part--
|
||||
|
||||
""")
|
||||
|
||||
def test_same_boundary_inner_outer(self):
|
||||
# XXX better would be to actually detect the duplicate.
|
||||
msg = self._str_msg(self.dup_boundary_msg)
|
||||
inner = msg.get_payload(0)
|
||||
self.assertTrue(hasattr(inner, 'defects'))
|
||||
self.assertEqual(len(self.get_defects(inner)), 1)
|
||||
self.assertTrue(isinstance(self.get_defects(inner)[0],
|
||||
errors.StartBoundaryNotFoundDefect))
|
||||
|
||||
def test_same_boundary_inner_outer_raises_on_defect(self):
|
||||
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
|
||||
self._str_msg(self.dup_boundary_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
no_boundary_msg = textwrap.dedent("""\
|
||||
Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
|
||||
From: foobar
|
||||
Subject: broken mail
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/report; report-type=delivery-status;
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com
|
||||
|
||||
One part
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com
|
||||
Content-Type: message/delivery-status
|
||||
|
||||
Header: Another part
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com--
|
||||
""")
|
||||
|
||||
def test_multipart_no_boundary(self):
|
||||
msg = self._str_msg(self.no_boundary_msg)
|
||||
self.assertTrue(isinstance(msg.get_payload(), str))
|
||||
self.assertEqual(len(self.get_defects(msg)), 2)
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[0],
|
||||
errors.NoBoundaryInMultipartDefect))
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[1],
|
||||
errors.MultipartInvariantViolationDefect))
|
||||
|
||||
def test_multipart_no_boundary_raise_on_defect(self):
|
||||
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
|
||||
self._str_msg(self.no_boundary_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
multipart_msg = textwrap.dedent("""\
|
||||
Date: Wed, 14 Nov 2007 12:56:23 GMT
|
||||
From: foo@bar.invalid
|
||||
To: foo@bar.invalid
|
||||
Subject: Content-Transfer-Encoding: base64 and multipart
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/mixed;
|
||||
boundary="===============3344438784458119861=="{}
|
||||
|
||||
--===============3344438784458119861==
|
||||
Content-Type: text/plain
|
||||
|
||||
Test message
|
||||
|
||||
--===============3344438784458119861==
|
||||
Content-Type: application/octet-stream
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
YWJj
|
||||
|
||||
--===============3344438784458119861==--
|
||||
""")
|
||||
|
||||
def test_multipart_invalid_cte(self):
|
||||
msg = self._str_msg(
|
||||
self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
|
||||
self.assertEqual(len(self.get_defects(msg)), 1)
|
||||
self.assertIsInstance(self.get_defects(msg)[0],
|
||||
errors.InvalidMultipartContentTransferEncodingDefect)
|
||||
|
||||
def test_multipart_invalid_cte_raise_on_defect(self):
|
||||
with self.assertRaises(
|
||||
errors.InvalidMultipartContentTransferEncodingDefect):
|
||||
self._str_msg(
|
||||
self.multipart_msg.format(
|
||||
"\nContent-Transfer-Encoding: base64"),
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
def test_multipart_no_cte_no_defect(self):
|
||||
msg = self._str_msg(self.multipart_msg.format(''))
|
||||
self.assertEqual(len(self.get_defects(msg)), 0)
|
||||
|
||||
def test_multipart_valid_cte_no_defect(self):
|
||||
for cte in ('7bit', '8bit', 'BINary'):
|
||||
msg = self._str_msg(
|
||||
self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
|
||||
self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
|
||||
|
||||
lying_multipart_msg = textwrap.dedent("""\
|
||||
From: "Allison Dunlap" <xxx@example.com>
|
||||
To: yyy@example.com
|
||||
Subject: 64423
|
||||
Date: Sun, 11 Jul 2004 16:09:27 -0300
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/alternative;
|
||||
|
||||
Blah blah blah
|
||||
""")
|
||||
|
||||
def test_lying_multipart(self):
|
||||
msg = self._str_msg(self.lying_multipart_msg)
|
||||
self.assertTrue(hasattr(msg, 'defects'))
|
||||
self.assertEqual(len(self.get_defects(msg)), 2)
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[0],
|
||||
errors.NoBoundaryInMultipartDefect))
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[1],
|
||||
errors.MultipartInvariantViolationDefect))
|
||||
|
||||
def test_lying_multipart_raise_on_defect(self):
|
||||
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
|
||||
self._str_msg(self.lying_multipart_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
missing_start_boundary_msg = textwrap.dedent("""\
|
||||
Content-Type: multipart/mixed; boundary="AAA"
|
||||
From: Mail Delivery Subsystem <xxx@example.com>
|
||||
To: yyy@example.com
|
||||
|
||||
--AAA
|
||||
|
||||
Stuff
|
||||
|
||||
--AAA
|
||||
Content-Type: message/rfc822
|
||||
|
||||
From: webmaster@python.org
|
||||
To: zzz@example.com
|
||||
Content-Type: multipart/mixed; boundary="BBB"
|
||||
|
||||
--BBB--
|
||||
|
||||
--AAA--
|
||||
|
||||
""")
|
||||
|
||||
def test_missing_start_boundary(self):
|
||||
# The message structure is:
|
||||
#
|
||||
# multipart/mixed
|
||||
# text/plain
|
||||
# message/rfc822
|
||||
# multipart/mixed [*]
|
||||
#
|
||||
# [*] This message is missing its start boundary
|
||||
outer = self._str_msg(self.missing_start_boundary_msg)
|
||||
bad = outer.get_payload(1).get_payload(0)
|
||||
self.assertEqual(len(self.get_defects(bad)), 1)
|
||||
self.assertTrue(isinstance(self.get_defects(bad)[0],
|
||||
errors.StartBoundaryNotFoundDefect))
|
||||
|
||||
def test_missing_start_boundary_raise_on_defect(self):
|
||||
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
|
||||
self._str_msg(self.missing_start_boundary_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
def test_first_line_is_continuation_header(self):
|
||||
msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
|
||||
self.assertEqual(msg.keys(), ['Subject'])
|
||||
self.assertEqual(msg.get_payload(), 'body')
|
||||
self.assertEqual(len(self.get_defects(msg)), 1)
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.FirstHeaderLineIsContinuationDefect])
|
||||
self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
|
||||
|
||||
def test_first_line_is_continuation_header_raise_on_defect(self):
|
||||
with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
|
||||
self._str_msg(' Line 1\nSubject: test\n\nbody\n',
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
def test_missing_header_body_separator(self):
|
||||
# Our heuristic if we see a line that doesn't look like a header (no
|
||||
# leading whitespace but no ':') is to assume that the blank line that
|
||||
# separates the header from the body is missing, and to stop parsing
|
||||
# headers and start parsing the body.
|
||||
msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
|
||||
self.assertEqual(msg.keys(), ['Subject'])
|
||||
self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.MissingHeaderBodySeparatorDefect])
|
||||
|
||||
def test_missing_header_body_separator_raise_on_defect(self):
|
||||
with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
|
||||
self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
badly_padded_base64_payload = textwrap.dedent("""\
|
||||
Subject: test
|
||||
MIME-Version: 1.0
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
dmk
|
||||
""")
|
||||
|
||||
def test_bad_padding_in_base64_payload(self):
|
||||
msg = self._str_msg(self.badly_padded_base64_payload)
|
||||
self.assertEqual(msg.get_payload(decode=True), b'vi')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.InvalidBase64PaddingDefect])
|
||||
|
||||
def test_bad_padding_in_base64_payload_raise_on_defect(self):
|
||||
msg = self._str_msg(self.badly_padded_base64_payload,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
with self.assertRaises(errors.InvalidBase64PaddingDefect):
|
||||
msg.get_payload(decode=True)
|
||||
|
||||
invalid_chars_in_base64_payload = textwrap.dedent("""\
|
||||
Subject: test
|
||||
MIME-Version: 1.0
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
dm\x01k===
|
||||
""")
|
||||
|
||||
def test_invalid_chars_in_base64_payload(self):
|
||||
msg = self._str_msg(self.invalid_chars_in_base64_payload)
|
||||
self.assertEqual(msg.get_payload(decode=True), b'vi')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.InvalidBase64CharactersDefect])
|
||||
|
||||
def test_invalid_chars_in_base64_payload_raise_on_defect(self):
|
||||
msg = self._str_msg(self.invalid_chars_in_base64_payload,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
with self.assertRaises(errors.InvalidBase64CharactersDefect):
|
||||
msg.get_payload(decode=True)
|
||||
|
||||
|
||||
class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
|
||||
|
||||
def get_defects(self, obj):
|
||||
return obj.defects
|
||||
|
||||
|
||||
class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
|
||||
TestEmailBase):
|
||||
|
||||
class CapturePolicy(Compat32):
|
||||
captured = None
|
||||
def register_defect(self, obj, defect):
|
||||
self.captured.append(defect)
|
||||
|
||||
def setUp(self):
|
||||
self.policy = self.CapturePolicy(captured=list())
|
||||
|
||||
def get_defects(self, obj):
|
||||
return self.policy.captured
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -513,6 +513,7 @@ class TestMessageAPI(TestEmailBase):
|
||||
eq(msg.values(), ['One Hundred', 'Twenty', 'Three', 'Eleven'])
|
||||
self.assertRaises(KeyError, msg.replace_header, 'Fourth', 'Missing')
|
||||
|
||||
# test_defect_handling:test_invalid_chars_in_base64_payload
|
||||
def test_broken_base64_payload(self):
|
||||
x = 'AwDp0P7//y6LwKEAcPa/6Q=9'
|
||||
msg = Message()
|
||||
@ -520,7 +521,10 @@ class TestMessageAPI(TestEmailBase):
|
||||
msg['content-transfer-encoding'] = 'base64'
|
||||
msg.set_payload(x)
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
bytes(x, 'raw-unicode-escape'))
|
||||
(b'\x03\x00\xe9\xd0\xfe\xff\xff.\x8b\xc0'
|
||||
b'\xa1\x00p\xf6\xbf\xe9\x0f'))
|
||||
self.assertIsInstance(msg.defects[0],
|
||||
errors.InvalidBase64CharactersDefect)
|
||||
|
||||
def test_broken_unicode_payload(self):
|
||||
# This test improves coverage but is not a compliance test.
|
||||
@ -1815,7 +1819,7 @@ class TestNonConformant(TestEmailBase):
|
||||
eq(msg.get_content_maintype(), 'text')
|
||||
eq(msg.get_content_subtype(), 'plain')
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_same_boundary_inner_outer(self):
|
||||
unless = self.assertTrue
|
||||
msg = self._msgobj('msg_15.txt')
|
||||
@ -1826,7 +1830,7 @@ class TestNonConformant(TestEmailBase):
|
||||
unless(isinstance(inner.defects[0],
|
||||
errors.StartBoundaryNotFoundDefect))
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_multipart_no_boundary(self):
|
||||
unless = self.assertTrue
|
||||
msg = self._msgobj('msg_25.txt')
|
||||
@ -1860,7 +1864,7 @@ class TestNonConformant(TestEmailBase):
|
||||
--===============3344438784458119861==--
|
||||
""")
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_multipart_invalid_cte(self):
|
||||
msg = self._str_msg(
|
||||
self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
|
||||
@ -1868,12 +1872,12 @@ class TestNonConformant(TestEmailBase):
|
||||
self.assertIsInstance(msg.defects[0],
|
||||
errors.InvalidMultipartContentTransferEncodingDefect)
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_multipart_no_cte_no_defect(self):
|
||||
msg = self._str_msg(self.multipart_msg.format(''))
|
||||
self.assertEqual(len(msg.defects), 0)
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_multipart_valid_cte_no_defect(self):
|
||||
for cte in ('7bit', '8bit', 'BINary'):
|
||||
msg = self._str_msg(
|
||||
@ -1930,7 +1934,7 @@ Subject: here's something interesting
|
||||
counter to RFC 2822, there's no separating newline here
|
||||
""")
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_lying_multipart(self):
|
||||
unless = self.assertTrue
|
||||
msg = self._msgobj('msg_41.txt')
|
||||
@ -1941,7 +1945,7 @@ counter to RFC 2822, there's no separating newline here
|
||||
unless(isinstance(msg.defects[1],
|
||||
errors.MultipartInvariantViolationDefect))
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_missing_start_boundary(self):
|
||||
outer = self._msgobj('msg_42.txt')
|
||||
# The message structure is:
|
||||
@ -1957,7 +1961,7 @@ counter to RFC 2822, there's no separating newline here
|
||||
self.assertTrue(isinstance(bad.defects[0],
|
||||
errors.StartBoundaryNotFoundDefect))
|
||||
|
||||
# test_parser.TestMessageDefectDetectionBase
|
||||
# test_defect_handling
|
||||
def test_first_line_is_continuation_header(self):
|
||||
eq = self.assertEqual
|
||||
m = ' Line 1\nSubject: test\n\nbody'
|
||||
@ -3271,15 +3275,19 @@ class Test8BitBytesHandling(unittest.TestCase):
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
'pöstál\n'.encode('utf-8'))
|
||||
|
||||
# test_defect_handling:test_invalid_chars_in_base64_payload
|
||||
def test_8bit_in_base64_body(self):
|
||||
# Sticking an 8bit byte in a base64 block makes it undecodable by
|
||||
# normal means, so the block is returned undecoded, but as bytes.
|
||||
# If we get 8bit bytes in a base64 body, we can just ignore them
|
||||
# as being outside the base64 alphabet and decode anyway. But
|
||||
# we register a defect.
|
||||
m = self.bodytest_msg.format(charset='utf-8',
|
||||
cte='base64',
|
||||
bodyline='cMO2c3RhbAá=').encode('utf-8')
|
||||
msg = email.message_from_bytes(m)
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
'cMO2c3RhbAá=\n'.encode('utf-8'))
|
||||
'pöstal'.encode('utf-8'))
|
||||
self.assertIsInstance(msg.defects[0],
|
||||
errors.InvalidBase64CharactersDefect)
|
||||
|
||||
def test_8bit_in_uuencode_body(self):
|
||||
# Sticking an 8bit byte in a uuencode block makes it undecodable by
|
||||
|
@ -1,9 +1,6 @@
|
||||
import io
|
||||
import email
|
||||
import textwrap
|
||||
import unittest
|
||||
from email._policybase import Compat32
|
||||
from email import errors
|
||||
from email.message import Message
|
||||
from test.test_email import TestEmailBase
|
||||
|
||||
@ -35,258 +32,5 @@ class TestCustomMessage(TestEmailBase):
|
||||
# XXX add tests for other functions that take Message arg.
|
||||
|
||||
|
||||
class TestMessageDefectDetectionBase:
|
||||
|
||||
dup_boundary_msg = textwrap.dedent("""\
|
||||
Subject: XX
|
||||
From: xx@xx.dk
|
||||
To: XX
|
||||
Mime-version: 1.0
|
||||
Content-type: multipart/mixed;
|
||||
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: multipart/alternative;
|
||||
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: text/plain; charset="ISO-8859-1"
|
||||
Content-transfer-encoding: quoted-printable
|
||||
|
||||
text
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: text/html; charset="ISO-8859-1"
|
||||
Content-transfer-encoding: quoted-printable
|
||||
|
||||
<HTML></HTML>
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part--
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: image/gif; name="xx.gif";
|
||||
Content-disposition: attachment
|
||||
Content-transfer-encoding: base64
|
||||
|
||||
Some removed base64 encoded chars.
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part--
|
||||
|
||||
""")
|
||||
|
||||
def test_same_boundary_inner_outer(self):
|
||||
# XXX better would be to actually detect the duplicate.
|
||||
msg = self._str_msg(self.dup_boundary_msg)
|
||||
inner = msg.get_payload(0)
|
||||
self.assertTrue(hasattr(inner, 'defects'))
|
||||
self.assertEqual(len(self.get_defects(inner)), 1)
|
||||
self.assertTrue(isinstance(self.get_defects(inner)[0],
|
||||
errors.StartBoundaryNotFoundDefect))
|
||||
|
||||
def test_same_boundary_inner_outer_raises_on_defect(self):
|
||||
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
|
||||
self._str_msg(self.dup_boundary_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
no_boundary_msg = textwrap.dedent("""\
|
||||
Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
|
||||
From: foobar
|
||||
Subject: broken mail
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/report; report-type=delivery-status;
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com
|
||||
|
||||
One part
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com
|
||||
Content-Type: message/delivery-status
|
||||
|
||||
Header: Another part
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com--
|
||||
""")
|
||||
|
||||
def test_multipart_no_boundary(self):
|
||||
msg = self._str_msg(self.no_boundary_msg)
|
||||
self.assertTrue(isinstance(msg.get_payload(), str))
|
||||
self.assertEqual(len(self.get_defects(msg)), 2)
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[0],
|
||||
errors.NoBoundaryInMultipartDefect))
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[1],
|
||||
errors.MultipartInvariantViolationDefect))
|
||||
|
||||
def test_multipart_no_boundary_raise_on_defect(self):
|
||||
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
|
||||
self._str_msg(self.no_boundary_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
multipart_msg = textwrap.dedent("""\
|
||||
Date: Wed, 14 Nov 2007 12:56:23 GMT
|
||||
From: foo@bar.invalid
|
||||
To: foo@bar.invalid
|
||||
Subject: Content-Transfer-Encoding: base64 and multipart
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/mixed;
|
||||
boundary="===============3344438784458119861=="{}
|
||||
|
||||
--===============3344438784458119861==
|
||||
Content-Type: text/plain
|
||||
|
||||
Test message
|
||||
|
||||
--===============3344438784458119861==
|
||||
Content-Type: application/octet-stream
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
YWJj
|
||||
|
||||
--===============3344438784458119861==--
|
||||
""")
|
||||
|
||||
def test_multipart_invalid_cte(self):
|
||||
msg = self._str_msg(
|
||||
self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
|
||||
self.assertEqual(len(self.get_defects(msg)), 1)
|
||||
self.assertIsInstance(self.get_defects(msg)[0],
|
||||
errors.InvalidMultipartContentTransferEncodingDefect)
|
||||
|
||||
def test_multipart_invalid_cte_raise_on_defect(self):
|
||||
with self.assertRaises(
|
||||
errors.InvalidMultipartContentTransferEncodingDefect):
|
||||
self._str_msg(
|
||||
self.multipart_msg.format(
|
||||
"\nContent-Transfer-Encoding: base64"),
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
def test_multipart_no_cte_no_defect(self):
|
||||
msg = self._str_msg(self.multipart_msg.format(''))
|
||||
self.assertEqual(len(self.get_defects(msg)), 0)
|
||||
|
||||
def test_multipart_valid_cte_no_defect(self):
|
||||
for cte in ('7bit', '8bit', 'BINary'):
|
||||
msg = self._str_msg(
|
||||
self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
|
||||
self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
|
||||
|
||||
lying_multipart_msg = textwrap.dedent("""\
|
||||
From: "Allison Dunlap" <xxx@example.com>
|
||||
To: yyy@example.com
|
||||
Subject: 64423
|
||||
Date: Sun, 11 Jul 2004 16:09:27 -0300
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/alternative;
|
||||
|
||||
Blah blah blah
|
||||
""")
|
||||
|
||||
def test_lying_multipart(self):
|
||||
msg = self._str_msg(self.lying_multipart_msg)
|
||||
self.assertTrue(hasattr(msg, 'defects'))
|
||||
self.assertEqual(len(self.get_defects(msg)), 2)
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[0],
|
||||
errors.NoBoundaryInMultipartDefect))
|
||||
self.assertTrue(isinstance(self.get_defects(msg)[1],
|
||||
errors.MultipartInvariantViolationDefect))
|
||||
|
||||
def test_lying_multipart_raise_on_defect(self):
|
||||
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
|
||||
self._str_msg(self.lying_multipart_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
missing_start_boundary_msg = textwrap.dedent("""\
|
||||
Content-Type: multipart/mixed; boundary="AAA"
|
||||
From: Mail Delivery Subsystem <xxx@example.com>
|
||||
To: yyy@example.com
|
||||
|
||||
--AAA
|
||||
|
||||
Stuff
|
||||
|
||||
--AAA
|
||||
Content-Type: message/rfc822
|
||||
|
||||
From: webmaster@python.org
|
||||
To: zzz@example.com
|
||||
Content-Type: multipart/mixed; boundary="BBB"
|
||||
|
||||
--BBB--
|
||||
|
||||
--AAA--
|
||||
|
||||
""")
|
||||
|
||||
def test_missing_start_boundary(self):
|
||||
# The message structure is:
|
||||
#
|
||||
# multipart/mixed
|
||||
# text/plain
|
||||
# message/rfc822
|
||||
# multipart/mixed [*]
|
||||
#
|
||||
# [*] This message is missing its start boundary
|
||||
outer = self._str_msg(self.missing_start_boundary_msg)
|
||||
bad = outer.get_payload(1).get_payload(0)
|
||||
self.assertEqual(len(self.get_defects(bad)), 1)
|
||||
self.assertTrue(isinstance(self.get_defects(bad)[0],
|
||||
errors.StartBoundaryNotFoundDefect))
|
||||
|
||||
def test_missing_start_boundary_raise_on_defect(self):
|
||||
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
|
||||
self._str_msg(self.missing_start_boundary_msg,
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
def test_first_line_is_continuation_header(self):
|
||||
msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
|
||||
self.assertEqual(msg.keys(), ['Subject'])
|
||||
self.assertEqual(msg.get_payload(), 'body')
|
||||
self.assertEqual(len(self.get_defects(msg)), 1)
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.FirstHeaderLineIsContinuationDefect])
|
||||
self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
|
||||
|
||||
def test_first_line_is_continuation_header_raise_on_defect(self):
|
||||
with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
|
||||
self._str_msg(' Line 1\nSubject: test\n\nbody\n',
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
def test_missing_header_body_separator(self):
|
||||
# Our heuristic if we see a line that doesn't look like a header (no
|
||||
# leading whitespace but no ':') is to assume that the blank line that
|
||||
# separates the header from the body is missing, and to stop parsing
|
||||
# headers and start parsing the body.
|
||||
msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
|
||||
self.assertEqual(msg.keys(), ['Subject'])
|
||||
self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.MissingHeaderBodySeparatorDefect])
|
||||
|
||||
def test_missing_header_body_separator_raise_on_defect(self):
|
||||
with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
|
||||
self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
|
||||
policy=self.policy.clone(raise_on_defect=True))
|
||||
|
||||
|
||||
class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
|
||||
|
||||
def get_defects(self, obj):
|
||||
return obj.defects
|
||||
|
||||
|
||||
class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
|
||||
TestEmailBase):
|
||||
|
||||
class CapturePolicy(Compat32):
|
||||
captured = None
|
||||
def register_defect(self, obj, defect):
|
||||
self.captured.append(defect)
|
||||
|
||||
def setUp(self):
|
||||
self.policy = self.CapturePolicy(captured=list())
|
||||
|
||||
def get_defects(self, obj):
|
||||
return self.policy.captured
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user