mirror of
https://github.com/python/cpython.git
synced 2024-11-25 02:44:06 +08:00
7e642e82d3
Some of the characters (form feed, vertical tab) are not legal continuation characters anyway, so this was wrong as well as annoying.
306 lines
9.9 KiB
Python
306 lines
9.9 KiB
Python
"""CGI-savvy HTTP Server.
|
|
|
|
This module builds on SimpleHTTPServer by implementing GET and POST
|
|
requests to cgi-bin scripts.
|
|
|
|
If the os.fork() function is not present (e.g. on Windows),
|
|
os.popen2() is used as a fallback, with slightly altered semantics; if
|
|
that function is not present either (e.g. on Macintosh), only Python
|
|
scripts are supported, and they are executed by the current process.
|
|
|
|
In all cases, the implementation is intentionally naive -- all
|
|
requests are executed sychronously.
|
|
|
|
SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
|
|
-- it may execute arbitrary Python code or external programs.
|
|
|
|
"""
|
|
|
|
|
|
__version__ = "0.4"
|
|
|
|
__all__ = ["CGIHTTPRequestHandler"]
|
|
|
|
import os
|
|
import sys
|
|
import urllib
|
|
import BaseHTTPServer
|
|
import SimpleHTTPServer
|
|
|
|
|
|
class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
|
|
|
|
"""Complete HTTP server with GET, HEAD and POST commands.
|
|
|
|
GET and HEAD also support running CGI scripts.
|
|
|
|
The POST command is *only* implemented for CGI scripts.
|
|
|
|
"""
|
|
|
|
# Determine platform specifics
|
|
have_fork = hasattr(os, 'fork')
|
|
have_popen2 = hasattr(os, 'popen2')
|
|
|
|
# Make rfile unbuffered -- we need to read one line and then pass
|
|
# the rest to a subprocess, so we can't use buffered input.
|
|
rbufsize = 0
|
|
|
|
def do_POST(self):
|
|
"""Serve a POST request.
|
|
|
|
This is only implemented for CGI scripts.
|
|
|
|
"""
|
|
|
|
if self.is_cgi():
|
|
self.run_cgi()
|
|
else:
|
|
self.send_error(501, "Can only POST to CGI scripts")
|
|
|
|
def send_head(self):
|
|
"""Version of send_head that support CGI scripts"""
|
|
if self.is_cgi():
|
|
return self.run_cgi()
|
|
else:
|
|
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
|
|
|
|
def is_cgi(self):
|
|
"""Test whether self.path corresponds to a CGI script.
|
|
|
|
Return a tuple (dir, rest) if self.path requires running a
|
|
CGI script, None if not. Note that rest begins with a
|
|
slash if it is not empty.
|
|
|
|
The default implementation tests whether the path
|
|
begins with one of the strings in the list
|
|
self.cgi_directories (and the next character is a '/'
|
|
or the end of the string).
|
|
|
|
"""
|
|
|
|
path = self.path
|
|
|
|
for x in self.cgi_directories:
|
|
i = len(x)
|
|
if path[:i] == x and (not path[i:] or path[i] == '/'):
|
|
self.cgi_info = path[:i], path[i+1:]
|
|
return 1
|
|
return 0
|
|
|
|
cgi_directories = ['/cgi-bin', '/htbin']
|
|
|
|
def is_executable(self, path):
|
|
"""Test whether argument path is an executable file."""
|
|
return executable(path)
|
|
|
|
def is_python(self, path):
|
|
"""Test whether argument path is a Python script."""
|
|
head, tail = os.path.splitext(path)
|
|
return tail.lower() in (".py", ".pyw")
|
|
|
|
def run_cgi(self):
|
|
"""Execute a CGI script."""
|
|
dir, rest = self.cgi_info
|
|
i = rest.rfind('?')
|
|
if i >= 0:
|
|
rest, query = rest[:i], rest[i+1:]
|
|
else:
|
|
query = ''
|
|
i = rest.find('/')
|
|
if i >= 0:
|
|
script, rest = rest[:i], rest[i:]
|
|
else:
|
|
script, rest = rest, ''
|
|
scriptname = dir + '/' + script
|
|
scriptfile = self.translate_path(scriptname)
|
|
if not os.path.exists(scriptfile):
|
|
self.send_error(404, "No such CGI script (%s)" % `scriptname`)
|
|
return
|
|
if not os.path.isfile(scriptfile):
|
|
self.send_error(403, "CGI script is not a plain file (%s)" %
|
|
`scriptname`)
|
|
return
|
|
ispy = self.is_python(scriptname)
|
|
if not ispy:
|
|
if not (self.have_fork or self.have_popen2):
|
|
self.send_error(403, "CGI script is not a Python script (%s)" %
|
|
`scriptname`)
|
|
return
|
|
if not self.is_executable(scriptfile):
|
|
self.send_error(403, "CGI script is not executable (%s)" %
|
|
`scriptname`)
|
|
return
|
|
|
|
# Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
|
|
# XXX Much of the following could be prepared ahead of time!
|
|
env = {}
|
|
env['SERVER_SOFTWARE'] = self.version_string()
|
|
env['SERVER_NAME'] = self.server.server_name
|
|
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
|
|
env['SERVER_PROTOCOL'] = self.protocol_version
|
|
env['SERVER_PORT'] = str(self.server.server_port)
|
|
env['REQUEST_METHOD'] = self.command
|
|
uqrest = urllib.unquote(rest)
|
|
env['PATH_INFO'] = uqrest
|
|
env['PATH_TRANSLATED'] = self.translate_path(uqrest)
|
|
env['SCRIPT_NAME'] = scriptname
|
|
if query:
|
|
env['QUERY_STRING'] = query
|
|
host = self.address_string()
|
|
if host != self.client_address[0]:
|
|
env['REMOTE_HOST'] = host
|
|
env['REMOTE_ADDR'] = self.client_address[0]
|
|
# XXX AUTH_TYPE
|
|
# XXX REMOTE_USER
|
|
# XXX REMOTE_IDENT
|
|
if self.headers.typeheader is None:
|
|
env['CONTENT_TYPE'] = self.headers.type
|
|
else:
|
|
env['CONTENT_TYPE'] = self.headers.typeheader
|
|
length = self.headers.getheader('content-length')
|
|
if length:
|
|
env['CONTENT_LENGTH'] = length
|
|
accept = []
|
|
for line in self.headers.getallmatchingheaders('accept'):
|
|
if line[:1] in "\t\n\r ":
|
|
accept.append(line.strip())
|
|
else:
|
|
accept = accept + line[7:].split(',')
|
|
env['HTTP_ACCEPT'] = ','.join(accept)
|
|
ua = self.headers.getheader('user-agent')
|
|
if ua:
|
|
env['HTTP_USER_AGENT'] = ua
|
|
co = filter(None, self.headers.getheaders('cookie'))
|
|
if co:
|
|
env['HTTP_COOKIE'] = ', '.join(co)
|
|
# XXX Other HTTP_* headers
|
|
if not self.have_fork:
|
|
# Since we're setting the env in the parent, provide empty
|
|
# values to override previously set values
|
|
for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
|
|
'HTTP_USER_AGENT', 'HTTP_COOKIE'):
|
|
env.setdefault(k, "")
|
|
|
|
self.send_response(200, "Script output follows")
|
|
|
|
decoded_query = query.replace('+', ' ')
|
|
|
|
if self.have_fork:
|
|
# Unix -- fork as we should
|
|
args = [script]
|
|
if '=' not in decoded_query:
|
|
args.append(decoded_query)
|
|
nobody = nobody_uid()
|
|
self.wfile.flush() # Always flush before forking
|
|
pid = os.fork()
|
|
if pid != 0:
|
|
# Parent
|
|
pid, sts = os.waitpid(pid, 0)
|
|
if sts:
|
|
self.log_error("CGI script exit status %#x", sts)
|
|
return
|
|
# Child
|
|
try:
|
|
try:
|
|
os.setuid(nobody)
|
|
except os.error:
|
|
pass
|
|
os.dup2(self.rfile.fileno(), 0)
|
|
os.dup2(self.wfile.fileno(), 1)
|
|
os.execve(scriptfile, args, env)
|
|
except:
|
|
self.server.handle_error(self.request, self.client_address)
|
|
os._exit(127)
|
|
|
|
elif self.have_popen2:
|
|
# Windows -- use popen2 to create a subprocess
|
|
import shutil
|
|
os.environ.update(env)
|
|
cmdline = scriptfile
|
|
if self.is_python(scriptfile):
|
|
interp = sys.executable
|
|
if interp.lower().endswith("w.exe"):
|
|
# On Windows, use python.exe, not python.exe
|
|
interp = interp[:-5] = interp[-4:]
|
|
cmdline = "%s %s" % (interp, cmdline)
|
|
if '=' not in query and '"' not in query:
|
|
cmdline = '%s "%s"' % (cmdline, query)
|
|
self.log_error("command: %s", cmdline)
|
|
try:
|
|
nbytes = int(length)
|
|
except:
|
|
nbytes = 0
|
|
fi, fo = os.popen2(cmdline)
|
|
if self.command.lower() == "post" and nbytes > 0:
|
|
data = self.rfile.read(nbytes)
|
|
fi.write(data)
|
|
fi.close()
|
|
shutil.copyfileobj(fo, self.wfile)
|
|
sts = fo.close()
|
|
if sts:
|
|
self.log_error("CGI script exit status %#x", sts)
|
|
else:
|
|
self.log_error("CGI script exited OK")
|
|
|
|
else:
|
|
# Other O.S. -- execute script in this process
|
|
os.environ.update(env)
|
|
save_argv = sys.argv
|
|
save_stdin = sys.stdin
|
|
save_stdout = sys.stdout
|
|
save_stderr = sys.stderr
|
|
try:
|
|
try:
|
|
sys.argv = [scriptfile]
|
|
if '=' not in decoded_query:
|
|
sys.argv.append(decoded_query)
|
|
sys.stdout = self.wfile
|
|
sys.stdin = self.rfile
|
|
execfile(scriptfile, {"__name__": "__main__"})
|
|
finally:
|
|
sys.argv = save_argv
|
|
sys.stdin = save_stdin
|
|
sys.stdout = save_stdout
|
|
sys.stderr = save_stderr
|
|
except SystemExit, sts:
|
|
self.log_error("CGI script exit status %s", str(sts))
|
|
else:
|
|
self.log_error("CGI script exited OK")
|
|
|
|
|
|
nobody = None
|
|
|
|
def nobody_uid():
|
|
"""Internal routine to get nobody's uid"""
|
|
global nobody
|
|
if nobody:
|
|
return nobody
|
|
try:
|
|
import pwd
|
|
except ImportError:
|
|
return -1
|
|
try:
|
|
nobody = pwd.getpwnam('nobody')[2]
|
|
except KeyError:
|
|
nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
|
|
return nobody
|
|
|
|
|
|
def executable(path):
|
|
"""Test for executable file."""
|
|
try:
|
|
st = os.stat(path)
|
|
except os.error:
|
|
return 0
|
|
return st[0] & 0111 != 0
|
|
|
|
|
|
def test(HandlerClass = CGIHTTPRequestHandler,
|
|
ServerClass = BaseHTTPServer.HTTPServer):
|
|
SimpleHTTPServer.test(HandlerClass, ServerClass)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test()
|