proxygen
ConnectionManager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <glog/logging.h>
21 
23 using std::chrono::milliseconds;
24 
25 namespace wangle {
26 
28  milliseconds timeout, Callback* callback)
29  : callback_(callback),
30  eventBase_(eventBase),
31  drainIterator_(conns_.end()),
32  idleIterator_(conns_.end()),
33  drainHelper_(*this),
34  timeout_(timeout),
35  idleConnEarlyDropThreshold_(timeout_ / 2) {
36 
37 }
38 
39 void
41  bool timeout) {
42  CHECK_NOTNULL(connection);
43  ConnectionManager* oldMgr = connection->getConnectionManager();
44  if (oldMgr != this) {
45  if (oldMgr) {
46  // 'connection' was being previously managed in a different thread.
47  // We must remove it from that manager before adding it to this one.
48  oldMgr->removeConnection(connection);
49  }
50 
51  // put the connection into busy part first. This should not matter at all
52  // because the last callback for an idle connection must be onDeactivated(),
53  // so the connection must be moved to idle part then.
54  conns_.push_front(*connection);
55 
56  connection->setConnectionManager(this);
57  if (callback_) {
58  callback_->onConnectionAdded(connection);
59  }
60  }
61  if (timeout) {
62  scheduleTimeout(connection, timeout_);
63  }
64 
65  if (drainHelper_.getShutdownState() >=
66  ShutdownState::NOTIFY_PENDING_SHUTDOWN &&
67  notifyPendingShutdown_) {
68  connection->fireNotifyPendingShutdown();
69  }
70 
71  if (drainHelper_.getShutdownState() >= ShutdownState::CLOSE_WHEN_IDLE) {
72  // closeWhenIdle can delete the connection (it was just created, so it's
73  // probably idle). Delay the closeWhenIdle call until the end of the loop
74  // where it will be safer to terminate the conn.
75  // Hold a DestructorGuard to the end of the loop for this
76  auto cmDg = new DestructorGuard(this);
77  auto connDg = new DestructorGuard(connection);
78  eventBase_->runInLoop([connection, this, cmDg, connDg] {
79  if (connection->listHook_.is_linked()) {
80  auto it = conns_.iterator_to(*connection);
81  DCHECK(it != conns_.end());
82  connection->fireCloseWhenIdle(!notifyPendingShutdown_);
83  }
84  delete connDg;
85  delete cmDg;
86  });
87  }
88 }
89 
90 void
92  std::chrono::milliseconds timeout) {
93  if (timeout > std::chrono::milliseconds(0)) {
94  eventBase_->timer().scheduleTimeout(connection, timeout);
95  }
96 }
97 
100  std::chrono::milliseconds timeout) {
101  eventBase_->timer().scheduleTimeout(callback, timeout);
102 }
103 
104 void
106  if (connection->getConnectionManager() == this) {
107  connection->cancelTimeout();
108  connection->setConnectionManager(nullptr);
109 
110  // Un-link the connection from our list, being careful to keep the iterator
111  // that we're using for idle shedding valid
112  auto it = conns_.iterator_to(*connection);
113  if (it == drainIterator_) {
114  ++drainIterator_;
115  }
116  if (it == idleIterator_) {
117  ++idleIterator_;
118  }
119  conns_.erase(it);
120 
121  if (callback_) {
122  callback_->onConnectionRemoved(connection);
123  if (getNumConnections() == 0) {
124  callback_->onEmpty(*this);
125  }
126  }
127  }
128 }
129 
130 void
132  std::chrono::milliseconds idleGrace) {
133  VLOG(3) << this << " initiateGracefulShutdown with nconns=" << conns_.size();
134  if (drainHelper_.getShutdownState() != ShutdownState::NONE) {
135  VLOG(3) << "Ignoring redundant call to initiateGracefulShutdown";
136  return;
137  }
138  drainHelper_.startDrainAll(idleGrace);
139 }
140 
142  std::chrono::milliseconds idleGrace) {
143  if (drainHelper_.getShutdownState() != ShutdownState::NONE) {
144  VLOG(3) << "Ignoring partial drain with full drain in progress";
145  return;
146  }
147  drainHelper_.startDrainPartial(pct, idleGrace);
148 }
149 
151  double pct, std::chrono::milliseconds idleGrace) {
152  all_ = false;
153  pct_ = pct;
154  startDrain(idleGrace);
155 }
156 
158  std::chrono::milliseconds idleGrace) {
159  all_ = true;
160  pct_ = 1.0;
161  if (isScheduled()) {
162  // if we are in the middle of a partial, abort and convert to all
163  cancelTimeout();
164  }
165  startDrain(idleGrace);
166 }
167 
169  std::chrono::milliseconds idleGrace) {
170  if (idleGrace.count() > 0) {
171  shutdownState_ = ShutdownState::NOTIFY_PENDING_SHUTDOWN;
172  scheduleTimeout(idleGrace);
173  VLOG(3) << "Scheduling idle grace period of " << idleGrace.count() << "ms";
174  } else {
175  manager_.notifyPendingShutdown_ = false;
176  shutdownState_ = ShutdownState::CLOSE_WHEN_IDLE;
177  VLOG(3) << "proceeding directly to closing idle connections";
178  }
179  manager_.drainIterator_ = drainStartIterator();
180  drainConnections();
181 }
182 
183 void
185  DestructorGuard g(&manager_);
186  size_t numCleared = 0;
187  size_t numKept = 0;
188 
189  auto it = manager_.drainIterator_;
190 
191  CHECK(shutdownState_ == ShutdownState::NOTIFY_PENDING_SHUTDOWN ||
192  shutdownState_ == ShutdownState::CLOSE_WHEN_IDLE);
193  while (it != manager_.conns_.end() && (numKept + numCleared) < 64) {
194  ManagedConnection& conn = *it++;
195  if (shutdownState_ == ShutdownState::NOTIFY_PENDING_SHUTDOWN) {
197  numKept++;
198  } else { // CLOSE_WHEN_IDLE
199  // Second time around: close idle sessions. If they aren't idle yet,
200  // have them close when they are idle
201  if (conn.isBusy()) {
202  numKept++;
203  } else {
204  numCleared++;
205  }
206  conn.fireCloseWhenIdle(!manager_.notifyPendingShutdown_);
207  }
208  }
209 
210  if (shutdownState_ == ShutdownState::CLOSE_WHEN_IDLE) {
211  VLOG(2) << "Idle connections cleared: " << numCleared <<
212  ", busy conns kept: " << numKept;
213  } else {
214  VLOG(3) << this << " notified n=" << numKept;
215  }
216  manager_.drainIterator_ = it;
217  if (it != manager_.conns_.end()) {
218  manager_.eventBase_->runInLoop(this);
219  } else {
220  if (shutdownState_ == ShutdownState::NOTIFY_PENDING_SHUTDOWN) {
221  VLOG(3) << this << " finished notify_pending_shutdown";
222  shutdownState_ = ShutdownState::NOTIFY_PENDING_SHUTDOWN_COMPLETE;
223  if (!isScheduled()) {
224  // The idle grace timer already fired, start over immediately
225  shutdownState_ = ShutdownState::CLOSE_WHEN_IDLE;
226  manager_.drainIterator_ = drainStartIterator();
227  manager_.eventBase_->runInLoop(this);
228  }
229  } else {
230  shutdownState_ = ShutdownState::CLOSE_WHEN_IDLE_COMPLETE;
231  }
232  }
233 }
234 
235 void
237  VLOG(2) << this << " idleGracefulTimeoutExpired";
238  if (shutdownState_ ==
239  ShutdownState::NOTIFY_PENDING_SHUTDOWN_COMPLETE) {
240  shutdownState_ = ShutdownState::CLOSE_WHEN_IDLE;
241  manager_.drainIterator_ = drainStartIterator();
242  drainConnections();
243  } else {
244  VLOG(4) << this << " idleGracefulTimeoutExpired during "
245  "NOTIFY_PENDING_SHUTDOWN, ignoring";
246  }
247 }
248 
250  drainHelper_.setShutdownState(ShutdownState::CLOSE_WHEN_IDLE_COMPLETE);
251  drainHelper_.cancelTimeout();
252 }
253 
254 void
256  DestructorGuard g(this);
257 
258  // Signal the drain helper in case that has not happened before.
259  stopDrainingForShutdown();
260 
261  // Iterate through our connection list, and drop each connection.
262  VLOG_IF(4, conns_.empty()) << "no connections to drop";
263  VLOG_IF(2, !conns_.empty()) << "connections to drop: " << conns_.size();
264 
265  unsigned i = 0;
266  while (!conns_.empty()) {
267  ManagedConnection& conn = conns_.front();
268  conns_.pop_front();
269  conn.cancelTimeout();
270  conn.setConnectionManager(nullptr);
271  // For debugging purposes, dump information about the first few
272  // connections.
273  static const unsigned MAX_CONNS_TO_DUMP = 2;
274  if (++i <= MAX_CONNS_TO_DUMP) {
275  conn.dumpConnectionState(3);
276  }
277  conn.dropConnection();
278  }
279  drainIterator_ = conns_.end();
280  idleIterator_ = conns_.end();
281  drainHelper_.cancelLoopCallback();
282 
283  if (callback_) {
284  callback_->onEmpty(*this);
285  }
286 }
287 
288 void
290  DestructorGuard g(this);
291 
292  // Signal the drain helper in case that has not happened before.
293  stopDrainingForShutdown();
294 
295  const size_t N = conns_.size();
296  const size_t numToDrop = std::max<size_t>(0, std::min<size_t>(N, N * pct));
297  for (size_t i = 0; i < numToDrop && !conns_.empty(); i++) {
298  ManagedConnection& conn = conns_.front();
299  removeConnection(&conn);
300  conn.dropConnection();
301  }
302 }
303 
304 void
306  auto it = conns_.iterator_to(conn);
307  if (it == idleIterator_) {
308  idleIterator_++;
309  }
310  conns_.erase(it);
311  conns_.push_front(conn);
312 }
313 
314 void
316  auto it = conns_.iterator_to(conn);
317  bool moveDrainIter = false;
318  if (it == drainIterator_) {
319  drainIterator_++;
320  moveDrainIter = true;
321  }
322  conns_.erase(it);
323  conns_.push_back(conn);
324  if (idleIterator_ == conns_.end()) {
325  idleIterator_--;
326  }
327  if (moveDrainIter && drainIterator_ == conns_.end()) {
328  drainIterator_--;
329  }
330 }
331 
332 size_t
334  VLOG(4) << "attempt to drop " << num << " idle connections";
335  if (idleConnEarlyDropThreshold_ >= timeout_) {
336  return 0;
337  }
338 
339  size_t count = 0;
340  while(count < num) {
341  auto it = idleIterator_;
342  if (it == conns_.end()) {
343  return count; // no more idle session
344  }
345  auto idleTime = it->getIdleTime();
346  if (idleTime == std::chrono::milliseconds(0) ||
347  idleTime <= idleConnEarlyDropThreshold_) {
348  VLOG(4) << "conn's idletime: " << idleTime.count()
349  << ", earlyDropThreshold: " << idleConnEarlyDropThreshold_.count()
350  << ", attempt to drop " << count << "/" << num;
351  return count; // idleTime cannot be further reduced
352  }
353  ManagedConnection& conn = *it;
354  idleIterator_++;
355  conn.dropConnection();
356  count++;
357  }
358 
359  return count;
360 }
361 
362 } // wangle
size_t dropIdleConnections(size_t num)
void addConnection(ManagedConnection *connection, bool timeout=false)
ConnectionManager(folly::EventBase *eventBase, std::chrono::milliseconds timeout, Callback *callback=nullptr)
folly::SafeIntrusiveListHook listHook_
void startDrain(std::chrono::milliseconds idleGrace)
void removeConnection(ManagedConnection *connection)
void onActivated(ManagedConnection &conn) override
virtual void dropConnection()=0
void initiateGracefulShutdown(std::chrono::milliseconds idleGrace)
void startDrainAll(std::chrono::milliseconds idleGrace)
void onDeactivated(ManagedConnection &conn) override
void setConnectionManager(ConnectionManager *mgr)
void scheduleTimeout(ManagedConnection *const connection, std::chrono::milliseconds timeout)
auto end(TestAdlIterable &instance)
Definition: ForeachTest.cpp:62
void fireCloseWhenIdle(bool force_to_close=false)
ConnectionManager * getConnectionManager()
void drainConnections(double pct, std::chrono::milliseconds idleGrace)
int * count
uint64_t getNumConnections() const override
virtual bool isBusy() const =0
g_t g(f_t)
folly::Function< void()> callback_
void startDrainPartial(double pct, std::chrono::milliseconds idleGrace)
virtual void dumpConnectionState(uint8_t loglevel)=0