0001# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
0002# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
0003"""
0004Routines for testing WSGI applications.
0005
0006Most interesting is the `TestApp <class-paste.fixture.TestApp.html>`_
0007for testing WSGI applications, and the `TestFileEnvironment
0008<class-paste.fixture.TestFileEnvironment.html>`_ class for testing the
0009effects of command-line scripts.
0010"""
0011
0012import sys
0013import random
0014import urllib
0015import urlparse
0016import mimetypes
0017import time
0018import cgi
0019import os
0020import shutil
0021import webbrowser
0022import smtplib
0023import shlex
0024from Cookie import SimpleCookie
0025try:
0026 from cStringIO import StringIO
0027except ImportError:
0028 from StringIO import StringIO
0029import re
0030try:
0031 import subprocess
0032except ImportError:
0033 from paste.util import subprocess24 as subprocess
0034
0035from paste import wsgilib
0036from paste import lint
0037from paste.response import HeaderDict
0038
0039def tempnam_no_warning(*args):
0040 """
0041 An os.tempnam with the warning turned off, because sometimes
0042 you just need to use this and don't care about the stupid
0043 security warning.
0044 """
0045 return os.tempnam(*args)
0046
0047class NoDefault(object):
0048 pass
0049
0050def sorted(l):
0051 l = list(l)
0052 l.sort()
0053 return l
0054
0055class Dummy_smtplib(object):
0056
0057 existing = None
0058
0059 def __init__(self, server):
0060 import warnings
0061 warnings.warn(
0062 'Dummy_smtplib is not maintained and is deprecated',
0063 DeprecationWarning, 2)
0064 assert not self.existing, (
0065 "smtplib.SMTP() called again before Dummy_smtplib.existing.reset() "
0066 "called.")
0067 self.server = server
0068 self.open = True
0069 self.__class__.existing = self
0070
0071 def quit(self):
0072 assert self.open, (
0073 "Called %s.quit() twice" % self)
0074 self.open = False
0075
0076 def sendmail(self, from_address, to_addresses, msg):
0077 self.from_address = from_address
0078 self.to_addresses = to_addresses
0079 self.message = msg
0080
0081 def install(cls):
0082 smtplib.SMTP = cls
0083
0084 install = classmethod(install)
0085
0086 def reset(self):
0087 assert not self.open, (
0088 "SMTP connection not quit")
0089 self.__class__.existing = None
0090
0091class AppError(Exception):
0092 pass
0093
0094class TestApp(object):
0095
0096 # for py.test
0097 disabled = True
0098
0099 def __init__(self, app, namespace=None, relative_to=None,
0100 extra_environ=None, pre_request_hook=None,
0101 post_request_hook=None):
0102 """
0103 Wraps a WSGI application in a more convenient interface for
0104 testing.
0105
0106 ``app`` may be an application, or a Paste Deploy app
0107 URI, like ``'config:filename.ini#test'``.
0108
0109 ``namespace`` is a dictionary that will be written to (if
0110 provided). This can be used with doctest or some other
0111 system, and the variable ``res`` will be assigned everytime
0112 you make a request (instead of returning the request).
0113
0114 ``relative_to`` is a directory, and filenames used for file
0115 uploads are calculated relative to this. Also ``config:``
0116 URIs that aren't absolute.
0117
0118 ``extra_environ`` is a dictionary of values that should go
0119 into the environment for each request. These can provide a
0120 communication channel with the application.
0121
0122 ``pre_request_hook`` is a function to be called prior to
0123 making requests (such as ``post`` or ``get``). This function
0124 must take one argument (the instance of the TestApp).
0125
0126 ``post_request_hook`` is a function, similar to
0127 ``pre_request_hook``, to be called after requests are made.
0128 """
0129 if isinstance(app, (str, unicode)):
0130 from paste.deploy import loadapp
0131 # @@: Should pick up relative_to from calling module's
0132 # __file__
0133 app = loadapp(app, relative_to=relative_to)
0134 self.app = app
0135 self.namespace = namespace
0136 self.relative_to = relative_to
0137 if extra_environ is None:
0138 extra_environ = {}
0139 self.extra_environ = extra_environ
0140 self.pre_request_hook = pre_request_hook
0141 self.post_request_hook = post_request_hook
0142 self.reset()
0143
0144 def reset(self):
0145 """
0146 Resets the state of the application; currently just clears
0147 saved cookies.
0148 """
0149 self.cookies = {}
0150
0151 def _make_environ(self):
0152 environ = self.extra_environ.copy()
0153 environ['paste.throw_errors'] = True
0154 return environ
0155
0156 def get(self, url, params=None, headers=None, extra_environ=None,
0157 status=None, expect_errors=False):
0158 """
0159 Get the given url (well, actually a path like
0160 ``'/page.html'``).
0161
0162 ``params``:
0163 A query string, or a dictionary that will be encoded
0164 into a query string. You may also include a query
0165 string on the ``url``.
0166
0167 ``headers``:
0168 A dictionary of extra headers to send.
0169
0170 ``extra_environ``:
0171 A dictionary of environmental variables that should
0172 be added to the request.
0173
0174 ``status``:
0175 The integer status code you expect (if not 200 or 3xx).
0176 If you expect a 404 response, for instance, you must give
0177 ``status=404`` or it will be an error. You can also give
0178 a wildcard, like ``'3*'`` or ``'*'``.
0179
0180 ``expect_errors``:
0181 If this is not true, then if anything is written to
0182 ``wsgi.errors`` it will be an error. If it is true, then
0183 non-200/3xx responses are also okay.
0184
0185 Returns a `response object
0186 <class-paste.fixture.TestResponse.html>`_
0187 """
0188 if extra_environ is None:
0189 extra_environ = {}
0190 # Hide from py.test:
0191 __tracebackhide__ = True
0192 if params:
0193 if not isinstance(params, (str, unicode)):
0194 params = urllib.urlencode(params, doseq=True)
0195 if '?' in url:
0196 url += '&'
0197 else:
0198 url += '?'
0199 url += params
0200 environ = self._make_environ()
0201 url = str(url)
0202 if '?' in url:
0203 url, environ['QUERY_STRING'] = url.split('?', 1)
0204 else:
0205 environ['QUERY_STRING'] = ''
0206 self._set_headers(headers, environ)
0207 environ.update(extra_environ)
0208 req = TestRequest(url, environ, expect_errors)
0209 return self.do_request(req, status=status)
0210
0211 def _gen_request(self, method, url, params='', headers=None, extra_environ=None,
0212 status=None, upload_files=None, expect_errors=False):
0213 """
0214 Do a generic request.
0215 """
0216 if headers is None:
0217 headers = {}
0218 if extra_environ is None:
0219 extra_environ = {}
0220 environ = self._make_environ()
0221 # @@: Should this be all non-strings?
0222 if isinstance(params, (list, tuple, dict)):
0223 params = urllib.urlencode(params)
0224 if upload_files:
0225 params = cgi.parse_qsl(params, keep_blank_values=True)
0226 content_type, params = self.encode_multipart(
0227 params, upload_files)
0228 environ['CONTENT_TYPE'] = content_type
0229 if '?' in url:
0230 url, environ['QUERY_STRING'] = url.split('?', 1)
0231 else:
0232 environ['QUERY_STRING'] = ''
0233 environ['CONTENT_LENGTH'] = str(len(params))
0234 environ['REQUEST_METHOD'] = method
0235 environ['wsgi.input'] = StringIO(params)
0236 self._set_headers(headers, environ)
0237 environ.update(extra_environ)
0238 req = TestRequest(url, environ, expect_errors)
0239 return self.do_request(req, status=status)
0240
0241 def post(self, url, params='', headers=None, extra_environ=None,
0242 status=None, upload_files=None, expect_errors=False):
0243 """
0244 Do a POST request. Very like the ``.get()`` method.
0245 ``params`` are put in the body of the request.
0246
0247 ``upload_files`` is for file uploads. It should be a list of
0248 ``[(fieldname, filename, file_content)]``. You can also use
0249 just ``[(fieldname, filename)]`` and the file content will be
0250 read from disk.
0251
0252 Returns a `response object
0253 <class-paste.fixture.TestResponse.html>`_
0254 """
0255 return self._gen_request('POST', url, params=params, headers=headers,
0256 extra_environ=extra_environ,status=status,
0257 upload_files=upload_files,
0258 expect_errors=expect_errors)
0259
0260 def put(self, url, params='', headers=None, extra_environ=None,
0261 status=None, upload_files=None, expect_errors=False):
0262 """
0263 Do a PUT request. Very like the ``.get()`` method.
0264 ``params`` are put in the body of the request.
0265
0266 ``upload_files`` is for file uploads. It should be a list of
0267 ``[(fieldname, filename, file_content)]``. You can also use
0268 just ``[(fieldname, filename)]`` and the file content will be
0269 read from disk.
0270
0271 Returns a `response object
0272 <class-paste.fixture.TestResponse.html>`_
0273 """
0274 return self._gen_request('PUT', url, params=params, headers=headers,
0275 extra_environ=extra_environ,status=status,
0276 upload_files=upload_files,
0277 expect_errors=expect_errors)
0278
0279 def delete(self, url, params='', headers=None, extra_environ=None,
0280 status=None, expect_errors=False):
0281 """
0282 Do a DELETE request. Very like the ``.get()`` method.
0283 ``params`` are put in the body of the request.
0284
0285 Returns a `response object
0286 <class-paste.fixture.TestResponse.html>`_
0287 """
0288 return self._gen_request('DELETE', url, params=params, headers=headers,
0289 extra_environ=extra_environ,status=status,
0290 upload_files=None, expect_errors=expect_errors)
0291
0292
0293
0294
0295 def _set_headers(self, headers, environ):
0296 """
0297 Turn any headers into environ variables
0298 """
0299 if not headers:
0300 return
0301 for header, value in headers.items():
0302 if header.lower() == 'content-type':
0303 var = 'CONTENT_TYPE'
0304 elif header.lower() == 'content-length':
0305 var = 'CONTENT_LENGTH'
0306 else:
0307 var = 'HTTP_%s' % header.replace('-', '_').upper()
0308 environ[var] = value
0309
0310 def encode_multipart(self, params, files):
0311 """
0312 Encodes a set of parameters (typically a name/value list) and
0313 a set of files (a list of (name, filename, file_body)) into a
0314 typical POST body, returning the (content_type, body).
0315 """
0316 boundary = '----------a_BoUnDaRy%s$' % random.random()
0317 lines = []
0318 for key, value in params:
0319 lines.append('--'+boundary)
0320 lines.append('Content-Disposition: form-data; name="%s"' % key)
0321 lines.append('')
0322 lines.append(value)
0323 for file_info in files:
0324 key, filename, value = self._get_file_info(file_info)
0325 lines.append('--'+boundary)
0326 lines.append('Content-Disposition: form-data; name="%s"; filename="%s"'
0327 % (key, filename))
0328 fcontent = mimetypes.guess_type(filename)[0]
0329 lines.append('Content-Type: %s' %
0330 fcontent or 'application/octet-stream')
0331 lines.append('')
0332 lines.append(value)
0333 lines.append('--' + boundary + '--')
0334 lines.append('')
0335 body = '\r\n'.join(lines)
0336 content_type = 'multipart/form-data; boundary=%s' % boundary
0337 return content_type, body
0338
0339 def _get_file_info(self, file_info):
0340 if len(file_info) == 2:
0341 # It only has a filename
0342 filename = file_info[1]
0343 if self.relative_to:
0344 filename = os.path.join(self.relative_to, filename)
0345 f = open(filename, 'rb')
0346 content = f.read()
0347 f.close()
0348 return (file_info[0], filename, content)
0349 elif len(file_info) == 3:
0350 return file_info
0351 else:
0352 raise ValueError(
0353 "upload_files need to be a list of tuples of (fieldname, "
0354 "filename, filecontent) or (fieldname, filename); "
0355 "you gave: %r"
0356 % repr(file_info)[:100])
0357
0358 def do_request(self, req, status):
0359 """
0360 Executes the given request (``req``), with the expected
0361 ``status``. Generally ``.get()`` and ``.post()`` are used
0362 instead.
0363 """
0364 if self.pre_request_hook:
0365 self.pre_request_hook(self)
0366 __tracebackhide__ = True
0367 if self.cookies:
0368 c = SimpleCookie()
0369 for name, value in self.cookies.items():
0370 c[name] = value
0371 req.environ['HTTP_COOKIE'] = str(c).split(': ', 1)[1]
0372 req.environ['paste.testing'] = True
0373 req.environ['paste.testing_variables'] = {}
0374 app = lint.middleware(self.app)
0375 old_stdout = sys.stdout
0376 out = CaptureStdout(old_stdout)
0377 try:
0378 sys.stdout = out
0379 start_time = time.time()
0380 raise_on_wsgi_error = not req.expect_errors
0381 raw_res = wsgilib.raw_interactive(
0382 app, req.url,
0383 raise_on_wsgi_error=raise_on_wsgi_error,
0384 **req.environ)
0385 end_time = time.time()
0386 finally:
0387 sys.stdout = old_stdout
0388 sys.stderr.write(out.getvalue())
0389 res = self._make_response(raw_res, end_time - start_time)
0390 res.request = req
0391 for name, value in req.environ['paste.testing_variables'].items():
0392 if hasattr(res, name):
0393 raise ValueError(
0394 "paste.testing_variables contains the variable %r, but "
0395 "the response object already has an attribute by that "
0396 "name" % name)
0397 setattr(res, name, value)
0398 if self.namespace is not None:
0399 self.namespace['res'] = res
0400 if not req.expect_errors:
0401 self._check_status(status, res)
0402 self._check_errors(res)
0403 res.cookies_set = {}
0404 for header in res.all_headers('set-cookie'):
0405 c = SimpleCookie(header)
0406 for key, morsel in c.items():
0407 self.cookies[key] = morsel.value
0408 res.cookies_set[key] = morsel.value
0409 if self.post_request_hook:
0410 self.post_request_hook(self)
0411 if self.namespace is None:
0412 # It's annoying to return the response in doctests, as it'll
0413 # be printed, so we only return it is we couldn't assign
0414 # it anywhere
0415 return res
0416
0417 def _check_status(self, status, res):
0418 __tracebackhide__ = True
0419 if status == '*':
0420 return
0421 if isinstance(status, (list, tuple)):
0422 if res.status not in status:
0423 raise AppError(
0424 "Bad response: %s (not one of %s for %s)\n%s"
0425 % (res.full_status, ', '.join(map(str, status)),
0426 res.request.url, res.body))
0427 return
0428 if status is None:
0429 if res.status >= 200 and res.status < 400:
0430 return
0431 raise AppError(
0432 "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s"
0433 % (res.full_status, res.request.url,
0434 res.body))
0435 if status != res.status:
0436 raise AppError(
0437 "Bad response: %s (not %s)" % (res.full_status, status))
0438
0439 def _check_errors(self, res):
0440 if res.errors:
0441 raise AppError(
0442 "Application had errors logged:\n%s" % res.errors)
0443
0444 def _make_response(self, (status, headers, body, errors), total_time):
0445 return TestResponse(self, status, headers, body, errors,
0446 total_time)
0447
0448class CaptureStdout(object):
0449
0450 def __init__(self, actual):
0451 self.captured = StringIO()
0452 self.actual = actual
0453
0454 def write(self, s):
0455 self.captured.write(s)
0456 self.actual.write(s)
0457
0458 def flush(self):
0459 self.actual.flush()
0460
0461 def writelines(self, lines):
0462 for item in lines:
0463 self.write(item)
0464
0465 def getvalue(self):
0466 return self.captured.getvalue()
0467
0468class TestResponse(object):
0469
0470 # for py.test
0471 disabled = True
0472
0473 """
0474 Instances of this class are return by `TestApp
0475 <class-paste.fixture.TestApp.html>`_
0476 """
0477
0478 def __init__(self, test_app, status, headers, body, errors,
0479 total_time