proxygen
Pipeline-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <glog/logging.h>
20 
21 namespace wangle {
22 
23 template <class R, class W>
24 Pipeline<R, W>::Pipeline() : isStatic_(false) {}
25 
26 template <class R, class W>
27 Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
28  CHECK(isStatic_);
29 }
30 
31 template <class R, class W>
33  if (!isStatic_) {
35  }
36 }
37 
38 template <class H>
40  typedef typename ContextType<H>::type Context;
41  return addHelper(
42  std::make_shared<Context>(shared_from_this(), std::move(handler)),
43  false);
44 }
45 
46 template <class H>
48  return addBack(std::make_shared<H>(std::forward<H>(handler)));
49 }
50 
51 template <class H>
53  return addBack(std::shared_ptr<H>(handler, [](H*){}));
54 }
55 
56 template <class H>
58  typedef typename ContextType<H>::type Context;
59  return addHelper(
60  std::make_shared<Context>(shared_from_this(), std::move(handler)),
61  true);
62 }
63 
64 template <class H>
66  return addFront(std::make_shared<H>(std::forward<H>(handler)));
67 }
68 
69 template <class H>
71  return addFront(std::shared_ptr<H>(handler, [](H*){}));
72 }
73 
74 template <class H>
76  typedef typename ContextType<H>::type Context;
77  bool removed = false;
78 
79  auto it = ctxs_.begin();
80  while (it != ctxs_.end()) {
81  auto ctx = std::dynamic_pointer_cast<Context>(*it);
82  if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
83  it = removeAt(it);
84  removed = true;
85  } else {
86  it++;
87  }
88  }
89 
90  if (!removed) {
91  throw std::invalid_argument("No such handler in pipeline");
92  }
93 
94  return *this;
95 }
96 
97 template <class H>
99  return removeHelper<H>(nullptr, false);
100 }
101 
102 template <class H>
104  return removeHelper<H>(handler, true);
105 }
106 
107 template <class H>
109  return getContext<H>(i)->getHandler();
110 }
111 
112 template <class H>
114  auto ctx = getContext<H>();
115  return ctx ? ctx->getHandler() : nullptr;
116 }
117 
118 template <class H>
120  auto ctx = dynamic_cast<typename ContextType<H>::type*>(ctxs_[i].get());
121  CHECK(ctx);
122  return ctx;
123 }
124 
125 template <class H>
127  for (auto pipelineCtx : ctxs_) {
128  auto ctx = dynamic_cast<typename ContextType<H>::type*>(pipelineCtx.get());
129  if (ctx) {
130  return ctx;
131  }
132  }
133  return nullptr;
134 }
135 
136 template <class H>
138  typedef typename ContextType<H>::type Context;
139  for (auto& ctx : ctxs_) {
140  auto ctxImpl = dynamic_cast<Context*>(ctx.get());
141  if (ctxImpl && ctxImpl->getHandler() == handler) {
142  owner_ = ctx;
143  return true;
144  }
145  }
146  return false;
147 }
148 
149 template <class Context>
150 void PipelineBase::addContextFront(Context* ctx) {
151  addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
152 }
153 
154 template <class Context>
156  std::shared_ptr<Context>&& ctx,
157  bool front) {
158  ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
159  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
160  inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
161  }
162  if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
163  outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
164  }
165  return *this;
166 }
167 
168 namespace detail {
169 
170 template <class T>
171 inline void logWarningIfNotUnit(const std::string& warning) {
172  LOG(WARNING) << warning;
173 }
174 
175 template <>
176 inline void logWarningIfNotUnit<folly::Unit>(const std::string& /*warning*/) {
177  // do nothing
178 }
179 
180 } // detail
181 
182 template <class R, class W>
183 template <class T>
186  if (!front_) {
187  throw std::invalid_argument("read(): no inbound handler in Pipeline");
188  }
189  front_->read(std::forward<R>(msg));
190 }
191 
192 template <class R, class W>
193 template <class T>
196  if (!front_) {
197  throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
198  }
199  front_->readEOF();
200 }
201 
202 template <class R, class W>
203 template <class T>
206  if (front_) {
208  }
209 }
210 
211 template <class R, class W>
212 template <class T>
215  if (front_) {
217  }
218 }
219 
220 template <class R, class W>
221 template <class T>
224  if (!front_) {
225  throw std::invalid_argument(
226  "readException(): no inbound handler in Pipeline");
227  }
229 }
230 
231 template <class R, class W>
232 template <class T>
236  if (!back_) {
237  throw std::invalid_argument("write(): no outbound handler in Pipeline");
238  }
239  return back_->write(std::forward<W>(msg));
240 }
241 
242 template <class R, class W>
243 template <class T>
247  if (!back_) {
248  throw std::invalid_argument(
249  "writeException(): no outbound handler in Pipeline");
250  }
251  return back_->writeException(std::move(e));
252 }
253 
254 template <class R, class W>
255 template <class T>
259  if (!back_) {
260  throw std::invalid_argument("close(): no outbound handler in Pipeline");
261  }
262  return back_->close();
263 }
264 
265 // TODO Have read/write/etc check that pipeline has been finalized
266 template <class R, class W>
268  front_ = nullptr;
269  if (!inCtxs_.empty()) {
270  front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
271  for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
272  inCtxs_[i]->setNextIn(inCtxs_[i+1]);
273  }
274  inCtxs_.back()->setNextIn(nullptr);
275  }
276 
277  back_ = nullptr;
278  if (!outCtxs_.empty()) {
279  back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
280  for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
281  outCtxs_[i]->setNextOut(outCtxs_[i-1]);
282  }
283  outCtxs_.front()->setNextOut(nullptr);
284  }
285 
286  if (!front_) {
287  detail::logWarningIfNotUnit<R>(
288  "No inbound handler in Pipeline, inbound operations will throw "
289  "std::invalid_argument");
290  }
291  if (!back_) {
292  detail::logWarningIfNotUnit<W>(
293  "No outbound handler in Pipeline, outbound operations will throw "
294  "std::invalid_argument");
295  }
296 
297  for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
298  (*it)->attachPipeline();
299  }
300 }
301 
302 } // namespace wangle
InboundLink< R > * front_
Definition: Pipeline.h:224
PipelineBase & addHelper(std::shared_ptr< Context > &&ctx, bool front)
Definition: Pipeline-inl.h:155
std::vector< PipelineContext * > outCtxs_
Definition: Pipeline.h:137
std::shared_ptr< PipelineContext > owner_
Definition: Pipeline.h:158
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type close()
Definition: Pipeline-inl.h:258
std::enable_if<!std::is_same< T, folly::Unit >::value >::type transportInactive()
Definition: Pipeline-inl.h:214
PipelineBase & addFront(std::shared_ptr< H > handler)
Definition: Pipeline-inl.h:57
PskType type
void addContextFront(Context *ctx)
Definition: Pipeline-inl.h:150
std::vector< std::shared_ptr< PipelineContext > > ctxs_
Definition: Pipeline.h:135
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
ContextType< H >::type * getContext()
Definition: Pipeline-inl.h:126
std::enable_if<!std::is_same< T, folly::Unit >::value >::type readEOF()
Definition: Pipeline-inl.h:195
ContextIterator removeAt(const ContextIterator &it)
Definition: Pipeline.cpp:49
void handler(int, siginfo_t *, void *)
std::conditional< Handler::dir==HandlerDir::BOTH, ContextImpl< Handler >, typename std::conditional< Handler::dir==HandlerDir::IN, InboundContextImpl< Handler >, OutboundContextImpl< Handler > >::type >::type type
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type write(W msg)
Definition: Pipeline-inl.h:235
std::enable_if<!std::is_same< T, folly::Unit >::value >::type transportActive()
Definition: Pipeline-inl.h:205
std::enable_if<!std::is_same< T, folly::Unit >::value >::type read(R msg)
Definition: Pipeline-inl.h:185
PipelineBase & removeHelper(H *handler, bool checkEqual)
Definition: Pipeline-inl.h:75
PipelineBase & addBack(std::shared_ptr< H > handler)
Definition: Pipeline-inl.h:39
static const char *const value
Definition: Conv.cpp:50
void logWarningIfNotUnit(const std::string &warning)
Definition: Pipeline-inl.h:171
std::enable_if<!std::is_same< T, folly::Unit >::value, folly::Future< folly::Unit > >::type writeException(folly::exception_wrapper e)
Definition: Pipeline-inl.h:246
PipelineBase & remove()
Definition: Pipeline-inl.h:98
std::enable_if<!std::is_same< T, folly::Unit >::value >::type readException(folly::exception_wrapper e)
Definition: Pipeline-inl.h:223
std::vector< PipelineContext * > inCtxs_
Definition: Pipeline.h:136
const char * string
Definition: Conv.cpp:212
OutboundLink< W > * back_
Definition: Pipeline.h:225
bool setOwner(H *handler)
Definition: Pipeline-inl.h:137
void finalize() override
Definition: Pipeline-inl.h:267
~Pipeline() override
Definition: Pipeline-inl.h:32