#!/usr/bin/env python """web.py: makes web apps (http://webpy.org)""" __version__ = "0.131" __license__ = "Affero General Public License, Version 1" __author__ = "Aaron Swartz " from __future__ import generators # long term todo: # - new form system # - new templating system # - unit tests? # todo: # - get rid of upvars # - move documentation into docstrings # - fix that silly nonce stuff in web.update # - provide an option to use .write() # - add ip:port support # - allow people to do $self.id from inside a reparam # - add sqlite support # - make storage a subclass of dictionary # - convert datetimes, floats in WebSafe # - locks around memoize # - fix memoize to use cacheify style techniques # - merge curval query with the insert # - figure out how to handle squid, etc. for web.ctx.ip import os, os.path, sys, time, types, traceback import cgi, re, urllib, urlparse, Cookie, pprint from threading import currentThread from tokenize import tokenprog try: import datetime except ImportError: pass try: from Cheetah.Compiler import Compiler from Cheetah.Filters import Filter _hasTemplating = True except ImportError: _hasTemplating = False try: from DBUtils.PooledDB import PooledDB _hasPooling = True except ImportError: _hasPooling = False # hack for compatibility with Python 2.3: if not hasattr(traceback, 'format_exc'): from cStringIO import StringIO def format_exc(limit=None): s = StringIO() traceback.print_exc(limit, s) return s.getvalue() traceback.format_exc = format_exc ## general utils def _strips(direction, text, remove): if direction == 'l': if text.startswith(remove): return text[len(remove):] elif direction == 'r': if text.endswith(remove): return text[:-len(remove)] else: raise "WrongDirection", "Needs to be r or l." return text def rstrips(a, b): return _strips('r', a, b) def lstrips(a, b): return _strips('l', a, b) def strips(a, b): return rstrips(lstrips(a,b),b) def autoassign(): locals = sys._getframe(1).f_locals self = locals['self'] for (k, v) in locals.iteritems(): if k == 'self': continue setattr(self, k, v) class Storage: def __init__(self, initial=None): if initial: for k in initial.keys(): setattr(self, k, initial[k]) def __getattr__(self, k): if hasattr(self.__dict__, k) or ( k.startswith('__') and k.endswith('__')): # special keyword return getattr(self.__dict__, k) raise AttributeError, repr(k) def __repr__(self): return '' storage = Storage def storify(f, *requireds, **defaults): stor = Storage() for k in requireds + tuple(f.keys()): v = f[k] if isinstance(k, list): v = v[-1] if hasattr(v, 'value'): v = v.value setattr(stor, k, v) for (k,v) in defaults.iteritems(): result = v if hasattr(stor, k): result = stor[k] if v == () and not isinstance(result, tuple): result = (result,) setattr(stor, k, result) return stor class memoize: def __init__(self, func): self.func = func; self.cache = {} def __call__(self, *a, **k): key = (a, tuple(k.items())) if key not in self.cache: self.cache[key] = self.func(*a, **k) return self.cache[key] re_compile = memoize(re.compile) #@@ threadsafe? class _re_subm_proxy: def __init__(self): self.match = None def __call__(self, match): self.match = match; return '' def re_subm(pat, repl, string): """like re.sub, but returns the replacement and the match object""" r = re_compile(pat) proxy = _re_subm_proxy() r.sub(proxy.__call__, string) return r.sub(repl, string), proxy.match def group(seq, size): """Breaks 'seq' into a generator of lists with length 'size'.""" if not hasattr(seq, 'next'): seq = iter(seq) while True: yield [seq.next() for i in xrange(size)] class iterbetter: def __init__(self, iterator): self.i, self.c = iterator, 0 def __iter__(self): while 1: yield self.i.next(); self.c += 1 def __getitem__(self, i): #todo: slices if i > self.c: raise KeyError, "already passed "+str(i) try: while i < self.c: self.i.next(); self.c += 1 # now self.c == i self.c += 1; return self.i.next() except StopIteration: raise KeyError, repr(i) def dictfind(d, elt): for (k,v) in d.iteritems(): if elt is v: return k def dictincr(d, e): d.setdefault(e, 0) d[e] += 1 return d[e] def dictadd(a, b): result = {} result.update(a) result.update(b) return result sumdicts = dictadd # deprecated def listget(l, n, v=None): if len(l)-1 < n: return v return l[n] def upvars(n=2): return dictadd( sys._getframe(n).f_globals, sys._getframe(n).f_locals) class capturestdout: def __init__(self, func): self.func = func def __call__(self, *args, **kw): from cStringIO import StringIO # Not threadsafe! out = StringIO() oldstdout = sys.stdout sys.stdout = out try: self.func(*args, **kw) finally: sys.stdout = oldstdout return out.getvalue() class profile: def __init__(self, func): self.func = func def __call__(self, *args, **kw): import hotshot, hotshot.stats, tempfile, time temp = tempfile.NamedTemporaryFile() prof = hotshot.Profile(temp.name) stime = time.time() result = prof.runcall(self.func, *args) stime = time.time() - stime prof.close() stats = hotshot.stats.load(temp.name) stats.strip_dirs() stats.sort_stats('time', 'calls') x = '\n\ntook '+ str(stime) + ' seconds\n' x += capturestdout(stats.print_stats)(40) x += capturestdout(stats.print_callers)() return result, x def tryall(context): context = context.copy() # vars() would update results = {} for (k, v) in context.iteritems(): if not hasattr(v, '__call__'): continue print k+':', try: r = v() dictincr(results, r) print r except: print 'ERROR' dictincr(results, 'ERROR') print ' '+'\n '.join(traceback.format_exc().split('\n')) print '-'*40 print 'results:' for (k, v) in results.iteritems(): print ' '*2, str(k)+':', v class threadeddict: def __init__(self, d): self.__dict__['_threadeddict__d'] = d def __getattr__(self, a): return getattr(self.__d[currentThread()], a) def __getitem__(self, i): return self.__d[currentThread()][i] def __setattr__(self, a, v): return setattr(self.__d[currentThread()], a, v) def __setitem__(self, i, v): self.__d[currentThread()][i] = v def __hash__(self): return hash(self.__d[currentThread()]) ## url utils def base(base=''): #when would you use a default base? url = context.path.lstrip('/') for i in xrange(url.count('/')): base += '../' if not base: base = './' return base ## formatting try: from markdown import markdown # http://webpy.org/markdown.py except ImportError: pass r_url = re_compile('(?', text) text = markdown(text) return text ## db api def _interpolate(format): """ takes a format string and returns a list of 2-tuples of the form (boolean, string) where boolean says whether string should be evaled or not from http://lfw.org/python/Itpl.py (public domain, Ka-Ping Yee) """ def matchorfail(text, pos): match = tokenprog.match(text, pos) if match is None: raise ItplError(text, pos) return match, match.end() namechars = "abcdefghijklmnopqrstuvwxyz" \ "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"; chunks = [] pos = 0 while 1: dollar = format.find("$", pos) if dollar < 0: break nextchar = format[dollar+1] if nextchar == "{": chunks.append((0, format[pos:dollar])) pos, level = dollar+2, 1 while level: match, pos = matchorfail(format, pos) tstart, tend = match.regs[3] token = format[tstart:tend] if token == "{": level = level+1 elif token == "}": level = level-1 chunks.append((1, format[dollar+2:pos-1])) elif nextchar in namechars: chunks.append((0, format[pos:dollar])) match, pos = matchorfail(format, dollar+1) while pos < len(format): if format[pos] == "." and \ pos+1 < len(format) and format[pos+1] in namechars: match, pos = matchorfail(format, pos+1) elif format[pos] in "([": pos, level = pos+1, 1 while level: match, pos = matchorfail(format, pos) tstart, tend = match.regs[3] token = format[tstart:tend] if token[0] in "([": level = level+1 elif token[0] in ")]": level = level-1 else: break chunks.append((1, format[dollar+1:pos])) else: chunks.append((0, format[pos:dollar+1])) pos = dollar + 1 + (nextchar == "$") if pos < len(format): chunks.append((0, format[pos:])) return chunks class UnknownParamstyle(Exception): pass def aparam(): p = ctx.db_module.paramstyle if p == 'qmark': return '?' elif p == 'numeric': return ':1' elif p in ['format', 'pyformat']: return '%s' raise UnknownParamstyle, p def reparam(s, d): vals = [] result = [] for live, chunk in _interpolate(s): if live: result.append(aparam()) vals.append(eval(chunk, d)) else: result.append(chunk) return ''.join(result), vals class UnknownDB(Exception): pass def connect(dbn, **kw): if dbn == "postgres": try: import psycopg2 as db except ImportError: try: import psycopg as db except ImportError: import pgdb as db kw['password'] = kw['pw'] del kw['pw'] kw['database'] = kw['db'] del kw['db'] elif dbn == "mysql": import MySQLdb as db kw['passwd'] = kw['pw'] del kw['pw'] db.paramstyle = 'pyformat' # it's both, like psycopg else: raise UnknownDB, dbn ctx.db_name = dbn ctx.db_module = db ctx.db_transaction = False if _hasPooling: if 'db' not in globals(): globals()['db'] = PooledDB(dbapi=db, **kw) ctx.db = globals()['db'].connection() else: ctx.db = db.connect(**kw) ctx.dbq_count = 0 if globals().get('db_printing'): def db_execute(cur, q, d=None): ctx.dbq_count += 1 try: outq = q % d except: outq = q print>>debug, str(ctx.dbq_count)+':', outq a = time.time() out = cur.execute(q, d) b = time.time() print>>debug, '(%s)' % round(b-a, 2) return out ctx.db_execute = db_execute else: ctx.db_execute = lambda cur, q, d=None: cur.execute(q, d) return ctx.db def transact(): """Start a transaction.""" # commit everything up to now, so we don't rollback it later ctx.db.commit() ctx.db_transaction = True def commit(): ctx.db.commit() ctx.db_transaction = False def rollback(): ctx.db.rollback() ctx.db_transaction = False def query(q, vars=None, processed=False): if vars is None: vars = {} d = ctx.db.cursor() if not processed: q, vars = reparam(q, vars) ctx.db_execute(d, q, vars) if d.description: names = [x[0] for x in d.description] def iterwrapper(): x = d.fetchone() while x: yield Storage(dict(zip(names, x))) x = d.fetchone() out = iterbetter(iterwrapper()) out.__len__ = lambda: d.rowcount out.list = lambda: [Storage(dict(zip(names, x))) for x in d.fetchall()] else: out = None out = d.rowcount if not ctx.db_transaction: ctx.db.commit() return out def sqllist(l): if isinstance(l, str): return l else: return ', '.join(l) def select(tables, vars=None, what='*', where=None, order=None, group=None, limit=None, offset=None): if vars is None: vars = {} values = [] qout = "SELECT "+what+" FROM "+sqllist(tables) for (sql, val) in ( ('WHERE', where), ('ORDER BY', order), ('GROUP BY', group), ('LIMIT', limit), ('OFFSET', offset)): if val: nquery, nvalue = reparam(val, vars) qout += " "+sql+" " + nquery values.extend(nvalue) return query(qout, values, processed=True) def insert(tablename, seqname=None, **values): d = ctx.db.cursor() if values: ctx.db_execute(d, "INSERT INTO %s (%s) VALUES (%s)" % ( tablename, ", ".join(values.keys()), ', '.join([aparam() for x in values]) #@@ use nonce ), values.values()) else: ctx.db_execute(d, "INSERT INTO %s DEFAULT VALUES" % tablename) if seqname == False: out = None elif ctx.db_name == "postgres": if seqname is None: seqname = tablename + "_id_seq" ctx.db_execute(d, "SELECT currval('%s')" % seqname) out = d.fetchone()[0] elif ctx.db_name == "mysql": ctx.db_execute(d, "SELECT last_insert_id()") out = d.fetchone()[0] elif ctx.db_name == "sqlite": # not really the same... ctx.db_execute(d, "SELECT last_insert_rowid()") out = d.fetchone()[0] else: out = None if not ctx.db_transaction: ctx.db.commit() return out def update(tables, where, vars=None, **values): if vars is None: vars = {} if isinstance(where, int): vars = [where] where = "id = "+aparam() else: where, vars = reparam(where, vars) d = ctx.db.cursor() ctx.db_execute(d, "UPDATE %s SET %s WHERE %s" % ( sqllist(tables), ', '.join([k+'='+aparam() for k in values.keys()]), where), values.values()+vars) if not ctx.db_transaction: ctx.db.commit() return d.rowcount def delete(table, where, using=None, vars=None): if vars is None: vars = {} d = ctx.db.cursor() where, vars = reparam(where, vars) q = 'DELETE FROM %s WHERE %s' % (table, where) if using: q += ' USING '+sqllist(using) ctx.db_execute(d, q, vars) if not ctx.db_transaction: ctx.db.commit() return d.rowcount ## request handlers def handle(mapping, fvars=None): for url, ofno in group(mapping, 2): if isinstance(ofno, tuple): ofn, fna = ofno[0], list(ofno[1:]) else: ofn, fna = ofno, [] fn, result = re_subm('^'+url+'$', ofn, context.path) if result: # it's a match if fn.split(' ', 1)[0] == "redirect": url = fn.split(' ', 1)[1] if context.method == "GET": x = context.environ.get('QUERY_STRING', '') if x: url += '?'+x return redirect(url) elif '.' in fn: x = fn.split('.') mod, cls = '.'.join(x[:-1]), x[-1] mod = __import__(mod, globals(), locals(), [""]) cls = getattr(mod, cls) else: cls = fn mod = fvars or upvars() if isinstance(mod, types.ModuleType): mod = vars(mod) try: cls = mod[cls] except KeyError: return notfound() meth = context.method if meth == "HEAD": if not hasattr(cls, meth): meth = "GET" if not hasattr(cls, meth): return nomethod(cls) tocall = getattr(cls(), meth) args = list(result.groups()) for d in re.findall(r'\\(\d+)', ofn): args.pop(int(d)-1) return tocall(*([urllib.unquote(x) for x in args]+fna)) return notfound() def autodelegate(prefix=''): def internal(self, arg): func = prefix+arg if hasattr(self, func): return getattr(self, func)() else: return notfound() return internal ## http defaults def redirect(url, status='301 Moved Permanently'): newloc = urlparse.urljoin(context.home + context.path, url) context.status = status header('Content-Type', 'text/html') header('Location', newloc) # seems to add a three-second delay for some reason: # output('moved permanently') def found(url): return redirect(url, '302 Found') def seeother(url): return redirect(url, '303 See Other') def tempredirect(url): return redirect(url, '307 Temporary Redirect') def badrequest(): context.status = '400 Bad Request' header('Content-Type', 'text/html') return output('bad request') def notfound(): context.status = '404 Not Found' header('Content-Type', 'text/html') return output('not found') def nomethod(cls): context.status = '405 Method Not Allowed' header('Content-Type', 'text/html') header("Allow", ', '.join([x for x in ['GET', 'HEAD', 'POST', 'PUT', 'DELETE'] if hasattr(cls, x)])) return output('method not allowed') def gone(): context.status = '410 Gone' header('Content-Type', 'text/html') return output("gone") def expires(delta): try: datetime except NameError: raise Exception, "this function requires at least python2.3" if isinstance(delta, (int, long)): delta = datetime.timedelta(seconds=delta) o = datetime.datetime.utcnow() + delta header('Expires', o.strftime("%a, %d %b %Y %T GMT")) def lastmodified(d): header('Last-Modified', d.strftime("%a, %d %b %Y %T GMT")) # adapted from Django # Copyright (c) 2005, the Lawrence Journal-World # Used under the modified BSD license: # http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5 DJANGO_500_PAGE = """#import inspect $exception_type at $context.path

$exception_type at $context.path

$exception_value

Python $lastframe.filename in $lastframe.function, line $lastframe.lineno
Web $context.method $context.home$context.path

Traceback (innermost first)

    #for frame in $frames
  • $frame.filename in $frame.function #if $frame.context_line
    #if $frame.pre_context
      #for line in $frame.pre_context#
    1. $line
    2. #end for#
    #end if
    1. $frame.context_line ...
    #if $frame.post_context
      #for line in $frame.post_context#
    1. $line
    2. #end for#
    #end if
    #end if #if $frame.vars
    Local vars## $inspect.formatargvalues(*inspect.getargvalues(frame['tb'].tb_frame))
    #set frameitems = $frame.vars #silent frameitems.sort(lambda x,y: cmp(x[0], y[0])) #for (key, val) in frameitems #end for
    Variable Value
    $key
    $prettify(val)
    #end if
  • #end for
#if $context_.output or $context_.headers

Response so far

HEADERS

#if $context.headers

#for (k, v) in $context_.headers $k: $v
#end for

#else

No headers.

#end if

BODY

$context_.output

#end if

Request information

INPUT

#if $input_ #set myitems = $input_.items() #silent myitems.sort(lambda x,y: cmp(x[0], y[0])) #for (key, val) in myitems #end for
Variable Value
$key
$val
#else

No input data.

#end if #if $cookies_ #for (key, val) in $cookies_.items() #end for
Variable Value
$key
$val
#else

No cookie data

#end if

META

#set myitems = $context_.items() #silent myitems.sort(lambda x,y: cmp(x[0], y[0])) #for (key, val) in $myitems #if not $key.startswith('_') and $key not in ['env', 'output', 'headers', 'environ', 'status', 'db_execute'] #end if #end for
Variable Value
$key
$prettify($val)

ENVIRONMENT

#set myitems = $context_.environ.items() #silent myitems.sort(lambda x,y: cmp(x[0], y[0])) #for (key, val) in $myitems #end for
Variable Value
$key
$prettify($val)

You're seeing this error because you have web.internalerror set to web.debugerror. Change that if you want a different one.

""" def djangoerror(): def _get_lines_from_file(filename, lineno, context_lines): """ Returns context_lines before and after lineno from file. Returns (pre_context_lineno, pre_context, context_line, post_context). """ try: source = open(filename).readlines() lower_bound = max(0, lineno - context_lines) upper_bound = lineno + context_lines pre_context = [line.strip('\n') for line in source[lower_bound:lineno]] context_line = source[lineno].strip('\n') post_context = [line.strip('\n') for line in source[lineno+1:upper_bound]] return lower_bound, pre_context, context_line, post_context except (OSError, IOError): return None, [], None, [] exception_type, exception_value, tb = sys.exc_info() frames = [] while tb is not None: filename = tb.tb_frame.f_code.co_filename function = tb.tb_frame.f_code.co_name lineno = tb.tb_lineno - 1 pre_context_lineno, pre_context, context_line, post_context = _get_lines_from_file(filename, lineno, 7) frames.append({ 'tb': tb, 'filename': filename, 'function': function, 'lineno': lineno, 'vars': tb.tb_frame.f_locals.items(), 'id': id(tb), 'pre_context': pre_context, 'context_line': context_line, 'post_context': post_context, 'pre_context_lineno': pre_context_lineno, }) tb = tb.tb_next lastframe = frames[-1] frames.reverse() urljoin = urlparse.urljoin input_ = input() cookies_ = cookies() context_ = context def prettify(x): try: out = pprint.pformat(x) except Exception, e: out = '[could not display: <'+e.__class__.__name__+': '+str(e)+'>]' return out return render(DJANGO_500_PAGE, asTemplate=True, isString=True) def internalerror(): context.status = "500 Internal Server Error" context.headers = [('Content-Type', 'text/html')] context.output = "internal server error" def debugerror(): # need to do django first, so it can get the old stuff if _hasTemplating: out = str(djangoerror()) else: # Cheetah isn't installed out = """

You've set web.py to use the fancier debugerror error messages, but these messages require you install the Cheetah template system. For more information, see the web.py website.

In the meantime, here's a plain old error message:

%s

(If it says something about 'Compiler', then it's probably because you're trying to use templates and you haven't installed Cheetah. See above.)

""" % htmlquote(traceback.format_exc()) context.status = "500 Internal Server Error" context.headers = [('Content-Type', 'text/html')] context.output = out ## rendering r_include = re_compile(r'(?!\\)#include \"(.*?)\"($|#)', re.M) def __compiletemplate(template, base=None, isString=False): if isString: text = template else: text = open('templates/'+template).read() # implement #include at compile-time def do_include(match): text = open('templates/'+match.groups()[0]).read() return text while r_include.findall(text): text = r_include.sub(do_include, text) execspace = _compiletemplate.bases.copy() c = Compiler(source=text, mainClassName='GenTemplate') c.addImportedVarNames(execspace.keys()) exec str(c) in execspace if base: _compiletemplate.bases[base] = execspace['GenTemplate'] return execspace['GenTemplate'] _compiletemplate = memoize(__compiletemplate) _compiletemplate.bases = {} def htmlquote(s): s = s.replace("&", "&") # Must be done first! s = s.replace("<", "<") s = s.replace(">", ">") s = s.replace("'", "'") s = s.replace('"', """) return s urlquote = urllib.quote if _hasTemplating: class WebSafe(Filter): def filter(selv, val, **kw): if val is None: return '' return htmlquote(str(val)) def render(template, terms=None, asTemplate=False, base=None, isString=False): # terms=['var1', 'var2'] means grab those variables if isinstance(terms, list): new = {}; old = upvars() for k in terms: new[k] = old[k] terms = new # default: grab all locals elif terms is None: terms = {'context': context} terms.update(sys._getframe(1).f_locals) # terms=d means use d as the searchList if not isinstance(terms, tuple): terms = (terms,) if not isString and template.endswith('.html'): header('Content-Type','text/html; charset=utf-8') t = _compiletemplate(template, base=base, isString=isString) t = t(searchList=terms, filter=WebSafe) if asTemplate: return t else: return output(str(t)) ## input forms def input(*requireds, **defaults): if not hasattr(context, '_inputfs'): context._inputfs = cgi.FieldStorage(fp = context.environ['wsgi.input'],environ=context.environ, keep_blank_values=1) return storify(context._inputfs, *requireds, **defaults) ## cookies def setcookie(name, value, expires="", domain=None): if expires < 0: expires = -1000000000 kargs = {'expires': expires, 'path':'/'} if domain: kargs['domain'] = domain # @@ should we limit cookies to a different path? c = Cookie.SimpleCookie() c[name] = value for key, val in kargs.iteritems(): c[name][key] = val header('Set-Cookie', c.items()[0][1].OutputString()) def cookies(*requireds, **defaults): c = Cookie.SimpleCookie() c.load(context.environ.get('HTTP_COOKIE', '')) return storify(c, *requireds, **defaults) ## WSGI Sugar def header(h, v): context.headers.append((h, v)) def output(t): context.output += str(t) def write(t): t = str(t) t.replace('\r\n', '\n') head, body = t.split('\n\n', 1) lines = head.split('\n') for line in lines: if line.isspace(): continue h, v = line.split(":", 1) v = v.strip() if h.lower() == "status": context.status = v else: header(h, v) output(body) def webpyfunc(inp, fvars=None, autoreload=False): if not fvars: fvars = upvars() if not hasattr(inp, '__call__'): if autoreload: # black magic to make autoreload work: mod = __import__(fvars['__file__'].split(os.path.sep).pop().split('.')[0]) #@@probably should replace this with some inspect magic name = dictfind(fvars, inp) func = lambda: handle(getattr(mod, name), mod) else: func = lambda: handle(inp, fvars) else: func = inp return func def wsgifunc(func, *middleware): middleware = list(middleware) if reloader in middleware: relr = reloader(None) relrcheck = relr.check middleware.remove(reloader) else: relr = None relrcheck = lambda: None def wsgifunc(e, r): _load(e) relrcheck() result = func() is_generator = result and hasattr(result, 'next') if is_generator: # we need to give wsgi back the headers first, # so we need to do at iteration try: firstchunk = handler_result.next() except StopIteration: firstchunk = '' status, headers, output = ctx.status, ctx.headers, ctx.output _unload() r(status, headers) if is_generator: return itertools.chain([firstchunk], result) elif isinstance(output, str): return [output] #@@ other stringlikes? elif hasattr(output, 'next'): return output else: raise Exception, "Invalid web.context.output" for x in middleware: wsgifunc = x(wsgifunc) if relr: relr.func = wsgifunc return wsgifunc return wsgifunc def run(inp, *middleware): autoreload = reloader in middleware fvars = upvars() return runwsgi(wsgifunc(webpyfunc(inp, fvars, autoreload), *middleware)) def runwsgi(func): #@@ improve detection if os.environ.has_key('SERVER_SOFTWARE'): # cgi os.environ['FCGI_FORCE_CGI'] = 'Y' if (os.environ.has_key('PHP_FCGI_CHILDREN') #lighttpd fastcgi or os.environ.has_key('SERVER_SOFTWARE')): import flup.server.fcgi return runfcgi(func) if 'scgi' in sys.argv: import flup.server.scgi return runscgi(func) # command line: return runsimple(func, listget(sys.argv, 1, 8080)) def runsimple(func, port=8080): # Copyright (c) 2004 Colin Stewart (http://www.owlfish.com/) # Modified somewhat for simplicity # Used under the modified BSD license: # http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5 import SimpleHTTPServer, SocketServer, BaseHTTPServer, urlparse import sys, socket, errno import traceback class WSGIHandler (SimpleHTTPServer.SimpleHTTPRequestHandler): def runWSGIApp(self): protocol, host, path, parameters, query, fragment = urlparse.urlparse ('http://dummyhost%s' % self.path) # we only use path, query env = {'wsgi.version': (1,0) ,'wsgi.url_scheme': 'http' ,'wsgi.input': self.rfile ,'wsgi.errors': sys.stderr ,'wsgi.multithread': 1 ,'wsgi.multiprocess': 0 ,'wsgi.run_once': 0 ,'REQUEST_METHOD': self.command ,'REQUEST_URI': self.path ,'PATH_INFO': path ,'QUERY_STRING': query ,'CONTENT_TYPE': self.headers.get ('Content-Type', '') ,'CONTENT_LENGTH': self.headers.get ('Content-Length', '') ,'REMOTE_ADDR': self.client_address[0] ,'SERVER_NAME': self.server.server_address [0] ,'SERVER_PORT': str (self.server.server_address [1]) ,'SERVER_PROTOCOL': self.request_version } for httpHeader, httpValue in self.headers.items(): env ['HTTP_%s' % httpHeader.replace ('-', '_').upper()] = httpValue # Setup the state self.wsgiSentHeaders = 0 self.wsgiHeaders = [] try: # We have there environment, now invoke the application result = self.server.app(env, self.wsgiStartResponse) try: try: for data in result: if data: self.wsgiWriteData (data) finally: if hasattr(result, 'close'): result.close() except socket.error, socketErr: # Catch common network errors and suppress them if (socketErr.args[0] in (errno.ECONNABORTED, errno.EPIPE)): return except socket.timeout, socketTimeout: return except: print >> debug, traceback.format_exc(), internalerror() if not self.wsgiSentHeaders: self.wsgiStartResponse(ctx.status, ctx.headers) self.wsgiWriteData(ctx.output) if (not self.wsgiSentHeaders): # We must write out something! self.wsgiWriteData(" ") return do_POST = runWSGIApp def do_GET(self): if self.path.startswith('/static/'): SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self) else: self.runWSGIApp() def wsgiStartResponse (self, response_status, response_headers, exc_info=None): if (self.wsgiSentHeaders): raise Exception ("Headers already sent and start_response called again!") # Should really take a copy to avoid changes in the application.... self.wsgiHeaders = (response_status, response_headers) return self.wsgiWriteData def wsgiWriteData (self, data): if (not self.wsgiSentHeaders): status, headers = self.wsgiHeaders # Need to send header prior to data statusCode = status [:status.find (' ')] statusMsg = status [status.find (' ') + 1:] self.send_response (int (statusCode), statusMsg) for header, value in headers: self.send_header (header, value) self.end_headers() self.wsgiSentHeaders = 1 # Send the data self.wfile.write (data) class WSGIServer (SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): def __init__(self, func): BaseHTTPServer.HTTPServer.__init__(self, ("0.0.0.0", int(port)), WSGIHandler) self.app = func self.serverShuttingDown = 0 print "Launching server: http://0.0.0.0:"+str(port)+"/" WSGIServer(func).serve_forever() def makeserver(WSGIServer): class MyServer(WSGIServer): def error(self, req): w = req.stdout.write internalerror() w('Status: '+context.status+'\r\n') for (h, v) in context.headers: w(h+': '+v+'\r\n') w('\r\n'+context.output) return MyServer def runfcgi(func): from flup.server.fcgi import WSGIServer return makeserver(WSGIServer)(func, multiplexed=True).run() def runscgi(func): from flup.server.scgi import WSGIServer MyServer = makeserver(WSGIServer) if len(sys.argv) > 2: # progname, scgi args = sys.argv[:] args.remove('scgi') hostport = args[1] hostport = hostport.split(':',1) if len(hostport) == 2: hostport = (hostport[0], int(hostport[1])) else: hostport = ('localhost',int(hostport[0])) else: hostport = ('localhost',4000) return MyServer(func, bindAddress=hostport).run() ## debug def debug(*args): try: out = context.environ['wsgi.errors'] except: out = sys.stderr for x in args: print >> out, pprint.pformat(x) return '' def debugwrite(x): try: out = context.environ['wsgi.errors'] except: out = sys.stderr out.write(x) debug.write = debugwrite class reloader: def __init__(self, func, tocheck=None): self.func = func self.mtimes = {} global _compiletemplate b = _compiletemplate.bases _compiletemplate = globals()['__compiletemplate'] _compiletemplate.bases = b def check(self): for mod in sys.modules.values(): try: mtime = os.stat(mod.__file__).st_mtime except (AttributeError, OSError, IOError): continue if mod.__file__.endswith('.pyc') and os.path.exists(mod.__file__[:-1]): mtime = max(os.stat(mod.__file__[:-1]).st_mtime, mtime) if mod not in self.mtimes: self.mtimes[mod] = mtime elif self.mtimes[mod] < mtime: try: reload(mod) except ImportError: pass return True def __call__(self, e, o): self.check() return self.func(e, o) def profiler(app): def profile_internal(e, o): out, result = profile(app)(e, o) return out + ['
'+result+'
'] #@@encode return profile_internal ## setting up the context class _outputter: def write(self, x): if hasattr(ctx, 'output'): return output(x) else: _oldstdout.write(x) def flush(self): return _oldstdout.flush() def close(self): return _oldstdout.close() _context = {currentThread():Storage()} ctx = context = threadeddict(_context) if not '_oldstdout' in globals(): _oldstdout = sys.stdout sys.stdout = _outputter() def _load(env): _context[currentThread()] = Storage() ctx.environ = ctx.env = env ctx.host = env.get('HTTP_HOST') ctx.home = 'http://' + env.get('HTTP_HOST', '[unknown]') + env.get('SCRIPT_NAME', '') ctx.ip = env.get('REMOTE_ADDR') ctx.method = env.get('REQUEST_METHOD') ctx.path = env.get('PATH_INFO') # http://trac.lighttpd.net/trac/ticket/406 requires: if env.get('SERVER_SOFTWARE', '').startswith('lighttpd/'): ctx.path = lstrips(env.get('REQUEST_URI').split('?')[0], env.get('SCRIPT_NAME')) ctx.fullpath = ctx.path if dict(input()): ctx.fullpath+='?'+urllib.urlencode(dict(input())) ctx.status = '200 OK' ctx.headers = [] ctx.output = '' if 'db_parameters' in globals(): connect(**db_parameters) def _unload(): # ensures db cursors and such are GCed promptly del _context[currentThread()] if __name__ == "__main__": urls = ('/web.py', 'source') class source: def GET(self): header('Content-Type', 'text/python') print open(sys.argv[0]).read() run(urls)