30 template <
class H,
class HandlerContext>
32 if (++handler->attachCount_ == 1) {
35 handler->ctx_ =
nullptr;
39 template <
class H,
class HandlerContext>
41 if (handler->attachCount_ >= 1) {
42 --handler->attachCount_;
44 handler->ctx_ =
nullptr;
57 virtual void read(In msg) = 0;
58 virtual void readEOF() = 0;
60 virtual void transportActive() = 0;
61 virtual void transportInactive() = 0;
74 template <
class H,
class Context>
80 return handler_.get();
84 std::weak_ptr<PipelineBase> pipeline,
86 pipelineWeak_ = pipeline;
87 pipelineRaw_ = pipeline.lock().get();
95 handler_->attachPipeline(impl_);
101 handler_->detachPipeline(impl_);
147 bool attached_{
false};
159 typedef typename H::rin
Rin;
161 typedef typename H::win
Win;
166 std::weak_ptr<PipelineBase> pipeline,
169 this->initialize(pipeline,
std::move(handler));
181 auto guard = this->pipelineWeak_.lock();
183 this->nextIn_->read(std::forward<Rout>(msg));
185 LOG(
WARNING) <<
"read reached end of pipeline";
190 auto guard = this->pipelineWeak_.lock();
192 this->nextIn_->readEOF();
194 LOG(
WARNING) <<
"readEOF reached end of pipeline";
199 auto guard = this->pipelineWeak_.lock();
201 this->nextIn_->readException(
std::move(e));
203 LOG(
WARNING) <<
"readException reached end of pipeline";
208 auto guard = this->pipelineWeak_.lock();
210 this->nextIn_->transportActive();
215 auto guard = this->pipelineWeak_.lock();
217 this->nextIn_->transportInactive();
222 auto guard = this->pipelineWeak_.lock();
223 if (this->nextOut_) {
224 return this->nextOut_->write(std::forward<Wout>(msg));
226 LOG(
WARNING) <<
"write reached end of pipeline";
233 auto guard = this->pipelineWeak_.lock();
234 if (this->nextOut_) {
235 return this->nextOut_->writeException(
std::move(e));
237 LOG(
WARNING) <<
"close reached end of pipeline";
243 auto guard = this->pipelineWeak_.lock();
244 if (this->nextOut_) {
245 return this->nextOut_->close();
247 LOG(
WARNING) <<
"close reached end of pipeline";
253 return this->pipelineRaw_;
257 return this->pipelineWeak_.lock();
261 this->pipelineRaw_->setWriteFlags(flags);
265 return this->pipelineRaw_->getWriteFlags();
271 this->pipelineRaw_->setReadBufferSettings(minAvailable, allocationSize);
275 return this->pipelineRaw_->getReadBufferSettings();
280 auto guard = this->pipelineWeak_.lock();
281 this->handler_->read(
this, std::forward<Rin>(msg));
285 auto guard = this->pipelineWeak_.lock();
286 this->handler_->readEOF(
this);
290 auto guard = this->pipelineWeak_.lock();
291 this->handler_->readException(
this,
std::move(e));
295 auto guard = this->pipelineWeak_.lock();
296 this->handler_->transportActive(
this);
300 auto guard = this->pipelineWeak_.lock();
301 this->handler_->transportInactive(
this);
306 auto guard = this->pipelineWeak_.lock();
307 return this->handler_->write(
this, std::forward<Win>(msg));
312 auto guard = this->pipelineWeak_.lock();
313 return this->handler_->writeException(
this,
std::move(e));
317 auto guard = this->pipelineWeak_.lock();
318 return this->handler_->close(
this);
328 typedef typename H::rin
Rin;
330 typedef typename H::win
Win;
335 std::weak_ptr<PipelineBase> pipeline,
338 this->initialize(pipeline,
std::move(handler));
350 auto guard = this->pipelineWeak_.lock();
352 this->nextIn_->read(std::forward<Rout>(msg));
354 LOG(
WARNING) <<
"read reached end of pipeline";
359 auto guard = this->pipelineWeak_.lock();
361 this->nextIn_->readEOF();
363 LOG(
WARNING) <<
"readEOF reached end of pipeline";
368 auto guard = this->pipelineWeak_.lock();
370 this->nextIn_->readException(
std::move(e));
372 LOG(
WARNING) <<
"readException reached end of pipeline";
377 auto guard = this->pipelineWeak_.lock();
379 this->nextIn_->transportActive();
384 auto guard = this->pipelineWeak_.lock();
386 this->nextIn_->transportInactive();
391 return this->pipelineRaw_;
395 return this->pipelineWeak_.lock();
400 auto guard = this->pipelineWeak_.lock();
401 this->handler_->read(
this, std::forward<Rin>(msg));
405 auto guard = this->pipelineWeak_.lock();
406 this->handler_->readEOF(
this);
410 auto guard = this->pipelineWeak_.lock();
411 this->handler_->readException(
this,
std::move(e));
415 auto guard = this->pipelineWeak_.lock();
416 this->handler_->transportActive(
this);
420 auto guard = this->pipelineWeak_.lock();
421 this->handler_->transportInactive(
this);
431 typedef typename H::rin
Rin;
433 typedef typename H::win
Win;
438 std::weak_ptr<PipelineBase> pipeline,
441 this->initialize(pipeline,
std::move(handler));
453 auto guard = this->pipelineWeak_.lock();
454 if (this->nextOut_) {
455 return this->nextOut_->write(std::forward<Wout>(msg));
457 LOG(
WARNING) <<
"write reached end of pipeline";
464 auto guard = this->pipelineWeak_.lock();
465 if (this->nextOut_) {
466 return this->nextOut_->writeException(
std::move(e));
468 LOG(
WARNING) <<
"close reached end of pipeline";
474 auto guard = this->pipelineWeak_.lock();
475 if (this->nextOut_) {
476 return this->nextOut_->close();
478 LOG(
WARNING) <<
"close reached end of pipeline";
484 return this->pipelineRaw_;
488 return this->pipelineWeak_.lock();
493 auto guard = this->pipelineWeak_.lock();
494 return this->handler_->write(
this, std::forward<Win>(msg));
499 auto guard = this->pipelineWeak_.lock();
500 return this->handler_->writeException(
this,
std::move(e));
504 auto guard = this->pipelineWeak_.lock();
505 return this->handler_->close(
this);
509 template <
class Handler>
511 typedef typename std::conditional<
514 typename std::conditional<
folly::Future< folly::Unit > writeException(folly::exception_wrapper e) override
std::shared_ptr< PipelineBase > getPipelineShared() override
void fireTransportActive() override
OutboundContextImpl(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
void transportInactive() override
PipelineBase * pipelineRaw_
void fireTransportInactive() override
void readException(folly::exception_wrapper e) override
void transportActive() override
void write(const T &in, folly::io::Appender &appender)
void attachPipeline() override
void setWriteFlags(folly::WriteFlags flags) override
std::string sformat(StringPiece fmt, Args &&...args)
folly::Future< folly::Unit > fireWrite(Wout msg) override
virtual void detachPipeline()=0
folly::Future< folly::Unit > fireClose() override
void read(Rin msg) override
virtual HandlerDir getDirection()=0
constexpr detail::Map< Move > move
folly::Future< folly::Unit > writeException(folly::exception_wrapper e) override
void fireReadException(folly::exception_wrapper e) override
void readException(folly::exception_wrapper e) override
void fireRead(Rout msg) override
void fireReadEOF() override
void setNextIn(PipelineContext *ctx) override
void transportActive() override
void fireRead(Rout msg) override
static const HandlerDir dir
ContextImpl(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
void handler(int, siginfo_t *, void *)
std::conditional< Handler::dir==HandlerDir::BOTH, ContextImpl< Handler >, typename std::conditional< Handler::dir==HandlerDir::IN, InboundContextImpl< Handler >, OutboundContextImpl< Handler > >::type >::type type
virtual ~PipelineContext()=default
void setNextOut(PipelineContext *ctx) override
void fireReadEOF() override
virtual void setNextIn(PipelineContext *ctx)=0
InboundContextImpl(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
std::pair< uint64_t, uint64_t > getReadBufferSettings() override
void fireReadException(folly::exception_wrapper e) override
virtual void setNextOut(PipelineContext *ctx)=0
size_t read(T &out, folly::io::Cursor &cursor)
folly::Future< folly::Unit > write(Win msg) override
PipelineBase * getPipeline() override
folly::Future< folly::Unit > fireWrite(Wout msg) override
GuardImpl guard(ErrorHandler &&handler)
virtual void attachPipeline()=0
folly::Future< folly::Unit > fireWriteException(folly::exception_wrapper e) override
folly::Future< folly::Unit > fireClose() override
std::shared_ptr< PipelineBase > getPipelineShared() override
HandlerDir getDirection() override
void read(Rin msg) override
folly::Future< folly::Unit > write(Win msg) override
void fireTransportActive() override
void transportInactive() override
std::shared_ptr< PipelineBase > getPipelineShared() override
PipelineBase * getPipeline() override
void detachPipeline() override
std::weak_ptr< PipelineBase > pipelineWeak_
void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) override
folly::Future< folly::Unit > close() override
void detachContext(H *handler, HandlerContext *)
PipelineBase * getPipeline() override
void fireTransportInactive() override
folly::Future< folly::Unit > close() override
int close(NetworkSocket s)
std::shared_ptr< H > handler_
void attachContext(H *handler, HandlerContext *ctx)
void initialize(std::weak_ptr< PipelineBase > pipeline, std::shared_ptr< H > handler)
Future< typename std::decay< T >::type > makeFuture(T &&t)
fbstring demangle(const char *name)
folly::Future< folly::Unit > fireWriteException(folly::exception_wrapper e) override
folly::WriteFlags getWriteFlags() override