proxygen
AsyncPipe.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <list>
20 #include <system_error>
21 
22 #include <folly/io/IOBufQueue.h>
26 
27 namespace folly {
28 
29 class AsyncSocketException;
30 
35  public AsyncReader,
36  public DelayedDestruction {
37  public:
38  typedef std::
39  unique_ptr<AsyncPipeReader, folly::DelayedDestruction::Destructor>
41 
42  template <typename... Args>
43  static UniquePtr newReader(Args&&... args) {
44  return UniquePtr(new AsyncPipeReader(std::forward<Args>(args)...));
45  }
46 
47  AsyncPipeReader(folly::EventBase* eventBase, int pipeFd)
48  : EventHandler(eventBase, pipeFd), fd_(pipeFd) {}
49 
54  void setReadCB(AsyncReader::ReadCallback* callback) override {
55  if (callback == readCallback_) {
56  return;
57  }
58  readCallback_ = callback;
61  } else if (!readCallback_ && isHandlerRegistered()) {
63  }
64  }
65 
70  return readCallback_;
71  }
72 
76  void setCloseCallback(std::function<void(int)> closeCb) {
77  closeCb_ = closeCb;
78  }
79 
80  private:
81  ~AsyncPipeReader() override;
82 
83  void handlerReady(uint16_t events) noexcept override;
84  void failRead(const AsyncSocketException& ex);
85  void close();
86 
87  int fd_;
89  std::function<void(int)> closeCb_;
90 };
91 
96  public AsyncWriter,
97  public DelayedDestruction {
98  public:
99  typedef std::
100  unique_ptr<AsyncPipeWriter, folly::DelayedDestruction::Destructor>
102 
103  template <typename... Args>
104  static UniquePtr newWriter(Args&&... args) {
105  return UniquePtr(new AsyncPipeWriter(std::forward<Args>(args)...));
106  }
107 
108  AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd)
109  : EventHandler(eventBase, pipeFd), fd_(pipeFd) {}
110 
115  void write(
116  std::unique_ptr<folly::IOBuf> iob,
117  AsyncWriter::WriteCallback* wcb = nullptr);
118 
122  void setCloseCallback(std::function<void(int)> closeCb) {
123  closeCb_ = closeCb;
124  }
125 
129  bool closed() const {
130  return (fd_ < 0 || closeOnEmpty_);
131  }
132 
136  void closeOnEmpty();
137 
141  void closeNow();
142 
147  bool hasPendingWrites() const {
148  return !queue_.empty();
149  }
150 
151  // AsyncWriter methods
152  void write(
154  const void* buf,
155  size_t bytes,
156  WriteFlags flags = WriteFlags::NONE) override {
157  writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
158  }
159  void writev(
161  const iovec*,
162  size_t,
163  WriteFlags = WriteFlags::NONE) override {
164  throw std::runtime_error("writev is not supported. Please use writeChain.");
165  }
166  void writeChain(
168  std::unique_ptr<folly::IOBuf>&& buf,
169  WriteFlags flags = WriteFlags::NONE) override;
170 
171  private:
172  void handlerReady(uint16_t events) noexcept override;
173  void handleWrite();
174  void failAllWrites(const AsyncSocketException& ex);
175 
176  int fd_;
177  std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
178  bool closeOnEmpty_{false};
179  std::function<void(int)> closeCb_;
180 
181  ~AsyncPipeWriter() override {
182  closeNow();
183  }
184 };
185 
186 } // namespace folly
flags
Definition: http_parser.h:127
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
std::unique_ptr< AsyncPipeReader, folly::DelayedDestruction::Destructor > UniquePtr
Definition: AsyncPipe.h:40
static std::unique_ptr< IOBuf > wrapBuffer(const void *buf, std::size_t capacity)
Definition: IOBuf.cpp:353
static UniquePtr newReader(Args &&...args)
Definition: AsyncPipe.h:43
~AsyncPipeReader() override
Definition: AsyncPipe.cpp:28
AsyncPipeReader(folly::EventBase *eventBase, int pipeFd)
Definition: AsyncPipe.h:47
internal::ArgsMatcher< InnerMatcher > Args(const InnerMatcher &matcher)
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
~AsyncPipeWriter() override
Definition: AsyncPipe.h:181
std::list< std::pair< folly::IOBufQueue, AsyncWriter::WriteCallback * > > queue_
Definition: AsyncPipe.h:177
std::unique_ptr< AsyncPipeWriter, folly::DelayedDestruction::Destructor > UniquePtr
Definition: AsyncPipe.h:101
AsyncReader::ReadCallback * readCallback_
Definition: AsyncPipe.h:88
void writev(folly::AsyncWriter::WriteCallback *, const iovec *, size_t, WriteFlags=WriteFlags::NONE) override
Definition: AsyncPipe.h:159
void handlerReady(uint16_t events) noexceptoverride
Definition: AsyncPipe.cpp:57
void write(folly::AsyncWriter::WriteCallback *callback, const void *buf, size_t bytes, WriteFlags flags=WriteFlags::NONE) override
Definition: AsyncPipe.h:152
AsyncReader::ReadCallback * getReadCallback() const override
Definition: AsyncPipe.h:69
void setCloseCallback(std::function< void(int)> closeCb)
Definition: AsyncPipe.h:122
void setReadCB(AsyncReader::ReadCallback *callback) override
Definition: AsyncPipe.h:54
AsyncPipeWriter(folly::EventBase *eventBase, int pipeFd)
Definition: AsyncPipe.h:108
void setCloseCallback(std::function< void(int)> closeCb)
Definition: AsyncPipe.h:76
std::function< void(int)> closeCb_
Definition: AsyncPipe.h:89
void failRead(const AsyncSocketException &ex)
Definition: AsyncPipe.cpp:32
bool closed() const
Definition: AsyncPipe.h:129
static UniquePtr newWriter(Args &&...args)
Definition: AsyncPipe.h:104
bool hasPendingWrites() const
Definition: AsyncPipe.h:147
bool registerHandler(uint16_t events)
Definition: EventHandler.h:100
bool isHandlerRegistered() const
Definition: EventHandler.h:112
std::function< void(int)> closeCb_
Definition: AsyncPipe.h:179