Index: twisted/scripts/_twistd_unix.py =================================================================== --- twisted/scripts/_twistd_unix.py (revision 33114) +++ twisted/scripts/_twistd_unix.py (working copy) @@ -7,6 +7,7 @@ from twisted.python import log, syslog, logfile, usage from twisted.python.util import switchUID, uidFromString, gidFromString from twisted.application import app, service +from twisted.internet.interfaces import IReactorDaemonize from twisted import copyright @@ -156,7 +157,22 @@ def daemonize(): - # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 + """ + Daemonizes the application on Unix. This is done by the usual double + forking approach. + + See: + [1] http://code.activestate.com/recipes/278731/ + [2] W. Richard Stevens, "Advanced Programming in the Unix Environment", + 1992, Addison-Wesley, ISBN 0-201-56317-7 + """ + + ## If the reactor requires hooks to be called for daemonization, call them. + ## Currently the only reactor which provides/needs that is KQueueReactor. + from twisted.internet import reactor + if IReactorDaemonize.providedBy(reactor): + reactor.beforeDaemonize() + if os.fork(): # launch child and... os._exit(0) # kill off parent os.setsid() @@ -171,8 +187,11 @@ raise os.close(null) + if IReactorDaemonize.providedBy(reactor): + reactor.afterDaemonize() + def launchWithName(name): if name and name != sys.argv[0]: exe = os.path.realpath(sys.executable) Index: twisted/internet/interfaces.py =================================================================== --- twisted/internet/interfaces.py (revision 33114) +++ twisted/internet/interfaces.py (working copy) @@ -870,6 +870,29 @@ """ +class IReactorDaemonize(Interface): + """ + A reactor which provides hooks that need to be called before and after + daemonization. + """ + + def beforeDaemonize(): + """ + Hook to be called immediately before daemonization. No reactor methods + must be called until L{afterDaemonize} is called. + + @return: C{None}. + """ + + def afterDaemonize(): + """ + Hook to be called immediately after daemonization. This must only be + called after L{beforeDaemonize} had been called previously. + + @return: C{None}. + """ + + class IReactorFDSet(Interface): """ Implement me to be able to use L{IFileDescriptor} type resources. @@ -1899,4 +1922,3 @@ @return: a client endpoint @rtype: L{IStreamClientEndpoint} """ - Index: twisted/internet/test/test_fdset.py =================================================================== --- twisted/internet/test/test_fdset.py (revision 33114) +++ twisted/internet/test/test_fdset.py (working copy) @@ -286,7 +286,7 @@ reactor = self.buildReactor() name = reactor.__class__.__name__ - if name in ('EPollReactor', 'CFReactor'): + if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): # Closing a file descriptor immediately removes it from the epoll # set without generating a notification. That means epollreactor # will not call any methods on Victim after the close, so there's Index: twisted/internet/kqreactor.py =================================================================== --- twisted/internet/kqreactor.py (revision 33114) +++ twisted/internet/kqreactor.py (working copy) @@ -4,68 +4,36 @@ """ A kqueue()/kevent() based implementation of the Twisted main loop. -To install the event loop (and you should do this before any connections, -listeners or connectors are added):: +To use this reactor, start your application specifying the kqueue reactor: - | from twisted.internet import kqreactor - | kqreactor.install() + twistd ... --reactor kqueue -This reactor only works on FreeBSD and requires PyKQueue 1.3, which is -available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} +To install the event loop from code (and you should do this before any +connections, listeners or connectors are added):: + from twisted.internet import kqreactor + kqreactor.install() +This implementation depends on Python 2.6 or higher which has kqueue support +built in the select module. -You're going to need to patch PyKqueue:: +Note, that you should use Python 2.6.5 or higher, since previous implementations +of select.kqueue had - ===================================================== - --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 - +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 - @@ -137,7 +137,7 @@ - } - - statichere PyTypeObject KQEvent_Type = { - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, // ob_size - "KQEvent", // tp_name - sizeof(KQEventObject), // tp_basicsize - @@ -291,13 +291,14 @@ - - /* Build timespec for timeout */ - totimespec.tv_sec = timeout / 1000; - - totimespec.tv_nsec = (timeout % 1000) * 100000; - + totimespec.tv_nsec = (timeout % 1000) * 1000000; - - // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); - - /* Make the call */ - - - + Py_BEGIN_ALLOW_THREADS - gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); - + Py_END_ALLOW_THREADS - - /* Don't need the input event list anymore, so get rid of it */ - free (changelist); - @@ -361,7 +362,7 @@ - statichere PyTypeObject KQueue_Type = { - /* The ob_type field must be initialized in the module init function - * to be portable to Windows without using C++. */ - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, /*ob_size*/ - "KQueue", /*tp_name*/ - sizeof(KQueueObject), /*tp_basicsize*/ + http://bugs.python.org/issue5910 +not yet fixed. """ -import errno, sys +import errno from zope.interface import implements -from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD -from kqsyscall import kqueue, kevent +from select import kqueue, kevent +from select import KQ_FILTER_READ, KQ_FILTER_WRITE, \ + KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF -from twisted.internet.interfaces import IReactorFDSet +from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize from twisted.python import log, failure from twisted.internet import main, posixbase @@ -73,7 +41,8 @@ class KQueueReactor(posixbase.PosixReactorBase): """ - A reactor that uses kqueue(2)/kevent(2). + A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher + which has built in support for kqueue in the select module. @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. @@ -95,12 +64,18 @@ dispatched to the corresponding L{FileDescriptor} instances in C{_selectables}. """ - implements(IReactorFDSet) + implements(IReactorFDSet, IReactorDaemonize) + def __init__(self): """ Initialize kqueue object, file descriptor tracking dictionaries, and the base class. + + See: + - http://docs.python.org/library/select.html + - www.freebsd.org/cgi/man.cgi?query=kqueue + - people.freebsd.org/~jlemon/papers/kqueue.pdf """ self._kq = kqueue() self._reads = {} @@ -109,50 +84,134 @@ posixbase.PosixReactorBase.__init__(self) - def _updateRegistration(self, *args): - self._kq.kevent([kevent(*args)], 0, 0) + def _updateRegistration(self, fd, filter, op): + """ + Private method for changing kqueue registration on a given FD + filtering for events given filter/op. This will never block and + returns nothing. + """ + self._kq.control([kevent(fd, filter, op)], 0, 0) + + def beforeDaemonize(self): + """ + Implement L{IReactorDaemonize.beforeDaemonize}. + """ + # Twisted-internal method called during daemonization (when application + # is started via twistd). This is called right before the magic double + # forking done for daemonization. We cleanly close the kqueue() and later + # recreate it. This is needed since a) kqueue() are not inherited across + # forks and b) twistd will create the reactor already before daemonization + # (and will also add at least 1 reader to the reactor, an instance of + # twisted.internet.posixbase._UnixWaker). + # + # See: twisted.scripts._twistd_unix.daemonize() + self._kq.close() + self._kq = None + + + def afterDaemonize(self): + """ + Implement L{IReactorDaemonize.afterDaemonize}. + """ + # Twisted-internal method called during daemonization. This is called right + # after daemonization and recreates the kqueue() and any readers/writers + # that were added before. Note that you MUST NOT call any reactor methods + # in between beforeDaemonize() and afterDaemonize()! + self._kq = kqueue() + for fd in self._reads: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) + for fd in self._writes: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + + def addReader(self, reader): - """Add a FileDescriptor for notification of data available to read. """ + Implement L{IReactorFDSet.addReader}. + """ fd = reader.fileno() if fd not in self._reads: - self._selectables[fd] = reader - self._reads[fd] = 1 - self._updateRegistration(fd, EVFILT_READ, EV_ADD) + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) + except OSError: + pass + finally: + self._selectables[fd] = reader + self._reads[fd] = 1 + def addWriter(self, writer): - """Add a FileDescriptor for notification of data available to write. """ + Implement L{IReactorFDSet.addWriter}. + """ fd = writer.fileno() if fd not in self._writes: - self._selectables[fd] = writer - self._writes[fd] = 1 - self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + except OSError: + pass + finally: + self._selectables[fd] = writer + self._writes[fd] = 1 + def removeReader(self, reader): - """Remove a Selectable for notification of data available to read. """ - fd = reader.fileno() + Implement L{IReactorFDSet.removeReader}. + """ + wasLost = False + try: + fd = reader.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if reader is fdes: + wasLost = True + break + else: + return if fd in self._reads: del self._reads[fd] if fd not in self._writes: del self._selectables[fd] - self._updateRegistration(fd, EVFILT_READ, EV_DELETE) + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) + except OSError: + pass + def removeWriter(self, writer): - """Remove a Selectable for notification of data available to write. """ - fd = writer.fileno() + Implement L{IReactorFDSet.removeWriter}. + """ + wasLost = False + try: + fd = writer.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if writer is fdes: + wasLost = True + break + else: + return if fd in self._writes: del self._writes[fd] if fd not in self._reads: del self._selectables[fd] - self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) + except OSError: + pass + def removeAll(self): """ - Remove all selectables, and return a list of them. + Implement L{IReactorFDSet.removeAll}. """ return self._removeAll( [self._selectables[fd] for fd in self._reads], @@ -160,62 +219,90 @@ def getReaders(self): + """ + Implement L{IReactorFDSet.getReaders}. + """ return [self._selectables[fd] for fd in self._reads] def getWriters(self): + """ + Implement L{IReactorFDSet.getWriters}. + """ return [self._selectables[fd] for fd in self._writes] def doKEvent(self, timeout): - """Poll the kqueue for new events.""" + """ + Poll the kqueue for new events. + """ if timeout is None: - timeout = 1000 - else: - timeout = int(timeout * 1000) # convert seconds to milliseconds + timeout = 1 try: - l = self._kq.kevent([], len(self._selectables), timeout) + l = self._kq.control([], len(self._selectables), timeout) except OSError, e: if e[0] == errno.EINTR: return else: raise + _drdw = self._doWriteOrRead for event in l: - why = None - fd, filter = event.ident, event.filter + fd = event.ident try: selectable = self._selectables[fd] except KeyError: # Handles the infrequent case where one selectable's # handler disconnects another. continue - log.callWithLogger(selectable, _drdw, selectable, fd, filter) + else: + log.callWithLogger(selectable, _drdw, selectable, fd, event) - def _doWriteOrRead(self, selectable, fd, filter): - try: - if filter == EVFILT_READ: - why = selectable.doRead() - if filter == EVFILT_WRITE: - why = selectable.doWrite() - if not selectable.fileno() == fd: - why = main.CONNECTION_LOST - except: - why = sys.exc_info()[1] - log.deferr() + def _doWriteOrRead(self, selectable, fd, event): + """ + Private method called when a FD is ready for reading, writing or was + lost. Do the work and raise errors where necessary. + """ + why = None + inRead = False + (filter, flags, data, fflags) = ( + event.filter, event.flags, event.data, event.fflags) + + if flags & KQ_EV_EOF and data and fflags: + why = main.CONNECTION_LOST + else: + try: + if selectable.fileno() == -1: + inRead = False + why = posixbase._NO_FILEDESC + else: + if filter == KQ_FILTER_READ: + inRead = True + why = selectable.doRead() + if filter == KQ_FILTER_WRITE: + inRead = False + why = selectable.doWrite() + except: + # Any exception from application code gets logged and will + # cause us to disconnect the selectable. + why = failure.Failure() + log.err(why) + if why: - self.removeReader(selectable) - self.removeWriter(selectable) - selectable.connectionLost(failure.Failure(why)) + self._disconnectSelectable(selectable, why, inRead) doIteration = doKEvent def install(): - k = KQueueReactor() - main.installReactor(k) + """ + Install the kqueue() reactor. + """ + p = KQueueReactor() + from twisted.internet.main import installReactor + installReactor(p) __all__ = ["KQueueReactor", "install"] Index: 1918.feature =================================================================== --- 1918.feature (revision 0) +++ 1918.feature (revision 0) @@ -0,0 +1 @@ +The kqueue reactor has been revived. (#1918)