proxygen
BroadcastProxy.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <folly/init/Init.h>
29 
30 using namespace folly;
31 using namespace wangle;
32 
33 DEFINE_int32(port, 8080, "Broadcast proxy port");
34 DEFINE_int32(upstream_port, 8081, "Upstream server port");
35 
69 class ByteToStringDecoder : public ByteToMessageDecoder<std::string> {
70  public:
71  bool decode(Context*,
72  IOBufQueue& buf,
73  std::string& result,
74  size_t&) override {
75  if (buf.chainLength() > 0) {
76  result = buf.move()->moveToFbString().toStdString();
77  return true;
78  }
79  return false;
80  }
81 };
82 
89 class StringToByteEncoder : public MessageToByteEncoder<std::string> {
90  public:
91  std::unique_ptr<folly::IOBuf> encode(std::string& msg) override {
92  return IOBuf::copyBuffer(msg);
93  }
94 };
95 
101 class ClientIPRoutingDataHandler : public RoutingDataHandler<std::string> {
102  public:
103  ClientIPRoutingDataHandler(uint64_t connId, Callback* cob)
104  : RoutingDataHandler<std::string>(connId, cob) {}
105 
107  RoutingData& routingData) override {
108  auto transportInfo = getContext()->getPipeline()->getTransportInfo();
109  const auto& clientIP = transportInfo->remoteAddr->getAddressStr();
110  LOG(INFO) << "Using client IP " << clientIP
111  << " as routing data to hash to a worker thread";
112 
113  routingData.routingData = clientIP;
114  routingData.bufQueue.append(bufQueue);
115  return true;
116  }
117 };
118 
120  : public RoutingDataHandlerFactory<std::string> {
121  public:
122  std::shared_ptr<RoutingDataHandler<std::string>> newHandler(
123  uint64_t connId,
125  return std::make_shared<ClientIPRoutingDataHandler>(connId, cob);
126  }
127 };
128 
133 class SimpleServerPool : public ServerPool<std::string> {
134  public:
137  const std::string& /* routingData */) noexcept override {
138  SocketAddress address;
139  address.setFromLocalPort(FLAGS_upstream_port);
140 
141  LOG(INFO) << "Connecting to upstream server " << address
142  << " for subscribing to broadcast";
143  return client->connect(address);
144  }
145 };
146 
152  : public BroadcastPipelineFactory<std::string, std::string> {
153  public:
155  std::shared_ptr<AsyncTransportWrapper> socket) override {
156  LOG(INFO) << "Creating a new BroadcastPipeline for upstream server";
157 
158  auto pipeline = DefaultPipeline::create();
159  pipeline->addBack(AsyncSocketHandler(socket));
160  pipeline->addBack(ByteToStringDecoder());
161  pipeline->addBack(BroadcastHandler<std::string, std::string>());
162  pipeline->finalize();
163  return pipeline;
164  }
165 
167  DefaultPipeline* pipeline) noexcept override {
168  return pipeline->getHandler<BroadcastHandler<std::string, std::string>>();
169  }
170 
172  DefaultPipeline* /* pipeline */,
173  const std::string& /* routingData */) noexcept override {}
174 };
175 
177 
185  : public ObservingPipelineFactory<std::string, std::string> {
186  public:
188  std::shared_ptr<SimpleServerPool> serverPool,
189  std::shared_ptr<SimpleBroadcastPipelineFactory> broadcastPipelineFactory)
191  serverPool, broadcastPipelineFactory) {}
192 
194  std::shared_ptr<AsyncTransportWrapper> socket,
195  const std::string& routingData,
197  std::shared_ptr<TransportInfo> transportInfo) override {
198  LOG(INFO) << "Creating a new ObservingPipeline for client "
199  << *(transportInfo->remoteAddr);
200 
201  auto pipeline = SimpleObservingPipeline::create();
202  pipeline->addBack(AsyncSocketHandler(socket));
203  pipeline->addBack(StringToByteEncoder());
204  pipeline->addBack(
206  routingData, broadcastPool()));
207  pipeline->finalize();
208  return pipeline;
209  }
210 };
211 
212 int main(int argc, char** argv) {
213  folly::Init init(&argc, &argv);
214 
215  auto serverPool = std::make_shared<SimpleServerPool>();
216 
217  // A unique BroadcastPipeline for each upstream server to fan-out the
218  // upstream messages to ObservingPipelines corresponding to each client.
219  auto broadcastPipelineFactory =
220  std::make_shared<SimpleBroadcastPipelineFactory>();
221 
222  // A unique ObservingPipeline is created for each client to subscribe
223  // to the broadcast.
224  auto observingPipelineFactory =
225  std::make_shared<SimpleObservingPipelineFactory>(
226  serverPool, broadcastPipelineFactory);
227 
228  // RoutingDataHandlerFactory for creating the RoutingDataHandler that sets
229  // client IP as the routing data.
230  auto routingHandlerFactory =
231  std::make_shared<ClientIPRoutingDataHandlerFactory>();
232 
234 
235  // AcceptRoutingPipelineFactory for creating accept pipelines hash the
236  // client connection to a worker thread based on client IP.
237  auto acceptPipelineFactory = std::make_shared<
239  &server, routingHandlerFactory, observingPipelineFactory);
240 
241  server.pipeline(acceptPipelineFactory);
242  server.bind(FLAGS_port);
243  server.waitForStop();
244 
245  return 0;
246 }
size_t chainLength() const
Definition: IOBufQueue.h:492
void setFromLocalPort(uint16_t port)
void setRoutingData(DefaultPipeline *, const std::string &) noexceptoverride
STL namespace.
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
SimpleObservingPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket, const std::string &routingData, RoutingDataHandler< std::string > *, std::shared_ptr< TransportInfo > transportInfo) override
int main(int argc, char **argv)
bool parseRoutingData(folly::IOBufQueue &bufQueue, RoutingData &routingData) override
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
char ** argv
std::shared_ptr< RoutingDataHandler< std::string > > newHandler(uint64_t connId, RoutingDataHandler< std::string >::Callback *cob) override
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket) override
virtual folly::Future< P * > connect(const folly::SocketAddress &address, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))=0
BroadcastHandler< std::string, std::string > * getBroadcastHandler(DefaultPipeline *pipeline) noexceptoverride
std::unique_ptr< folly::IOBuf > encode(std::string &msg) override
DEFINE_int32(port, 8080,"Broadcast proxy port")
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
ClientIPRoutingDataHandler(uint64_t connId, Callback *cob)
SimpleObservingPipelineFactory(std::shared_ptr< SimpleServerPool > serverPool, std::shared_ptr< SimpleBroadcastPipelineFactory > broadcastPipelineFactory)
const char * string
Definition: Conv.cpp:212
static Ptr create()
Definition: Pipeline.h:174
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
bool decode(Context *, IOBufQueue &buf, std::string &result, size_t &) override
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
Future< DefaultPipeline * > connect(BaseClientBootstrap< DefaultPipeline > *client, const std::string &) noexceptoverride