proxygen
FileRegion.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #ifdef SPLICE_F_NONBLOCK
20 using namespace folly;
21 using namespace wangle;
22 
23 namespace {
24 
25 struct FileRegionReadPool {};
26 
28  []{
29  return new IOThreadPoolExecutor(
30  sysconf(_SC_NPROCESSORS_ONLN),
31  std::make_shared<NamedThreadFactory>("FileRegionReadPool"));
32  });
33 
34 }
35 
36 namespace wangle {
37 
38 FileRegion::FileWriteRequest::FileWriteRequest(AsyncSocket* socket,
39  WriteCallback* callback, int fd, loff_t offset, size_t count)
40  : WriteRequest(socket, callback),
41  readFd_(fd), offset_(offset), count_(count) {
42 }
43 
45  readBase_->runInEventBaseThread([this]{
46  delete this;
47  });
48 }
49 
50 AsyncSocket::WriteResult FileRegion::FileWriteRequest::performWrite() {
51  if (!started_) {
52  start();
53  return AsyncSocket::WriteResult(0);
54  }
55 
56  int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
57  ssize_t spliced = ::splice(pipe_out_, nullptr,
58  socket_->getFd(), nullptr,
59  bytesInPipe_, flags);
60  if (spliced == -1) {
61  if (errno == EAGAIN) {
62  return AsyncSocket::WriteResult(0);
63  }
64  return AsyncSocket::WriteResult(-1);
65  }
66 
67  bytesInPipe_ -= spliced;
68  bytesWritten(spliced);
69  return AsyncSocket::WriteResult(spliced);
70 }
71 
72 void FileRegion::FileWriteRequest::consume() {
73  // do nothing
74 }
75 
76 bool FileRegion::FileWriteRequest::isComplete() {
77  return totalBytesWritten_ == count_;
78 }
79 
80 void FileRegion::FileWriteRequest::messageAvailable(size_t&& count) noexcept {
81  bool shouldWrite = bytesInPipe_ == 0;
82  bytesInPipe_ += count;
83  if (shouldWrite) {
84  socket_->writeRequestReady();
85  }
86 }
87 
88 #ifdef __GLIBC__
89 # if (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9))
90 # define GLIBC_AT_LEAST_2_9 1
91 # endif
92 #endif
93 
95  started_ = true;
96  readBase_ = readPool.try_get()->getEventBase();
97  readBase_->runInEventBaseThread([this]{
98  auto flags = fcntl(readFd_, F_GETFL);
99  if (flags == -1) {
100  fail(__func__, AsyncSocketException(
102  "fcntl F_GETFL failed", errno));
103  return;
104  }
105 
106  flags &= O_ACCMODE;
107  if (flags == O_WRONLY) {
108  fail(__func__, AsyncSocketException(
109  AsyncSocketException::BAD_ARGS, "file not open for reading"));
110  return;
111  }
112 
113 #ifndef GLIBC_AT_LEAST_2_9
114  fail(__func__, AsyncSocketException(
116  "writeFile unsupported on glibc < 2.9"));
117  return;
118 #else
119  int pipeFds[2];
120  if (::pipe2(pipeFds, O_NONBLOCK) == -1) {
121  fail(__func__, AsyncSocketException(
123  "pipe2 failed", errno));
124  return;
125  }
126 
127 #ifdef F_SETPIPE_SZ
128  // Max size for unprevileged processes as set in /proc/sys/fs/pipe-max-size
129  // Ignore failures and just roll with it
130  // TODO maybe read max size from /proc?
131  fcntl(pipeFds[0], F_SETPIPE_SZ, 1048576);
132  fcntl(pipeFds[1], F_SETPIPE_SZ, 1048576);
133 #endif
134 
135  pipe_out_ = pipeFds[0];
136 
137  socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
138  startConsuming(socket_->getEventBase(), &queue_);
139  });
140  readHandler_ = std::make_unique<FileReadHandler>(
141  this, pipeFds[1], count_);
142 #endif
143  });
144 }
145 
146 FileRegion::FileWriteRequest::~FileWriteRequest() {
147  CHECK(readBase_->isInEventBaseThread());
148  socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
149  stopConsuming();
150  if (pipe_out_ > -1) {
151  ::close(pipe_out_);
152  }
153  });
154 
155 }
156 
158  const char* fn,
159  const AsyncSocketException& ex) {
160  socket_->getEventBase()->runInEventBaseThread([=]{
161  WriteRequest::fail(fn, ex);
162  });
163 }
164 
165 FileRegion::FileWriteRequest::FileReadHandler::FileReadHandler(
166  FileWriteRequest* req, int pipe_in, size_t bytesToRead)
167  : req_(req), pipe_in_(pipe_in), bytesToRead_(bytesToRead) {
168  CHECK(req_->readBase_->isInEventBaseThread());
169  initHandler(req_->readBase_, pipe_in);
170  if (!registerHandler(EventFlags::WRITE | EventFlags::PERSIST)) {
171  req_->fail(__func__, AsyncSocketException(
173  "registerHandler failed"));
174  }
175 }
176 
177 FileRegion::FileWriteRequest::FileReadHandler::~FileReadHandler() {
178  CHECK(req_->readBase_->isInEventBaseThread());
179  unregisterHandler();
180  ::close(pipe_in_);
181 }
182 
183 void FileRegion::FileWriteRequest::FileReadHandler::handlerReady(
184  uint16_t events) noexcept {
185  CHECK(events & EventHandler::WRITE);
186  if (bytesToRead_ == 0) {
187  unregisterHandler();
188  return;
189  }
190 
191  int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
192  ssize_t spliced = ::splice(req_->readFd_, &req_->offset_,
193  pipe_in_, nullptr,
194  bytesToRead_, flags);
195  if (spliced == -1) {
196  if (errno == EAGAIN) {
197  return;
198  } else {
199  req_->fail(__func__, AsyncSocketException(
201  "splice failed", errno));
202  return;
203  }
204  }
205 
206  if (spliced > 0) {
207  bytesToRead_ -= spliced;
208  try {
209  req_->queue_.putMessage(static_cast<size_t>(spliced));
210  } catch (...) {
211  req_->fail(__func__, AsyncSocketException(
213  "putMessage failed"));
214  return;
215  }
216  }
217 }
218 
219 } // wangle
220 #endif
flags
Definition: http_parser.h:127
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
folly::Future< folly::Unit > close(Context *ctx) override
static void destroy()
void fail()
AsyncServerSocket::UniquePtr socket_
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
auto start
int * count
off_t offset_