mirror of
https://github.com/python/cpython.git
synced 2024-11-27 03:45:08 +08:00
Issue #19735: Implement private function ssl._create_stdlib_context() to
create SSLContext objects in Python's stdlib module. It provides a single configuration point and makes use of SSLContext.load_default_certs().
This commit is contained in:
parent
32eddc1bbc
commit
67986f9431
@ -571,10 +571,8 @@ class _SelectorSslTransport(_SelectorTransport):
|
||||
# context; in that case the sslcontext passed is None.
|
||||
# The default is the same as used by urllib with
|
||||
# cadefault=True.
|
||||
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
sslcontext.options |= ssl.OP_NO_SSLv2
|
||||
sslcontext.set_default_verify_paths()
|
||||
sslcontext.verify_mode = ssl.CERT_REQUIRED
|
||||
sslcontext = ssl._create_stdlib_context(
|
||||
cert_reqs=ssl.CERT_REQUIRED)
|
||||
|
||||
wrap_kwargs = {
|
||||
'server_side': server_side,
|
||||
|
@ -727,6 +727,10 @@ else:
|
||||
"exclusive")
|
||||
self.keyfile = keyfile
|
||||
self.certfile = certfile
|
||||
if context is None:
|
||||
context = ssl._create_stdlib_context(self.ssl_version,
|
||||
certfile=certfile,
|
||||
keyfile=keyfile)
|
||||
self.context = context
|
||||
self._prot_p = False
|
||||
FTP.__init__(self, host, user, passwd, acct, timeout, source_address)
|
||||
@ -744,12 +748,7 @@ else:
|
||||
resp = self.voidcmd('AUTH TLS')
|
||||
else:
|
||||
resp = self.voidcmd('AUTH SSL')
|
||||
if self.context is not None:
|
||||
self.sock = self.context.wrap_socket(self.sock)
|
||||
else:
|
||||
self.sock = ssl.wrap_socket(self.sock, self.keyfile,
|
||||
self.certfile,
|
||||
ssl_version=self.ssl_version)
|
||||
self.sock = self.context.wrap_socket(self.sock)
|
||||
self.file = self.sock.makefile(mode='r', encoding=self.encoding)
|
||||
return resp
|
||||
|
||||
@ -788,11 +787,7 @@ else:
|
||||
def ntransfercmd(self, cmd, rest=None):
|
||||
conn, size = FTP.ntransfercmd(self, cmd, rest)
|
||||
if self._prot_p:
|
||||
if self.context is not None:
|
||||
conn = self.context.wrap_socket(conn)
|
||||
else:
|
||||
conn = ssl.wrap_socket(conn, self.keyfile, self.certfile,
|
||||
ssl_version=self.ssl_version)
|
||||
conn = self.context.wrap_socket(conn)
|
||||
return conn, size
|
||||
|
||||
def abort(self):
|
||||
|
@ -1179,9 +1179,7 @@ else:
|
||||
self.key_file = key_file
|
||||
self.cert_file = cert_file
|
||||
if context is None:
|
||||
# Some reasonable defaults
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
context = ssl._create_stdlib_context()
|
||||
will_verify = context.verify_mode != ssl.CERT_NONE
|
||||
if check_hostname is None:
|
||||
check_hostname = will_verify
|
||||
|
@ -742,9 +742,7 @@ class IMAP4:
|
||||
raise self.abort('TLS not supported by server')
|
||||
# Generate a default SSL context if none was passed.
|
||||
if ssl_context is None:
|
||||
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
# SSLv2 considered harmful.
|
||||
ssl_context.options |= ssl.OP_NO_SSLv2
|
||||
ssl_context = ssl._create_stdlib_context()
|
||||
typ, dat = self._simple_command(name)
|
||||
if typ == 'OK':
|
||||
self.sock = ssl_context.wrap_socket(self.sock)
|
||||
@ -1210,15 +1208,15 @@ if HAVE_SSL:
|
||||
|
||||
self.keyfile = keyfile
|
||||
self.certfile = certfile
|
||||
if ssl_context is None:
|
||||
ssl_context = ssl._create_stdlib_context(certfile=certfile,
|
||||
keyfile=keyfile)
|
||||
self.ssl_context = ssl_context
|
||||
IMAP4.__init__(self, host, port)
|
||||
|
||||
def _create_socket(self):
|
||||
sock = IMAP4._create_socket(self)
|
||||
if self.ssl_context:
|
||||
return self.ssl_context.wrap_socket(sock)
|
||||
else:
|
||||
return ssl.wrap_socket(sock, self.keyfile, self.certfile)
|
||||
return self.ssl_context.wrap_socket(sock)
|
||||
|
||||
def open(self, host='', port=IMAP4_SSL_PORT):
|
||||
"""Setup connection to remote server on "host:port".
|
||||
|
@ -288,9 +288,7 @@ if _have_ssl:
|
||||
"""
|
||||
# Generate a default SSL context if none was passed.
|
||||
if context is None:
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
# SSLv2 considered harmful.
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
context = ssl._create_stdlib_context()
|
||||
return context.wrap_socket(sock)
|
||||
|
||||
|
||||
|
@ -385,8 +385,7 @@ class POP3:
|
||||
if not 'STLS' in caps:
|
||||
raise error_proto('-ERR STLS not supported by server')
|
||||
if context is None:
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
context = ssl._create_stdlib_context()
|
||||
resp = self._shortcmd('STLS')
|
||||
self.sock = context.wrap_socket(self.sock)
|
||||
self.file = self.sock.makefile('rb')
|
||||
@ -421,15 +420,15 @@ if HAVE_SSL:
|
||||
"exclusive")
|
||||
self.keyfile = keyfile
|
||||
self.certfile = certfile
|
||||
if context is None:
|
||||
context = ssl._create_stdlib_context(certfile=certfile,
|
||||
keyfile=keyfile)
|
||||
self.context = context
|
||||
POP3.__init__(self, host, port, timeout)
|
||||
|
||||
def _create_socket(self, timeout):
|
||||
sock = POP3._create_socket(self, timeout)
|
||||
if self.context is not None:
|
||||
sock = self.context.wrap_socket(sock)
|
||||
else:
|
||||
sock = ssl.wrap_socket(sock, self.keyfile, self.certfile)
|
||||
sock = self.context.wrap_socket(sock)
|
||||
return sock
|
||||
|
||||
def stls(self, keyfile=None, certfile=None, context=None):
|
||||
|
@ -664,10 +664,10 @@ class SMTP:
|
||||
if context is not None and certfile is not None:
|
||||
raise ValueError("context and certfile arguments are mutually "
|
||||
"exclusive")
|
||||
if context is not None:
|
||||
self.sock = context.wrap_socket(self.sock)
|
||||
else:
|
||||
self.sock = ssl.wrap_socket(self.sock, keyfile, certfile)
|
||||
if context is None:
|
||||
context = ssl._create_stdlib_context(certfile=certfile,
|
||||
keyfile=keyfile)
|
||||
self.sock = context.wrap_socket(self.sock)
|
||||
self.file = None
|
||||
# RFC 3207:
|
||||
# The client MUST discard any knowledge obtained from
|
||||
@ -880,6 +880,9 @@ if _have_ssl:
|
||||
"exclusive")
|
||||
self.keyfile = keyfile
|
||||
self.certfile = certfile
|
||||
if context is None:
|
||||
context = ssl._create_stdlib_context(certfile=certfile,
|
||||
keyfile=keyfile)
|
||||
self.context = context
|
||||
SMTP.__init__(self, host, port, local_hostname, timeout,
|
||||
source_address)
|
||||
@ -889,10 +892,7 @@ if _have_ssl:
|
||||
print('connect:', (host, port), file=stderr)
|
||||
new_socket = socket.create_connection((host, port), timeout,
|
||||
self.source_address)
|
||||
if self.context is not None:
|
||||
new_socket = self.context.wrap_socket(new_socket)
|
||||
else:
|
||||
new_socket = ssl.wrap_socket(new_socket, self.keyfile, self.certfile)
|
||||
new_socket = self.context.wrap_socket(new_socket)
|
||||
return new_socket
|
||||
|
||||
__all__.append("SMTP_SSL")
|
||||
|
50
Lib/ssl.py
50
Lib/ssl.py
@ -398,6 +398,43 @@ def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
|
||||
return context
|
||||
|
||||
|
||||
def _create_stdlib_context(protocol=PROTOCOL_SSLv23, *, cert_reqs=None,
|
||||
purpose=Purpose.SERVER_AUTH,
|
||||
certfile=None, keyfile=None,
|
||||
cafile=None, capath=None, cadata=None):
|
||||
"""Create a SSLContext object for Python stdlib modules
|
||||
|
||||
All Python stdlib modules shall use this function to create SSLContext
|
||||
objects in order to keep common settings in one place. The configuration
|
||||
is less restrict than create_default_context()'s to increase backward
|
||||
compatibility.
|
||||
"""
|
||||
if not isinstance(purpose, _ASN1Object):
|
||||
raise TypeError(purpose)
|
||||
|
||||
context = SSLContext(protocol)
|
||||
# SSLv2 considered harmful.
|
||||
context.options |= OP_NO_SSLv2
|
||||
|
||||
if cert_reqs is not None:
|
||||
context.verify_mode = cert_reqs
|
||||
|
||||
if keyfile and not certfile:
|
||||
raise ValueError("certfile must be specified")
|
||||
if certfile or keyfile:
|
||||
context.load_cert_chain(certfile, keyfile)
|
||||
|
||||
# load CA root certs
|
||||
if cafile or capath or cadata:
|
||||
context.load_verify_locations(cafile, capath, cadata)
|
||||
elif context.verify_mode != CERT_NONE:
|
||||
# no explicit cafile, capath or cadata but the verify mode is
|
||||
# CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system
|
||||
# root CA certificates for the given purpose. This may fail silently.
|
||||
context.load_default_certs(purpose)
|
||||
|
||||
return context
|
||||
|
||||
class SSLSocket(socket):
|
||||
"""This class implements a subtype of socket.socket that wraps
|
||||
the underlying OS socket in an SSL context when necessary, and
|
||||
@ -829,15 +866,16 @@ def get_server_certificate(addr, ssl_version=PROTOCOL_SSLv3, ca_certs=None):
|
||||
If 'ssl_version' is specified, use it in the connection attempt."""
|
||||
|
||||
host, port = addr
|
||||
if (ca_certs is not None):
|
||||
if ca_certs is not None:
|
||||
cert_reqs = CERT_REQUIRED
|
||||
else:
|
||||
cert_reqs = CERT_NONE
|
||||
s = create_connection(addr)
|
||||
s = wrap_socket(s, ssl_version=ssl_version,
|
||||
cert_reqs=cert_reqs, ca_certs=ca_certs)
|
||||
dercert = s.getpeercert(True)
|
||||
s.close()
|
||||
context = _create_stdlib_context(ssl_version,
|
||||
cert_reqs=cert_reqs,
|
||||
cafile=ca_certs)
|
||||
with create_connection(addr) as sock:
|
||||
with context.wrap_socket(sock) as sslsock:
|
||||
dercert = sslsock.getpeercert(True)
|
||||
return DER_cert_to_PEM_cert(dercert)
|
||||
|
||||
def get_protocol_name(protocol_code):
|
||||
|
@ -1018,6 +1018,27 @@ class ContextTests(unittest.TestCase):
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
||||
|
||||
def test__create_stdlib_context(self):
|
||||
ctx = ssl._create_stdlib_context()
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
||||
|
||||
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
||||
|
||||
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
|
||||
cert_reqs=ssl.CERT_REQUIRED)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
||||
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
||||
|
||||
ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
||||
|
||||
|
||||
class SSLErrorTests(unittest.TestCase):
|
||||
|
@ -141,13 +141,9 @@ def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
|
||||
if cafile or capath or cadefault:
|
||||
if not _have_ssl:
|
||||
raise ValueError('SSL support not available')
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
context.options |= ssl.OP_NO_SSLv2
|
||||
context.verify_mode = ssl.CERT_REQUIRED
|
||||
if cafile or capath:
|
||||
context.load_verify_locations(cafile, capath)
|
||||
else:
|
||||
context.set_default_verify_paths()
|
||||
context = ssl._create_stdlib_context(cert_reqs=ssl.CERT_REQUIRED,
|
||||
cafile=cafile,
|
||||
capath=capath)
|
||||
https_handler = HTTPSHandler(context=context, check_hostname=True)
|
||||
opener = build_opener(https_handler)
|
||||
elif _opener is None:
|
||||
|
@ -68,6 +68,10 @@ Core and Builtins
|
||||
Library
|
||||
-------
|
||||
|
||||
- Issue #19735: Implement private function ssl._create_stdlib_context() to
|
||||
create SSLContext objects in Python's stdlib module. It provides a single
|
||||
configuration point and makes use of SSLContext.load_default_certs().
|
||||
|
||||
- Issue #16203: Add re.fullmatch() function and regex.fullmatch() method,
|
||||
which anchor the pattern at both ends of the string to match.
|
||||
Original patch by Matthew Barnett.
|
||||
|
Loading…
Reference in New Issue
Block a user