Permalink
Cannot retrieve contributors at this time
#! /usr/local/bin/python | |
# NOTE: the above "/usr/local/bin/python" is NOT a mistake. It is | |
# intentionally NOT "/usr/bin/env python". On many systems | |
# (e.g. Solaris), /usr/local/bin is not in $PATH as passed to CGI | |
# scripts, and /usr/local/bin is the default directory where Python is | |
# installed, so /usr/bin/env would be unable to find python. Granted, | |
# binary installations by Linux vendors often install Python in | |
# /usr/bin. So let those vendors patch cgi.py to match their choice | |
# of installation. | |
"""Support module for CGI (Common Gateway Interface) scripts. | |
This module defines a number of utilities for use by CGI scripts | |
written in Python. | |
""" | |
# History | |
# ------- | |
# | |
# Michael McLay started this module. Steve Majewski changed the | |
# interface to SvFormContentDict and FormContentDict. The multipart | |
# parsing was inspired by code submitted by Andreas Paepcke. Guido van | |
# Rossum rewrote, reformatted and documented the module and is currently | |
# responsible for its maintenance. | |
# | |
__version__ = "2.6" | |
# Imports | |
# ======= | |
from io import StringIO, BytesIO, TextIOWrapper | |
from collections.abc import Mapping | |
import sys | |
import os | |
import urllib.parse | |
from email.parser import FeedParser | |
from email.message import Message | |
import html | |
import locale | |
import tempfile | |
__all__ = ["MiniFieldStorage", "FieldStorage", "parse", "parse_multipart", | |
"parse_header", "test", "print_exception", "print_environ", | |
"print_form", "print_directory", "print_arguments", | |
"print_environ_usage"] | |
# Logging support | |
# =============== | |
logfile = "" # Filename to log to, if not empty | |
logfp = None # File object to log to, if not None | |
def initlog(*allargs): | |
"""Write a log message, if there is a log file. | |
Even though this function is called initlog(), you should always | |
use log(); log is a variable that is set either to initlog | |
(initially), to dolog (once the log file has been opened), or to | |
nolog (when logging is disabled). | |
The first argument is a format string; the remaining arguments (if | |
any) are arguments to the % operator, so e.g. | |
log("%s: %s", "a", "b") | |
will write "a: b" to the log file, followed by a newline. | |
If the global logfp is not None, it should be a file object to | |
which log data is written. | |
If the global logfp is None, the global logfile may be a string | |
giving a filename to open, in append mode. This file should be | |
world writable!!! If the file can't be opened, logging is | |
silently disabled (since there is no safe place where we could | |
send an error message). | |
""" | |
global log, logfile, logfp | |
if logfile and not logfp: | |
try: | |
logfp = open(logfile, "a") | |
except OSError: | |
pass | |
if not logfp: | |
log = nolog | |
else: | |
log = dolog | |
log(*allargs) | |
def dolog(fmt, *args): | |
"""Write a log message to the log file. See initlog() for docs.""" | |
logfp.write(fmt%args + "\n") | |
def nolog(*allargs): | |
"""Dummy function, assigned to log when logging is disabled.""" | |
pass | |
def closelog(): | |
"""Close the log file.""" | |
global log, logfile, logfp | |
logfile = '' | |
if logfp: | |
logfp.close() | |
logfp = None | |
log = initlog | |
log = initlog # The current logging function | |
# Parsing functions | |
# ================= | |
# Maximum input we will accept when REQUEST_METHOD is POST | |
# 0 ==> unlimited input | |
maxlen = 0 | |
def parse(fp=None, environ=os.environ, keep_blank_values=0, | |
strict_parsing=0, separator='&'): | |
"""Parse a query in the environment or from a file (default stdin) | |
Arguments, all optional: | |
fp : file pointer; default: sys.stdin.buffer | |
environ : environment dictionary; default: os.environ | |
keep_blank_values: flag indicating whether blank values in | |
percent-encoded forms should be treated as blank strings. | |
A true value indicates that blanks should be retained as | |
blank strings. The default false value indicates that | |
blank values are to be ignored and treated as if they were | |
not included. | |
strict_parsing: flag indicating what to do with parsing errors. | |
If false (the default), errors are silently ignored. | |
If true, errors raise a ValueError exception. | |
separator: str. The symbol to use for separating the query arguments. | |
Defaults to &. | |
""" | |
if fp is None: | |
fp = sys.stdin | |
# field keys and values (except for files) are returned as strings | |
# an encoding is required to decode the bytes read from self.fp | |
if hasattr(fp,'encoding'): | |
encoding = fp.encoding | |
else: | |
encoding = 'latin-1' | |
# fp.read() must return bytes | |
if isinstance(fp, TextIOWrapper): | |
fp = fp.buffer | |
if not 'REQUEST_METHOD' in environ: | |
environ['REQUEST_METHOD'] = 'GET' # For testing stand-alone | |
if environ['REQUEST_METHOD'] == 'POST': | |
ctype, pdict = parse_header(environ['CONTENT_TYPE']) | |
if ctype == 'multipart/form-data': | |
return parse_multipart(fp, pdict, separator=separator) | |
elif ctype == 'application/x-www-form-urlencoded': | |
clength = int(environ['CONTENT_LENGTH']) | |
if maxlen and clength > maxlen: | |
raise ValueError('Maximum content length exceeded') | |
qs = fp.read(clength).decode(encoding) | |
else: | |
qs = '' # Unknown content-type | |
if 'QUERY_STRING' in environ: | |
if qs: qs = qs + '&' | |
qs = qs + environ['QUERY_STRING'] | |
elif sys.argv[1:]: | |
if qs: qs = qs + '&' | |
qs = qs + sys.argv[1] | |
environ['QUERY_STRING'] = qs # XXX Shouldn't, really | |
elif 'QUERY_STRING' in environ: | |
qs = environ['QUERY_STRING'] | |
else: | |
if sys.argv[1:]: | |
qs = sys.argv[1] | |
else: | |
qs = "" | |
environ['QUERY_STRING'] = qs # XXX Shouldn't, really | |
return urllib.parse.parse_qs(qs, keep_blank_values, strict_parsing, | |
encoding=encoding, separator=separator) | |
def parse_multipart(fp, pdict, encoding="utf-8", errors="replace", separator='&'): | |
"""Parse multipart input. | |
Arguments: | |
fp : input file | |
pdict: dictionary containing other parameters of content-type header | |
encoding, errors: request encoding and error handler, passed to | |
FieldStorage | |
Returns a dictionary just like parse_qs(): keys are the field names, each | |
value is a list of values for that field. For non-file fields, the value | |
is a list of strings. | |
""" | |
# RFC 2046, Section 5.1 : The "multipart" boundary delimiters are always | |
# represented as 7bit US-ASCII. | |
boundary = pdict['boundary'].decode('ascii') | |
ctype = "multipart/form-data; boundary={}".format(boundary) | |
headers = Message() | |
headers.set_type(ctype) | |
try: | |
headers['Content-Length'] = pdict['CONTENT-LENGTH'] | |
except KeyError: | |
pass | |
fs = FieldStorage(fp, headers=headers, encoding=encoding, errors=errors, | |
environ={'REQUEST_METHOD': 'POST'}, separator=separator) | |
return {k: fs.getlist(k) for k in fs} | |
def _parseparam(s): | |
while s[:1] == ';': | |
s = s[1:] | |
end = s.find(';') | |
while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2: | |
end = s.find(';', end + 1) | |
if end < 0: | |
end = len(s) | |
f = s[:end] | |
yield f.strip() | |
s = s[end:] | |
def parse_header(line): | |
"""Parse a Content-type like header. | |
Return the main content-type and a dictionary of options. | |
""" | |
parts = _parseparam(';' + line) | |
key = parts.__next__() | |
pdict = {} | |
for p in parts: | |
i = p.find('=') | |
if i >= 0: | |
name = p[:i].strip().lower() | |
value = p[i+1:].strip() | |
if len(value) >= 2 and value[0] == value[-1] == '"': | |
value = value[1:-1] | |
value = value.replace('\\\\', '\\').replace('\\"', '"') | |
pdict[name] = value | |
return key, pdict | |
# Classes for field storage | |
# ========================= | |
class MiniFieldStorage: | |
"""Like FieldStorage, for use when no file uploads are possible.""" | |
# Dummy attributes | |
filename = None | |
list = None | |
type = None | |
file = None | |
type_options = {} | |
disposition = None | |
disposition_options = {} | |
headers = {} | |
def __init__(self, name, value): | |
"""Constructor from field name and value.""" | |
self.name = name | |
self.value = value | |
# self.file = StringIO(value) | |
def __repr__(self): | |
"""Return printable representation.""" | |
return "MiniFieldStorage(%r, %r)" % (self.name, self.value) | |
class FieldStorage: | |
"""Store a sequence of fields, reading multipart/form-data. | |
This class provides naming, typing, files stored on disk, and | |
more. At the top level, it is accessible like a dictionary, whose | |
keys are the field names. (Note: None can occur as a field name.) | |
The items are either a Python list (if there's multiple values) or | |
another FieldStorage or MiniFieldStorage object. If it's a single | |
object, it has the following attributes: | |
name: the field name, if specified; otherwise None | |
filename: the filename, if specified; otherwise None; this is the | |
client side filename, *not* the file name on which it is | |
stored (that's a temporary file you don't deal with) | |
value: the value as a *string*; for file uploads, this | |
transparently reads the file every time you request the value | |
and returns *bytes* | |
file: the file(-like) object from which you can read the data *as | |
bytes* ; None if the data is stored a simple string | |
type: the content-type, or None if not specified | |
type_options: dictionary of options specified on the content-type | |
line | |
disposition: content-disposition, or None if not specified | |
disposition_options: dictionary of corresponding options | |
headers: a dictionary(-like) object (sometimes email.message.Message or a | |
subclass thereof) containing *all* headers | |
The class is subclassable, mostly for the purpose of overriding | |
the make_file() method, which is called internally to come up with | |
a file open for reading and writing. This makes it possible to | |
override the default choice of storing all files in a temporary | |
directory and unlinking them as soon as they have been opened. | |
""" | |
def __init__(self, fp=None, headers=None, outerboundary=b'', | |
environ=os.environ, keep_blank_values=0, strict_parsing=0, | |
limit=None, encoding='utf-8', errors='replace', | |
max_num_fields=None, separator='&'): | |
"""Constructor. Read multipart/* until last part. | |
Arguments, all optional: | |
fp : file pointer; default: sys.stdin.buffer | |
(not used when the request method is GET) | |
Can be : | |
1. a TextIOWrapper object | |
2. an object whose read() and readline() methods return bytes | |
headers : header dictionary-like object; default: | |
taken from environ as per CGI spec | |
outerboundary : terminating multipart boundary | |
(for internal use only) | |
environ : environment dictionary; default: os.environ | |
keep_blank_values: flag indicating whether blank values in | |
percent-encoded forms should be treated as blank strings. | |
A true value indicates that blanks should be retained as | |
blank strings. The default false value indicates that | |
blank values are to be ignored and treated as if they were | |
not included. | |
strict_parsing: flag indicating what to do with parsing errors. | |
If false (the default), errors are silently ignored. | |
If true, errors raise a ValueError exception. | |
limit : used internally to read parts of multipart/form-data forms, | |
to exit from the reading loop when reached. It is the difference | |
between the form content-length and the number of bytes already | |
read | |
encoding, errors : the encoding and error handler used to decode the | |
binary stream to strings. Must be the same as the charset defined | |
for the page sending the form (content-type : meta http-equiv or | |
header) | |
max_num_fields: int. If set, then __init__ throws a ValueError | |
if there are more than n fields read by parse_qsl(). | |
""" | |
method = 'GET' | |
self.keep_blank_values = keep_blank_values | |
self.strict_parsing = strict_parsing | |
self.max_num_fields = max_num_fields | |
self.separator = separator | |
if 'REQUEST_METHOD' in environ: | |
method = environ['REQUEST_METHOD'].upper() | |
self.qs_on_post = None | |
if method == 'GET' or method == 'HEAD': | |
if 'QUERY_STRING' in environ: | |
qs = environ['QUERY_STRING'] | |
elif sys.argv[1:]: | |
qs = sys.argv[1] | |
else: | |
qs = "" | |
qs = qs.encode(locale.getpreferredencoding(), 'surrogateescape') | |
fp = BytesIO(qs) | |
if headers is None: | |
headers = {'content-type': | |
"application/x-www-form-urlencoded"} | |
if headers is None: | |
headers = {} | |
if method == 'POST': | |
# Set default content-type for POST to what's traditional | |
headers['content-type'] = "application/x-www-form-urlencoded" | |
if 'CONTENT_TYPE' in environ: | |
headers['content-type'] = environ['CONTENT_TYPE'] | |
if 'QUERY_STRING' in environ: | |
self.qs_on_post = environ['QUERY_STRING'] | |
if 'CONTENT_LENGTH' in environ: | |
headers['content-length'] = environ['CONTENT_LENGTH'] | |
else: | |
if not (isinstance(headers, (Mapping, Message))): | |
raise TypeError("headers must be mapping or an instance of " | |
"email.message.Message") | |
self.headers = headers | |
if fp is None: | |
self.fp = sys.stdin.buffer | |
# self.fp.read() must return bytes | |
elif isinstance(fp, TextIOWrapper): | |
self.fp = fp.buffer | |
else: | |
if not (hasattr(fp, 'read') and hasattr(fp, 'readline')): | |
raise TypeError("fp must be file pointer") | |
self.fp = fp | |
self.encoding = encoding | |
self.errors = errors | |
if not isinstance(outerboundary, bytes): | |
raise TypeError('outerboundary must be bytes, not %s' | |
% type(outerboundary).__name__) | |
self.outerboundary = outerboundary | |
self.bytes_read = 0 | |
self.limit = limit | |
# Process content-disposition header | |
cdisp, pdict = "", {} | |
if 'content-disposition' in self.headers: | |
cdisp, pdict = parse_header(self.headers['content-disposition']) | |
self.disposition = cdisp | |
self.disposition_options = pdict | |
self.name = None | |
if 'name' in pdict: | |
self.name = pdict['name'] | |
self.filename = None | |
if 'filename' in pdict: | |
self.filename = pdict['filename'] | |
self._binary_file = self.filename is not None | |
# Process content-type header | |
# | |
# Honor any existing content-type header. But if there is no | |
# content-type header, use some sensible defaults. Assume | |
# outerboundary is "" at the outer level, but something non-false | |
# inside a multi-part. The default for an inner part is text/plain, | |
# but for an outer part it should be urlencoded. This should catch | |
# bogus clients which erroneously forget to include a content-type | |
# header. | |
# | |
# See below for what we do if there does exist a content-type header, | |
# but it happens to be something we don't understand. | |
if 'content-type' in self.headers: | |
ctype, pdict = parse_header(self.headers['content-type']) | |
elif self.outerboundary or method != 'POST': | |
ctype, pdict = "text/plain", {} | |
else: | |
ctype, pdict = 'application/x-www-form-urlencoded', {} | |
self.type = ctype | |
self.type_options = pdict | |
if 'boundary' in pdict: | |
self.innerboundary = pdict['boundary'].encode(self.encoding, | |
self.errors) | |
else: | |
self.innerboundary = b"" | |
clen = -1 | |
if 'content-length' in self.headers: | |
try: | |
clen = int(self.headers['content-length']) | |
except ValueError: | |
pass | |
if maxlen and clen > maxlen: | |
raise ValueError('Maximum content length exceeded') | |
self.length = clen | |
if self.limit is None and clen >= 0: | |
self.limit = clen | |
self.list = self.file = None | |
self.done = 0 | |
if ctype == 'application/x-www-form-urlencoded': | |
self.read_urlencoded() | |
elif ctype[:10] == 'multipart/': | |
self.read_multi(environ, keep_blank_values, strict_parsing) | |
else: | |
self.read_single() | |
def __del__(self): | |
try: | |
self.file.close() | |
except AttributeError: | |
pass | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
self.file.close() | |
def __repr__(self): | |
"""Return a printable representation.""" | |
return "FieldStorage(%r, %r, %r)" % ( | |
self.name, self.filename, self.value) | |
def __iter__(self): | |
return iter(self.keys()) | |
def __getattr__(self, name): | |
if name != 'value': | |
raise AttributeError(name) | |
if self.file: | |
self.file.seek(0) | |
value = self.file.read() | |
self.file.seek(0) | |
elif self.list is not None: | |
value = self.list | |
else: | |
value = None | |
return value | |
def __getitem__(self, key): | |
"""Dictionary style indexing.""" | |
if self.list is None: | |
raise TypeError("not indexable") | |
found = [] | |
for item in self.list: | |
if item.name == key: found.append(item) | |
if not found: | |
raise KeyError(key) | |
if len(found) == 1: | |
return found[0] | |
else: | |
return found | |
def getvalue(self, key, default=None): | |
"""Dictionary style get() method, including 'value' lookup.""" | |
if key in self: | |
value = self[key] | |
if isinstance(value, list): | |
return [x.value for x in value] | |
else: | |
return value.value | |
else: | |
return default | |
def getfirst(self, key, default=None): | |
""" Return the first value received.""" | |
if key in self: | |
value = self[key] | |
if isinstance(value, list): | |
return value[0].value | |
else: | |
return value.value | |
else: | |
return default | |
def getlist(self, key): | |
""" Return list of received values.""" | |
if key in self: | |
value = self[key] | |
if isinstance(value, list): | |
return [x.value for x in value] | |
else: | |
return [value.value] | |
else: | |
return [] | |
def keys(self): | |
"""Dictionary style keys() method.""" | |
if self.list is None: | |
raise TypeError("not indexable") | |
return list(set(item.name for item in self.list)) | |
def __contains__(self, key): | |
"""Dictionary style __contains__ method.""" | |
if self.list is None: | |
raise TypeError("not indexable") | |
return any(item.name == key for item in self.list) | |
def __len__(self): | |
"""Dictionary style len(x) support.""" | |
return len(self.keys()) | |
def __bool__(self): | |
if self.list is None: | |
raise TypeError("Cannot be converted to bool.") | |
return bool(self.list) | |
def read_urlencoded(self): | |
"""Internal: read data in query string format.""" | |
qs = self.fp.read(self.length) | |
if not isinstance(qs, bytes): | |
raise ValueError("%s should return bytes, got %s" \ | |
% (self.fp, type(qs).__name__)) | |
qs = qs.decode(self.encoding, self.errors) | |
if self.qs_on_post: | |
qs += '&' + self.qs_on_post | |
query = urllib.parse.parse_qsl( | |
qs, self.keep_blank_values, self.strict_parsing, | |
encoding=self.encoding, errors=self.errors, | |
max_num_fields=self.max_num_fields, separator=self.separator) | |
self.list = [MiniFieldStorage(key, value) for key, value in query] | |
self.skip_lines() | |
FieldStorageClass = None | |
def read_multi(self, environ, keep_blank_values, strict_parsing): | |
"""Internal: read a part that is itself multipart.""" | |
ib = self.innerboundary | |
if not valid_boundary(ib): | |
raise ValueError('Invalid boundary in multipart form: %r' % (ib,)) | |
self.list = [] | |
if self.qs_on_post: | |
query = urllib.parse.parse_qsl( | |
self.qs_on_post, self.keep_blank_values, self.strict_parsing, | |
encoding=self.encoding, errors=self.errors, | |
max_num_fields=self.max_num_fields, separator=self.separator) | |
self.list.extend(MiniFieldStorage(key, value) for key, value in query) | |
klass = self.FieldStorageClass or self.__class__ | |
first_line = self.fp.readline() # bytes | |
if not isinstance(first_line, bytes): | |
raise ValueError("%s should return bytes, got %s" \ | |
% (self.fp, type(first_line).__name__)) | |
self.bytes_read += len(first_line) | |
# Ensure that we consume the file until we've hit our inner boundary | |
while (first_line.strip() != (b"--" + self.innerboundary) and | |
first_line): | |
first_line = self.fp.readline() | |
self.bytes_read += len(first_line) | |
# Propagate max_num_fields into the sub class appropriately | |
max_num_fields = self.max_num_fields | |
if max_num_fields is not None: | |
max_num_fields -= len(self.list) | |
while True: | |
parser = FeedParser() | |
hdr_text = b"" | |
while True: | |
data = self.fp.readline() | |
hdr_text += data | |
if not data.strip(): | |
break | |
if not hdr_text: | |
break | |
# parser takes strings, not bytes | |
self.bytes_read += len(hdr_text) | |
parser.feed(hdr_text.decode(self.encoding, self.errors)) | |
headers = parser.close() | |
# Some clients add Content-Length for part headers, ignore them | |
if 'content-length' in headers: | |
del headers['content-length'] | |
limit = None if self.limit is None \ | |
else self.limit - self.bytes_read | |
part = klass(self.fp, headers, ib, environ, keep_blank_values, | |
strict_parsing, limit, | |
self.encoding, self.errors, max_num_fields, self.separator) | |
if max_num_fields is not None: | |
max_num_fields -= 1 | |
if part.list: | |
max_num_fields -= len(part.list) | |
if max_num_fields < 0: | |
raise ValueError('Max number of fields exceeded') | |
self.bytes_read += part.bytes_read | |
self.list.append(part) | |
if part.done or self.bytes_read >= self.length > 0: | |
break | |
self.skip_lines() | |
def read_single(self): | |
"""Internal: read an atomic part.""" | |
if self.length >= 0: | |
self.read_binary() | |
self.skip_lines() | |
else: | |
self.read_lines() | |
self.file.seek(0) | |
bufsize = 8*1024 # I/O buffering size for copy to file | |
def read_binary(self): | |
"""Internal: read binary data.""" | |
self.file = self.make_file() | |
todo = self.length | |
if todo >= 0: | |
while todo > 0: | |
data = self.fp.read(min(todo, self.bufsize)) # bytes | |
if not isinstance(data, bytes): | |
raise ValueError("%s should return bytes, got %s" | |
% (self.fp, type(data).__name__)) | |
self.bytes_read += len(data) | |
if not data: | |
self.done = -1 | |
break | |
self.file.write(data) | |
todo = todo - len(data) | |
def read_lines(self): | |
"""Internal: read lines until EOF or outerboundary.""" | |
if self._binary_file: | |
self.file = self.__file = BytesIO() # store data as bytes for files | |
else: | |
self.file = self.__file = StringIO() # as strings for other fields | |
if self.outerboundary: | |
self.read_lines_to_outerboundary() | |
else: | |
self.read_lines_to_eof() | |
def __write(self, line): | |
"""line is always bytes, not string""" | |
if self.__file is not None: | |
if self.__file.tell() + len(line) > 1000: | |
self.file = self.make_file() | |
data = self.__file.getvalue() | |
self.file.write(data) | |
self.__file = None | |
if self._binary_file: | |
# keep bytes | |
self.file.write(line) | |
else: | |
# decode to string | |
self.file.write(line.decode(self.encoding, self.errors)) | |
def read_lines_to_eof(self): | |
"""Internal: read lines until EOF.""" | |
while 1: | |
line = self.fp.readline(1<<16) # bytes | |
self.bytes_read += len(line) | |
if not line: | |
self.done = -1 | |
break | |
self.__write(line) | |
def read_lines_to_outerboundary(self): | |
"""Internal: read lines until outerboundary. | |
Data is read as bytes: boundaries and line ends must be converted | |
to bytes for comparisons. | |
""" | |
next_boundary = b"--" + self.outerboundary | |
last_boundary = next_boundary + b"--" | |
delim = b"" | |
last_line_lfend = True | |
_read = 0 | |
while 1: | |
if self.limit is not None and 0 <= self.limit <= _read: | |
break | |
line = self.fp.readline(1<<16) # bytes | |
self.bytes_read += len(line) | |
_read += len(line) | |
if not line: | |
self.done = -1 | |
break | |
if delim == b"\r": | |
line = delim + line | |
delim = b"" | |
if line.startswith(b"--") and last_line_lfend: | |
strippedline = line.rstrip() | |
if strippedline == next_boundary: | |
break | |
if strippedline == last_boundary: | |
self.done = 1 | |
break | |
odelim = delim | |
if line.endswith(b"\r\n"): | |
delim = b"\r\n" | |
line = line[:-2] | |
last_line_lfend = True | |
elif line.endswith(b"\n"): | |
delim = b"\n" | |
line = line[:-1] | |
last_line_lfend = True | |
elif line.endswith(b"\r"): | |
# We may interrupt \r\n sequences if they span the 2**16 | |
# byte boundary | |
delim = b"\r" | |
line = line[:-1] | |
last_line_lfend = False | |
else: | |
delim = b"" | |
last_line_lfend = False | |
self.__write(odelim + line) | |
def skip_lines(self): | |
"""Internal: skip lines until outer boundary if defined.""" | |
if not self.outerboundary or self.done: | |
return | |
next_boundary = b"--" + self.outerboundary | |
last_boundary = next_boundary + b"--" | |
last_line_lfend = True | |
while True: | |
line = self.fp.readline(1<<16) | |
self.bytes_read += len(line) | |
if not line: | |
self.done = -1 | |
break | |
if line.endswith(b"--") and last_line_lfend: | |
strippedline = line.strip() | |
if strippedline == next_boundary: | |
break | |
if strippedline == last_boundary: | |
self.done = 1 | |
break | |
last_line_lfend = line.endswith(b'\n') | |
def make_file(self): | |
"""Overridable: return a readable & writable file. | |
The file will be used as follows: | |
- data is written to it | |
- seek(0) | |
- data is read from it | |
The file is opened in binary mode for files, in text mode | |
for other fields | |
This version opens a temporary file for reading and writing, | |
and immediately deletes (unlinks) it. The trick (on Unix!) is | |
that the file can still be used, but it can't be opened by | |
another process, and it will automatically be deleted when it | |
is closed or when the current process terminates. | |
If you want a more permanent file, you derive a class which | |
overrides this method. If you want a visible temporary file | |
that is nevertheless automatically deleted when the script | |
terminates, try defining a __del__ method in a derived class | |
which unlinks the temporary files you have created. | |
""" | |
if self._binary_file: | |
return tempfile.TemporaryFile("wb+") | |
else: | |
return tempfile.TemporaryFile("w+", | |
encoding=self.encoding, newline = '\n') | |
# Test/debug code | |
# =============== | |
def test(environ=os.environ): | |
"""Robust test CGI script, usable as main program. | |
Write minimal HTTP headers and dump all information provided to | |
the script in HTML form. | |
""" | |
print("Content-type: text/html") | |
print() | |
sys.stderr = sys.stdout | |
try: | |
form = FieldStorage() # Replace with other classes to test those | |
print_directory() | |
print_arguments() | |
print_form(form) | |
print_environ(environ) | |
print_environ_usage() | |
def f(): | |
exec("testing print_exception() -- <I>italics?</I>") | |
def g(f=f): | |
f() | |
print("<H3>What follows is a test, not an actual exception:</H3>") | |
g() | |
except: | |
print_exception() | |
print("<H1>Second try with a small maxlen...</H1>") | |
global maxlen | |
maxlen = 50 | |
try: | |
form = FieldStorage() # Replace with other classes to test those | |
print_directory() | |
print_arguments() | |
print_form(form) | |
print_environ(environ) | |
except: | |
print_exception() | |
def print_exception(type=None, value=None, tb=None, limit=None): | |
if type is None: | |
type, value, tb = sys.exc_info() | |
import traceback | |
print() | |
print("<H3>Traceback (most recent call last):</H3>") | |
list = traceback.format_tb(tb, limit) + \ | |
traceback.format_exception_only(type, value) | |
print("<PRE>%s<B>%s</B></PRE>" % ( | |
html.escape("".join(list[:-1])), | |
html.escape(list[-1]), | |
)) | |
del tb | |
def print_environ(environ=os.environ): | |
"""Dump the shell environment as HTML.""" | |
keys = sorted(environ.keys()) | |
print() | |
print("<H3>Shell Environment:</H3>") | |
print("<DL>") | |
for key in keys: | |
print("<DT>", html.escape(key), "<DD>", html.escape(environ[key])) | |
print("</DL>") | |
print() | |
def print_form(form): | |
"""Dump the contents of a form as HTML.""" | |
keys = sorted(form.keys()) | |
print() | |
print("<H3>Form Contents:</H3>") | |
if not keys: | |
print("<P>No form fields.") | |
print("<DL>") | |
for key in keys: | |
print("<DT>" + html.escape(key) + ":", end=' ') | |
value = form[key] | |
print("<i>" + html.escape(repr(type(value))) + "</i>") | |
print("<DD>" + html.escape(repr(value))) | |
print("</DL>") | |
print() | |
def print_directory(): | |
"""Dump the current directory as HTML.""" | |
print() | |
print("<H3>Current Working Directory:</H3>") | |
try: | |
pwd = os.getcwd() | |
except OSError as msg: | |
print("OSError:", html.escape(str(msg))) | |
else: | |
print(html.escape(pwd)) | |
print() | |
def print_arguments(): | |
print() | |
print("<H3>Command Line Arguments:</H3>") | |
print() | |
print(sys.argv) | |
print() | |
def print_environ_usage(): | |
"""Dump a list of environment variables used by CGI as HTML.""" | |
print(""" | |
<H3>These environment variables could have been set:</H3> | |
<UL> | |
<LI>AUTH_TYPE | |
<LI>CONTENT_LENGTH | |
<LI>CONTENT_TYPE | |
<LI>DATE_GMT | |
<LI>DATE_LOCAL | |
<LI>DOCUMENT_NAME | |
<LI>DOCUMENT_ROOT | |
<LI>DOCUMENT_URI | |
<LI>GATEWAY_INTERFACE | |
<LI>LAST_MODIFIED | |
<LI>PATH | |
<LI>PATH_INFO | |
<LI>PATH_TRANSLATED | |
<LI>QUERY_STRING | |
<LI>REMOTE_ADDR | |
<LI>REMOTE_HOST | |
<LI>REMOTE_IDENT | |
<LI>REMOTE_USER | |
<LI>REQUEST_METHOD | |
<LI>SCRIPT_NAME | |
<LI>SERVER_NAME | |
<LI>SERVER_PORT | |
<LI>SERVER_PROTOCOL | |
<LI>SERVER_ROOT | |
<LI>SERVER_SOFTWARE | |
</UL> | |
In addition, HTTP headers sent by the server may be passed in the | |
environment as well. Here are some common variable names: | |
<UL> | |
<LI>HTTP_ACCEPT | |
<LI>HTTP_CONNECTION | |
<LI>HTTP_HOST | |
<LI>HTTP_PRAGMA | |
<LI>HTTP_REFERER | |
<LI>HTTP_USER_AGENT | |
</UL> | |
""") | |
# Utilities | |
# ========= | |
def valid_boundary(s): | |
import re | |
if isinstance(s, bytes): | |
_vb_pattern = b"^[ -~]{0,200}[!-~]$" | |
else: | |
_vb_pattern = "^[ -~]{0,200}[!-~]$" | |
return re.match(_vb_pattern, s) | |
# Invoke mainline | |
# =============== | |
# Call test() when this file is run as a script (not imported as a module) | |
if __name__ == '__main__': | |
test() |