proxygen
SerialExecutor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <glog/logging.h>
20 
21 #include <folly/ExceptionString.h>
22 
23 namespace folly {
24 
25 SerialExecutor::SerialExecutor(KeepAlive<Executor> parent)
26  : parent_(std::move(parent)) {}
27 
29  DCHECK(!keepAliveCounter_);
30 }
31 
34  return makeKeepAlive<SerialExecutor>(new SerialExecutor(std::move(parent)));
35 }
36 
38  std::shared_ptr<Executor> parent) {
39  auto executor = new SerialExecutor(getKeepAliveToken(parent.get()));
40  return {executor, Deleter{std::move(parent)}};
41 }
42 
44  auto keepAliveCounter =
45  keepAliveCounter_.fetch_add(1, std::memory_order_relaxed);
46  DCHECK(keepAliveCounter > 0);
47  return true;
48 }
49 
51  auto keepAliveCounter =
52  keepAliveCounter_.fetch_sub(1, std::memory_order_acq_rel);
53  DCHECK(keepAliveCounter > 0);
54  if (keepAliveCounter == 1) {
55  delete this;
56  }
57 }
58 
60  queue_.enqueue(std::move(func));
61  parent_->add([keepAlive = getKeepAliveToken(this)] { keepAlive->run(); });
62 }
63 
65  queue_.enqueue(std::move(func));
66  parent_->addWithPriority(
67  [keepAlive = getKeepAliveToken(this)] { keepAlive->run(); }, priority);
68 }
69 
71  // We want scheduled_ to guard side-effects of completed tasks, so we can't
72  // use std::memory_order_relaxed here.
73  if (scheduled_.fetch_add(1, std::memory_order_acquire) > 0) {
74  return;
75  }
76 
77  do {
78  Func func;
79  queue_.dequeue(func);
80 
81  try {
82  func();
83  } catch (std::exception const& ex) {
84  LOG(ERROR) << "SerialExecutor: func threw unhandled exception "
85  << folly::exceptionStr(ex);
86  } catch (...) {
87  LOG(ERROR) << "SerialExecutor: func threw unhandled non-exception "
88  "object";
89  }
90 
91  // We want scheduled_ to guard side-effects of completed tasks, so we can't
92  // use std::memory_order_relaxed here.
93  } while (scheduled_.fetch_sub(1, std::memory_order_release) > 1);
94 }
95 
96 } // namespace folly
void keepAliveRelease() override
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
static KeepAlive< SerialExecutor > create(KeepAlive< Executor > parent=getKeepAliveToken(getCPUExecutor().get()))
PUSHMI_INLINE_VAR constexpr __adl::get_executor_fn executor
bool keepAliveAcquire() override
static KeepAlive< ExecutorT > getKeepAliveToken(ExecutorT *executor)
Definition: Executor.h:138
SerialExecutor(SerialExecutor const &)=delete
void addWithPriority(Func func, int8_t priority) override
Executor that guarantees serial non-concurrent execution of added tasks.
void add(Func func) override
static UniquePtr createUnique(std::shared_ptr< Executor > parent=getCPUExecutor())
std::unique_ptr< SerialExecutor, Deleter > UniquePtr
folly::Function< void()> parent
Definition: AtFork.cpp:34