/* aio.c -- asynchronous file i/o * * Copyright (C) 1996-2006 by Ian Piumarta and other authors/contributors * listed elsewhere in this file. * All rights reserved. * * This file is part of Unix Squeak. * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. */ /* Authors: Ian.Piumarta@squeakland.org, eliot.miranda@gmail.com * * Last edited: Tue Mar 29 13:06:00 PDT 2016 */ #include "pharovm/debug.h" #include "pharovm/semaphores/platformSemaphore.h" #include "sqMemoryFence.h" #include "sqaio.h" #include <sys/types.h> #include <sys/socket.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <signal.h> #include <sys/types.h> #include <sys/time.h> #include <sys/select.h> #include <sys/ioctl.h> #include <fcntl.h> #define _DO_FLAG_TYPE() do { _DO(AIO_R, rd) _DO(AIO_W, wr) _DO(AIO_X, ex) } while (0) static aioHandler rdHandler[FD_SETSIZE]; static aioHandler wrHandler[FD_SETSIZE]; static aioHandler exHandler[FD_SETSIZE]; static void *clientData[FD_SETSIZE]; static int maxFd; static fd_set fdMask; /* handled by aio */ static fd_set rdMask; /* handle read */ static fd_set wrMask; /* handle write */ static fd_set exMask; /* handle exception */ static fd_set xdMask; /* external descriptor */ /* * This is important, the AIO poll should only do a long pause if there is no pending signals for semaphores. * Check ExternalSemaphores to understand this function. */ int isPendingSemaphores(); void heartbeat_poll_enter(long microSeconds); void heartbeat_poll_exit(long microSeconds); static int aio_handle_events(long microSeconds); Semaphore* interruptFIFOMutex; int pendingInterruption; int aio_in_sleep = 0; int aio_request_interrupt = 0; volatile int isPooling = 0; static void undefinedHandler(int fd, void *clientData, int flags) { logError("Undefined handler called (fd %d, flags %x)\n", fd, flags); } /* initialise asynchronous i/o */ int signal_pipe_fd[2]; void aioInit(void) { int arg; interruptFIFOMutex = platform_semaphore_new(1); FD_ZERO(&fdMask); FD_ZERO(&rdMask); FD_ZERO(&wrMask); FD_ZERO(&exMask); FD_ZERO(&xdMask); maxFd = 0; if (pipe(signal_pipe_fd) != 0) { logErrorFromErrno("pipe"); exit(-1); } if ((arg = fcntl(signal_pipe_fd[0], F_GETFL, 0)) < 0) logErrorFromErrno("fcntl(F_GETFL)"); if (fcntl(signal_pipe_fd[0], F_SETFL, arg | O_NONBLOCK | O_ASYNC ) < 0) logErrorFromErrno("fcntl(F_SETFL, O_ASYNC)"); if ((arg = fcntl(signal_pipe_fd[1], F_GETFL, 0)) < 0) logErrorFromErrno("fcntl(F_GETFL)"); if (fcntl(signal_pipe_fd[1], F_SETFL, arg | O_NONBLOCK | O_ASYNC | O_APPEND) < 0) logErrorFromErrno("fcntl(F_SETFL, O_ASYNC)"); signal(SIGIO, forceInterruptCheck); } /* disable handlers and close all handled non-exteral descriptors */ void aioFini(void) { int fd; for (fd = 0; fd < maxFd; fd++) if (FD_ISSET(fd, &fdMask) && !(FD_ISSET(fd, &xdMask))) { aioDisable(fd); close(fd); FD_CLR(fd, &fdMask); FD_CLR(fd, &rdMask); FD_CLR(fd, &wrMask); FD_CLR(fd, &exMask); } while (maxFd && !FD_ISSET(maxFd - 1, &fdMask)) --maxFd; signal(SIGPIPE, SIG_DFL); } /* * answer whether i/o becomes possible within the given number of * microSeconds */ #ifndef max # define max(a, b) (((a) > (b)) ? (a) : (b)) #endif volatile int aio_requests = 0; volatile int aio_responses = 0; /* * I Try to clear all the data available in the pipe, so it does not passes the limit of data. * Do not call me outside the mutex area of interruptFIFOMutex. */ void aio_flush_pipe(int fd){ int bytesRead; char buf[1024]; interruptFIFOMutex->wait(interruptFIFOMutex); if(pendingInterruption){ pendingInterruption = false; } do { bytesRead = read(fd, &buf, 1024); if(bytesRead == -1){ if(errno == EAGAIN || errno == EWOULDBLOCK){ interruptFIFOMutex->signal(interruptFIFOMutex); return; } logErrorFromErrno("pipe - read"); interruptFIFOMutex->signal(interruptFIFOMutex); return; } } while(bytesRead > 0); interruptFIFOMutex->signal(interruptFIFOMutex); } long aioPoll(long microSeconds){ long timeout; interruptFIFOMutex->wait(interruptFIFOMutex); if(pendingInterruption || isPendingSemaphores()){ timeout = 0; }else{ timeout = microSeconds; } if(pendingInterruption){ pendingInterruption = false; } interruptFIFOMutex->signal(interruptFIFOMutex); return aio_handle_events(timeout); } static int aio_handle_events(long microSeconds){ int fd; fd_set rd, wr, ex; unsigned long long us; int maxFdToUse; long remainingMicroSeconds; /* * Copy the Masks as they are used to know which * FD wants which event */ rd = rdMask; wr = wrMask; ex = exMask; us = ioUTCMicroseconds(); remainingMicroSeconds = microSeconds; FD_SET(signal_pipe_fd[0], &rd); maxFdToUse = maxFd > (signal_pipe_fd[0] + 1) ? maxFd : signal_pipe_fd[0] + 1; sqLowLevelMFence(); isPooling = 1; heartbeat_poll_enter(microSeconds); for (;;) { struct timeval tv; int n; unsigned long long now; tv.tv_sec = remainingMicroSeconds / 1000000; tv.tv_usec = remainingMicroSeconds % 1000000; n = select(maxFdToUse, &rd, &wr, &ex, &tv); if (n > 0) break; if (n == 0) { if (remainingMicroSeconds) addIdleUsecs(remainingMicroSeconds); sqLowLevelMFence(); isPooling = 0; heartbeat_poll_exit(microSeconds); return 0; } if (errno && (EINTR != errno)) { logError("errno %d\n", errno); logErrorFromErrno("select"); sqLowLevelMFence(); isPooling = 0; heartbeat_poll_exit(microSeconds); return 0; } now = ioUTCMicroseconds(); remainingMicroSeconds -= max(now - us, 1); if (remainingMicroSeconds <= 0){ sqLowLevelMFence(); isPooling = 0; heartbeat_poll_exit(microSeconds); return 0; } us = now; } sqLowLevelMFence(); isPooling = 0; heartbeat_poll_exit(microSeconds); aio_flush_pipe(signal_pipe_fd[0]); // We clear signal_pipe_fd because when it arrives here we do not care anymore // about it, but it may cause a crash if it is set because we do not have // a handler for it. Another solution could be to just add a handler to signal_pipe_fd // but for now it does not seems needed. FD_CLR(signal_pipe_fd[0], &rd); for (fd = 0; fd < maxFd; ++fd) { aioHandler handler; //_DO_FLAG_TYPE(); //_DO(AIO_R, rd) if (FD_ISSET(fd, &rd)) { handler = rdHandler[fd]; FD_CLR(fd, &rdMask); handler(fd, clientData[fd], AIO_R); rdHandler[fd]= undefinedHandler; } //_DO(AIO_W, wr) if (FD_ISSET(fd, &wr)) { handler = wrHandler[fd]; FD_CLR(fd, &wrMask); handler(fd, clientData[fd], AIO_W); wrHandler[fd]= undefinedHandler; } //_DO(AIO_X, ex) if (FD_ISSET(fd, &ex)) { handler = exHandler[fd]; FD_CLR(fd, &exMask); handler(fd, clientData[fd], AIO_X); exHandler[fd]= undefinedHandler; } } return 1; } /* * This function is used to interrupt a aioPoll. * Used when signalling a Pharo semaphore to re-wake the VM and execute code of the image. */ void aioInterruptPoll(){ int n; sqLowLevelMFence(); if(isPooling){ n = write(signal_pipe_fd[1], "1", 1); if(n != 1){ logErrorFromErrno("write to pipe"); } fsync(signal_pipe_fd[1]); } interruptFIFOMutex->wait(interruptFIFOMutex); pendingInterruption = true; interruptFIFOMutex->signal(interruptFIFOMutex); } void aioEnable(int fd, void *data, int flags) { if (fd < 0) { logWarn("AioEnable(%d): IGNORED - Negative Number", fd); return; } if (FD_ISSET(fd, &fdMask)) { logWarn("AioEnable: descriptor %d already enabled", fd); return; } clientData[fd] = data; rdHandler[fd] = wrHandler[fd] = exHandler[fd] = undefinedHandler; FD_SET(fd, &fdMask); FD_CLR(fd, &rdMask); FD_CLR(fd, &wrMask); FD_CLR(fd, &exMask); if (fd >= maxFd) maxFd = fd + 1; if (flags & AIO_EXT) { FD_SET(fd, &xdMask); /* we should not set NBIO ourselves on external descriptors! */ } else { /* * enable non-blocking asynchronous i/o and delivery of SIGIO * to the active process */ int arg; FD_CLR(fd, &xdMask); #if defined(O_ASYNC) if (fcntl(fd, F_SETOWN, getpid()) < 0) logErrorFromErrno("fcntl(F_SETOWN, getpid())"); if ((arg = fcntl(fd, F_GETFL, 0)) < 0) logErrorFromErrno("fcntl(F_GETFL)"); if (fcntl(fd, F_SETFL, arg | O_NONBLOCK | O_ASYNC) < 0) logErrorFromErrno("fcntl(F_SETFL, O_ASYNC)"); #elif defined(FASYNC) if (fcntl(fd, F_SETOWN, getpid()) < 0) logErrorFromErrno("fcntl(F_SETOWN, getpid())"); if ((arg = fcntl(fd, F_GETFL, 0)) < 0) logErrorFromErrno("fcntl(F_GETFL)"); if (fcntl(fd, F_SETFL, arg | O_NONBLOCK | FASYNC) < 0) logErrorFromErrno("fcntl(F_SETFL, FASYNC)"); #elif defined(FIOASYNC) arg = getpid(); if (ioctl(fd, SIOCSPGRP, &arg) < 0) logErrorFromErrno("ioctl(SIOCSPGRP, getpid())"); arg = 1; if (ioctl(fd, FIOASYNC, &arg) < 0) logErrorFromErrno("ioctl(FIOASYNC, 1)"); #endif } } /* install/change the handler for a descriptor */ void aioHandle(int fd, aioHandler handlerFn, int mask) { if (fd < 0) { logWarn("aioHandle(%d): IGNORED - Negative FD", fd); return; } #undef _DO #define _DO(FLAG, TYPE) \ if (mask & FLAG) { \ FD_SET(fd, &TYPE##Mask); \ TYPE##Handler[fd]= handlerFn; \ } _DO_FLAG_TYPE(); } /* temporarily suspend asynchronous notification for a descriptor */ void aioSuspend(int fd, int mask) { if (fd < 0) { logWarn("aioSuspend(%d): IGNORED - Negative FD\n", fd); return; } #undef _DO #define _DO(FLAG, TYPE) \ if (mask & FLAG) { \ FD_CLR(fd, &TYPE##Mask); \ TYPE##Handler[fd]= undefinedHandler; \ } _DO_FLAG_TYPE(); } /* definitively disable asynchronous notification for a descriptor */ void aioDisable(int fd) { if (fd < 0) { logWarn( "aioDisable(%d): IGNORED - Negative FD\n", fd); return; } aioSuspend(fd, AIO_RWX); FD_CLR(fd, &xdMask); FD_CLR(fd, &fdMask); rdHandler[fd] = wrHandler[fd] = exHandler[fd] = 0; clientData[fd] = 0; /* keep maxFd accurate (drops to zero if no more sockets) */ while (maxFd && !FD_ISSET(maxFd - 1, &fdMask)) --maxFd; }