proxygen
Pipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <boost/variant.hpp>
20 #include <folly/ExceptionWrapper.h>
21 #include <folly/Memory.h>
22 #include <folly/futures/Future.h>
23 #include <folly/Unit.h>
24 #include <folly/io/IOBufQueue.h>
30 
31 namespace wangle {
32 
33 class PipelineBase;
34 class Acceptor;
35 
37  public:
38  virtual ~PipelineManager() = default;
39  virtual void deletePipeline(PipelineBase* pipeline) = 0;
40  virtual void refreshTimeout() {}
41 };
42 
43 class PipelineBase : public std::enable_shared_from_this<PipelineBase> {
44  public:
45  virtual ~PipelineBase() = default;
46 
48  manager_ = manager;
49  }
50 
52  return manager_;
53  }
54 
55  void deletePipeline() {
56  if (manager_) {
57  manager_->deletePipeline(this);
58  }
59  }
60 
61  void setTransport(std::shared_ptr<folly::AsyncTransport> transport) {
62  transport_ = transport;
63  }
64 
65  std::shared_ptr<folly::AsyncTransport> getTransport() {
66  return transport_;
67  }
68 
69  void setWriteFlags(folly::WriteFlags flags);
70  folly::WriteFlags getWriteFlags();
71 
72  void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
73  std::pair<uint64_t, uint64_t> getReadBufferSettings();
74 
75  void setTransportInfo(std::shared_ptr<TransportInfo> tInfo);
76  std::shared_ptr<TransportInfo> getTransportInfo();
77 
78  template <class H>
79  PipelineBase& addBack(std::shared_ptr<H> handler);
80 
81  template <class H>
82  PipelineBase& addBack(H&& handler);
83 
84  template <class H>
85  PipelineBase& addBack(H* handler);
86 
87  template <class H>
88  PipelineBase& addFront(std::shared_ptr<H> handler);
89 
90  template <class H>
91  PipelineBase& addFront(H&& handler);
92 
93  template <class H>
94  PipelineBase& addFront(H* handler);
95 
96  template <class H>
97  PipelineBase& remove(H* handler);
98 
99  template <class H>
100  PipelineBase& remove();
101 
102  PipelineBase& removeFront();
103 
104  PipelineBase& removeBack();
105 
106  template <class H>
107  H* getHandler(int i);
108 
109  template <class H>
110  H* getHandler();
111 
112  template <class H>
113  typename ContextType<H>::type* getContext(int i);
114 
115  template <class H>
116  typename ContextType<H>::type* getContext();
117 
118  // If one of the handlers owns the pipeline itself, use setOwner to ensure
119  // that the pipeline doesn't try to detach the handler during destruction,
120  // lest destruction ordering issues occur.
121  // See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
122  template <class H>
123  bool setOwner(H* handler);
124 
125  virtual void finalize() = 0;
126 
127  size_t numHandlers() const;
128 
129  protected:
130  template <class Context>
131  void addContextFront(Context* ctx);
132 
133  void detachHandlers();
134 
135  std::vector<std::shared_ptr<PipelineContext>> ctxs_;
136  std::vector<PipelineContext*> inCtxs_;
137  std::vector<PipelineContext*> outCtxs_;
138 
139  private:
140  PipelineManager* manager_{nullptr};
141  std::shared_ptr<folly::AsyncTransport> transport_;
142  std::shared_ptr<TransportInfo> transportInfo_;
143 
144  template <class Context>
145  PipelineBase& addHelper(std::shared_ptr<Context>&& ctx, bool front);
146 
147  template <class H>
148  PipelineBase& removeHelper(H* handler, bool checkEqual);
149 
150  typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
152 
153  ContextIterator removeAt(const ContextIterator& it);
154 
156  std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
157 
158  std::shared_ptr<PipelineContext> owner_;
159 };
160 
161 /*
162  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
163  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
164  *
165  * Use Unit for one of the types if your pipeline is unidirectional.
166  * If R is Unit, read(), readEOF(), and readException() will be disabled.
167  * If W is Unit, write() and close() will be disabled.
168  */
169 template <class R, class W = folly::Unit>
170 class Pipeline : public PipelineBase {
171  public:
172  using Ptr = std::shared_ptr<Pipeline>;
173 
174  static Ptr create() {
175  return std::shared_ptr<Pipeline>(new Pipeline());
176  }
177 
178  ~Pipeline() override;
179 
180  template <class T = R>
182  read(R msg);
183 
184  template <class T = R>
186  readEOF();
187 
188  template <class T = R>
190  readException(folly::exception_wrapper e);
191 
192  template <class T = R>
194  transportActive();
195 
196  template <class T = R>
198  transportInactive();
199 
200  template <class T = W>
203  write(W msg);
204 
205  template <class T = W>
208  writeException(folly::exception_wrapper e);
209 
210  template <class T = W>
213  close();
214 
215  void finalize() override;
216 
217  protected:
218  Pipeline();
219  explicit Pipeline(bool isStatic);
220 
221  private:
222  bool isStatic_{false};
223 
224  InboundLink<R>* front_{nullptr};
225  OutboundLink<W>* back_{nullptr};
226 };
227 
228 } // namespace wangle
229 
230 namespace folly {
231 
232 class AsyncSocket;
234 class AsyncUDPSocket;
235 
236 }
237 
238 namespace wangle {
239 
240 using DefaultPipeline =
242 
243 template <typename Pipeline>
245  public:
246  virtual typename Pipeline::Ptr newPipeline(
247  std::shared_ptr<folly::AsyncTransportWrapper>) = 0;
248 
249  virtual typename Pipeline::Ptr newPipeline(
250  std::shared_ptr<folly::AsyncUDPSocket> /* serverSocket */,
251  const folly::SocketAddress& /* clientAddr */) {
252  return nullptr;
253  }
254 
255  virtual ~PipelineFactory() = default;
256 };
257 
258 struct ConnInfo {
264 };
265 
266 enum class ConnEvent {
267  CONN_ADDED,
268  CONN_REMOVED,
269 };
270 
271 typedef boost::variant<folly::IOBuf*,
273  ConnInfo&,
274  ConnEvent,
275  std::tuple<folly::IOBuf*,
276  std::shared_ptr<folly::AsyncUDPSocket>,
279 
281  public:
282  virtual typename AcceptPipeline::Ptr newPipeline(Acceptor* acceptor) = 0;
283 
284  virtual ~AcceptPipelineFactory() = default;
285 };
286 
287 }
288 
std::vector< PipelineContext * > outCtxs_
Definition: Pipeline.h:137
std::shared_ptr< PipelineContext > owner_
Definition: Pipeline.h:158
flags
Definition: http_parser.h:127
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
ConnEvent
Definition: Pipeline.h:266
boost::variant< folly::IOBuf *, folly::AsyncTransportWrapper *, ConnInfo &, ConnEvent, std::tuple< folly::IOBuf *, std::shared_ptr< folly::AsyncUDPSocket >, folly::SocketAddress > > AcceptPipelineType
Definition: Pipeline.h:277
PskType type
std::vector< std::shared_ptr< PipelineContext > > ctxs_
Definition: Pipeline.h:135
const TransportInfo & tinfo
Definition: Pipeline.h:263
PipelineManager * getPipelineManager()
Definition: Pipeline.h:51
std::shared_ptr< TransportInfo > transportInfo_
Definition: Pipeline.h:142
SecureTransportType secureType
Definition: Pipeline.h:262
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void handler(int, siginfo_t *, void *)
std::conditional< Handler::dir==HandlerDir::BOTH, ContextImpl< Handler >, typename std::conditional< Handler::dir==HandlerDir::IN, InboundContextImpl< Handler >, OutboundContextImpl< Handler > >::type >::type type
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
virtual Pipeline::Ptr newPipeline(std::shared_ptr< folly::AsyncUDPSocket >, const folly::SocketAddress &)
Definition: Pipeline.h:249
std::shared_ptr< folly::AsyncTransport > getTransport()
Definition: Pipeline.h:65
static const char *const value
Definition: Conv.cpp:50
virtual void deletePipeline(PipelineBase *pipeline)=0
void deletePipeline()
Definition: Pipeline.h:55
virtual void refreshTimeout()
Definition: Pipeline.h:40
const std::string & nextProtoName
Definition: Pipeline.h:261
virtual ~PipelineManager()=default
std::vector< PipelineContext * > inCtxs_
Definition: Pipeline.h:136
std::vector< std::shared_ptr< PipelineContext > >::iterator ContextIterator
Definition: Pipeline.h:151
const folly::SocketAddress * clientAddr
Definition: Pipeline.h:260
const char * string
Definition: Conv.cpp:212
static Ptr create()
Definition: Pipeline.h:174
Pipeline< AcceptPipelineType > AcceptPipeline
Definition: Pipeline.h:278
AsyncFizzClient::UniquePtr transport_
folly::AsyncTransportWrapper * sock
Definition: Pipeline.h:259
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
std::shared_ptr< folly::AsyncTransport > transport_
Definition: Pipeline.h:141
void setPipelineManager(PipelineManager *manager)
Definition: Pipeline.h:47
int close(NetworkSocket s)
Definition: NetOps.cpp:90
void setTransport(std::shared_ptr< folly::AsyncTransport > transport)
Definition: Pipeline.h:61