import sys import os sys.path.insert(0, '.') sys.path.extend(os.environ.get('PYTHONPATH','').split(os.pathsep)) import imp import traceback __name__ = '__main__' mainmodule = type(sys)('__main__') sys.modules['__main__'] = mainmodule import cffi # this is a list holding object we do not want to be freed (like callback and handlers) uwsgi_gc = [] # the main ffi ffi = cffi.FFI() # the hooks we need to patch hooks = ''' void free(void *); ssize_t read(int, void *, size_t); ssize_t write(int, const void *, size_t); int close(int); void (*uwsgi_pypy_hook_execute_source)(char *); void (*uwsgi_pypy_hook_loader)(char *); void (*uwsgi_pypy_hook_file_loader)(char *); void (*uwsgi_pypy_hook_paste_loader)(char *); void (*uwsgi_pypy_hook_pythonpath)(char *); void (*uwsgi_pypy_hook_request)(struct wsgi_request *); void (*uwsgi_pypy_post_fork_hook)(void); ''' # here we load CFLAGS and uwsgi.h from the binary defines0 = ''' char *uwsgi_get_cflags(); char *uwsgi_get_dot_h(); ''' ffi.cdef(defines0) lib0 = ffi.verify(defines0) # this is ugly, we should find a better approach # basically it build a list of #define from binary CFLAGS uwsgi_cdef = [] uwsgi_defines = [] uwsgi_cflags = ffi.string(lib0.uwsgi_get_cflags()).split() for cflag in uwsgi_cflags: if cflag.startswith('-D'): line = cflag[2:] if '=' in line: (key, value) = line.split('=', 1) uwsgi_cdef.append('#define %s ...' % key) uwsgi_defines.append('#define %s %s' % (key, value.replace('\\"','"').replace('""','"'))) else: uwsgi_cdef.append('#define %s ...' % line) uwsgi_defines.append('#define %s 1' % line) uwsgi_dot_h = ffi.string(lib0.uwsgi_get_dot_h()) # uwsgi definitions cdefines = ''' %s struct iovec { void *iov_base; size_t iov_len; ...; }; struct uwsgi_header { uint8_t modifier1; ...; }; struct wsgi_request { int fd; int async_id; uint16_t var_cnt; struct iovec *hvec; int async_ready_fd; int async_last_ready_fd; int suspended; struct uwsgi_header *uh; ...; }; struct uwsgi_opt { char *key; char *value; ...; }; struct uwsgi_worker { int id; int pid; uint64_t requests; uint64_t delta_requests; uint64_t signals; int cheaped; int suspended; int sig; uint8_t signum; uint64_t running_time; uint64_t avg_response_time; uint64_t tx; ...; }; struct uwsgi_plugin { uint8_t modifier1; void (*suspend) (struct wsgi_request *); void (*resume) (struct wsgi_request *); ...; }; struct uwsgi_buffer { char *buf; size_t pos; ...; }; struct uwsgi_lock_item { ...; }; struct uwsgi_cache { struct uwsgi_lock_item *lock; ...; }; struct uwsgi_cache_item { uint64_t keysize; ...; }; struct uwsgi_server { char hostname[]; int mywid; int muleid; int master_process; struct uwsgi_opt **exported_opts; int exported_opts_cnt; struct uwsgi_worker *workers; int signal_socket; int numproc; int async; void (*schedule_to_main) (struct wsgi_request *); void (*schedule_to_req) (void); struct wsgi_request *(*current_wsgi_req) (void); struct wsgi_request *wsgi_req; struct uwsgi_plugin *p[]; ...; }; struct uwsgi_server uwsgi; struct uwsgi_plugin pypy_plugin; const char *uwsgi_pypy_version; char *uwsgi_binary_path(); void *uwsgi_malloc(size_t); int uwsgi_response_prepare_headers(struct wsgi_request *, char *, size_t); int uwsgi_response_add_header(struct wsgi_request *, char *, uint16_t, char *, uint16_t); int uwsgi_response_write_body_do(struct wsgi_request *, char *, size_t); int uwsgi_response_sendfile_do_can_close(struct wsgi_request *, int, size_t, size_t, int); char *uwsgi_request_body_read(struct wsgi_request *, ssize_t , ssize_t *); char *uwsgi_request_body_readline(struct wsgi_request *, ssize_t, ssize_t *); void uwsgi_buffer_destroy(struct uwsgi_buffer *); int uwsgi_is_again(); int uwsgi_register_rpc(char *, struct uwsgi_plugin *, uint8_t, void *); int uwsgi_register_signal(uint8_t, char *, void *, uint8_t); char *uwsgi_do_rpc(char *, char *, uint8_t, char **, uint16_t *, uint64_t *); void uwsgi_set_processname(char *); int uwsgi_signal_send(int, uint8_t); uint64_t uwsgi_worker_exceptions(int); int uwsgi_worker_is_busy(int); char *uwsgi_cache_magic_get(char *, uint16_t, uint64_t *, uint64_t *, char *); int uwsgi_cache_magic_set(char *, uint16_t, char *, uint64_t, uint64_t, uint64_t, char *); int uwsgi_cache_magic_del(char *, uint16_t, char *); int uwsgi_cache_magic_exists(char *, uint16_t, char *); int uwsgi_cache_magic_clear(char *); struct uwsgi_cache *uwsgi_cache_by_name(char *); void uwsgi_cache_rlock(struct uwsgi_cache *); void uwsgi_cache_rwunlock(struct uwsgi_cache *); char *uwsgi_cache_item_key(struct uwsgi_cache_item *); struct uwsgi_cache_item *uwsgi_cache_keys(struct uwsgi_cache *, uint64_t *, struct uwsgi_cache_item **); int uwsgi_add_file_monitor(uint8_t, char *); int uwsgi_add_timer(uint8_t, int); int uwsgi_signal_add_rb_timer(uint8_t, int, int); int uwsgi_user_lock(int); int uwsgi_user_unlock(int); int uwsgi_signal_registered(uint8_t); int uwsgi_signal_add_cron(uint8_t, int, int, int, int, int); void uwsgi_alarm_trigger(char *, char *, size_t); void async_schedule_to_req_green(void); void async_add_timeout(struct wsgi_request *, int); int async_add_fd_write(struct wsgi_request *, int, int); int async_add_fd_read(struct wsgi_request *, int, int); int uwsgi_connect(char *, int, int); int uwsgi_websocket_handshake(struct wsgi_request *, char *, uint16_t, char *, uint16_t, char *, uint16_t); int uwsgi_websocket_send(struct wsgi_request *, char *, size_t); struct uwsgi_buffer *uwsgi_websocket_recv(struct wsgi_request *); struct uwsgi_buffer *uwsgi_websocket_recv_nb(struct wsgi_request *); char *uwsgi_chunked_read(struct wsgi_request *, size_t *, int, int); void uwsgi_disconnect(struct wsgi_request *); int uwsgi_ready_fd(struct wsgi_request *); void set_user_harakiri(int); int uwsgi_metric_set(char *, char *, int64_t); int uwsgi_metric_inc(char *, char *, int64_t); int uwsgi_metric_dec(char *, char *, int64_t); int uwsgi_metric_mul(char *, char *, int64_t); int uwsgi_metric_div(char *, char *, int64_t); int64_t uwsgi_metric_get(char *, char *); %s ''' % ('\n'.join(uwsgi_cdef), hooks) cverify = ''' %s const char *uwsgi_pypy_version = UWSGI_VERSION; %s extern struct uwsgi_server uwsgi; extern struct uwsgi_plugin pypy_plugin; %s ''' % ('\n'.join(uwsgi_defines), uwsgi_dot_h, hooks) ffi.cdef(cdefines) lib = ffi.verify(cverify) libc = ffi.dlopen(None) """ this is a global object point the the WSGI callable it sucks, i will fix it in the near future... """ wsgi_application = None # fix argv if needed if len(sys.argv) == 0: sys.argv.insert(0, ffi.string(lib.uwsgi_binary_path())) """ execute source, we expose it as cffi callback to avoid deadlocks after GIL initialization """ @ffi.callback("void(char *)") def uwsgi_pypy_execute_source(s): source = ffi.string(s) exec(source) """ load a wsgi module """ @ffi.callback("void(char *)") def uwsgi_pypy_loader(module): global wsgi_application m = ffi.string(module) c = 'application' if ':' in m: m, c = m.split(':') if '.' in m: mod = __import__(m, None, None, '*') else: mod = __import__(m) wsgi_application = getattr(mod, c) """ load a mod_wsgi compliant .wsgi file """ @ffi.callback("void(char *)") def uwsgi_pypy_file_loader(filename): global wsgi_application w = ffi.string(filename) c = 'application' mod = imp.load_source('uwsgi_file_wsgi', w) wsgi_application = getattr(mod, c) """ load a .ini paste app """ @ffi.callback("void(char *)") def uwsgi_pypy_paste_loader(config): global wsgi_application c = ffi.string(config) if c.startswith('config:'): c = c[7:] if c[0] != '/': c = os.getcwd() + '/' + c try: from paste.script.util.logging_config import fileConfig fileConfig(c) except ImportError: print "PyPy WARNING: unable to load paste.script.util.logging_config" from paste.deploy import loadapp wsgi_application = loadapp('config:%s' % c) """ .post_fork_hook """ @ffi.callback("void()") def uwsgi_pypy_post_fork_hook(): import uwsgi if hasattr(uwsgi, 'post_fork_hook'): uwsgi.post_fork_hook() """ add an item to the pythonpath """ @ffi.callback("void(char *)") def uwsgi_pypy_pythonpath(item): path = ffi.string(item) sys.path.append(path) print "added %s to pythonpath" % path """ class implementing wsgi.file_wrapper """ class WSGIfilewrapper(object): def __init__(self, wsgi_req, f, chunksize=0): self.wsgi_req = wsgi_req self.f = f self.chunksize = chunksize if hasattr(f, 'close'): self.close = f.close def __iter__(self): return self def sendfile(self): if hasattr(self.f, 'fileno'): lib.uwsgi_response_sendfile_do_can_close(self.wsgi_req, self.f.fileno(), 0, 0, 0) elif hasattr(self.f, 'read'): if self.chunksize == 0: chunk = self.f.read() if len(chunk) > 0: lib.uwsgi_response_write_body_do(self.wsgi_req, ffi.new("char[]", chunk), len(chunk)) return while True: chunk = self.f.read(self.chunksize) if chunk is None or len(chunk) == 0: break lib.uwsgi_response_write_body_do(self.wsgi_req, ffi.new("char[]", chunk), len(chunk)) """ class implementing wsgi.input """ class WSGIinput(object): def __init__(self, wsgi_req): self.wsgi_req = wsgi_req def read(self, size=0): rlen = ffi.new('ssize_t*') chunk = lib.uwsgi_request_body_read(self.wsgi_req, size, rlen) if chunk != ffi.NULL: return ffi.string(chunk, rlen[0]) if rlen[0] < 0: raise IOError("error reading wsgi.input") raise IOError("error waiting for wsgi.input") def getline(self, hint=0): rlen = ffi.new('ssize_t*') chunk = lib.uwsgi_request_body_readline(self.wsgi_req, hint, rlen) if chunk != ffi.NULL: return ffi.string(chunk, rlen[0]) if rlen[0] < 0: raise IOError("error reading line from wsgi.input") raise IOError("error waiting for line on wsgi.input") def readline(self, hint=0): return self.getline(hint) def readlines(self, hint=0): lines = [] while True: chunk = self.getline(hint) if len(chunk) == 0: break lines.append(chunk) return lines def __iter__(self): return self def __next__(self): chunk = self.getline() if len(chunk) == 0: raise StopIteration return chunk """ the WSGI request handler """ @ffi.callback("void(struct wsgi_request *)") def uwsgi_pypy_wsgi_handler(wsgi_req): import uwsgi global wsgi_application def writer(data): lib.uwsgi_response_write_body_do(wsgi_req, ffi.new("char[]", data), len(data)) def start_response(status, headers, exc_info=None): if exc_info: traceback.print_exception(*exc_info) lib.uwsgi_response_prepare_headers(wsgi_req, ffi.new("char[]", status), len(status)) for hh in headers: lib.uwsgi_response_add_header(wsgi_req, ffi.new("char[]", hh[0]), len(hh[0]), ffi.new("char[]", hh[1]), len(hh[1])) return writer environ = {} iov = wsgi_req.hvec for i in range(0, wsgi_req.var_cnt, 2): environ[ffi.string(ffi.cast("char*", iov[i].iov_base), iov[i].iov_len)] = ffi.string(ffi.cast("char*", iov[i+1].iov_base), iov[i+1].iov_len) environ['wsgi.version'] = (1, 0) scheme = 'http' if 'HTTPS' in environ: if environ['HTTPS'] in ('on', 'ON', 'On', '1', 'true', 'TRUE', 'True'): scheme = 'https' environ['wsgi.url_scheme'] = environ.get('UWSGI_SCHEME', scheme) environ['wsgi.input'] = WSGIinput(wsgi_req) environ['wsgi.errors'] = sys.stderr environ['wsgi.run_once'] = False environ['wsgi.file_wrapper'] = lambda f, chunksize=0: WSGIfilewrapper(wsgi_req, f, chunksize) environ['wsgi.multithread'] = True environ['wsgi.multiprocess'] = True environ['uwsgi.core'] = wsgi_req.async_id environ['uwsgi.node'] = uwsgi.hostname response = wsgi_application(environ, start_response) if type(response) is str: writer(response) else: try: if isinstance(response, WSGIfilewrapper): response.sendfile() else: for chunk in response: if isinstance(chunk, WSGIfilewrapper): try: chunk.sendfile() finally: chunk.close() else: writer(chunk) finally: if hasattr(response, 'close'): response.close() lib.uwsgi_pypy_hook_execute_source = uwsgi_pypy_execute_source lib.uwsgi_pypy_hook_loader = uwsgi_pypy_loader lib.uwsgi_pypy_hook_file_loader = uwsgi_pypy_file_loader lib.uwsgi_pypy_hook_paste_loader = uwsgi_pypy_paste_loader lib.uwsgi_pypy_hook_pythonpath = uwsgi_pypy_pythonpath lib.uwsgi_pypy_hook_request = uwsgi_pypy_wsgi_handler lib.uwsgi_pypy_post_fork_hook = uwsgi_pypy_post_fork_hook """ Here we define the "uwsgi" virtual module """ uwsgi = imp.new_module('uwsgi') sys.modules['uwsgi'] = uwsgi uwsgi.version = ffi.string(lib.uwsgi_pypy_version) uwsgi.hostname = ffi.string(lib.uwsgi.hostname) def uwsgi_pypy_uwsgi_register_signal(signum, kind, handler): cb = ffi.callback('void(int)', handler) uwsgi_gc.append(cb) if lib.uwsgi_register_signal(signum, ffi.new("char[]", kind), cb, lib.pypy_plugin.modifier1) < 0: raise Exception("unable to register signal %d" % signum) uwsgi.register_signal = uwsgi_pypy_uwsgi_register_signal class uwsgi_pypy_RPC(object): def __init__(self, func): self.func = func def __call__(self, argc, argv, argvs, buf): pargs = [] for i in range(0, argc): pargs.append(ffi.string(argv[i], argvs[i])) response = self.func(*pargs) if len(response) > 0: buf[0] = lib.uwsgi_malloc(len(response)) dst = ffi.buffer(buf[0], len(response)) dst[:len(response)] = response return len(response) def uwsgi_pypy_uwsgi_register_rpc(name, func, argc=0): rpc_func = uwsgi_pypy_RPC(func) cb = ffi.callback("int(int, char*[], int[], char**)", rpc_func) uwsgi_gc.append(cb) if lib.uwsgi_register_rpc(ffi.new("char[]", name), ffi.addressof(lib.pypy_plugin), argc, cb) < 0: raise Exception("unable to register rpc func %s" % name) uwsgi.register_rpc = uwsgi_pypy_uwsgi_register_rpc def uwsgi_pypy_rpc(node, func, *args): argc = 0 argv = ffi.new('char*[256]') argvs = ffi.new('uint16_t[256]') rsize = ffi.new('uint64_t*') for arg in args: if argc >= 255: raise Exception('invalid number of rpc arguments') if len(arg) >= 65535: raise Exception('invalid rpc argument size (must be < 65535)') argv[argc] = ffi.new('char[]', arg) argvs[argc] = len(arg) argc += 1 if node: c_node = ffi.new("char[]", node) else: c_node = ffi.NULL response = lib.uwsgi_do_rpc(c_node, ffi.new("char[]",func), argc, argv, argvs, rsize) if response: ret = ffi.string(response, rsize[0]) lib.free(response) return ret return None uwsgi.rpc = uwsgi_pypy_rpc def uwsgi_pypy_call(func, *args): node = None if '@' in func: (func, node) = func.split('@') return uwsgi_pypy_rpc(node, func, *args) uwsgi.call = uwsgi_pypy_call uwsgi.signal = lambda x: lib.uwsgi_signal_send(lib.uwsgi.signal_socket, x) uwsgi.metric_get = lambda x: lib.uwsgi_metric_get(x, ffi.NULL) uwsgi.metric_set = lambda x, y: lib.uwsgi_metric_set(x, ffi.NULL, y) uwsgi.metric_inc = lambda x, y=1: lib.uwsgi_metric_inc(x, ffi.NULL, y) uwsgi.metric_dec = lambda x, y=1: lib.uwsgi_metric_dec(x, ffi.NULL, y) uwsgi.metric_mul = lambda x, y=1: lib.uwsgi_metric_mul(x, ffi.NULL, y) uwsgi.metric_div = lambda x, y=1: lib.uwsgi_metric_div(x, ffi.NULL, y) def uwsgi_pypy_uwsgi_cache_get(key, cache=ffi.NULL): vallen = ffi.new('uint64_t*') value = lib.uwsgi_cache_magic_get(key, len(key), vallen, ffi.NULL, cache) if value == ffi.NULL: return None ret = ffi.string(value, vallen[0]) libc.free(value) return ret uwsgi.cache_get = uwsgi_pypy_uwsgi_cache_get def uwsgi_pypy_uwsgi_cache_set(key, value, expires=0, cache=ffi.NULL): if lib.uwsgi_cache_magic_set(key, len(key), value, len(value), expires, 0, cache) < 0: raise Exception('unable to store item in the cache') uwsgi.cache_set = uwsgi_pypy_uwsgi_cache_set def uwsgi_pypy_uwsgi_cache_update(key, value, expires=0, cache=ffi.NULL): if lib.uwsgi_cache_magic_set(key, len(key), value, len(value), expires, 1 << 1, cache) < 0: raise Exception('unable to store item in the cache') uwsgi.cache_update = uwsgi_pypy_uwsgi_cache_update def uwsgi_pypy_uwsgi_cache_del(key, cache=ffi.NULL): if lib.uwsgi_cache_magic_del(key, len(key), cache) < 0: raise Exception('unable to delete item from the cache') uwsgi.cache_del = uwsgi_pypy_uwsgi_cache_del def uwsgi_pypy_uwsgi_cache_keys(cache=ffi.NULL): uc = lib.uwsgi_cache_by_name(cache) if uc == ffi.NULL: raise Exception('no local uWSGI cache available') l = [] lib.uwsgi_cache_rlock(uc) pos = ffi.new('uint64_t *') uci = ffi.new('struct uwsgi_cache_item **') while True: uci[0] = lib.uwsgi_cache_keys(uc, pos, uci) if uci[0] == ffi.NULL: break l.append(ffi.string(lib.uwsgi_cache_item_key(uci[0]), uci[0].keysize)) lib.uwsgi_cache_rwunlock(uc) return l uwsgi.cache_keys = uwsgi_pypy_uwsgi_cache_keys def uwsgi_pypy_uwsgi_add_timer(signum, secs): if lib.uwsgi_add_timer(signum, secs) < 0: raise Exception("unable to register timer") uwsgi.add_timer = uwsgi_pypy_uwsgi_add_timer def uwsgi_pypy_uwsgi_add_rb_timer(signum, secs): if lib.uwsgi_signal_add_rb_timer(signum, secs, 0) < 0: raise Exception("unable to register redblack timer") uwsgi.add_rb_timer = uwsgi_pypy_uwsgi_add_rb_timer def uwsgi_pypy_uwsgi_add_file_monitor(signum, filename): if lib.uwsgi_add_file_monitor(signum, ffi.new("char[]", filename)) < 0: raise Exception("unable to register file monitor") uwsgi.add_file_monitor = uwsgi_pypy_uwsgi_add_file_monitor def uwsgi_pypy_lock(num=0): if lib.uwsgi_user_lock(num) < 0: raise Exception("invalid lock") uwsgi.lock = uwsgi_pypy_lock def uwsgi_pypy_unlock(num=0): if lib.uwsgi_user_unlock(num) < 0: raise Exception("invalid lock") uwsgi.unlock = uwsgi_pypy_unlock def uwsgi_pypy_masterpid(): if lib.uwsgi.master_process: return lib.uwsgi.workers[0].pid return 0 uwsgi.masterpid = uwsgi_pypy_masterpid uwsgi.worker_id = lambda: lib.uwsgi.mywid uwsgi.mule_id = lambda: lib.uwsgi.muleid def uwsgi_pypy_signal_registered(signum): if lib.uwsgi_signal_registered(signum) > 0: return True return False uwsgi.signal_registered = uwsgi_pypy_signal_registered def uwsgi_pypy_alarm(alarm, msg): lib.uwsgi_alarm_trigger(ffi.new('char[]', alarm), ffi.new('char[]', msg), len(msg)) uwsgi.alarm = uwsgi_pypy_alarm uwsgi.setprocname = lambda name: lib.uwsgi_set_processname(ffi.new('char[]', name)) def uwsgi_pypy_add_cron(signum, minute, hour, day, month, week): if lib.uwsgi_signal_add_cron(signum, minute, hour, day, month, week) < 0: raise Exception("unable to register cron") uwsgi.add_cron = uwsgi_pypy_add_cron """ populate uwsgi.opt """ uwsgi.opt = {} for i in range(0, lib.uwsgi.exported_opts_cnt): uo = lib.uwsgi.exported_opts[i] k = ffi.string(uo.key) if uo.value == ffi.NULL: v = True else: v = ffi.string(uo.value) if k in uwsgi.opt: if type(uwsgi.opt[k]) is list: uwsgi.opt[k].append(v) else: uwsgi.opt[k] = [uwsgi.opt[k], v] else: uwsgi.opt[k] = v def uwsgi_pypy_current_wsgi_req(): wsgi_req = lib.uwsgi.current_wsgi_req() if wsgi_req == ffi.NULL: raise Exception("unable to get current wsgi_request, check your setup !!!") return wsgi_req """ uwsgi.suspend() """ def uwsgi_pypy_suspend(): wsgi_req = uwsgi_pypy_current_wsgi_req() if lib.uwsgi.schedule_to_main: lib.uwsgi.schedule_to_main(wsgi_req); uwsgi.suspend = uwsgi_pypy_suspend """ uwsgi.workers() """ def uwsgi_pypy_workers(): workers = [] for i in range(1, lib.uwsgi.numproc+1): worker = {} worker['id'] = lib.uwsgi.workers[i].id worker['pid'] = lib.uwsgi.workers[i].pid worker['requests'] = lib.uwsgi.workers[i].requests worker['delta_requests'] = lib.uwsgi.workers[i].delta_requests worker['signals'] = lib.uwsgi.workers[i].signals worker['exceptions'] = lib.uwsgi_worker_exceptions(i); worker['apps'] = [] if lib.uwsgi.workers[i].cheaped: worker['status'] == 'cheap' elif lib.uwsgi.workers[i].suspended and not lib.uwsgi_worker_is_busy(i): worker['status'] == 'pause' else: if lib.uwsgi.workers[i].sig: worker['status'] = 'sig%d' % lib.uwsgi.workers[i].signum elif lib.uwsgi_worker_is_busy(i): worker['status'] = 'busy' else: worker['status'] = 'idle' worker['running_time'] = lib.uwsgi.workers[i].running_time worker['avg_rt'] = lib.uwsgi.workers[i].avg_response_time worker['tx'] = lib.uwsgi.workers[i].tx workers.append(worker) return workers uwsgi.workers = uwsgi_pypy_workers """ uwsgi.async_sleep(timeout) """ def uwsgi_pypy_async_sleep(timeout): if timeout > 0: wsgi_req = uwsgi_pypy_current_wsgi_req(); lib.async_add_timeout(wsgi_req, timeout); uwsgi.async_sleep = uwsgi_pypy_async_sleep """ uwsgi.async_connect(addr) """ def uwsgi_pypy_async_connect(addr): fd = lib.uwsgi_connect(ffi.new('char[]', addr), 0, 1) if fd < 0: raise Exception("unable to connect to %s" % addr) return fd uwsgi.async_connect = uwsgi_pypy_async_connect uwsgi.connection_fd = lambda: uwsgi_pypy_current_wsgi_req().fd """ uwsgi.wait_fd_read(fd, timeout=0) """ def uwsgi_pypy_wait_fd_read(fd, timeout=0): wsgi_req = uwsgi_pypy_current_wsgi_req(); if lib.async_add_fd_read(wsgi_req, fd, timeout) < 0: raise Exception("unable to add fd %d to the event queue" % fd) uwsgi.wait_fd_read = uwsgi_pypy_wait_fd_read """ uwsgi.wait_fd_write(fd, timeout=0) """ def uwsgi_pypy_wait_fd_write(fd, timeout=0): wsgi_req = uwsgi_pypy_current_wsgi_req(); if lib.async_add_fd_write(wsgi_req, fd, timeout) < 0: raise Exception("unable to add fd %d to the event queue" % fd) uwsgi.wait_fd_write = uwsgi_pypy_wait_fd_write """ uwsgi.ready_fd() """ def uwsgi_pypy_ready_fd(): wsgi_req = uwsgi_pypy_current_wsgi_req(); return lib.uwsgi_ready_fd(wsgi_req) uwsgi.ready_fd = uwsgi_pypy_ready_fd """ uwsgi.send(fd=None,data) """ def uwsgi_pypy_send(*args): if len(args) == 0: raise ValueError("uwsgi.send() takes at least 1 argument") elif len(args) == 1: wsgi_req = uwsgi_pypy_current_wsgi_req(); fd = wsgi_req.fd data = args[0] else: fd = args[0] data = args[1] rlen = libc.write(fd, data, len(data)) if rlen <= 0: raise IOError("unable to send data") return rlen uwsgi.send = uwsgi_pypy_send """ uwsgi.recv(fd=None,len) """ def uwsgi_pypy_recv(*args): if len(args) == 0: raise ValueError("uwsgi.recv() takes at least 1 argument") elif len(args) == 1: wsgi_req = uwsgi_pypy_current_wsgi_req(); fd = wsgi_req.fd l = args[0] else: fd = args[0] l = args[1] data = ffi.new('char[%d]' % l) rlen = libc.read(fd, data, l) if rlen <= 0: raise IOError("unable to receive data") return ffi.string(data[0:rlen]) uwsgi.recv = uwsgi_pypy_recv """ uwsgi.close(fd) """ uwsgi.close = lambda fd: lib.close(fd) """ uwsgi.disconnect() """ uwsgi.disconnect = lambda: lib.uwsgi_disconnect(uwsgi_pypy_current_wsgi_req()) """ uwsgi.websocket_recv() """ def uwsgi_pypy_websocket_recv(): wsgi_req = uwsgi_pypy_current_wsgi_req(); ub = lib.uwsgi_websocket_recv(wsgi_req); if ub == ffi.NULL: raise IOError("unable to receive websocket message") ret = ffi.string(ub.buf, ub.pos) lib.uwsgi_buffer_destroy(ub) return ret uwsgi.websocket_recv = uwsgi_pypy_websocket_recv """ uwsgi.websocket_recv_nb() """ def uwsgi_pypy_websocket_recv_nb(): wsgi_req = uwsgi_pypy_current_wsgi_req(); ub = lib.uwsgi_websocket_recv_nb(wsgi_req); if ub == ffi.NULL: raise IOError("unable to receive websocket message") ret = ffi.string(ub.buf, ub.pos) lib.uwsgi_buffer_destroy(ub) return ret uwsgi.websocket_recv_nb = uwsgi_pypy_websocket_recv_nb """ uwsgi.websocket_handshake(key, origin) """ def uwsgi_pypy_websocket_handshake(key='', origin='', proto=''): wsgi_req = uwsgi_pypy_current_wsgi_req(); c_key = ffi.new('char[]', key) c_origin = ffi.new('char[]', origin) c_proto = ffi.new('char[]', proto) if lib.uwsgi_websocket_handshake(wsgi_req, c_key, len(key), c_origin, len(origin), c_proto, len(proto)) < 0: raise IOError("unable to complete websocket handshake") uwsgi.websocket_handshake = uwsgi_pypy_websocket_handshake """ uwsgi.websocket_send(msg) """ def uwsgi_pypy_websocket_send(msg): wsgi_req = uwsgi_pypy_current_wsgi_req(); if lib.uwsgi_websocket_send(wsgi_req, ffi.new('char[]', msg), len(msg)) < 0: raise IOError("unable to send websocket message") uwsgi.websocket_send = uwsgi_pypy_websocket_send """ uwsgi.chunked_read(timeout=0) """ def uwsgi_pypy_chunked_read(timeout=0): wsgi_req = uwsgi_pypy_current_wsgi_req(); rlen = ffi.new("size_t*") chunk = lib.uwsgi_chunked_read(wsgi_req, rlen, timeout, 0) if chunk == ffi.NULL: raise IOError("unable to receive chunked part") return ffi.string(chunk, rlen[0]) uwsgi.chunked_read = uwsgi_pypy_chunked_read """ uwsgi.chunked_read_nb() """ def uwsgi_pypy_chunked_read_nb(): wsgi_req = uwsgi_pypy_current_wsgi_req(); rlen = ffi.new("size_t*") chunk = lib.uwsgi_chunked_read(wsgi_req, rlen, 0, 1) if chunk == ffi.NULL: if lib.uwsgi_is_again() > 0: return None raise IOError("unable to receive chunked part") return ffi.string(chunk, rlen[0]) uwsgi.chunked_read_nb = uwsgi_pypy_chunked_read_nb """ uwsgi.set_user_harakiri(sec) """ uwsgi.set_user_harakiri = lambda x: lib.set_user_harakiri(x) print "Initialized PyPy with Python", sys.version print "PyPy Home:", sys.prefix """ Continulets support """ # this is the dictionary of continulets (one per-core) uwsgi_pypy_continulets = {} def uwsgi_pypy_continulet_wrapper(cont): lib.async_schedule_to_req_green() @ffi.callback("void()") def uwsgi_pypy_continulet_schedule(): id = lib.uwsgi.wsgi_req.async_id modifier1 = lib.uwsgi.wsgi_req.uh.modifier1; # generate a new continulet if not lib.uwsgi.wsgi_req.suspended: from _continuation import continulet uwsgi_pypy_continulets[id] = continulet(uwsgi_pypy_continulet_wrapper) lib.uwsgi.wsgi_req.suspended = 1 # this is called in the main stack if lib.uwsgi.p[modifier1].suspend: lib.uwsgi.p[modifier1].suspend(ffi.NULL) # let's switch uwsgi_pypy_continulets[id].switch() # back to the main stack if lib.uwsgi.p[modifier1].resume: lib.uwsgi.p[modifier1].resume(ffi.NULL) @ffi.callback("void(struct wsgi_request *)") def uwsgi_pypy_continulet_switch(wsgi_req): id = wsgi_req.async_id modifier1 = wsgi_req.uh.modifier1; # this is called in the current continulet if lib.uwsgi.p[modifier1].suspend: lib.uwsgi.p[modifier1].suspend(wsgi_req) uwsgi_pypy_continulets[id].switch() # back to the continulet if lib.uwsgi.p[modifier1].resume: lib.uwsgi.p[modifier1].resume(wsgi_req) # update current running continulet lib.uwsgi.wsgi_req = wsgi_req def uwsgi_pypy_setup_continulets(): if lib.uwsgi.async <= 1: raise Exception("pypy continulets require async mode !!!") lib.uwsgi.schedule_to_main = uwsgi_pypy_continulet_switch lib.uwsgi.schedule_to_req = uwsgi_pypy_continulet_schedule print "*** PyPy Continulets engine loaded ***"