From b0deeb47d862793e047eeea243f1d037f4152474 Mon Sep 17 00:00:00 2001 From: R David Murray Date: Sun, 8 Nov 2015 01:03:52 -0500 Subject: [PATCH] #25446: Fix regression in smtplib's AUTH LOGIN support. The auth method tests simply weren't adequate because of the fact that smtpd doesn't support authentication. I borrowed some of Milan's code for that from issue #21935 and added it to the smtplib tests. Also discovered that the direct test for the 'auth' method wasn't actually testing anything and fixed it. The fix makes the new authobject mechanism work the way it is documented...the problem was that wasn't checking for a 334 return code if an initial-response was provided, which works fine for auth plain and cram-md5, but not for auth login. --- Lib/smtplib.py | 19 ++--- Lib/test/test_smtplib.py | 179 ++++++++++++++++++++++++++------------- Misc/NEWS | 2 + 3 files changed, 132 insertions(+), 68 deletions(-) diff --git a/Lib/smtplib.py b/Lib/smtplib.py index 7e348757f3e..47569738f0d 100755 --- a/Lib/smtplib.py +++ b/Lib/smtplib.py @@ -630,12 +630,12 @@ class SMTP: (code, resp) = self.docmd("AUTH", mechanism + " " + response) else: (code, resp) = self.docmd("AUTH", mechanism) - # Server replies with 334 (challenge) or 535 (not supported) - if code == 334: - challenge = base64.decodebytes(resp) - response = encode_base64( - authobject(challenge).encode('ascii'), eol='') - (code, resp) = self.docmd(response) + # If server responds with a challenge, send the response. + if code == 334: + challenge = base64.decodebytes(resp) + response = encode_base64( + authobject(challenge).encode('ascii'), eol='') + (code, resp) = self.docmd(response) if code in (235, 503): return (code, resp) raise SMTPAuthenticationError(code, resp) @@ -657,11 +657,10 @@ class SMTP: def auth_login(self, challenge=None): """ Authobject to use with LOGIN authentication. Requires self.user and self.password to be set.""" - (code, resp) = self.docmd( - encode_base64(self.user.encode('ascii'), eol='')) - if code == 334: + if challenge is None: + return self.user + else: return self.password - raise SMTPAuthenticationError(code, resp) def login(self, user, password, *, initial_response_ok=True): """Log in on an SMTP server that requires authentication. diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 8e362414288..28539f360f5 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -1,8 +1,10 @@ import asyncore +import base64 import email.mime.text from email.message import EmailMessage from email.base64mime import body_encode as encode_base64 import email.utils +import hmac import socket import smtpd import smtplib @@ -623,20 +625,12 @@ sim_users = {'Mr.A@somewhere.com':'John A', sim_auth = ('Mr.A@somewhere.com', 'somepassword') sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') -sim_auth_credentials = { - 'login': 'TXIuQUBzb21ld2hlcmUuY29t', - 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=', - 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ' - 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'), - } -sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T' -sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ=' - sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 'list-2':['Ms.B@xn--fo-fka.com',], } # Simulated SMTP channel & server +class ResponseException(Exception): pass class SimSMTPChannel(smtpd.SMTPChannel): quit_response = None @@ -646,12 +640,109 @@ class SimSMTPChannel(smtpd.SMTPChannel): rcpt_count = 0 rset_count = 0 disconnect = 0 + AUTH = 99 # Add protocol state to enable auth testing. + authenticated_user = None def __init__(self, extra_features, *args, **kw): self._extrafeatures = ''.join( [ "250-{0}\r\n".format(x) for x in extra_features ]) super(SimSMTPChannel, self).__init__(*args, **kw) + # AUTH related stuff. It would be nice if support for this were in smtpd. + def found_terminator(self): + if self.smtp_state == self.AUTH: + line = self._emptystring.join(self.received_lines) + print('Data:', repr(line), file=smtpd.DEBUGSTREAM) + self.received_lines = [] + try: + self.auth_object(line) + except ResponseException as e: + self.smtp_state = self.COMMAND + self.push('%s %s' % (e.smtp_code, e.smtp_error)) + return + super().found_terminator() + + + def smtp_AUTH(self, arg): + if not self.seen_greeting: + self.push('503 Error: send EHLO first') + return + if not self.extended_smtp or 'AUTH' not in self._extrafeatures: + self.push('500 Error: command "AUTH" not recognized') + return + if self.authenticated_user is not None: + self.push( + '503 Bad sequence of commands: already authenticated') + return + args = arg.split() + if len(args) not in [1, 2]: + self.push('501 Syntax: AUTH [initial-response]') + return + auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') + try: + self.auth_object = getattr(self, auth_object_name) + except AttributeError: + self.push('504 Command parameter not implemented: unsupported ' + ' authentication mechanism {!r}'.format(auth_object_name)) + return + self.smtp_state = self.AUTH + self.auth_object(args[1] if len(args) == 2 else None) + + def _authenticated(self, user, valid): + if valid: + self.authenticated_user = user + self.push('235 Authentication Succeeded') + else: + self.push('535 Authentication credentials invalid') + self.smtp_state = self.COMMAND + + def _decode_base64(self, string): + return base64.decodebytes(string.encode('ascii')).decode('utf-8') + + def _auth_plain(self, arg=None): + if arg is None: + self.push('334 ') + else: + logpass = self._decode_base64(arg) + try: + *_, user, password = logpass.split('\0') + except ValueError as e: + self.push('535 Splitting response {!r} into user and password' + ' failed: {}'.format(logpass, e)) + return + self._authenticated(user, password == sim_auth[1]) + + def _auth_login(self, arg=None): + if arg is None: + # base64 encoded 'Username:' + self.push('334 VXNlcm5hbWU6') + elif not hasattr(self, '_auth_login_user'): + self._auth_login_user = self._decode_base64(arg) + # base64 encoded 'Password:' + self.push('334 UGFzc3dvcmQ6') + else: + password = self._decode_base64(arg) + self._authenticated(self._auth_login_user, password == sim_auth[1]) + del self._auth_login_user + + def _auth_cram_md5(self, arg=None): + if arg is None: + self.push('334 {}'.format(sim_cram_md5_challenge)) + else: + logpass = self._decode_base64(arg) + try: + user, hashed_pass = logpass.split() + except ValueError as e: + self.push('535 Splitting response {!r} into user and password' + 'failed: {}'.format(logpass, e)) + return False + valid_hashed_pass = hmac.HMAC( + sim_auth[1].encode('ascii'), + self._decode_base64(sim_cram_md5_challenge).encode('ascii'), + 'md5').hexdigest() + self._authenticated(user, hashed_pass == valid_hashed_pass) + # end AUTH related stuff. + def smtp_EHLO(self, arg): resp = ('250-testhost\r\n' '250-EXPN\r\n' @@ -683,20 +774,6 @@ class SimSMTPChannel(smtpd.SMTPChannel): else: self.push('550 No access for you!') - def smtp_AUTH(self, arg): - mech = arg.strip().lower() - if mech=='cram-md5': - self.push('334 {}'.format(sim_cram_md5_challenge)) - elif mech not in sim_auth_credentials: - self.push('504 auth type unimplemented') - return - elif mech=='plain': - self.push('334 ') - elif mech=='login': - self.push('334 ') - else: - self.push('550 No access for you!') - def smtp_QUIT(self, arg): if self.quit_response is None: super(SimSMTPChannel, self).smtp_QUIT(arg) @@ -841,63 +918,49 @@ class SMTPSimTests(unittest.TestCase): self.assertEqual(smtp.expn(u), expected_unknown) smtp.quit() - # SimSMTPChannel doesn't fully support AUTH because it requires a - # synchronous read to obtain the credentials...so instead smtpd - # sees the credential sent by smtplib's login method as an unknown command, - # which results in smtplib raising an auth error. Fortunately the error - # message contains the encoded credential, so we can partially check that it - # was generated correctly (partially, because the 'word' is uppercased in - # the error message). - def testAUTH_PLAIN(self): self.serv.add_feature("AUTH PLAIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_plain, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_LOGIN(self): self.serv.add_feature("AUTH LOGIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_login_user, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_CRAM_MD5(self): self.serv.add_feature("AUTH CRAM-MD5") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_credentials['cram-md5'], str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_multiple(self): # Test that multiple authentication methods are tried. self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_login_user, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def test_auth_function(self): - smtp = smtplib.SMTP(HOST, self.port, - local_hostname='localhost', timeout=15) - self.serv.add_feature("AUTH CRAM-MD5") - smtp.user, smtp.password = sim_auth[0], sim_auth[1] - supported = {'CRAM-MD5': smtp.auth_cram_md5, - 'PLAIN': smtp.auth_plain, - 'LOGIN': smtp.auth_login, - } - for mechanism, method in supported.items(): - try: smtp.auth(mechanism, method, initial_response_ok=False) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_credentials[mechanism.lower()].upper(), - str(err)) - smtp.close() + supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'} + for mechanism in supported: + self.serv.add_feature("AUTH {}".format(mechanism)) + for mechanism in supported: + with self.subTest(mechanism=mechanism): + smtp = smtplib.SMTP(HOST, self.port, + local_hostname='localhost', timeout=15) + smtp.ehlo('foo') + smtp.user, smtp.password = sim_auth[0], sim_auth[1] + method = 'auth_' + mechanism.lower().replace('-', '_') + resp = smtp.auth(mechanism, getattr(smtp, method)) + self.assertEqual(resp, (235, b'Authentication Succeeded')) + smtp.close() def test_quit_resets_greeting(self): smtp = smtplib.SMTP(HOST, self.port, diff --git a/Misc/NEWS b/Misc/NEWS index f08b4ce8430..fef42a0ed4f 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -58,6 +58,8 @@ Core and Builtins Library ------- +- Issue #25446: Fix regression in smtplib's AUTH LOGIN support. + - Issue #18010: Fix the pydoc web server's module search function to handle exceptions from importing packages.