proxygen
accept_steering_server.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 
19 #include <folly/init/Init.h>
24 
25 using namespace folly;
26 using namespace wangle;
27 
28 DEFINE_int32(port, 23, "test server port");
29 
36  public:
37  NaiveRoutingDataHandler(uint64_t connId, Callback* cob)
38  : RoutingDataHandler<char>(connId, cob) {}
39 
41  RoutingData& routingData) override {
42  if (bufQueue.chainLength() == 0) {
43  return false;
44  }
45 
46  auto buf = bufQueue.move();
47  buf->coalesce();
48  // Use the first byte for hashing to a worker
49  routingData.routingData = buf->data()[0];
50  routingData.bufQueue.append(std::move(buf));
51  return true;
52  }
53 };
54 
56  public:
57  std::shared_ptr<RoutingDataHandler<char>> newHandler(
58  uint64_t connId,
59  RoutingDataHandler<char>::Callback* cob) override {
60  return std::make_shared<NaiveRoutingDataHandler>(connId, cob);
61  }
62 };
63 
65  public:
66  explicit ThreadPrintingHandler(const char& routingData)
67  : routingData_(routingData) {}
68 
69  void transportActive(Context* ctx) override {
70  std::stringstream out;
71  out << "You were hashed to thread " << std::this_thread::get_id()
72  << " based on '" << routingData_ << "'" << std::endl;
73  write(ctx, IOBuf::copyBuffer(out.str()));
74  close(ctx);
75  }
76 
77  private:
79 };
80 
82  : public RoutingDataPipelineFactory<DefaultPipeline, char> {
83  public:
85  std::shared_ptr<AsyncTransportWrapper> sock,
86  const char& routingData,
88  std::shared_ptr<TransportInfo> transportInfo) override {
89  auto pipeline = DefaultPipeline::create();
90  pipeline->addBack(AsyncSocketHandler(sock));
91  pipeline->addBack(ThreadPrintingHandler(routingData));
92  pipeline->finalize();
93 
94  pipeline->setTransportInfo(transportInfo);
95 
96  LOG(INFO) << "Created new server pipeline. Local address = "
97  << *(transportInfo->localAddr)
98  << ", remote address = " << *(transportInfo->remoteAddr);
99 
100  return pipeline;
101  }
102 };
103 
104 int main(int argc, char** argv) {
105  folly::Init init(&argc, &argv);
106 
107  auto routingHandlerFactory =
108  std::make_shared<NaiveRoutingDataHandlerFactory>();
109  auto childPipelineFactory = std::make_shared<ServerPipelineFactory>();
110 
112  server.pipeline(
114  &server, routingHandlerFactory, childPipelineFactory));
115  server.bind(FLAGS_port);
116  server.waitForStop();
117 
118  return 0;
119 }
std::shared_ptr< RoutingDataHandler< char > > newHandler(uint64_t connId, RoutingDataHandler< char >::Callback *cob) override
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > sock, const char &routingData, RoutingDataHandler< char > *, std::shared_ptr< TransportInfo > transportInfo) override
size_t chainLength() const
Definition: IOBufQueue.h:492
void write(const T &in, folly::io::Appender &appender)
Definition: Types-inl.h:112
bool parseRoutingData(folly::IOBufQueue &bufQueue, RoutingData &routingData) override
NaiveRoutingDataHandler(uint64_t connId, Callback *cob)
void bind(folly::AsyncServerSocket::UniquePtr s)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
DEFINE_int32(port, 23,"test server port")
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
char ** argv
int main(int argc, char **argv)
ServerBootstrap * pipeline(std::shared_ptr< AcceptPipelineFactory > factory)
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
Handler< R, R, W, W >::Context Context
Definition: Handler.h:161
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
int close(NetworkSocket s)
Definition: NetOps.cpp:90
ThreadPrintingHandler(const char &routingData)
void transportActive(Context *ctx) override