proxygen
ObservingHandler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
22 
23 namespace wangle {
24 
30 template <typename T, typename R, typename P = DefaultPipeline>
31 class ObservingHandler : public HandlerAdapter<folly::IOBufQueue&, T>,
32  public Subscriber<T, R> {
33  public:
35 
36  ObservingHandler(const R& routingData, BroadcastPool<T, R, P>* broadcastPool);
37  ~ObservingHandler() override;
38 
39  // Non-copyable
40  ObservingHandler(const ObservingHandler&) = delete;
42 
43  // Movable
46 
47  // HandlerAdapter implementation
48  void transportActive(Context* ctx) override;
49  void readEOF(Context* ctx) override;
50  void readException(Context* ctx, folly::exception_wrapper ex) override;
51 
52  // Subscriber implementation
53  void onNext(const T& buf) override;
54  void onError(folly::exception_wrapper ex) override;
55  void onCompleted() override;
56  R& routingData() override;
57 
58  private:
61 
64  bool paused_{false};
65 
66  // True iff the handler has been deleted
67  std::shared_ptr<bool> deleted_{new bool(false)};
68 };
69 
70 template <typename T>
72 
73 template <typename T, typename R, typename P = DefaultPipeline>
75  : public RoutingDataPipelineFactory<ObservingPipeline<T>, R> {
76  public:
78  std::shared_ptr<ServerPool<R, P>> serverPool,
79  std::shared_ptr<BroadcastPipelineFactory<T, R>> broadcastPipelineFactory)
80  : serverPool_(serverPool),
81  broadcastPipelineFactory_(broadcastPipelineFactory) {}
82 
84  std::shared_ptr<folly::AsyncTransportWrapper> socket,
85  const R& routingData,
87  std::shared_ptr<TransportInfo> transportInfo) override {
88  auto pipeline = ObservingPipeline<T>::create();
89  pipeline->addBack(AsyncSocketHandler(socket));
90  auto handler = std::make_shared<ObservingHandler<T, R, P>>(
91  routingData, broadcastPool());
92  pipeline->addBack(handler);
93  pipeline->finalize();
94 
95  pipeline->setTransportInfo(transportInfo);
96 
97  return pipeline;
98  }
99 
101  std::shared_ptr<BaseClientBootstrapFactory<>> clientFactory = nullptr) {
102  if (!broadcastPool_) {
103  if (clientFactory) {
105  serverPool_, broadcastPipelineFactory_, clientFactory));
106  } else {
107  broadcastPool_.reset(
108  new BroadcastPool<T, R, P>(serverPool_, broadcastPipelineFactory_));
109  }
110  }
111  return broadcastPool_.get();
112  }
113 
114  protected:
115  std::shared_ptr<ServerPool<R, P>> serverPool_;
116  std::shared_ptr<BroadcastPipelineFactory<T, R>> broadcastPipelineFactory_;
118 };
119 
120 } // namespace wangle
121 
void readEOF(Context *ctx) override
#define T(v)
Definition: http_parser.c:233
std::shared_ptr< bool > deleted_
HandlerAdapter< folly::IOBufQueue &, T >::Context Context
BroadcastHandler< T, R > * broadcastHandler_
void onError(folly::exception_wrapper ex) override
ObservingHandler(const R &routingData, BroadcastPool< T, R, P > *broadcastPool)
std::shared_ptr< BroadcastPipelineFactory< T, R > > broadcastPipelineFactory_
ObservingHandler & operator=(const ObservingHandler &)=delete
ObservingPipeline< T >::Ptr newPipeline(std::shared_ptr< folly::AsyncTransportWrapper > socket, const R &routingData, RoutingDataHandler< R > *, std::shared_ptr< TransportInfo > transportInfo) override
void handler(int, siginfo_t *, void *)
std::shared_ptr< ServerPool< R, P > > serverPool_
BroadcastPool< T, R, P > * broadcastPool_
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
ObservingPipelineFactory(std::shared_ptr< ServerPool< R, P >> serverPool, std::shared_ptr< BroadcastPipelineFactory< T, R >> broadcastPipelineFactory)
void onNext(const T &buf) override
static Ptr create()
Definition: Pipeline.h:174
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
void transportActive(Context *ctx) override
void readException(Context *ctx, folly::exception_wrapper ex) override
virtual BroadcastPool< T, R, P > * broadcastPool(std::shared_ptr< BaseClientBootstrapFactory<>> clientFactory=nullptr)
folly::ThreadLocalPtr< BroadcastPool< T, R, P > > broadcastPool_