Index: twisted/internet/test/test_fdset.py =================================================================== --- twisted/internet/test/test_fdset.py (revision 33091) +++ twisted/internet/test/test_fdset.py (working copy) @@ -286,7 +286,7 @@ reactor = self.buildReactor() name = reactor.__class__.__name__ - if name in ('EPollReactor', 'CFReactor'): + if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): # Closing a file descriptor immediately removes it from the epoll # set without generating a notification. That means epollreactor # will not call any methods on Victim after the close, so there's Index: twisted/internet/kqreactor.py =================================================================== --- twisted/internet/kqreactor.py (revision 33091) +++ twisted/internet/kqreactor.py (working copy) @@ -10,60 +10,16 @@ | from twisted.internet import kqreactor | kqreactor.install() -This reactor only works on FreeBSD and requires PyKQueue 1.3, which is -available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} - - - -You're going to need to patch PyKqueue:: - - ===================================================== - --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 - +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 - @@ -137,7 +137,7 @@ - } - - statichere PyTypeObject KQEvent_Type = { - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, // ob_size - "KQEvent", // tp_name - sizeof(KQEventObject), // tp_basicsize - @@ -291,13 +291,14 @@ - - /* Build timespec for timeout */ - totimespec.tv_sec = timeout / 1000; - - totimespec.tv_nsec = (timeout % 1000) * 100000; - + totimespec.tv_nsec = (timeout % 1000) * 1000000; - - // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); - - /* Make the call */ - - - + Py_BEGIN_ALLOW_THREADS - gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); - + Py_END_ALLOW_THREADS - - /* Don't need the input event list anymore, so get rid of it */ - free (changelist); - @@ -361,7 +362,7 @@ - statichere PyTypeObject KQueue_Type = { - /* The ob_type field must be initialized in the module init function - * to be portable to Windows without using C++. */ - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, /*ob_size*/ - "KQueue", /*tp_name*/ - sizeof(KQueueObject), /*tp_basicsize*/ - +This implementation depends on Python 2.6 or higher which has kqueue support +built in the select module. """ import errno, sys from zope.interface import implements -from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD -from kqsyscall import kqueue, kevent +from select import kqueue, kevent +from select import KQ_FILTER_READ, KQ_FILTER_WRITE, KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF from twisted.internet.interfaces import IReactorFDSet @@ -73,7 +29,8 @@ class KQueueReactor(posixbase.PosixReactorBase): """ - A reactor that uses kqueue(2)/kevent(2). + A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher + which has built in support for kqueue in the select module. @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. @@ -97,6 +54,7 @@ """ implements(IReactorFDSet) + def __init__(self): """ Initialize kqueue object, file descriptor tracking dictionaries, and the @@ -109,47 +67,97 @@ posixbase.PosixReactorBase.__init__(self) - def _updateRegistration(self, *args): - self._kq.kevent([kevent(*args)], 0, 0) + def _updateRegistration(self, fd, filter, op): + """ + Private method for changing kqueue registration. + """ + self._kq.control([kevent(fd, filter, op)], 0, 0) + def addReader(self, reader): - """Add a FileDescriptor for notification of data available to read. """ + Add a FileDescriptor for notification of data available to read. + """ fd = reader.fileno() if fd not in self._reads: - self._selectables[fd] = reader - self._reads[fd] = 1 - self._updateRegistration(fd, EVFILT_READ, EV_ADD) + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) + except OSError, e: + pass + finally: + self._selectables[fd] = reader + self._reads[fd] = 1 + def addWriter(self, writer): - """Add a FileDescriptor for notification of data available to write. """ + Add a FileDescriptor for notification of data available to write. + """ fd = writer.fileno() if fd not in self._writes: - self._selectables[fd] = writer - self._writes[fd] = 1 - self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + except OSError, e: + pass + finally: + self._selectables[fd] = writer + self._writes[fd] = 1 + def removeReader(self, reader): - """Remove a Selectable for notification of data available to read. """ - fd = reader.fileno() + Remove a Selectable for notification of data available to read. + """ + wasLost = False + try: + fd = reader.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if reader is fdes: + wasLost = True + break + else: + return if fd in self._reads: del self._reads[fd] if fd not in self._writes: del self._selectables[fd] - self._updateRegistration(fd, EVFILT_READ, EV_DELETE) + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) + except OSError, e: + pass + def removeWriter(self, writer): - """Remove a Selectable for notification of data available to write. """ - fd = writer.fileno() + Remove a Selectable for notification of data available to write. + """ + wasLost = False + try: + fd = writer.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if writer is fdes: + wasLost = True + break + else: + return if fd in self._writes: del self._writes[fd] if fd not in self._reads: del self._selectables[fd] - self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) + except OSError, e: + pass + def removeAll(self): """ Remove all selectables, and return a list of them. @@ -168,54 +176,71 @@ def doKEvent(self, timeout): - """Poll the kqueue for new events.""" + """ + Poll the kqueue for new events. + """ if timeout is None: - timeout = 1000 - else: - timeout = int(timeout * 1000) # convert seconds to milliseconds + timeout = 1 try: - l = self._kq.kevent([], len(self._selectables), timeout) + l = self._kq.control([], len(self._selectables), timeout) except OSError, e: if e[0] == errno.EINTR: return else: raise + _drdw = self._doWriteOrRead for event in l: - why = None - fd, filter = event.ident, event.filter + fd = event.ident try: selectable = self._selectables[fd] except KeyError: # Handles the infrequent case where one selectable's # handler disconnects another. continue - log.callWithLogger(selectable, _drdw, selectable, fd, filter) + else: + log.callWithLogger(selectable, _drdw, selectable, fd, event) - def _doWriteOrRead(self, selectable, fd, filter): - try: - if filter == EVFILT_READ: - why = selectable.doRead() - if filter == EVFILT_WRITE: - why = selectable.doWrite() - if not selectable.fileno() == fd: - why = main.CONNECTION_LOST - except: - why = sys.exc_info()[1] - log.deferr() + def _doWriteOrRead(self, selectable, fd, event): + why = None + inRead = False + filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags + + if flags & KQ_EV_EOF and data and fflags: + why = main.CONNECTION_LOST + else: + try: + if selectable.fileno() == -1: + inRead = False + why = posixbase._NO_FILEDESC + else: + if filter == KQ_FILTER_READ: + inRead = True + why = selectable.doRead() + if filter == KQ_FILTER_WRITE: + inRead = False + why = selectable.doWrite() + except: + # Any exception from application code gets logged and will + # cause us to disconnect the selectable. + why = sys.exc_info()[1] + log.err() + if why: - self.removeReader(selectable) - self.removeWriter(selectable) - selectable.connectionLost(failure.Failure(why)) + self._disconnectSelectable(selectable, why, inRead) doIteration = doKEvent def install(): - k = KQueueReactor() - main.installReactor(k) + """ + Install the kqueue() reactor. + """ + p = KQueueReactor() + from twisted.internet.main import installReactor + installReactor(p) __all__ = ["KQueueReactor", "install"]