30 using namespace folly;
34 DEFINE_int32(upstream_port, 8081,
"Upstream server port");
76 result = buf.
move()->moveToFbString().toStdString();
107 RoutingData& routingData)
override {
108 auto transportInfo = getContext()->getPipeline()->getTransportInfo();
109 const auto& clientIP = transportInfo->remoteAddr->getAddressStr();
110 LOG(
INFO) <<
"Using client IP " << clientIP
111 <<
" as routing data to hash to a worker thread";
113 routingData.routingData = clientIP;
114 routingData.bufQueue.append(bufQueue);
125 return std::make_shared<ClientIPRoutingDataHandler>(connId, cob);
141 LOG(
INFO) <<
"Connecting to upstream server " << address
142 <<
" for subscribing to broadcast";
143 return client->
connect(address);
155 std::shared_ptr<AsyncTransportWrapper>
socket)
override {
156 LOG(
INFO) <<
"Creating a new BroadcastPipeline for upstream server";
158 auto pipeline = DefaultPipeline::create();
162 pipeline->finalize();
188 std::shared_ptr<SimpleServerPool> serverPool,
189 std::shared_ptr<SimpleBroadcastPipelineFactory> broadcastPipelineFactory)
191 serverPool, broadcastPipelineFactory) {}
194 std::shared_ptr<AsyncTransportWrapper>
socket,
197 std::shared_ptr<TransportInfo> transportInfo)
override {
198 LOG(
INFO) <<
"Creating a new ObservingPipeline for client " 199 << *(transportInfo->remoteAddr);
206 routingData, broadcastPool()));
207 pipeline->finalize();
215 auto serverPool = std::make_shared<SimpleServerPool>();
219 auto broadcastPipelineFactory =
220 std::make_shared<SimpleBroadcastPipelineFactory>();
224 auto observingPipelineFactory =
225 std::make_shared<SimpleObservingPipelineFactory>(
226 serverPool, broadcastPipelineFactory);
230 auto routingHandlerFactory =
231 std::make_shared<ClientIPRoutingDataHandlerFactory>();
237 auto acceptPipelineFactory = std::make_shared<
239 &server, routingHandlerFactory, observingPipelineFactory);
241 server.pipeline(acceptPipelineFactory);
242 server.bind(FLAGS_port);
243 server.waitForStop();
size_t chainLength() const
void setFromLocalPort(uint16_t port)
void setRoutingData(DefaultPipeline *, const std::string &) noexceptoverride
std::unique_ptr< folly::IOBuf > move()
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
SimpleObservingPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket, const std::string &routingData, RoutingDataHandler< std::string > *, std::shared_ptr< TransportInfo > transportInfo) override
int main(int argc, char **argv)
bool parseRoutingData(folly::IOBufQueue &bufQueue, RoutingData &routingData) override
void init(int *argc, char ***argv, bool removeFlags)
std::shared_ptr< RoutingDataHandler< std::string > > newHandler(uint64_t connId, RoutingDataHandler< std::string >::Callback *cob) override
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > socket) override
virtual folly::Future< P * > connect(const folly::SocketAddress &address, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))=0
BroadcastHandler< std::string, std::string > * getBroadcastHandler(DefaultPipeline *pipeline) noexceptoverride
std::unique_ptr< folly::IOBuf > encode(std::string &msg) override
DEFINE_int32(port, 8080,"Broadcast proxy port")
NetworkSocket socket(int af, int type, int protocol)
ClientIPRoutingDataHandler(uint64_t connId, Callback *cob)
SimpleObservingPipelineFactory(std::shared_ptr< SimpleServerPool > serverPool, std::shared_ptr< SimpleBroadcastPipelineFactory > broadcastPipelineFactory)
std::shared_ptr< Pipeline > Ptr
bool decode(Context *, IOBufQueue &buf, std::string &result, size_t &) override
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Future< DefaultPipeline * > connect(BaseClientBootstrap< DefaultPipeline > *client, const std::string &) noexceptoverride