proxygen
RpcClient.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <gflags/gflags.h>
18 
19 #include <folly/init/Init.h>
20 #include <wangle/service/Service.h>
28 
30 
31 using namespace folly;
32 using namespace wangle;
33 
34 using thrift::test::Bonk;
35 using thrift::test::Xtruct;
36 
38 
39 DEFINE_int32(port, 8080, "test server port");
40 DEFINE_string(host, "::1", "test server address");
41 
42 class RpcPipelineFactory : public PipelineFactory<SerializePipeline> {
43  public:
45  std::shared_ptr<AsyncTransportWrapper> sock) override {
46  auto pipeline = SerializePipeline::create();
47  pipeline->addBack(AsyncSocketHandler(sock));
48  // ensure we can write from any thread
49  pipeline->addBack(EventBaseHandler());
50  pipeline->addBack(LengthFieldBasedFrameDecoder());
51  pipeline->addBack(LengthFieldPrepender());
52  pipeline->addBack(ClientSerializeHandler());
53  pipeline->finalize();
54 
55  return pipeline;
56  }
57 };
58 
59 // Client multiplex dispatcher. Uses Bonk.type as request ID
61  : public ClientDispatcherBase<SerializePipeline, Bonk, Xtruct> {
62  public:
63  void read(Context*, Xtruct in) override {
64  auto search = requests_.find(in.i32_thing);
65  CHECK(search != requests_.end());
66  auto p = std::move(search->second);
67  requests_.erase(in.i32_thing);
68  p.setValue(in);
69  }
70 
71  Future<Xtruct> operator()(Bonk arg) override {
72  auto& p = requests_[arg.type];
73  auto f = p.getFuture();
74  p.setInterruptHandler([arg, this](const folly::exception_wrapper&) {
75  this->requests_.erase(arg.type);
76  });
77  this->pipeline_->write(arg);
78 
79  return f;
80  }
81 
82  // Print some nice messages for close
83 
84  Future<Unit> close() override {
85  printf("Channel closed\n");
87  }
88 
89  Future<Unit> close(Context* ctx) override {
90  printf("Channel closed\n");
91  return ClientDispatcherBase::close(ctx);
92  }
93 
94  private:
95  std::unordered_map<int32_t, Promise<Xtruct>> requests_;
96 };
97 
98 int main(int argc, char** argv) {
99  folly::Init init(&argc, &argv);
100 
108  client.group(std::make_shared<folly::IOThreadPoolExecutor>(1));
109  client.pipelineFactory(std::make_shared<RpcPipelineFactory>());
110  auto pipeline = client.connect(SocketAddress(FLAGS_host, FLAGS_port)).get();
111  // A serial dispatcher would assert if we tried to send more than one
112  // request at a time
113  // SerialClientDispatcher<SerializePipeline, Bonk> service;
114  // Or we could use a pipelined dispatcher, but responses would always come
115  // back in order
116  // PipelinedClientDispatcher<SerializePipeline, Bonk> service;
117  auto dispatcher = std::make_shared<BonkMultiplexClientDispatcher>();
118  dispatcher->setPipeline(pipeline);
119 
120  // Set an idle timeout of 5s using a filter.
121  ExpiringFilter<Bonk, Xtruct> service(dispatcher, std::chrono::seconds(5));
122 
123  try {
124  while (true) {
125  std::cout << "Input string and int" << std::endl;
126 
127  Bonk request;
128  std::cin >> request.message;
129  std::cin >> request.type;
130  service(request).thenValue([request](Xtruct response) {
131  CHECK(request.type == response.i32_thing);
132  std::cout << response.string_thing << std::endl;
133  });
134  }
135  } catch (const std::exception& e) {
136  std::cout << exceptionStr(e) << std::endl;
137  }
138 
139  return 0;
140 }
Future< Xtruct > operator()(Bonk arg) override
Definition: RpcClient.cpp:71
Future< Unit > close() override
Definition: RpcClient.cpp:84
auto f
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
folly::Future< Pipeline * > connect(const folly::SocketAddress &address, std::chrono::milliseconds timeout=std::chrono::milliseconds(0)) override
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
DEFINE_string(host,"::1","test server address")
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
char ** argv
BaseClientBootstrap< Pipeline > * pipelineFactory(std::shared_ptr< PipelineFactory< Pipeline >> factory) noexcept
int main(int argc, char **argv)
Definition: RpcClient.cpp:98
Future< Unit > close(Context *ctx) override
Definition: RpcClient.cpp:89
DEFINE_int32(port, 8080,"test server port")
static Ptr create()
Definition: Pipeline.h:174
std::unordered_map< int32_t, Promise< Xtruct > > requests_
Definition: RpcClient.cpp:95
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
SerializePipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > sock) override
Definition: RpcClient.cpp:44
ClientBootstrap * group(std::shared_ptr< folly::IOThreadPoolExecutor > group)
int close(NetworkSocket s)
Definition: NetOps.cpp:90
void read(Context *, Xtruct in) override
Definition: RpcClient.cpp:63