proxygen
AsyncioExecutor.h
Go to the documentation of this file.
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <folly/ExceptionString.h>
19 #include <folly/Function.h>
23 
24 namespace folly {
25 namespace python {
26 
28  public:
29  using Func = folly::Func;
30 
31  ~AsyncioExecutor() override {
33  while (keepAliveCounter_ > 0) {
34  drive();
35  }
36  }
37 
38  void add(Func func) override {
39  queue_.putMessage(std::move(func));
40  }
41 
42  int fileno() const {
43  return consumer_.getFd();
44  }
45 
46  void drive() noexcept override {
47  consumer_.consumeUntilDrained([](Func&& func) {
48  try {
49  func();
50  } catch (...) {
51  LOG(ERROR) << "Exception thrown by NotificationQueueExecutor task."
52  << "Exception message: "
53  << folly::exceptionStr(std::current_exception());
54  }
55  });
56  }
57 
58  protected:
59  bool keepAliveAcquire() override {
60  auto keepAliveCounter =
61  keepAliveCounter_.fetch_add(1, std::memory_order_relaxed);
62  // We should never increment from 0
63  DCHECK(keepAliveCounter > 0);
64  return true;
65  }
66 
67  void keepAliveRelease() override {
68  auto keepAliveCounter = --keepAliveCounter_;
69  DCHECK(keepAliveCounter >= 0);
70  }
71 
72  private:
75  std::atomic<size_t> keepAliveCounter_{1};
76 }; // AsyncioExecutor
77 
78 } // namespace python
79 } // namespace folly
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
Function< void()> Func
Definition: Executor.h:27
folly::NotificationQueue< Func >::SimpleConsumer consumer_
std::atomic< size_t > keepAliveCounter_
void drive() noexceptoverride
void add(Func func) override
folly::NotificationQueue< Func > queue_