Index: twisted/test/test_twistd.py =================================================================== --- twisted/test/test_twistd.py (revision 33453) +++ twisted/test/test_twistd.py (working copy) @@ -23,6 +23,7 @@ from zope.interface.verify import verifyObject from twisted.trial import unittest +from twisted.test.test_process import MockOS from twisted import plugin from twisted.application.service import IServiceMaker @@ -34,6 +35,7 @@ from twisted.python.versions import Version from twisted.python.components import Componentized from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IReactorDaemonize from twisted.python.fakepwd import UserDatabase try: @@ -616,7 +618,7 @@ self.runner = UnixApplicationRunner({}) - def daemonize(self): + def daemonize(self, daemonize_reactor, daemonize_os): """ Indicate that daemonization has happened and change the PID so that the value written to the pidfile can be tested in the daemonization case. @@ -806,6 +808,78 @@ +class FakeNonDaemonizingReactor: + """ + A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, + but not announcing this, and logging whether the methods have been called. + + @ivar _beforeDaemonizeCalled: if C{beforeDaemonize} has been called or not. + @type _beforeDaemonizeCalled: C{bool} + @ivar _afterDaemonizeCalled: if C{afterDaemonize} has been called or not. + @type _afterDaemonizeCalled: C{bool} + """ + + def __init__(self): + self._beforeDaemonizeCalled = False + self._afterDaemonizeCalled = False + + def beforeDaemonize(self): + self._beforeDaemonizeCalled = True + + def afterDaemonize(self): + self._afterDaemonizeCalled = True + + + +class FakeDaemonizingReactor(FakeNonDaemonizingReactor): + """ + A dummy reactor, providing C{beforeDaemonize} and C{afterDaemonize} methods, + announcing this, and logging whether the methods have been called. + """ + + implements(IReactorDaemonize) + + + +class ReactorDaemonizationTests(unittest.TestCase): + """ + Tests for L{_twistd_unix.daemonize} and L{IReactorDaemonize}. + """ + if _twistd_unix is None: + skip = "twistd unix not available" + + + def test_daemonizationHooksCalled(self): + """ + L{_twistd_unix.daemonize} indeed calls + L{IReactorDaemonize.beforeDaemonize} and + L{IReactorDaemonize.afterDaemonize} if the reactor implements + L{IReactorDaemonize}. + """ + reactor = FakeDaemonizingReactor() + os = MockOS() + _twistd_unix.daemonize(reactor, os) + msg = "At least one reactor daemonization hook WAS NOT called" + self.assertTrue(reactor._beforeDaemonizeCalled and \ + reactor._afterDaemonizeCalled, msg = msg) + + + def test_daemonizationHooksNotCalled(self): + """ + L{_twistd_unix.daemonize} does NOT call + L{IReactorDaemonize.beforeDaemonize} or + L{IReactorDaemonize.afterDaemonize} if the reactor does NOT + implement L{IReactorDaemonize}. + """ + reactor = FakeNonDaemonizingReactor() + os = MockOS() + _twistd_unix.daemonize(reactor, os) + msg = "At least one reactor daemonization hook WAS called" + self.assertTrue(not reactor._beforeDaemonizeCalled and \ + not reactor._afterDaemonizeCalled, msg = msg) + + + class DummyReactor(object): """ A dummy reactor, only providing a C{run} method and checking that it Index: twisted/topfiles/1918.feature =================================================================== --- twisted/topfiles/1918.feature (revision 0) +++ twisted/topfiles/1918.feature (revision 0) @@ -0,0 +1 @@ +The kqueue reactor has been revived. Index: twisted/scripts/_twistd_unix.py =================================================================== --- twisted/scripts/_twistd_unix.py (revision 33453) +++ twisted/scripts/_twistd_unix.py (working copy) @@ -7,6 +7,7 @@ from twisted.python import log, syslog, logfile, usage from twisted.python.util import switchUID, uidFromString, gidFromString from twisted.application import app, service +from twisted.internet.interfaces import IReactorDaemonize from twisted import copyright @@ -155,8 +156,27 @@ -def daemonize(): - # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16 +def daemonize(reactor, os): + """ + Daemonizes the application on Unix. This is done by the usual double + forking approach. + + See: + [1] http://code.activestate.com/recipes/278731/ + [2] W. Richard Stevens, "Advanced Programming in the Unix Environment", + 1992, Addison-Wesley, ISBN 0-201-56317-7 + + @type reactor: C{reactor} + @param reactor: The reactor in use. + @type os: C{os} + @param os: The 'os' module to use for daemonization. + """ + + ## If the reactor requires hooks to be called for daemonization, call them. + ## Currently the only reactor which provides/needs that is KQueueReactor. + if IReactorDaemonize.providedBy(reactor): + reactor.beforeDaemonize() + if os.fork(): # launch child and... os._exit(0) # kill off parent os.setsid() @@ -171,8 +191,11 @@ raise os.close(null) + if IReactorDaemonize.providedBy(reactor): + reactor.afterDaemonize() + def launchWithName(name): if name and name != sys.argv[0]: exe = os.path.realpath(sys.executable) @@ -265,7 +288,8 @@ if umask is not None: os.umask(umask) if daemon: - daemonize() + from twisted.internet import reactor + daemonize(reactor, os) if pidfile: f = open(pidfile,'wb') f.write(str(os.getpid())) Index: twisted/internet/interfaces.py =================================================================== --- twisted/internet/interfaces.py (revision 33453) +++ twisted/internet/interfaces.py (working copy) @@ -872,6 +872,35 @@ """ +class IReactorDaemonize(Interface): + """ + A reactor which provides hooks that need to be called before and after + daemonization. + + Notes: + - This interface SHOULD NOT be called by applications. + - This interface should only be implemented by reactors as a workaround + (in particular, it's implemented currently only by kqueue()). + For details please see the comments on ticket #1918. + """ + + def beforeDaemonize(): + """ + Hook to be called immediately before daemonization. No reactor methods + must be called until L{afterDaemonize} is called. + + @return: C{None}. + """ + + def afterDaemonize(): + """ + Hook to be called immediately after daemonization. This must only be + called after L{beforeDaemonize} had been called previously. + + @return: C{None}. + """ + + class IReactorFDSet(Interface): """ Implement me to be able to use L{IFileDescriptor} type resources. @@ -1901,4 +1930,3 @@ @return: a client endpoint @rtype: L{IStreamClientEndpoint} """ - Index: twisted/internet/test/test_fdset.py =================================================================== --- twisted/internet/test/test_fdset.py (revision 33453) +++ twisted/internet/test/test_fdset.py (working copy) @@ -286,7 +286,7 @@ reactor = self.buildReactor() name = reactor.__class__.__name__ - if name in ('EPollReactor', 'CFReactor'): + if name in ('EPollReactor', 'KQueueReactor', 'CFReactor'): # Closing a file descriptor immediately removes it from the epoll # set without generating a notification. That means epollreactor # will not call any methods on Victim after the close, so there's Index: twisted/internet/kqreactor.py =================================================================== --- twisted/internet/kqreactor.py (revision 33453) +++ twisted/internet/kqreactor.py (working copy) @@ -4,68 +4,36 @@ """ A kqueue()/kevent() based implementation of the Twisted main loop. -To install the event loop (and you should do this before any connections, -listeners or connectors are added):: +To use this reactor, start your application specifying the kqueue reactor: - | from twisted.internet import kqreactor - | kqreactor.install() + twistd ... --reactor kqueue -This reactor only works on FreeBSD and requires PyKQueue 1.3, which is -available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} +To install the event loop from code (and you should do this before any +connections, listeners or connectors are added):: + from twisted.internet import kqreactor + kqreactor.install() +This implementation depends on Python 2.6 or higher which has kqueue support +built in the select module. -You're going to need to patch PyKqueue:: +Note, that you should use Python 2.6.5 or higher, since previous implementations +of select.kqueue had - ===================================================== - --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 - +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 - @@ -137,7 +137,7 @@ - } - - statichere PyTypeObject KQEvent_Type = { - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, // ob_size - "KQEvent", // tp_name - sizeof(KQEventObject), // tp_basicsize - @@ -291,13 +291,14 @@ - - /* Build timespec for timeout */ - totimespec.tv_sec = timeout / 1000; - - totimespec.tv_nsec = (timeout % 1000) * 100000; - + totimespec.tv_nsec = (timeout % 1000) * 1000000; - - // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); - - /* Make the call */ - - - + Py_BEGIN_ALLOW_THREADS - gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); - + Py_END_ALLOW_THREADS - - /* Don't need the input event list anymore, so get rid of it */ - free (changelist); - @@ -361,7 +362,7 @@ - statichere PyTypeObject KQueue_Type = { - /* The ob_type field must be initialized in the module init function - * to be portable to Windows without using C++. */ - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, /*ob_size*/ - "KQueue", /*tp_name*/ - sizeof(KQueueObject), /*tp_basicsize*/ + http://bugs.python.org/issue5910 +not yet fixed. """ -import errno, sys +import errno from zope.interface import implements -from kqsyscall import EVFILT_READ, EVFILT_WRITE, EV_DELETE, EV_ADD -from kqsyscall import kqueue, kevent +from select import kqueue, kevent +from select import KQ_FILTER_READ, KQ_FILTER_WRITE +from select import KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF -from twisted.internet.interfaces import IReactorFDSet +from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize from twisted.python import log, failure from twisted.internet import main, posixbase @@ -73,7 +41,8 @@ class KQueueReactor(posixbase.PosixReactorBase): """ - A reactor that uses kqueue(2)/kevent(2). + A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher + which has built in support for kqueue in the select module. @ivar _kq: A L{kqueue} which will be used to check for I/O readiness. @@ -95,12 +64,18 @@ dispatched to the corresponding L{FileDescriptor} instances in C{_selectables}. """ - implements(IReactorFDSet) + implements(IReactorFDSet, IReactorDaemonize) + def __init__(self): """ Initialize kqueue object, file descriptor tracking dictionaries, and the base class. + + See: + - http://docs.python.org/library/select.html + - www.freebsd.org/cgi/man.cgi?query=kqueue + - people.freebsd.org/~jlemon/papers/kqueue.pdf """ self._kq = kqueue() self._reads = {} @@ -109,50 +84,134 @@ posixbase.PosixReactorBase.__init__(self) - def _updateRegistration(self, *args): - self._kq.kevent([kevent(*args)], 0, 0) + def _updateRegistration(self, fd, filter, op): + """ + Private method for changing kqueue registration on a given FD + filtering for events given filter/op. This will never block and + returns nothing. + """ + self._kq.control([kevent(fd, filter, op)], 0, 0) + + def beforeDaemonize(self): + """ + Implement L{IReactorDaemonize.beforeDaemonize}. + """ + # Twisted-internal method called during daemonization (when application + # is started via twistd). This is called right before the magic double + # forking done for daemonization. We cleanly close the kqueue() and later + # recreate it. This is needed since a) kqueue() are not inherited across + # forks and b) twistd will create the reactor already before daemonization + # (and will also add at least 1 reader to the reactor, an instance of + # twisted.internet.posixbase._UnixWaker). + # + # See: twisted.scripts._twistd_unix.daemonize() + self._kq.close() + self._kq = None + + + def afterDaemonize(self): + """ + Implement L{IReactorDaemonize.afterDaemonize}. + """ + # Twisted-internal method called during daemonization. This is called right + # after daemonization and recreates the kqueue() and any readers/writers + # that were added before. Note that you MUST NOT call any reactor methods + # in between beforeDaemonize() and afterDaemonize()! + self._kq = kqueue() + for fd in self._reads: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) + for fd in self._writes: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + + def addReader(self, reader): - """Add a FileDescriptor for notification of data available to read. """ + Implement L{IReactorFDSet.addReader}. + """ fd = reader.fileno() if fd not in self._reads: - self._selectables[fd] = reader - self._reads[fd] = 1 - self._updateRegistration(fd, EVFILT_READ, EV_ADD) + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) + except OSError: + pass + finally: + self._selectables[fd] = reader + self._reads[fd] = 1 + def addWriter(self, writer): - """Add a FileDescriptor for notification of data available to write. """ + Implement L{IReactorFDSet.addWriter}. + """ fd = writer.fileno() if fd not in self._writes: - self._selectables[fd] = writer - self._writes[fd] = 1 - self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) + except OSError: + pass + finally: + self._selectables[fd] = writer + self._writes[fd] = 1 + def removeReader(self, reader): - """Remove a Selectable for notification of data available to read. """ - fd = reader.fileno() + Implement L{IReactorFDSet.removeReader}. + """ + wasLost = False + try: + fd = reader.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if reader is fdes: + wasLost = True + break + else: + return if fd in self._reads: del self._reads[fd] if fd not in self._writes: del self._selectables[fd] - self._updateRegistration(fd, EVFILT_READ, EV_DELETE) + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) + except OSError: + pass + def removeWriter(self, writer): - """Remove a Selectable for notification of data available to write. """ - fd = writer.fileno() + Implement L{IReactorFDSet.removeWriter}. + """ + wasLost = False + try: + fd = writer.fileno() + except: + fd = -1 + if fd == -1: + for fd, fdes in self._selectables.items(): + if writer is fdes: + wasLost = True + break + else: + return if fd in self._writes: del self._writes[fd] if fd not in self._reads: del self._selectables[fd] - self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) + if not wasLost: + try: + self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) + except OSError: + pass + def removeAll(self): """ - Remove all selectables, and return a list of them. + Implement L{IReactorFDSet.removeAll}. """ return self._removeAll( [self._selectables[fd] for fd in self._reads], @@ -160,62 +219,91 @@ def getReaders(self): + """ + Implement L{IReactorFDSet.getReaders}. + """ return [self._selectables[fd] for fd in self._reads] def getWriters(self): + """ + Implement L{IReactorFDSet.getWriters}. + """ return [self._selectables[fd] for fd in self._writes] def doKEvent(self, timeout): - """Poll the kqueue for new events.""" + """ + Poll the kqueue for new events. + """ if timeout is None: - timeout = 1000 - else: - timeout = int(timeout * 1000) # convert seconds to milliseconds + timeout = 1 try: - l = self._kq.kevent([], len(self._selectables), timeout) + l = self._kq.control([], len(self._selectables), timeout) except OSError, e: if e[0] == errno.EINTR: return else: raise + _drdw = self._doWriteOrRead for event in l: - why = None - fd, filter = event.ident, event.filter + fd = event.ident try: selectable = self._selectables[fd] except KeyError: # Handles the infrequent case where one selectable's # handler disconnects another. continue - log.callWithLogger(selectable, _drdw, selectable, fd, filter) + else: + log.callWithLogger(selectable, _drdw, selectable, fd, event) - def _doWriteOrRead(self, selectable, fd, filter): - try: - if filter == EVFILT_READ: - why = selectable.doRead() - if filter == EVFILT_WRITE: - why = selectable.doWrite() - if not selectable.fileno() == fd: - why = main.CONNECTION_LOST - except: - why = sys.exc_info()[1] - log.deferr() + def _doWriteOrRead(self, selectable, fd, event): + """ + Private method called when a FD is ready for reading, writing or was + lost. Do the work and raise errors where necessary. + """ + why = None + inRead = False + (filter, flags, data, fflags) = ( + event.filter, event.flags, event.data, event.fflags) + + if flags & KQ_EV_EOF and data and fflags: + why = main.CONNECTION_LOST + else: + try: + if selectable.fileno() == -1: + inRead = False + why = posixbase._NO_FILEDESC + else: + if filter == KQ_FILTER_READ: + inRead = True + why = selectable.doRead() + if filter == KQ_FILTER_WRITE: + inRead = False + why = selectable.doWrite() + except: + # Any exception from application code gets logged and will + # cause us to disconnect the selectable. + why = failure.Failure() + log.err(why, "An exception was raised from application code" \ + " while processing a reactor selectable") + if why: - self.removeReader(selectable) - self.removeWriter(selectable) - selectable.connectionLost(failure.Failure(why)) + self._disconnectSelectable(selectable, why, inRead) doIteration = doKEvent def install(): - k = KQueueReactor() - main.installReactor(k) + """ + Install the kqueue() reactor. + """ + p = KQueueReactor() + from twisted.internet.main import installReactor + installReactor(p) __all__ = ["KQueueReactor", "install"]