proxygen
HandlerContext-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Format.h>
20 
21 namespace wangle {
22 
24  public:
25  virtual ~PipelineContext() = default;
26 
27  virtual void attachPipeline() = 0;
28  virtual void detachPipeline() = 0;
29 
30  template <class H, class HandlerContext>
32  if (++handler->attachCount_ == 1) {
33  handler->ctx_ = ctx;
34  } else {
35  handler->ctx_ = nullptr;
36  }
37  }
38 
39  template <class H, class HandlerContext>
40  void detachContext(H* handler, HandlerContext* /*ctx*/) {
41  if (handler->attachCount_ >= 1) {
42  --handler->attachCount_;
43  }
44  handler->ctx_ = nullptr;
45  }
46 
47  virtual void setNextIn(PipelineContext * ctx) = 0;
48  virtual void setNextOut(PipelineContext * ctx) = 0;
49 
50  virtual HandlerDir getDirection() = 0;
51 };
52 
53 template <class In>
54 class InboundLink {
55  public:
56  virtual ~InboundLink() = default;
57  virtual void read(In msg) = 0;
58  virtual void readEOF() = 0;
59  virtual void readException(folly::exception_wrapper e) = 0;
60  virtual void transportActive() = 0;
61  virtual void transportInactive() = 0;
62 };
63 
64 template <class Out>
65 class OutboundLink {
66  public:
67  virtual ~OutboundLink() = default;
68  virtual folly::Future<folly::Unit> write(Out msg) = 0;
69  virtual folly::Future<folly::Unit> writeException(
71  virtual folly::Future<folly::Unit> close() = 0;
72 };
73 
74 template <class H, class Context>
76  public:
77  ~ContextImplBase() override = default;
78 
79  H* getHandler() {
80  return handler_.get();
81  }
82 
83  void initialize(
84  std::weak_ptr<PipelineBase> pipeline,
85  std::shared_ptr<H> handler) {
86  pipelineWeak_ = pipeline;
87  pipelineRaw_ = pipeline.lock().get();
88  handler_ = std::move(handler);
89  }
90 
91  // PipelineContext overrides
92  void attachPipeline() override {
93  if (!attached_) {
94  this->attachContext(handler_.get(), impl_);
95  handler_->attachPipeline(impl_);
96  attached_ = true;
97  }
98  }
99 
100  void detachPipeline() override {
101  handler_->detachPipeline(impl_);
102  attached_ = false;
103  this->detachContext(handler_.get(), impl_);
104  }
105 
106  void setNextIn(PipelineContext* ctx) override {
107  if (!ctx) {
108  nextIn_ = nullptr;
109  return;
110  }
111  auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
112  if (nextIn) {
113  nextIn_ = nextIn;
114  } else {
115  throw std::invalid_argument(folly::sformat(
116  "inbound type mismatch after {}", folly::demangle(typeid(H))));
117  }
118  }
119 
120  void setNextOut(PipelineContext* ctx) override {
121  if (!ctx) {
122  nextOut_ = nullptr;
123  return;
124  }
125  auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
126  if (nextOut) {
127  nextOut_ = nextOut;
128  } else {
129  throw std::invalid_argument(folly::sformat(
130  "outbound type mismatch after {}", folly::demangle(typeid(H))));
131  }
132  }
133 
135  return H::dir;
136  }
137 
138  protected:
139  Context* impl_;
140  std::weak_ptr<PipelineBase> pipelineWeak_;
142  std::shared_ptr<H> handler_;
143  InboundLink<typename H::rout>* nextIn_{nullptr};
144  OutboundLink<typename H::wout>* nextOut_{nullptr};
145 
146  private:
147  bool attached_{false};
148 };
149 
150 template <class H>
152  : public HandlerContext<typename H::rout,
153  typename H::wout>,
154  public InboundLink<typename H::rin>,
155  public OutboundLink<typename H::win>,
156  public ContextImplBase<H, HandlerContext<typename H::rout,
157  typename H::wout>> {
158  public:
159  typedef typename H::rin Rin;
160  typedef typename H::rout Rout;
161  typedef typename H::win Win;
162  typedef typename H::wout Wout;
163  static const HandlerDir dir = HandlerDir::BOTH;
164 
165  explicit ContextImpl(
166  std::weak_ptr<PipelineBase> pipeline,
167  std::shared_ptr<H> handler) {
168  this->impl_ = this;
169  this->initialize(pipeline, std::move(handler));
170  }
171 
172  // For StaticPipeline
174  this->impl_ = this;
175  }
176 
177  ~ContextImpl() override = default;
178 
179  // HandlerContext overrides
180  void fireRead(Rout msg) override {
181  auto guard = this->pipelineWeak_.lock();
182  if (this->nextIn_) {
183  this->nextIn_->read(std::forward<Rout>(msg));
184  } else {
185  LOG(WARNING) << "read reached end of pipeline";
186  }
187  }
188 
189  void fireReadEOF() override {
190  auto guard = this->pipelineWeak_.lock();
191  if (this->nextIn_) {
192  this->nextIn_->readEOF();
193  } else {
194  LOG(WARNING) << "readEOF reached end of pipeline";
195  }
196  }
197 
199  auto guard = this->pipelineWeak_.lock();
200  if (this->nextIn_) {
201  this->nextIn_->readException(std::move(e));
202  } else {
203  LOG(WARNING) << "readException reached end of pipeline";
204  }
205  }
206 
207  void fireTransportActive() override {
208  auto guard = this->pipelineWeak_.lock();
209  if (this->nextIn_) {
210  this->nextIn_->transportActive();
211  }
212  }
213 
214  void fireTransportInactive() override {
215  auto guard = this->pipelineWeak_.lock();
216  if (this->nextIn_) {
217  this->nextIn_->transportInactive();
218  }
219  }
220 
222  auto guard = this->pipelineWeak_.lock();
223  if (this->nextOut_) {
224  return this->nextOut_->write(std::forward<Wout>(msg));
225  } else {
226  LOG(WARNING) << "write reached end of pipeline";
227  return folly::makeFuture();
228  }
229  }
230 
232  folly::exception_wrapper e) override {
233  auto guard = this->pipelineWeak_.lock();
234  if (this->nextOut_) {
235  return this->nextOut_->writeException(std::move(e));
236  } else {
237  LOG(WARNING) << "close reached end of pipeline";
238  return folly::makeFuture();
239  }
240  }
241 
243  auto guard = this->pipelineWeak_.lock();
244  if (this->nextOut_) {
245  return this->nextOut_->close();
246  } else {
247  LOG(WARNING) << "close reached end of pipeline";
248  return folly::makeFuture();
249  }
250  }
251 
252  PipelineBase* getPipeline() override {
253  return this->pipelineRaw_;
254  }
255 
256  std::shared_ptr<PipelineBase> getPipelineShared() override {
257  return this->pipelineWeak_.lock();
258  }
259 
261  this->pipelineRaw_->setWriteFlags(flags);
262  }
263 
265  return this->pipelineRaw_->getWriteFlags();
266  }
267 
269  uint64_t minAvailable,
270  uint64_t allocationSize) override {
271  this->pipelineRaw_->setReadBufferSettings(minAvailable, allocationSize);
272  }
273 
274  std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
275  return this->pipelineRaw_->getReadBufferSettings();
276  }
277 
278  // InboundLink overrides
279  void read(Rin msg) override {
280  auto guard = this->pipelineWeak_.lock();
281  this->handler_->read(this, std::forward<Rin>(msg));
282  }
283 
284  void readEOF() override {
285  auto guard = this->pipelineWeak_.lock();
286  this->handler_->readEOF(this);
287  }
288 
290  auto guard = this->pipelineWeak_.lock();
291  this->handler_->readException(this, std::move(e));
292  }
293 
294  void transportActive() override {
295  auto guard = this->pipelineWeak_.lock();
296  this->handler_->transportActive(this);
297  }
298 
299  void transportInactive() override {
300  auto guard = this->pipelineWeak_.lock();
301  this->handler_->transportInactive(this);
302  }
303 
304  // OutboundLink overrides
305  folly::Future<folly::Unit> write(Win msg) override {
306  auto guard = this->pipelineWeak_.lock();
307  return this->handler_->write(this, std::forward<Win>(msg));
308  }
309 
311  folly::exception_wrapper e) override {
312  auto guard = this->pipelineWeak_.lock();
313  return this->handler_->writeException(this, std::move(e));
314  }
315 
317  auto guard = this->pipelineWeak_.lock();
318  return this->handler_->close(this);
319  }
320 };
321 
322 template <class H>
324  : public InboundHandlerContext<typename H::rout>,
325  public InboundLink<typename H::rin>,
326  public ContextImplBase<H, InboundHandlerContext<typename H::rout>> {
327  public:
328  typedef typename H::rin Rin;
329  typedef typename H::rout Rout;
330  typedef typename H::win Win;
331  typedef typename H::wout Wout;
332  static const HandlerDir dir = HandlerDir::IN;
333 
335  std::weak_ptr<PipelineBase> pipeline,
336  std::shared_ptr<H> handler) {
337  this->impl_ = this;
338  this->initialize(pipeline, std::move(handler));
339  }
340 
341  // For StaticPipeline
343  this->impl_ = this;
344  }
345 
346  ~InboundContextImpl() override = default;
347 
348  // InboundHandlerContext overrides
349  void fireRead(Rout msg) override {
350  auto guard = this->pipelineWeak_.lock();
351  if (this->nextIn_) {
352  this->nextIn_->read(std::forward<Rout>(msg));
353  } else {
354  LOG(WARNING) << "read reached end of pipeline";
355  }
356  }
357 
358  void fireReadEOF() override {
359  auto guard = this->pipelineWeak_.lock();
360  if (this->nextIn_) {
361  this->nextIn_->readEOF();
362  } else {
363  LOG(WARNING) << "readEOF reached end of pipeline";
364  }
365  }
366 
368  auto guard = this->pipelineWeak_.lock();
369  if (this->nextIn_) {
370  this->nextIn_->readException(std::move(e));
371  } else {
372  LOG(WARNING) << "readException reached end of pipeline";
373  }
374  }
375 
376  void fireTransportActive() override {
377  auto guard = this->pipelineWeak_.lock();
378  if (this->nextIn_) {
379  this->nextIn_->transportActive();
380  }
381  }
382 
383  void fireTransportInactive() override {
384  auto guard = this->pipelineWeak_.lock();
385  if (this->nextIn_) {
386  this->nextIn_->transportInactive();
387  }
388  }
389 
390  PipelineBase* getPipeline() override {
391  return this->pipelineRaw_;
392  }
393 
394  std::shared_ptr<PipelineBase> getPipelineShared() override {
395  return this->pipelineWeak_.lock();
396  }
397 
398  // InboundLink overrides
399  void read(Rin msg) override {
400  auto guard = this->pipelineWeak_.lock();
401  this->handler_->read(this, std::forward<Rin>(msg));
402  }
403 
404  void readEOF() override {
405  auto guard = this->pipelineWeak_.lock();
406  this->handler_->readEOF(this);
407  }
408 
410  auto guard = this->pipelineWeak_.lock();
411  this->handler_->readException(this, std::move(e));
412  }
413 
414  void transportActive() override {
415  auto guard = this->pipelineWeak_.lock();
416  this->handler_->transportActive(this);
417  }
418 
419  void transportInactive() override {
420  auto guard = this->pipelineWeak_.lock();
421  this->handler_->transportInactive(this);
422  }
423 };
424 
425 template <class H>
427  : public OutboundHandlerContext<typename H::wout>,
428  public OutboundLink<typename H::win>,
429  public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> {
430  public:
431  typedef typename H::rin Rin;
432  typedef typename H::rout Rout;
433  typedef typename H::win Win;
434  typedef typename H::wout Wout;
435  static const HandlerDir dir = HandlerDir::OUT;
436 
438  std::weak_ptr<PipelineBase> pipeline,
439  std::shared_ptr<H> handler) {
440  this->impl_ = this;
441  this->initialize(pipeline, std::move(handler));
442  }
443 
444  // For StaticPipeline
446  this->impl_ = this;
447  }
448 
449  ~OutboundContextImpl() override = default;
450 
451  // OutboundHandlerContext overrides
453  auto guard = this->pipelineWeak_.lock();
454  if (this->nextOut_) {
455  return this->nextOut_->write(std::forward<Wout>(msg));
456  } else {
457  LOG(WARNING) << "write reached end of pipeline";
458  return folly::makeFuture();
459  }
460  }
461 
463  folly::exception_wrapper e) override {
464  auto guard = this->pipelineWeak_.lock();
465  if (this->nextOut_) {
466  return this->nextOut_->writeException(std::move(e));
467  } else {
468  LOG(WARNING) << "close reached end of pipeline";
469  return folly::makeFuture();
470  }
471  }
472 
474  auto guard = this->pipelineWeak_.lock();
475  if (this->nextOut_) {
476  return this->nextOut_->close();
477  } else {
478  LOG(WARNING) << "close reached end of pipeline";
479  return folly::makeFuture();
480  }
481  }
482 
483  PipelineBase* getPipeline() override {
484  return this->pipelineRaw_;
485  }
486 
487  std::shared_ptr<PipelineBase> getPipelineShared() override {
488  return this->pipelineWeak_.lock();
489  }
490 
491  // OutboundLink overrides
492  folly::Future<folly::Unit> write(Win msg) override {
493  auto guard = this->pipelineWeak_.lock();
494  return this->handler_->write(this, std::forward<Win>(msg));
495  }
496 
498  folly::exception_wrapper e) override {
499  auto guard = this->pipelineWeak_.lock();
500  return this->handler_->writeException(this, std::move(e));
501  }
502 
504  auto guard = this->pipelineWeak_.lock();
505  return this->handler_->close(this);
506  }
507 };
508 
509 template <class Handler>
510 struct ContextType {
511  typedef typename std::conditional<
514  typename std::conditional<
515  Handler::dir == HandlerDir::IN,
518  >::type>::type
520 };
521 
522 } // namespace wangle
folly::Future< folly::Unit > writeException(folly::exception_wrapper e) override
std::shared_ptr< PipelineBase > getPipelineShared() override
void fireTransportActive() override
OutboundContextImpl(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
void transportInactive() override
void fireTransportInactive() override
void readException(folly::exception_wrapper e) override
flags
Definition: http_parser.h:127
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
void attachPipeline() override
void setWriteFlags(folly::WriteFlags flags) override
std::string sformat(StringPiece fmt, Args &&...args)
Definition: Format.h:280
folly::Future< folly::Unit > fireWrite(Wout msg) override
virtual void detachPipeline()=0
folly::Future< folly::Unit > fireClose() override
void read(Rin msg) override
virtual HandlerDir getDirection()=0
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::Future< folly::Unit > writeException(folly::exception_wrapper e) override
void fireReadException(folly::exception_wrapper e) override
void readException(folly::exception_wrapper e) override
void fireRead(Rout msg) override
void setNextIn(PipelineContext *ctx) override
void transportActive() override
void fireRead(Rout msg) override
static const HandlerDir dir
Definition: Handler.h:51
ContextImpl(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
void handler(int, siginfo_t *, void *)
std::conditional< Handler::dir==HandlerDir::BOTH, ContextImpl< Handler >, typename std::conditional< Handler::dir==HandlerDir::IN, InboundContextImpl< Handler >, OutboundContextImpl< Handler > >::type >::type type
virtual ~PipelineContext()=default
void setNextOut(PipelineContext *ctx) override
void fireReadEOF() override
virtual void setNextIn(PipelineContext *ctx)=0
InboundContextImpl(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
std::pair< uint64_t, uint64_t > getReadBufferSettings() override
void fireReadException(folly::exception_wrapper e) override
virtual void setNextOut(PipelineContext *ctx)=0
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
folly::Future< folly::Unit > write(Win msg) override
PipelineBase * getPipeline() override
folly::Future< folly::Unit > fireWrite(Wout msg) override
GuardImpl guard(ErrorHandler &&handler)
Definition: Base.h:840
virtual void attachPipeline()=0
folly::Future< folly::Unit > fireWriteException(folly::exception_wrapper e) override
folly::Future< folly::Unit > fireClose() override
std::shared_ptr< PipelineBase > getPipelineShared() override
HandlerDir getDirection() override
void read(Rin msg) override
folly::Future< folly::Unit > write(Win msg) override
std::shared_ptr< PipelineBase > getPipelineShared() override
void readEOF() override
PipelineBase * getPipeline() override
void detachPipeline() override
std::weak_ptr< PipelineBase > pipelineWeak_
void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) override
folly::Future< folly::Unit > close() override
void detachContext(H *handler, HandlerContext *)
PipelineBase * getPipeline() override
void fireTransportInactive() override
folly::Future< folly::Unit > close() override
int close(NetworkSocket s)
Definition: NetOps.cpp:90
std::shared_ptr< H > handler_
void attachContext(H *handler, HandlerContext *ctx)
void initialize(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
fbstring demangle(const char *name)
Definition: Demangle.cpp:111
folly::Future< folly::Unit > fireWriteException(folly::exception_wrapper e) override
folly::WriteFlags getWriteFlags() override