proxygen
FiberManagerInternal-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <cassert>
19 
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/Try.h>
29 #include <folly/fibers/Baton.h>
30 #include <folly/fibers/Fiber.h>
32 #include <folly/fibers/Promise.h>
33 
34 namespace folly {
35 namespace fibers {
36 
37 namespace {
38 
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
44  opts.stackSize *= std::exchange(opts.stackSizeMultiplier, 1);
45  return opts;
46 }
47 
48 } // namespace
49 
51  if (isLoopScheduled_) {
52  return;
53  }
54 
55  isLoopScheduled_ = true;
56  loopController_->schedule();
57 }
58 
59 inline void FiberManager::activateFiber(Fiber* fiber) {
60  DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
61 
62 #ifdef FOLLY_SANITIZE_ADDRESS
63  DCHECK(!fiber->asanMainStackBase_);
64  DCHECK(!fiber->asanMainStackSize_);
65  auto stack = fiber->getStack();
66  void* asanFakeStack;
67  registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
68  SCOPE_EXIT {
69  registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
70  fiber->asanMainStackBase_ = nullptr;
71  fiber->asanMainStackSize_ = 0;
72  };
73 #endif
74 
75  activeFiber_ = fiber;
76  fiber->fiberImpl_.activate();
77 }
78 
79 inline void FiberManager::deactivateFiber(Fiber* fiber) {
80  DCHECK_EQ(activeFiber_, fiber);
81 
82 #ifdef FOLLY_SANITIZE_ADDRESS
83  DCHECK(fiber->asanMainStackBase_);
84  DCHECK(fiber->asanMainStackSize_);
85 
86  registerStartSwitchStackWithAsan(
87  &fiber->asanFakeStack_,
88  fiber->asanMainStackBase_,
89  fiber->asanMainStackSize_);
90  SCOPE_EXIT {
91  registerFinishSwitchStackWithAsan(
92  fiber->asanFakeStack_,
93  &fiber->asanMainStackBase_,
94  &fiber->asanMainStackSize_);
95  fiber->asanFakeStack_ = nullptr;
96  };
97 #endif
98 
99  activeFiber_ = nullptr;
100  fiber->fiberImpl_.deactivate();
101 }
102 
103 inline void FiberManager::runReadyFiber(Fiber* fiber) {
104  SCOPE_EXIT {
105  assert(currentFiber_ == nullptr);
106  assert(activeFiber_ == nullptr);
107  };
108 
109  assert(
110  fiber->state_ == Fiber::NOT_STARTED ||
111  fiber->state_ == Fiber::READY_TO_RUN);
112  currentFiber_ = fiber;
113  // Note: resetting the context is handled by the loop
115  if (observer_) {
116  observer_->starting(reinterpret_cast<uintptr_t>(fiber));
117  }
118 
119  while (fiber->state_ == Fiber::NOT_STARTED ||
120  fiber->state_ == Fiber::READY_TO_RUN) {
121  activateFiber(fiber);
122  if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
123  try {
124  immediateFunc_();
125  } catch (...) {
126  exceptionCallback_(std::current_exception(), "running immediateFunc_");
127  }
128  immediateFunc_ = nullptr;
129  fiber->state_ = Fiber::READY_TO_RUN;
130  }
131  }
132 
133  if (fiber->state_ == Fiber::AWAITING) {
134  awaitFunc_(*fiber);
135  awaitFunc_ = nullptr;
136  if (observer_) {
137  observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
138  }
139  currentFiber_ = nullptr;
141  } else if (fiber->state_ == Fiber::INVALID) {
142  assert(fibersActive_ > 0);
143  --fibersActive_;
144  // Making sure that task functor is deleted once task is complete.
145  // NOTE: we must do it on main context, as the fiber is not
146  // running at this point.
147  fiber->func_ = nullptr;
148  fiber->resultFunc_ = nullptr;
149  if (fiber->finallyFunc_) {
150  try {
151  fiber->finallyFunc_();
152  } catch (...) {
153  exceptionCallback_(std::current_exception(), "running finallyFunc_");
154  }
155  fiber->finallyFunc_ = nullptr;
156  }
157  // Make sure LocalData is not accessible from its destructor
158  if (observer_) {
159  observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
160  }
161  currentFiber_ = nullptr;
163  fiber->localData_.reset();
164  fiber->rcontext_.reset();
165 
168  fibersPool_.push_front(*fiber);
169  ++fibersPoolSize_;
170  } else {
171  delete fiber;
172  assert(fibersAllocated_ > 0);
174  }
175  } else if (fiber->state_ == Fiber::YIELDED) {
176  if (observer_) {
177  observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
178  }
179  currentFiber_ = nullptr;
181  fiber->state_ = Fiber::READY_TO_RUN;
182  yieldedFibers_.push_back(*fiber);
183  }
184 }
185 
187  return loopController_->runLoop();
188 }
189 
191 #ifndef _WIN32
194  }
195 #endif
196 
197  // Support nested FiberManagers
198  auto originalFiberManager = this;
199  std::swap(currentFiberManager_, originalFiberManager);
200 
201  // Save current context, and reset it after executing all fibers.
202  // This can avoid a lot of context swapping,
203  // if the Fibers share the same context
204  auto curCtx = RequestContext::saveContext();
205 
206  SCOPE_EXIT {
208  isLoopScheduled_ = false;
209  if (!readyFibers_.empty()) {
211  }
212  std::swap(currentFiberManager_, originalFiberManager);
213  CHECK_EQ(this, originalFiberManager);
214  };
215 
216  bool hadRemote = true;
217  while (hadRemote) {
218  while (!readyFibers_.empty()) {
219  auto& fiber = readyFibers_.front();
220  readyFibers_.pop_front();
221  runReadyFiber(&fiber);
222  }
223 
224  auto hadRemoteFiber = remoteReadyQueue_.sweepOnce(
225  [this](Fiber* fiber) { runReadyFiber(fiber); });
226 
227  if (hadRemoteFiber) {
228  ++remoteCount_;
229  }
230 
231  auto hadRemoteTask =
232  remoteTaskQueue_.sweepOnce([this](RemoteTask* taskPtr) {
233  std::unique_ptr<RemoteTask> task(taskPtr);
234  auto fiber = getFiber();
235  if (task->localData) {
236  fiber->localData_ = *task->localData;
237  }
238  fiber->rcontext_ = std::move(task->rcontext);
239 
240  fiber->setFunction(std::move(task->func));
241  if (observer_) {
242  observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
243  }
244  runReadyFiber(fiber);
245  });
246 
247  if (hadRemoteTask) {
248  ++remoteCount_;
249  }
250 
251  hadRemote = hadRemoteTask || hadRemoteFiber;
252  }
253 
254  if (observer_) {
255  for (auto& yielded : yieldedFibers_) {
256  observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
257  }
258  }
260 }
261 
263  --remoteCount_;
264  return !remoteReadyQueue_.empty() || !remoteTaskQueue_.empty();
265 }
266 
267 inline bool FiberManager::hasReadyTasks() const {
268  return !readyFibers_.empty() || !remoteReadyQueue_.empty() ||
269  !remoteTaskQueue_.empty();
270 }
271 
272 // We need this to be in a struct, not inlined in addTask, because clang crashes
273 // otherwise.
274 template <typename F>
276  class Func;
277 
278  static constexpr bool allocateInBuffer =
279  sizeof(Func) <= Fiber::kUserBufferSize;
280 
281  class Func {
282  public:
283  Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
284 
285  void operator()() {
286  try {
287  func_();
288  } catch (...) {
289  fm_.exceptionCallback_(
290  std::current_exception(), "running Func functor");
291  }
292  if (allocateInBuffer) {
293  this->~Func();
294  } else {
295  delete this;
296  }
297  }
298 
299  private:
300  F func_;
302  };
303 };
304 
305 template <typename F>
306 void FiberManager::addTask(F&& func) {
307  typedef AddTaskHelper<F> Helper;
308 
309  auto fiber = getFiber();
310  initLocalData(*fiber);
311 
312  if (Helper::allocateInBuffer) {
313  auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
314  new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
315 
316  fiber->setFunction(std::ref(*funcLoc));
317  } else {
318  auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
319 
320  fiber->setFunction(std::ref(*funcLoc));
321  }
322 
323  readyFibers_.push_back(*fiber);
324  if (observer_) {
325  observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
326  }
327 
329 }
330 
331 template <typename F>
333  auto task = [&]() {
334  auto currentFm = getFiberManagerUnsafe();
335  if (currentFm && currentFm->currentFiber_ &&
336  currentFm->localType_ == localType_) {
337  return std::make_unique<RemoteTask>(
338  std::forward<F>(func), currentFm->currentFiber_->localData_);
339  }
340  return std::make_unique<RemoteTask>(std::forward<F>(func));
341  }();
342  if (remoteTaskQueue_.insertHead(task.release())) {
343  loopController_->scheduleThreadSafe();
344  }
345 }
346 
347 template <typename X>
349  static const bool value = false;
350 };
351 template <typename T>
352 struct IsRvalueRefTry<folly::Try<T>&&> {
353  static const bool value = true;
354 };
355 
356 // We need this to be in a struct, not inlined in addTaskFinally, because clang
357 // crashes otherwise.
358 template <typename F, typename G>
360  class Func;
361 
362  typedef invoke_result_t<F> Result;
363 
364  class Finally {
365  public:
366  Finally(G finally, FiberManager& fm)
367  : finally_(std::move(finally)), fm_(fm) {}
368 
369  void operator()() {
370  try {
371  finally_(std::move(result_));
372  } catch (...) {
373  fm_.exceptionCallback_(
374  std::current_exception(), "running Finally functor");
375  }
376 
377  if (allocateInBuffer) {
378  this->~Finally();
379  } else {
380  delete this;
381  }
382  }
383 
384  private:
385  friend class Func;
386 
390  };
391 
392  class Func {
393  public:
394  Func(F func, Finally& finally)
395  : func_(std::move(func)), result_(finally.result_) {}
396 
397  void operator()() {
398  folly::tryEmplaceWith(result_, std::move(func_));
399 
400  if (allocateInBuffer) {
401  this->~Func();
402  } else {
403  delete this;
404  }
405  }
406 
407  private:
408  F func_;
410  };
411 
412  static constexpr bool allocateInBuffer =
413  sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
414 };
415 
416 template <typename F, typename G>
417 void FiberManager::addTaskFinally(F&& func, G&& finally) {
418  typedef invoke_result_t<F> Result;
419 
420  static_assert(
422  "finally(arg): arg must be Try<T>&&");
423  static_assert(
424  std::is_convertible<
425  Result,
426  typename std::remove_reference<
427  typename FirstArgOf<G>::type>::type::element_type>::value,
428  "finally(Try<T>&&): T must be convertible from func()'s return type");
429 
430  auto fiber = getFiber();
431  initLocalData(*fiber);
432 
433  typedef AddTaskFinallyHelper<
434  typename std::decay<F>::type,
435  typename std::decay<G>::type>
436  Helper;
437 
438  if (Helper::allocateInBuffer) {
439  auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
440  auto finallyLoc =
441  static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
442 
443  new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
444  new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
445 
446  fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
447  } else {
448  auto finallyLoc =
449  new typename Helper::Finally(std::forward<G>(finally), *this);
450  auto funcLoc =
451  new typename Helper::Func(std::forward<F>(func), *finallyLoc);
452 
453  fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
454  }
455 
456  readyFibers_.push_back(*fiber);
457  if (observer_) {
458  observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
459  }
460 
462 }
463 
464 template <typename F>
466  if (UNLIKELY(activeFiber_ == nullptr)) {
467  return func();
468  }
469 
470  typedef invoke_result_t<F> Result;
471 
472  folly::Try<Result> result;
473  auto f = [&func, &result]() mutable {
474  folly::tryEmplaceWith(result, std::forward<F>(func));
475  };
476 
477  immediateFunc_ = std::ref(f);
479 
480  return std::move(result).value();
481 }
482 
484  assert(currentFiberManager_ != nullptr);
485  return *currentFiberManager_;
486 }
487 
489  return currentFiberManager_;
490 }
491 
492 inline bool FiberManager::hasActiveFiber() const {
493  return activeFiber_ != nullptr;
494 }
495 
496 inline void FiberManager::yield() {
497  assert(currentFiberManager_ == this);
498  assert(activeFiber_ != nullptr);
499  assert(activeFiber_->state_ == Fiber::RUNNING);
501 }
502 
503 template <typename T>
505  if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
506  return currentFiber_->localData_.get<T>();
507  }
508  return localThread<T>();
509 }
510 
511 template <typename T>
513 #ifndef __APPLE__
514  static thread_local T t;
515  return t;
516 #else // osx doesn't support thread_local
517  static ThreadLocal<T> t;
518  return *t;
519 #endif
520 }
521 
522 inline void FiberManager::initLocalData(Fiber& fiber) {
523  auto fm = getFiberManagerUnsafe();
524  if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
525  fiber.localData_ = fm->currentFiber_->localData_;
526  }
528 }
529 
530 template <typename LocalT>
533  std::unique_ptr<LoopController> loopController__,
534  Options options)
535  : loopController_(std::move(loopController__)),
536  stackAllocator_(options.useGuardPages),
537  options_(preprocessOptions(std::move(options))),
538  exceptionCallback_([](std::exception_ptr eptr, std::string context) {
539  try {
540  std::rethrow_exception(eptr);
541  } catch (const std::exception& e) {
542  LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
543  << e.what() << "' was thrown in "
544  << "FiberManager with context '" << context << "'";
545  } catch (...) {
546  LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
547  << "context '" << context << "'";
548  }
549  }),
550  timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
551  fibersPoolResizer_(*this),
552  localType_(typeid(LocalT)) {
553  loopController_->setFiberManager(this);
554 }
555 
556 template <typename F>
557 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
558  typedef typename FirstArgOf<F>::type::value_type Result;
559  typedef typename FirstArgOf<F>::type::baton_type BatonT;
560 
561  return Promise<Result, BatonT>::await(std::forward<F>(func));
562 }
563 } // namespace fibers
564 } // namespace folly
std::pair< void *, size_t > getStack() const
Definition: Fiber.h:62
FiberImpl fiberImpl_
Definition: Fiber.h:112
auto f
static std::shared_ptr< RequestContext > setContext(std::shared_ptr< RequestContext > ctx)
Definition: Request.cpp:227
folly::Function< void(Fiber &)> awaitFunc_
static FOLLY_EXPORT T & localThread()
typename invoke_result< F, Args... >::type invoke_result_t
Definition: Invoke.h:142
PskType type
context
Definition: CMakeCache.txt:563
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
invoke_result_t< F > runInMainContext(F &&func)
static FOLLY_TLS FiberManager * currentFiberManager_
#define SCOPE_EXIT
Definition: ScopeGuard.h:274
void addTaskFinally(F &&func, G &&finally)
folly::std T
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void setFunction(F &&func)
Definition: Fiber-inl.h:24
static FiberManager & getFiberManager()
folly::Function< void()> resultFunc_
Definition: Fiber.h:128
T * tryEmplaceWith(Try< T > &t, Func &&func) noexcept
Definition: Try-inl.h:266
virtual void runnable(uintptr_t id) noexcept=0
std::unique_ptr< LoopController > loopController_
Single-threaded task execution engine.
const char * name
Definition: http_parser.c:437
static std::shared_ptr< RequestContext > saveContext()
Definition: Request.h:196
Function< void()> Func
Definition: Executor.h:27
Fiber object used by FiberManager to execute tasks.
Definition: Fiber.h:45
std::shared_ptr< RequestContext > rcontext_
Definition: Fiber.h:113
virtual void stopped(uintptr_t id) noexcept=0
void preempt(State state)
Definition: Fiber.cpp:172
static constexpr size_t kUserBufferSize
Definition: Fiber.h:123
folly::Function< void()> func_
Definition: Fiber.h:114
folly::Function< void()> finallyFunc_
Definition: Fiber.h:129
Definition: Try.h:51
virtual void starting(uintptr_t id) noexcept=0
std::shared_ptr< TimeoutController > timeoutManager_
folly::AtomicIntrusiveLinkedList< Fiber,&Fiber::nextRemoteReady_ > remoteReadyQueue_
T exchange(T &obj, U &&new_value)
Definition: Utility.h:120
const char * string
Definition: Conv.cpp:212
static FiberManager * getFiberManagerUnsafe()
uint64_t value(const typename LockFreeRingBuffer< T, Atom >::Cursor &rbcursor)
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition: F14TestUtil.h:414
folly::AtomicIntrusiveLinkedList< RemoteTask,&RemoteTask::nextRemoteTask > remoteTaskQueue_
static value_type await(F &&func)
Definition: Promise-inl.h:94
#define UNLIKELY(x)
Definition: Likely.h:48
FiberManager(const FiberManager &)=delete
folly::Function< void()> immediateFunc_
LocalData localData_
Definition: Fiber.h:173
std::shared_ptr< RequestContext > rcontext
std::unique_ptr< Fiber::LocalData > localData
FirstArgOf< F >::type::value_type await(F &&func)