mirror of
https://github.com/python/cpython.git
synced 2024-11-23 18:04:37 +08:00
#25446: Fix regression in smtplib's AUTH LOGIN support.
The auth method tests simply weren't adequate because of the fact that smtpd doesn't support authentication. I borrowed some of Milan's code for that from issue #21935 and added it to the smtplib tests. Also discovered that the direct test for the 'auth' method wasn't actually testing anything and fixed it. The fix makes the new authobject mechanism work the way it is documented...the problem was that wasn't checking for a 334 return code if an initial-response was provided, which works fine for auth plain and cram-md5, but not for auth login.
This commit is contained in:
parent
65b77d625e
commit
b0deeb47d8
@ -630,12 +630,12 @@ class SMTP:
|
||||
(code, resp) = self.docmd("AUTH", mechanism + " " + response)
|
||||
else:
|
||||
(code, resp) = self.docmd("AUTH", mechanism)
|
||||
# Server replies with 334 (challenge) or 535 (not supported)
|
||||
if code == 334:
|
||||
challenge = base64.decodebytes(resp)
|
||||
response = encode_base64(
|
||||
authobject(challenge).encode('ascii'), eol='')
|
||||
(code, resp) = self.docmd(response)
|
||||
# If server responds with a challenge, send the response.
|
||||
if code == 334:
|
||||
challenge = base64.decodebytes(resp)
|
||||
response = encode_base64(
|
||||
authobject(challenge).encode('ascii'), eol='')
|
||||
(code, resp) = self.docmd(response)
|
||||
if code in (235, 503):
|
||||
return (code, resp)
|
||||
raise SMTPAuthenticationError(code, resp)
|
||||
@ -657,11 +657,10 @@ class SMTP:
|
||||
def auth_login(self, challenge=None):
|
||||
""" Authobject to use with LOGIN authentication. Requires self.user and
|
||||
self.password to be set."""
|
||||
(code, resp) = self.docmd(
|
||||
encode_base64(self.user.encode('ascii'), eol=''))
|
||||
if code == 334:
|
||||
if challenge is None:
|
||||
return self.user
|
||||
else:
|
||||
return self.password
|
||||
raise SMTPAuthenticationError(code, resp)
|
||||
|
||||
def login(self, user, password, *, initial_response_ok=True):
|
||||
"""Log in on an SMTP server that requires authentication.
|
||||
|
@ -1,8 +1,10 @@
|
||||
import asyncore
|
||||
import base64
|
||||
import email.mime.text
|
||||
from email.message import EmailMessage
|
||||
from email.base64mime import body_encode as encode_base64
|
||||
import email.utils
|
||||
import hmac
|
||||
import socket
|
||||
import smtpd
|
||||
import smtplib
|
||||
@ -623,20 +625,12 @@ sim_users = {'Mr.A@somewhere.com':'John A',
|
||||
sim_auth = ('Mr.A@somewhere.com', 'somepassword')
|
||||
sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
|
||||
'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
|
||||
sim_auth_credentials = {
|
||||
'login': 'TXIuQUBzb21ld2hlcmUuY29t',
|
||||
'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
|
||||
'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
|
||||
'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
|
||||
}
|
||||
sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
|
||||
sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
|
||||
|
||||
sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
|
||||
'list-2':['Ms.B@xn--fo-fka.com',],
|
||||
}
|
||||
|
||||
# Simulated SMTP channel & server
|
||||
class ResponseException(Exception): pass
|
||||
class SimSMTPChannel(smtpd.SMTPChannel):
|
||||
|
||||
quit_response = None
|
||||
@ -646,12 +640,109 @@ class SimSMTPChannel(smtpd.SMTPChannel):
|
||||
rcpt_count = 0
|
||||
rset_count = 0
|
||||
disconnect = 0
|
||||
AUTH = 99 # Add protocol state to enable auth testing.
|
||||
authenticated_user = None
|
||||
|
||||
def __init__(self, extra_features, *args, **kw):
|
||||
self._extrafeatures = ''.join(
|
||||
[ "250-{0}\r\n".format(x) for x in extra_features ])
|
||||
super(SimSMTPChannel, self).__init__(*args, **kw)
|
||||
|
||||
# AUTH related stuff. It would be nice if support for this were in smtpd.
|
||||
def found_terminator(self):
|
||||
if self.smtp_state == self.AUTH:
|
||||
line = self._emptystring.join(self.received_lines)
|
||||
print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
|
||||
self.received_lines = []
|
||||
try:
|
||||
self.auth_object(line)
|
||||
except ResponseException as e:
|
||||
self.smtp_state = self.COMMAND
|
||||
self.push('%s %s' % (e.smtp_code, e.smtp_error))
|
||||
return
|
||||
super().found_terminator()
|
||||
|
||||
|
||||
def smtp_AUTH(self, arg):
|
||||
if not self.seen_greeting:
|
||||
self.push('503 Error: send EHLO first')
|
||||
return
|
||||
if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
|
||||
self.push('500 Error: command "AUTH" not recognized')
|
||||
return
|
||||
if self.authenticated_user is not None:
|
||||
self.push(
|
||||
'503 Bad sequence of commands: already authenticated')
|
||||
return
|
||||
args = arg.split()
|
||||
if len(args) not in [1, 2]:
|
||||
self.push('501 Syntax: AUTH <mechanism> [initial-response]')
|
||||
return
|
||||
auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
|
||||
try:
|
||||
self.auth_object = getattr(self, auth_object_name)
|
||||
except AttributeError:
|
||||
self.push('504 Command parameter not implemented: unsupported '
|
||||
' authentication mechanism {!r}'.format(auth_object_name))
|
||||
return
|
||||
self.smtp_state = self.AUTH
|
||||
self.auth_object(args[1] if len(args) == 2 else None)
|
||||
|
||||
def _authenticated(self, user, valid):
|
||||
if valid:
|
||||
self.authenticated_user = user
|
||||
self.push('235 Authentication Succeeded')
|
||||
else:
|
||||
self.push('535 Authentication credentials invalid')
|
||||
self.smtp_state = self.COMMAND
|
||||
|
||||
def _decode_base64(self, string):
|
||||
return base64.decodebytes(string.encode('ascii')).decode('utf-8')
|
||||
|
||||
def _auth_plain(self, arg=None):
|
||||
if arg is None:
|
||||
self.push('334 ')
|
||||
else:
|
||||
logpass = self._decode_base64(arg)
|
||||
try:
|
||||
*_, user, password = logpass.split('\0')
|
||||
except ValueError as e:
|
||||
self.push('535 Splitting response {!r} into user and password'
|
||||
' failed: {}'.format(logpass, e))
|
||||
return
|
||||
self._authenticated(user, password == sim_auth[1])
|
||||
|
||||
def _auth_login(self, arg=None):
|
||||
if arg is None:
|
||||
# base64 encoded 'Username:'
|
||||
self.push('334 VXNlcm5hbWU6')
|
||||
elif not hasattr(self, '_auth_login_user'):
|
||||
self._auth_login_user = self._decode_base64(arg)
|
||||
# base64 encoded 'Password:'
|
||||
self.push('334 UGFzc3dvcmQ6')
|
||||
else:
|
||||
password = self._decode_base64(arg)
|
||||
self._authenticated(self._auth_login_user, password == sim_auth[1])
|
||||
del self._auth_login_user
|
||||
|
||||
def _auth_cram_md5(self, arg=None):
|
||||
if arg is None:
|
||||
self.push('334 {}'.format(sim_cram_md5_challenge))
|
||||
else:
|
||||
logpass = self._decode_base64(arg)
|
||||
try:
|
||||
user, hashed_pass = logpass.split()
|
||||
except ValueError as e:
|
||||
self.push('535 Splitting response {!r} into user and password'
|
||||
'failed: {}'.format(logpass, e))
|
||||
return False
|
||||
valid_hashed_pass = hmac.HMAC(
|
||||
sim_auth[1].encode('ascii'),
|
||||
self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
|
||||
'md5').hexdigest()
|
||||
self._authenticated(user, hashed_pass == valid_hashed_pass)
|
||||
# end AUTH related stuff.
|
||||
|
||||
def smtp_EHLO(self, arg):
|
||||
resp = ('250-testhost\r\n'
|
||||
'250-EXPN\r\n'
|
||||
@ -683,20 +774,6 @@ class SimSMTPChannel(smtpd.SMTPChannel):
|
||||
else:
|
||||
self.push('550 No access for you!')
|
||||
|
||||
def smtp_AUTH(self, arg):
|
||||
mech = arg.strip().lower()
|
||||
if mech=='cram-md5':
|
||||
self.push('334 {}'.format(sim_cram_md5_challenge))
|
||||
elif mech not in sim_auth_credentials:
|
||||
self.push('504 auth type unimplemented')
|
||||
return
|
||||
elif mech=='plain':
|
||||
self.push('334 ')
|
||||
elif mech=='login':
|
||||
self.push('334 ')
|
||||
else:
|
||||
self.push('550 No access for you!')
|
||||
|
||||
def smtp_QUIT(self, arg):
|
||||
if self.quit_response is None:
|
||||
super(SimSMTPChannel, self).smtp_QUIT(arg)
|
||||
@ -841,63 +918,49 @@ class SMTPSimTests(unittest.TestCase):
|
||||
self.assertEqual(smtp.expn(u), expected_unknown)
|
||||
smtp.quit()
|
||||
|
||||
# SimSMTPChannel doesn't fully support AUTH because it requires a
|
||||
# synchronous read to obtain the credentials...so instead smtpd
|
||||
# sees the credential sent by smtplib's login method as an unknown command,
|
||||
# which results in smtplib raising an auth error. Fortunately the error
|
||||
# message contains the encoded credential, so we can partially check that it
|
||||
# was generated correctly (partially, because the 'word' is uppercased in
|
||||
# the error message).
|
||||
|
||||
def testAUTH_PLAIN(self):
|
||||
self.serv.add_feature("AUTH PLAIN")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_plain, str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def testAUTH_LOGIN(self):
|
||||
self.serv.add_feature("AUTH LOGIN")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
try: smtp.login(sim_auth[0], sim_auth[1])
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_login_user, str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def testAUTH_CRAM_MD5(self):
|
||||
self.serv.add_feature("AUTH CRAM-MD5")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
|
||||
try: smtp.login(sim_auth[0], sim_auth[1])
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_credentials['cram-md5'], str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def testAUTH_multiple(self):
|
||||
# Test that multiple authentication methods are tried.
|
||||
self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
try: smtp.login(sim_auth[0], sim_auth[1])
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_login_user, str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def test_auth_function(self):
|
||||
smtp = smtplib.SMTP(HOST, self.port,
|
||||
local_hostname='localhost', timeout=15)
|
||||
self.serv.add_feature("AUTH CRAM-MD5")
|
||||
smtp.user, smtp.password = sim_auth[0], sim_auth[1]
|
||||
supported = {'CRAM-MD5': smtp.auth_cram_md5,
|
||||
'PLAIN': smtp.auth_plain,
|
||||
'LOGIN': smtp.auth_login,
|
||||
}
|
||||
for mechanism, method in supported.items():
|
||||
try: smtp.auth(mechanism, method, initial_response_ok=False)
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
|
||||
str(err))
|
||||
smtp.close()
|
||||
supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
|
||||
for mechanism in supported:
|
||||
self.serv.add_feature("AUTH {}".format(mechanism))
|
||||
for mechanism in supported:
|
||||
with self.subTest(mechanism=mechanism):
|
||||
smtp = smtplib.SMTP(HOST, self.port,
|
||||
local_hostname='localhost', timeout=15)
|
||||
smtp.ehlo('foo')
|
||||
smtp.user, smtp.password = sim_auth[0], sim_auth[1]
|
||||
method = 'auth_' + mechanism.lower().replace('-', '_')
|
||||
resp = smtp.auth(mechanism, getattr(smtp, method))
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def test_quit_resets_greeting(self):
|
||||
smtp = smtplib.SMTP(HOST, self.port,
|
||||
|
Loading…
Reference in New Issue
Block a user