proxygen
BroadcastHandler.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
19 #include <wangle/channel/Handler.h>
22 
23 namespace wangle {
24 
29 template <typename T, typename R>
30 class BroadcastHandler : public HandlerAdapter<T, std::unique_ptr<folly::IOBuf>> {
31  public:
33 
34  ~BroadcastHandler() override {
35  CHECK(subscribers_.empty());
36  }
37 
38  // BytesToBytesHandler implementation
39  void read(Context* ctx, T data) override;
40  void readEOF(Context* ctx) override;
41  void readException(Context* ctx, folly::exception_wrapper ex) override;
42 
47  virtual uint64_t subscribe(Subscriber<T, R>* subscriber);
48 
53  virtual void unsubscribe(uint64_t subscriptionId);
54 
59  virtual void closeIfIdle();
60 
65  virtual void onSubscribe(Subscriber<T, R>*) {}
66 
71  virtual void onUnsubscribe(Subscriber<T, R>*) {}
72 
77  virtual void onData(T& /* data */) {}
78 
84 
85  protected:
86  template <typename FUNC> // FUNC: Subscriber<T, R>* -> void
87  void forEachSubscriber(FUNC f) {
88  auto subscribers = subscribers_;
89  for (const auto& it : subscribers) {
90  f(it.second);
91  }
92  }
93 
94  private:
95  std::map<uint64_t, Subscriber<T, R>*> subscribers_;
97 
98  // For unit tests only.
100 };
101 
102 template <typename T, typename R>
104  : public PipelineFactory<DefaultPipeline> {
105  public:
106  DefaultPipeline::Ptr newPipeline(
107  std::shared_ptr<folly::AsyncTransportWrapper> socket) override = 0;
108 
109  virtual BroadcastHandler<T, R>* getBroadcastHandler(
110  DefaultPipeline* pipeline) noexcept = 0;
111 
112  virtual void setRoutingData(DefaultPipeline* pipeline,
113  const R& routingData) = 0;
114 };
115 
116 } // namespace wangle
117 
void readEOF(Context *ctx) override
#define T(v)
Definition: http_parser.c:233
HandlerAdapter< std::string, std::unique_ptr< folly::IOBuf > >::Context Context
auto f
virtual void onSubscribe(Subscriber< T, R > *)
void read(Context *ctx, T data) override
requires E e noexcept(noexcept(s.error(std::move(e))))
virtual void onUnsubscribe(Subscriber< T, R > *)
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
virtual void unsubscribe(uint64_t subscriptionId)
void readException(Context *ctx, folly::exception_wrapper ex) override
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
virtual void onData(T &)
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
std::map< uint64_t, Subscriber< T, R > * > subscribers_