Index: twisted/conch/test/test_recvline.py =================================================================== --- twisted/conch/test/test_recvline.py (revision 17515) +++ twisted/conch/test/test_recvline.py (working copy) @@ -11,7 +11,7 @@ from twisted.conch import recvline from twisted.python import log, reflect, components -from twisted.internet import defer, error, task +from twisted.internet import defer, error, task, reactor from twisted.trial import unittest from twisted.cred import portal from twisted.test.proto_helpers import StringTransport @@ -664,3 +664,7 @@ class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin): if stdio is None: skip = "Terminal requirements missing, can't run historic recvline tests over stdio" + +if reactor.__class__.__name__ == 'KQueueReactor': + _StdioMixin.skip = "KQueue doesn't support this yet" + Index: twisted/test/test_process.py =================================================================== --- twisted/test/test_process.py (revision 17515) +++ twisted/test/test_process.py (working copy) @@ -29,6 +29,14 @@ from twisted.python import util, runtime from twisted.python import procutils +kqreactor = False +kqosx = False +if reactor.__class__.__name__ == 'KQueueReactor': + kqueueskipmessage = "KQueue doesn't support this yet" + kqreactor = True + if runtime.platform.isMacOSX(): + kqosx = True + class TrivialProcessProtocol(protocol.ProcessProtocol): def __init__(self, d): self.deferred = d @@ -371,6 +379,8 @@ reactor.callLater(1, self.close, 0) reactor.callLater(2, self.close, 1) return self._onClose() + if kqosx: + testClosePty.skip = kqueueskipmessage + " on Mac OSX" def testKillPty(self): if self.verbose: print "starting processes" @@ -378,6 +388,8 @@ reactor.callLater(1, self.kill, 0) reactor.callLater(2, self.kill, 1) return self._onClose() + if kqreactor: + testKillPty.skip = kqueueskipmessage class FDChecker(protocol.ProcessProtocol): state = 0 @@ -554,6 +566,8 @@ self.assertEquals(p.reason.value.signal, None) d.addCallback(check) return d + if kqreactor: + testNormalTermination.skip = kqueueskipmessage def testAbnormalTermination(self): if os.path.exists('/bin/false'): cmd = '/bin/false' @@ -571,6 +585,8 @@ self.assertEquals(p.reason.value.signal, None) d.addCallback(check) return d + if kqreactor: + testAbnormalTermination.skip = kqueueskipmessage def _testSignal(self, sig): exe = sys.executable @@ -665,7 +681,6 @@ "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue()) return d.addCallback(processEnded) - def testBadArgs(self): pyExe = sys.executable pyArgs = [pyExe, "-u", "-c", "print 'hello'"] @@ -870,3 +885,6 @@ if not interfaces.IReactorProcess(reactor, None): ProcessTestCase.skip = skipMessage ClosingPipes.skip = skipMessage + +if kqreactor: + PosixProcessTestCasePTY.skip = kqueueskipmessage Index: twisted/internet/kqreactor.py =================================================================== --- twisted/internet/kqreactor.py (revision 17515) +++ twisted/internet/kqreactor.py (working copy) @@ -10,181 +10,186 @@ | from twisted.internet import kqreactor | kqreactor.install() -This reactor only works on FreeBSD and requires PyKQueue 1.3, which is -available at: U{http://people.freebsd.org/~dwhite/PyKQueue/} +This reactor only works on FreeBSD and requires PyKQueue 2, which is +available at: U{http://python-hpio.net/trac/wiki/PyKQueue} API Stability: stable Maintainer: U{Itamar Shtull-Trauring} - - - -You're going to need to patch PyKqueue:: - - ===================================================== - --- PyKQueue-1.3/kqsyscallmodule.c Sun Jan 28 21:59:50 2001 - +++ PyKQueue-1.3/kqsyscallmodule.c.new Tue Jul 30 18:06:08 2002 - @@ -137,7 +137,7 @@ - } - - statichere PyTypeObject KQEvent_Type = { - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, // ob_size - "KQEvent", // tp_name - sizeof(KQEventObject), // tp_basicsize - @@ -291,13 +291,14 @@ - - /* Build timespec for timeout */ - totimespec.tv_sec = timeout / 1000; - - totimespec.tv_nsec = (timeout % 1000) * 100000; - + totimespec.tv_nsec = (timeout % 1000) * 1000000; - - // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec); - - /* Make the call */ - - - + Py_BEGIN_ALLOW_THREADS - gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec); - + Py_END_ALLOW_THREADS - - /* Don't need the input event list anymore, so get rid of it */ - free (changelist); - @@ -361,7 +362,7 @@ - statichere PyTypeObject KQueue_Type = { - /* The ob_type field must be initialized in the module init function - * to be portable to Windows without using C++. */ - - PyObject_HEAD_INIT(NULL) - + PyObject_HEAD_INIT(&PyType_Type) - 0, /*ob_size*/ - "KQueue", /*tp_name*/ - sizeof(KQueueObject), /*tp_basicsize*/ - """ # System imports import errno, sys # PyKQueue imports -from kqsyscall import * +from kqueue import * +from zope.interface import implements + # Twisted imports from twisted.python import log, failure +from twisted.internet.interfaces import IReactorFDSet + # Sibling imports import main import posixbase - +try: + set() +except NameError: + from set import Set as set + # globals -reads = {} -writes = {} +reads = set() +writes = set() selectables = {} +reverse = {} kq = kqueue() class KQueueReactor(posixbase.PosixReactorBase): """A reactor that uses kqueue(2)/kevent(2).""" + implements(IReactorFDSet) - def _updateRegistration(self, *args): - kq.kevent([kevent(*args)], 0, 0) + def _updateRegistration(self, fd, filter, flags, OSError=OSError, kq=kq): + try: + kevent(kq, [Event(fd, filter, flags)], 0, 0) + except OSError, e: + if e[0] == errno.EBADF: + return + else: + raise def addReader(self, reader): """Add a FileDescriptor for notification of data available to read. """ fd = reader.fileno() - if not reads.has_key(fd): + reads.add(fd) + if not selectables.has_key(fd): selectables[fd] = reader - reads[fd] = 1 - self._updateRegistration(fd, EVFILT_READ, EV_ADD) + reverse[reader] = fd + self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_ENABLE) + self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_DISABLE) + else: + self._updateRegistration(fd, EVFILT_READ, EV_ENABLE) - def addWriter(self, writer, writes=writes, selectables=selectables): + def addWriter(self, writer): """Add a FileDescriptor for notification of data available to write. """ fd = writer.fileno() - if not writes.has_key(fd): + writes.add(fd) + if not selectables.has_key(fd): selectables[fd] = writer - writes[fd] = 1 - self._updateRegistration(fd, EVFILT_WRITE, EV_ADD) + reverse[writer] = fd + self._updateRegistration(fd, EVFILT_WRITE, EV_ADD|EV_ENABLE) + self._updateRegistration(fd, EVFILT_READ, EV_ADD|EV_DISABLE) + else: + self._updateRegistration(fd, EVFILT_WRITE, EV_ENABLE) def removeReader(self, reader): """Remove a Selectable for notification of data available to read. """ fd = reader.fileno() - if reads.has_key(fd): - del reads[fd] - if not writes.has_key(fd): del selectables[fd] - self._updateRegistration(fd, EVFILT_READ, EV_DELETE) + if fd == -1: + try: + fd = reverse[reader] + except KeyError: + return + if fd in reads: + reads.discard(fd) + if fd not in writes: + del selectables[fd] + del reverse[reader] + self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) + else: + self._updateRegistration(fd, EVFILT_READ, EV_DISABLE) - def removeWriter(self, writer, writes=writes): + def removeWriter(self, writer): """Remove a Selectable for notification of data available to write. """ fd = writer.fileno() - if writes.has_key(fd): - del writes[fd] - if not reads.has_key(fd): del selectables[fd] - self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) + if fd == -1: + try: + fd = reverse[writer] + except KeyError: + return + if fd in writes: + writes.discard(fd) + if fd not in reads: + del selectables[fd] + del reverse[writer] + self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) + else: + self._updateRegistration(fd, EVFILT_WRITE, EV_DISABLE) def removeAll(self): """Remove all selectables, and return a list of them.""" if self.waker is not None: self.removeReader(self.waker) result = selectables.values() - for fd in reads.keys(): + for fd in reads: self._updateRegistration(fd, EVFILT_READ, EV_DELETE) - for fd in writes.keys(): + for fd in writes: self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE) reads.clear() writes.clear() selectables.clear() + reverse.clear() if self.waker is not None: self.addReader(self.waker) return result def doKEvent(self, timeout, - reads=reads, - writes=writes, selectables=selectables, kq=kq, log=log, - OSError=OSError, - EVFILT_READ=EVFILT_READ, - EVFILT_WRITE=EVFILT_WRITE): + OSError=OSError): """Poll the kqueue for new events.""" if timeout is None: - timeout = 1000 + timeout = 1000000 else: - timeout = int(timeout * 1000) # convert seconds to milliseconds + timeout = int(timeout * 1000000) # convert seconds to nanoseconds try: - l = kq.kevent([], len(selectables), timeout) + l = kevent(kq, [], len(selectables), timeout) except OSError, e: if e[0] == errno.EINTR: return else: raise _drdw = self._doWriteOrRead + for event in l: - why = None - fd, filter = event.ident, event.filter - selectable = selectables[fd] - log.callWithLogger(selectable, _drdw, selectable, fd, filter) + fd = event.ident + try: + selectable = selectables[fd] + except KeyError: + continue + log.callWithLogger(selectable, _drdw, selectable, fd, event) - def _doWriteOrRead(self, selectable, fd, filter): - try: - if filter == EVFILT_READ: - why = selectable.doRead() - if filter == EVFILT_WRITE: - why = selectable.doWrite() - if not selectable.fileno() == fd: - why = main.CONNECTION_LOST - except: - why = sys.exc_info()[1] - log.deferr() + def _doWriteOrRead(self, selectable, fd, event, CONNECTION_LOST=main.CONNECTION_LOST, EV_EOF=EV_EOF, EVFILT_READ=EVFILT_READ, EVFILT_WRITE=EVFILT_WRITE): + why = None + inRead = False + filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags + if flags & EV_EOF and data and fflags: + why = CONNECTION_LOST + else: + try: + if filter == EVFILT_READ: + inRead = True + why = selectable.doRead() + if filter == EVFILT_WRITE: + inRead = False + why = selectable.doWrite() + if not selectable.fileno() == fd: + why = CONNECTION_LOST + inRead = False + except: + why = sys.exc_info()[1] + log.deferr() if why: - self.removeReader(selectable) - self.removeWriter(selectable) - selectable.connectionLost(failure.Failure(why)) + self._disconnectSelectable(selectable, why, inRead) doIteration = doKEvent @@ -195,3 +200,4 @@ __all__ = ["KQueueReactor", "install"] +