19 #ifdef SPLICE_F_NONBLOCK 20 using namespace folly;
25 struct FileRegionReadPool {};
30 sysconf(_SC_NPROCESSORS_ONLN),
31 std::make_shared<NamedThreadFactory>(
"FileRegionReadPool"));
40 : WriteRequest(socket, callback),
41 readFd_(fd),
offset_(offset), count_(count) {
45 readBase_->runInEventBaseThread([
this]{
56 int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
57 ssize_t spliced = ::splice(pipe_out_,
nullptr,
61 if (errno == EAGAIN) {
67 bytesInPipe_ -= spliced;
68 bytesWritten(spliced);
72 void FileRegion::FileWriteRequest::consume() {
76 bool FileRegion::FileWriteRequest::isComplete() {
77 return totalBytesWritten_ == count_;
80 void FileRegion::FileWriteRequest::messageAvailable(
size_t&&
count)
noexcept {
81 bool shouldWrite = bytesInPipe_ == 0;
82 bytesInPipe_ +=
count;
89 # if (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9)) 90 # define GLIBC_AT_LEAST_2_9 1 96 readBase_ = readPool.try_get()->getEventBase();
97 readBase_->runInEventBaseThread([
this]{
98 auto flags = fcntl(readFd_, F_GETFL);
102 "fcntl F_GETFL failed", errno));
107 if (flags == O_WRONLY) {
113 #ifndef GLIBC_AT_LEAST_2_9 116 "writeFile unsupported on glibc < 2.9"));
120 if (::pipe2(pipeFds, O_NONBLOCK) == -1) {
123 "pipe2 failed", errno));
131 fcntl(pipeFds[0], F_SETPIPE_SZ, 1048576);
132 fcntl(pipeFds[1], F_SETPIPE_SZ, 1048576);
135 pipe_out_ = pipeFds[0];
137 socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
138 startConsuming(
socket_->getEventBase(), &queue_);
140 readHandler_ = std::make_unique<FileReadHandler>(
141 this, pipeFds[1], count_);
146 FileRegion::FileWriteRequest::~FileWriteRequest() {
147 CHECK(readBase_->isInEventBaseThread());
148 socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
150 if (pipe_out_ > -1) {
160 socket_->getEventBase()->runInEventBaseThread([=]{
165 FileRegion::FileWriteRequest::FileReadHandler::FileReadHandler(
166 FileWriteRequest* req,
int pipe_in,
size_t bytesToRead)
167 : req_(req), pipe_in_(pipe_in), bytesToRead_(bytesToRead) {
168 CHECK(req_->readBase_->isInEventBaseThread());
169 initHandler(req_->readBase_, pipe_in);
170 if (!registerHandler(EventFlags::WRITE | EventFlags::PERSIST)) {
173 "registerHandler failed"));
177 FileRegion::FileWriteRequest::FileReadHandler::~FileReadHandler() {
178 CHECK(req_->readBase_->isInEventBaseThread());
183 void FileRegion::FileWriteRequest::FileReadHandler::handlerReady(
186 if (bytesToRead_ == 0) {
191 int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
192 ssize_t spliced = ::splice(req_->readFd_, &req_->offset_,
194 bytesToRead_, flags);
196 if (errno == EAGAIN) {
201 "splice failed", errno));
207 bytesToRead_ -= spliced;
209 req_->queue_.putMessage(static_cast<size_t>(spliced));
213 "putMessage failed"));
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
folly::Future< folly::Unit > close(Context *ctx) override
AsyncServerSocket::UniquePtr socket_
NetworkSocket socket(int af, int type, int protocol)