proxygen
ShutdownSocketSet.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <chrono>
20 #include <thread>
21 
22 #include <glog/logging.h>
23 
24 #include <folly/FileUtil.h>
26 
27 namespace folly {
28 
30  : maxFd_(maxFd),
31  data_(static_cast<std::atomic<uint8_t>*>(
32  folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic<uint8_t>)))),
33  nullFile_("/dev/null", O_RDWR) {}
34 
35 void ShutdownSocketSet::add(int fd) {
36  // Silently ignore any fds >= maxFd_, very unlikely
37  DCHECK_GE(fd, 0);
38  if (fd >= maxFd_) {
39  return;
40  }
41 
42  auto& sref = data_[size_t(fd)];
43  uint8_t prevState = FREE;
44  CHECK(sref.compare_exchange_strong(
45  prevState, IN_USE, std::memory_order_relaxed))
46  << "Invalid prev state for fd " << fd << ": " << int(prevState);
47 }
48 
50  DCHECK_GE(fd, 0);
51  if (fd >= maxFd_) {
52  return;
53  }
54 
55  auto& sref = data_[size_t(fd)];
56  uint8_t prevState = 0;
57 
58  prevState = sref.load(std::memory_order_relaxed);
59  do {
60  switch (prevState) {
61  case IN_SHUTDOWN:
62  std::this_thread::sleep_for(std::chrono::milliseconds(1));
63  prevState = sref.load(std::memory_order_relaxed);
64  continue;
65  case FREE:
66  LOG(FATAL) << "Invalid prev state for fd " << fd << ": "
67  << int(prevState);
68  }
69  } while (
70  !sref.compare_exchange_weak(prevState, FREE, std::memory_order_relaxed));
71 }
72 
74  DCHECK_GE(fd, 0);
75  if (fd >= maxFd_) {
76  return folly::closeNoInt(fd);
77  }
78 
79  auto& sref = data_[size_t(fd)];
80  uint8_t prevState = sref.load(std::memory_order_relaxed);
81  uint8_t newState = 0;
82 
83  do {
84  switch (prevState) {
85  case IN_USE:
86  case SHUT_DOWN:
87  newState = FREE;
88  break;
89  case IN_SHUTDOWN:
90  newState = MUST_CLOSE;
91  break;
92  default:
93  LOG(FATAL) << "Invalid prev state for fd " << fd << ": "
94  << int(prevState);
95  }
96  } while (!sref.compare_exchange_weak(
97  prevState, newState, std::memory_order_relaxed));
98 
99  return newState == FREE ? folly::closeNoInt(fd) : 0;
100 }
101 
102 void ShutdownSocketSet::shutdown(int fd, bool abortive) {
103  DCHECK_GE(fd, 0);
104  if (fd >= maxFd_) {
105  doShutdown(fd, abortive);
106  return;
107  }
108 
109  auto& sref = data_[size_t(fd)];
110  uint8_t prevState = IN_USE;
111  if (!sref.compare_exchange_strong(
112  prevState, IN_SHUTDOWN, std::memory_order_relaxed)) {
113  return;
114  }
115 
116  doShutdown(fd, abortive);
117 
118  prevState = IN_SHUTDOWN;
119  if (sref.compare_exchange_strong(
120  prevState, SHUT_DOWN, std::memory_order_relaxed)) {
121  return;
122  }
123 
124  CHECK_EQ(prevState, MUST_CLOSE)
125  << "Invalid prev state for fd " << fd << ": " << int(prevState);
126 
127  folly::closeNoInt(fd); // ignore errors, nothing to do
128 
129  CHECK(
130  sref.compare_exchange_strong(prevState, FREE, std::memory_order_relaxed))
131  << "Invalid prev state for fd " << fd << ": " << int(prevState);
132 }
133 
134 void ShutdownSocketSet::shutdownAll(bool abortive) {
135  for (int i = 0; i < maxFd_; ++i) {
136  auto& sref = data_[size_t(i)];
137  if (sref.load(std::memory_order_relaxed) == IN_USE) {
138  shutdown(i, abortive);
139  }
140  }
141 }
142 
143 void ShutdownSocketSet::doShutdown(int fd, bool abortive) {
144  // shutdown() the socket first, to awaken any threads blocked on the fd
145  // (subsequent IO will fail because it's been shutdown); close()ing the
146  // socket does not wake up blockers, see
147  // http://stackoverflow.com/a/3624545/1736339
148  folly::shutdownNoInt(fd, SHUT_RDWR);
149 
150  // If abortive shutdown is desired, we'll set the SO_LINGER option on
151  // the socket with a timeout of 0; this will cause RST to be sent on
152  // close.
153  if (abortive) {
154  struct linger l = {1, 0};
155  if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) != 0) {
156  // Probably not a socket, ignore.
157  return;
158  }
159  }
160 
161  // We can't close() the socket, as that would be dangerous; a new file
162  // could be opened and get the same file descriptor, and then code assuming
163  // the old fd would do IO in the wrong place. We'll (atomically) dup2
164  // /dev/null onto the fd instead.
166 }
167 
168 } // namespace folly
int shutdownNoInt(NetworkSocket fd, int how)
Definition: FileUtil.cpp:98
int setsockopt(NetworkSocket s, int level, int optname, const void *optval, socklen_t optlen)
Definition: NetOps.cpp:384
int closeNoInt(int fd)
Definition: FileUtil.cpp:56
void shutdown(int fd, bool abortive=false)
STL namespace.
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void doShutdown(int fd, bool abortive)
int fd() const
Definition: File.h:85
folly::std enable_if::typetoAppendDelimStrImpl const Delimiter, const Tv, Tgtresult sizeof(Ts) >
void shutdownAll(bool abortive=false)
void * checkedCalloc(size_t n, size_t size)
Definition: Malloc.h:235
ShutdownSocketSet(int maxFd=1<< 18)
int dup2NoInt(int oldfd, int newfd)
Definition: FileUtil.cpp:72
std::unique_ptr< std::atomic< uint8_t >[], Free > data_
StringPiece data_