proxygen
AsyncSocketHandler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <wangle/channel/Handler.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25 
26 namespace wangle {
27 
28 // This handler may only be used in a single Pipeline
32  public:
34  std::shared_ptr<folly::AsyncTransportWrapper> socket)
35  : socket_(std::move(socket)) {}
36 
38 
39  ~AsyncSocketHandler() override {
41 
42  if (socket_) {
43  auto evb = socket_->getEventBase();
44  if (evb) {
45  evb->runImmediatelyOrRunInEventBaseThreadAndWait(
46  [s = std::move(socket_)]() mutable {
47  s.reset();
48  });
49  }
50  }
51  }
52 
54  socket_->setReadCB(socket_->good() ? this : nullptr);
55  }
56 
58  if (socket_ && socket_->getReadCallback() == this) {
59  socket_->setReadCB(nullptr);
60  }
61  auto ctx = getContext();
62  if (ctx && !firedInactive_) {
63  firedInactive_ = true;
64  ctx->fireTransportInactive();
65  }
66  }
67 
68  void attachEventBase(folly::EventBase* eventBase) {
69  if (eventBase && !socket_->getEventBase()) {
70  socket_->attachEventBase(eventBase);
71  }
72  }
73 
74  void detachEventBase() {
76  if (socket_->getEventBase()) {
77  socket_->detachEventBase();
78  }
79  }
80 
81  void transportActive(Context* ctx) override {
82  ctx->getPipeline()->setTransport(socket_);
84  firedInactive_ = false;
85  ctx->fireTransportActive();
86  }
87 
88  void transportInactive(Context* ctx) override {
89  // detachReadCallback invokes fireTransportInactive() if the transport
90  // is currently active.
92  ctx->getPipeline()->setTransport(nullptr);
93  }
94 
95  void detachPipeline(Context*) override {
97  }
98 
100  Context* ctx,
101  std::unique_ptr<folly::IOBuf> buf) override {
102  refreshTimeout();
103  if (UNLIKELY(!buf)) {
104  return folly::makeFuture();
105  }
106 
107  if (!socket_->good()) {
108  VLOG(5) << "socket is closed in write()";
109  return folly::makeFuture<folly::Unit>(folly::AsyncSocketException(
110  folly::AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
111  "socket is closed in write()"));
112  }
113 
114  auto cb = new WriteCallback();
115  auto future = cb->promise_.getFuture();
116  socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
117  return future;
118  }
119 
121  folly::exception_wrapper) override {
122  return shutdown(ctx, true);
123  }
124 
126  bool shutdownWriteOnly = isSet(ctx->getWriteFlags(),
128  if (shutdownWriteOnly) {
129  socket_->shutdownWrite();
130  return folly::makeFuture();
131  } else {
132  return shutdown(ctx, false);
133  }
134  }
135 
136  // Must override to avoid warnings about hidden overloaded virtual due to
137  // AsyncSocket::ReadCallback::readEOF()
138  void readEOF(Context* ctx) override {
139  ctx->fireReadEOF();
140  }
141 
142  void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
143  const auto readBufferSettings = getContext()->getReadBufferSettings();
144  const auto ret = bufQueue_.preallocate(
145  readBufferSettings.first,
146  readBufferSettings.second);
147  *bufReturn = ret.first;
148  *lenReturn = ret.second;
149  }
150 
151  void readDataAvailable(size_t len) noexcept override {
152  refreshTimeout();
153  bufQueue_.postallocate(len);
155  }
156 
157  void readEOF() noexcept override {
158  getContext()->fireReadEOF();
159  }
160 
162  noexcept override {
164  folly::make_exception_wrapper<folly::AsyncSocketException>(ex));
165  }
166 
167  private:
168  void refreshTimeout() {
169  auto manager = getContext()->getPipeline()->getPipelineManager();
170  if (manager) {
171  manager->refreshTimeout();
172  }
173  }
174 
175  folly::Future<folly::Unit> shutdown(Context* ctx, bool closeWithReset) {
176  if (socket_) {
178  if (closeWithReset) {
179  socket_->closeWithReset();
180  } else {
181  socket_->closeNow();
182  }
183  }
184  if (!pipelineDeleted_) {
185  pipelineDeleted_ = true;
186  ctx->getPipeline()->deletePipeline();
187  }
188  return folly::makeFuture();
189  }
190 
192  void writeSuccess() noexcept override {
193  promise_.setValue();
194  delete this;
195  }
196 
197  void writeErr(size_t /* bytesWritten */,
198  const folly::AsyncSocketException& ex)
199  noexcept override {
201  delete this;
202  }
203 
204  private:
205  friend class AsyncSocketHandler;
207  };
208 
210  std::shared_ptr<folly::AsyncTransportWrapper> socket_{nullptr};
211  bool firedInactive_{false};
212  bool pipelineDeleted_{false};
213 };
214 
215 } // namespace wangle
folly::Promise< folly::Unit > promise_
void readEOF(Context *ctx) override
folly::Future< folly::Unit > close(Context *ctx) override
AsyncSocketHandler(std::shared_ptr< folly::AsyncTransportWrapper > socket)
virtual void fireReadEOF()=0
void transportInactive(Context *ctx) override
PipelineManager * getPipelineManager()
Definition: Pipeline.h:51
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
STL namespace.
void setException(exception_wrapper ew)
Definition: Promise-inl.h:111
requires E e noexcept(noexcept(s.error(std::move(e))))
folly::Future< folly::Unit > shutdown(Context *ctx, bool closeWithReset)
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
HandlerContext< R, W > * getContext()
Definition: Handler.h:34
bool isSet(WriteFlags a, WriteFlags b)
void detachPipeline(Context *) override
void readErr(const folly::AsyncSocketException &ex) noexceptoverride
void getReadBuffer(void **bufReturn, size_t *lenReturn) override
static Options cacheChainLength()
Definition: IOBufQueue.h:83
void transportActive(Context *ctx) override
std::shared_ptr< folly::AsyncTransportWrapper > socket_
void writeErr(size_t, const folly::AsyncSocketException &ex) noexceptoverride
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
virtual void fireRead(In msg)=0
void readEOF() noexceptoverride
folly::Future< folly::Unit > write(Context *ctx, std::unique_ptr< folly::IOBuf > buf) override
virtual void refreshTimeout()
Definition: Pipeline.h:40
std::enable_if< std::is_same< Unit, B >::value, void >::type setValue()
Definition: Promise.h:326
void attachEventBase(folly::EventBase *eventBase)
static set< string > s
virtual void fireReadException(folly::exception_wrapper e)=0
Handler< R, R, W, W >::Context Context
Definition: Handler.h:161
virtual std::pair< uint64_t, uint64_t > getReadBufferSettings()=0
#define UNLIKELY(x)
Definition: Likely.h:48
void postallocate(std::size_t n)
Definition: IOBufQueue.h:380
void readDataAvailable(size_t len) noexceptoverride
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
virtual PipelineBase * getPipeline()=0
folly::Future< folly::Unit > writeException(Context *ctx, folly::exception_wrapper) override