cpython/Lib/packaging/tests/pypi_server.py
Éric Araujo 348c572dcf Fix typo
2011-06-19 18:53:31 +02:00

449 lines
16 KiB
Python

"""Mock PyPI Server implementation, to use in tests.
This module also provides a simple test case to extend if you need to use
the PyPIServer all along your test case. Be sure to read the documentation
before any use.
XXX TODO:
The mock server can handle simple HTTP request (to simulate a simple index) or
XMLRPC requests, over HTTP. Both does not have the same intergface to deal
with, and I think it's a pain.
A good idea could be to re-think a bit the way dstributions are handled in the
mock server. As it should return malformed HTML pages, we need to keep the
static behavior.
I think of something like that:
>>> server = PyPIMockServer()
>>> server.startHTTP()
>>> server.startXMLRPC()
Then, the server must have only one port to rely on, eg.
>>> server.fulladdress()
"http://ip:port/"
It could be simple to have one HTTP server, relaying the requests to the two
implementations (static HTTP and XMLRPC over HTTP).
"""
import os
import queue
import select
import threading
import socketserver
from functools import wraps
from http.server import HTTPServer, SimpleHTTPRequestHandler
from xmlrpc.server import SimpleXMLRPCServer
from packaging.tests import unittest
PYPI_DEFAULT_STATIC_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'pypiserver')
def use_xmlrpc_server(*server_args, **server_kwargs):
server_kwargs['serve_xmlrpc'] = True
return use_pypi_server(*server_args, **server_kwargs)
def use_http_server(*server_args, **server_kwargs):
server_kwargs['serve_xmlrpc'] = False
return use_pypi_server(*server_args, **server_kwargs)
def use_pypi_server(*server_args, **server_kwargs):
"""Decorator to make use of the PyPIServer for test methods,
just when needed, and not for the entire duration of the testcase.
"""
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
server = PyPIServer(*server_args, **server_kwargs)
server.start()
try:
func(server=server, *args, **kwargs)
finally:
server.stop()
return wrapped
return wrapper
class PyPIServerTestCase(unittest.TestCase):
def setUp(self):
super(PyPIServerTestCase, self).setUp()
self.pypi = PyPIServer()
self.pypi.start()
self.addCleanup(self.pypi.stop)
class PyPIServer(threading.Thread):
"""PyPI Mocked server.
Provides a mocked version of the PyPI API's, to ease tests.
Support serving static content and serving previously given text.
"""
def __init__(self, test_static_path=None,
static_filesystem_paths=None,
static_uri_paths=["simple", "packages"], serve_xmlrpc=False):
"""Initialize the server.
Default behavior is to start the HTTP server. You can either start the
xmlrpc server by setting xmlrpc to True. Caution: Only one server will
be started.
static_uri_paths and static_base_path are parameters used to provides
respectively the http_paths to serve statically, and where to find the
matching files on the filesystem.
"""
# we want to launch the server in a new dedicated thread, to not freeze
# tests.
threading.Thread.__init__(self)
self._run = True
self._serve_xmlrpc = serve_xmlrpc
if static_filesystem_paths is None:
static_filesystem_paths = ["default"]
#TODO allow to serve XMLRPC and HTTP static files at the same time.
if not self._serve_xmlrpc:
self.server = HTTPServer(('127.0.0.1', 0), PyPIRequestHandler)
self.server.RequestHandlerClass.pypi_server = self
self.request_queue = queue.Queue()
self._requests = []
self.default_response_status = 404
self.default_response_headers = [('Content-type', 'text/plain')]
self.default_response_data = "The page does not exists"
# initialize static paths / filesystems
self.static_uri_paths = static_uri_paths
# append the static paths defined locally
if test_static_path is not None:
static_filesystem_paths.append(test_static_path)
self.static_filesystem_paths = [
PYPI_DEFAULT_STATIC_PATH + "/" + path
for path in static_filesystem_paths]
else:
# XMLRPC server
self.server = PyPIXMLRPCServer(('127.0.0.1', 0))
self.xmlrpc = XMLRPCMockIndex()
# register the xmlrpc methods
self.server.register_introspection_functions()
self.server.register_instance(self.xmlrpc)
self.address = ('127.0.0.1', self.server.server_port)
# to not have unwanted outputs.
self.server.RequestHandlerClass.log_request = lambda *_: None
def run(self):
# loop because we can't stop it otherwise, for python < 2.6
while self._run:
r, w, e = select.select([self.server], [], [], 0.5)
if r:
self.server.handle_request()
def stop(self):
"""self shutdown is not supported for python < 2.6"""
self._run = False
if self.is_alive():
self.join()
self.server.server_close()
def get_next_response(self):
return (self.default_response_status,
self.default_response_headers,
self.default_response_data)
@property
def requests(self):
"""Use this property to get all requests that have been made
to the server
"""
while True:
try:
self._requests.append(self.request_queue.get_nowait())
except queue.Empty:
break
return self._requests
@property
def full_address(self):
return "http://%s:%s" % self.address
class PyPIRequestHandler(SimpleHTTPRequestHandler):
# we need to access the pypi server while serving the content
pypi_server = None
def serve_request(self):
"""Serve the content.
Also record the requests to be accessed later. If trying to access an
url matching a static uri, serve static content, otherwise serve
what is provided by the `get_next_response` method.
If nothing is defined there, return a 404 header.
"""
# record the request. Read the input only on PUT or POST requests
if self.command in ("PUT", "POST"):
if 'content-length' in self.headers:
request_data = self.rfile.read(
int(self.headers['content-length']))
else:
request_data = self.rfile.read()
elif self.command in ("GET", "DELETE"):
request_data = ''
self.pypi_server.request_queue.put((self, request_data))
# serve the content from local disc if we request an URL beginning
# by a pattern defined in `static_paths`
url_parts = self.path.split("/")
if (len(url_parts) > 1 and
url_parts[1] in self.pypi_server.static_uri_paths):
data = None
# always take the last first.
fs_paths = []
fs_paths.extend(self.pypi_server.static_filesystem_paths)
fs_paths.reverse()
relative_path = self.path
for fs_path in fs_paths:
try:
if self.path.endswith("/"):
relative_path += "index.html"
if relative_path.endswith('.tar.gz'):
with open(fs_path + relative_path, 'br') as file:
data = file.read()
headers = [('Content-type', 'application/x-gtar')]
else:
with open(fs_path + relative_path) as file:
data = file.read().encode()
headers = [('Content-type', 'text/html')]
headers.append(('Content-Length', len(data)))
self.make_response(data, headers=headers)
except IOError:
pass
if data is None:
self.make_response("Not found", 404)
# otherwise serve the content from get_next_response
else:
# send back a response
status, headers, data = self.pypi_server.get_next_response()
self.make_response(data, status, headers)
do_POST = do_GET = do_DELETE = do_PUT = serve_request
def make_response(self, data, status=200,
headers=[('Content-type', 'text/html')]):
"""Send the response to the HTTP client"""
if not isinstance(status, int):
try:
status = int(status)
except ValueError:
# we probably got something like YYY Codename.
# Just get the first 3 digits
status = int(status[:3])
self.send_response(status)
for header, value in headers:
self.send_header(header, value)
self.end_headers()
if type(data) is str:
data = data.encode()
self.wfile.write(data)
class PyPIXMLRPCServer(SimpleXMLRPCServer):
def server_bind(self):
"""Override server_bind to store the server name."""
socketserver.TCPServer.server_bind(self)
host, port = self.socket.getsockname()[:2]
self.server_port = port
class MockDist:
"""Fake distribution, used in the Mock PyPI Server"""
def __init__(self, name, version="1.0", hidden=False, url="http://url/",
type="sdist", filename="", size=10000,
digest="123456", downloads=7, has_sig=False,
python_version="source", comment="comment",
author="John Doe", author_email="john@doe.name",
maintainer="Main Tayner", maintainer_email="maintainer_mail",
project_url="http://project_url/", homepage="http://homepage/",
keywords="", platform="UNKNOWN", classifiers=[], licence="",
description="Description", summary="Summary", stable_version="",
ordering="", documentation_id="", code_kwalitee_id="",
installability_id="", obsoletes=[], obsoletes_dist=[],
provides=[], provides_dist=[], requires=[], requires_dist=[],
requires_external=[], requires_python=""):
# basic fields
self.name = name
self.version = version
self.hidden = hidden
# URL infos
self.url = url
self.digest = digest
self.downloads = downloads
self.has_sig = has_sig
self.python_version = python_version
self.comment = comment
self.type = type
# metadata
self.author = author
self.author_email = author_email
self.maintainer = maintainer
self.maintainer_email = maintainer_email
self.project_url = project_url
self.homepage = homepage
self.keywords = keywords
self.platform = platform
self.classifiers = classifiers
self.licence = licence
self.description = description
self.summary = summary
self.stable_version = stable_version
self.ordering = ordering
self.cheesecake_documentation_id = documentation_id
self.cheesecake_code_kwalitee_id = code_kwalitee_id
self.cheesecake_installability_id = installability_id
self.obsoletes = obsoletes
self.obsoletes_dist = obsoletes_dist
self.provides = provides
self.provides_dist = provides_dist
self.requires = requires
self.requires_dist = requires_dist
self.requires_external = requires_external
self.requires_python = requires_python
def url_infos(self):
return {
'url': self.url,
'packagetype': self.type,
'filename': 'filename.tar.gz',
'size': '6000',
'md5_digest': self.digest,
'downloads': self.downloads,
'has_sig': self.has_sig,
'python_version': self.python_version,
'comment_text': self.comment,
}
def metadata(self):
return {
'maintainer': self.maintainer,
'project_url': [self.project_url],
'maintainer_email': self.maintainer_email,
'cheesecake_code_kwalitee_id': self.cheesecake_code_kwalitee_id,
'keywords': self.keywords,
'obsoletes_dist': self.obsoletes_dist,
'requires_external': self.requires_external,
'author': self.author,
'author_email': self.author_email,
'download_url': self.url,
'platform': self.platform,
'version': self.version,
'obsoletes': self.obsoletes,
'provides': self.provides,
'cheesecake_documentation_id': self.cheesecake_documentation_id,
'_pypi_hidden': self.hidden,
'description': self.description,
'_pypi_ordering': 19,
'requires_dist': self.requires_dist,
'requires_python': self.requires_python,
'classifiers': [],
'name': self.name,
'licence': self.licence,
'summary': self.summary,
'home_page': self.homepage,
'stable_version': self.stable_version,
'provides_dist': self.provides_dist or "%s (%s)" % (self.name,
self.version),
'requires': self.requires,
'cheesecake_installability_id': self.cheesecake_installability_id,
}
def search_result(self):
return {
'_pypi_ordering': 0,
'version': self.version,
'name': self.name,
'summary': self.summary,
}
class XMLRPCMockIndex:
"""Mock XMLRPC server"""
def __init__(self, dists=[]):
self._dists = dists
self._search_result = []
def add_distributions(self, dists):
for dist in dists:
self._dists.append(MockDist(**dist))
def set_distributions(self, dists):
self._dists = []
self.add_distributions(dists)
def set_search_result(self, result):
"""set a predefined search result"""
self._search_result = result
def _get_search_results(self):
results = []
for name in self._search_result:
found_dist = [d for d in self._dists if d.name == name]
if found_dist:
results.append(found_dist[0])
else:
dist = MockDist(name)
results.append(dist)
self._dists.append(dist)
return [r.search_result() for r in results]
def list_packages(self):
return [d.name for d in self._dists]
def package_releases(self, package_name, show_hidden=False):
if show_hidden:
# return all
return [d.version for d in self._dists if d.name == package_name]
else:
# return only un-hidden
return [d.version for d in self._dists if d.name == package_name
and not d.hidden]
def release_urls(self, package_name, version):
return [d.url_infos() for d in self._dists
if d.name == package_name and d.version == version]
def release_data(self, package_name, version):
release = [d for d in self._dists
if d.name == package_name and d.version == version]
if release:
return release[0].metadata()
else:
return {}
def search(self, spec, operator="and"):
return self._get_search_results()