proxygen
Proxy.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <gflags/gflags.h>
18 
19 #include <folly/init/Init.h>
23 
24 using namespace folly;
25 using namespace wangle;
26 
27 DEFINE_int32(port, 1080, "proxy server port");
28 DEFINE_string(remote_host, "127.0.0.1", "remote host");
29 DEFINE_int32(remote_port, 23, "remote port");
30 
32  public:
33  explicit ProxyBackendHandler(DefaultPipeline* frontendPipeline) :
34  frontendPipeline_(frontendPipeline) {}
35 
36  void read(Context*, IOBufQueue& q) override {
37  frontendPipeline_->write(q.move());
38  }
39 
40  void readEOF(Context*) override {
41  LOG(INFO) << "Connection closed by remote host";
42  frontendPipeline_->close();
43  }
44 
46  LOG(ERROR) << "Remote error: " << exceptionStr(e);
47  frontendPipeline_->close();
48  }
49 
50  private:
52 };
53 
54 class ProxyBackendPipelineFactory : public PipelineFactory<DefaultPipeline> {
55  public:
56  explicit ProxyBackendPipelineFactory(DefaultPipeline* frontendPipeline) :
57  frontendPipeline_(frontendPipeline) {}
58 
60  std::shared_ptr<AsyncTransportWrapper> sock) override {
61  auto pipeline = DefaultPipeline::create();
62  pipeline->addBack(AsyncSocketHandler(sock));
63  pipeline->addBack(ProxyBackendHandler(frontendPipeline_));
64  pipeline->finalize();
65 
66  return pipeline;
67  }
68  private:
70 };
71 
73  public:
74  explicit ProxyFrontendHandler(SocketAddress remoteAddress) :
75  remoteAddress_(remoteAddress) {}
76 
77  void read(Context*, IOBufQueue& q) override {
78  buffer_.append(q);
79  if (!backendPipeline_) {
80  return;
81  }
82 
83  backendPipeline_->write(buffer_.move());
84  }
85 
86  void readEOF(Context* ctx) override {
87  LOG(INFO) << "Connection closed by local host";
88  if (!backendPipeline_) {
89  return;
90  }
91  backendPipeline_->close().thenValue([this, ctx](auto&&){
92  this->close(ctx);
93  });
94  }
95 
96  void readException(Context* ctx, exception_wrapper e) override {
97  LOG(ERROR) << "Local error: " << exceptionStr(e);
98  if (!backendPipeline_) {
99  return;
100  }
101  backendPipeline_->close().thenValue([this, ctx](auto&&){
102  this->close(ctx);
103  });
104  }
105 
106  void transportActive(Context* ctx) override {
107  if (backendPipeline_) {
108  // Already connected
109  return;
110  }
111 
112  // Pause reading from the socket until remote connection succeeds
113  auto frontendPipeline = dynamic_cast<DefaultPipeline*>(ctx->getPipeline());
114  frontendPipeline->transportInactive();
115 
116  client_.pipelineFactory(
117  std::make_shared<ProxyBackendPipelineFactory>(frontendPipeline));
118  client_.connect(remoteAddress_)
119  .thenValue([this, frontendPipeline](DefaultPipeline* pipeline){
120  backendPipeline_ = pipeline;
121  // Resume read
122  frontendPipeline->transportActive();
123  })
124  .onError([this, ctx](const std::exception& e){
125  LOG(ERROR) << "Connect error: " << exceptionStr(e);
126  this->close(ctx);
127  });
128  }
129 
130  private:
133  DefaultPipeline* backendPipeline_{nullptr};
135 };
136 
137 class ProxyFrontendPipelineFactory : public PipelineFactory<DefaultPipeline> {
138  public:
139  explicit ProxyFrontendPipelineFactory(SocketAddress remoteAddress) :
140  remoteAddress_(remoteAddress) {}
141 
143  std::shared_ptr<AsyncTransportWrapper> sock) override {
144  auto pipeline = DefaultPipeline::create();
145  pipeline->addBack(AsyncSocketHandler(sock));
146  pipeline->addBack(std::make_shared<ProxyFrontendHandler>(remoteAddress_));
147  pipeline->finalize();
148 
149  return pipeline;
150  }
151  private:
153 };
154 
155 int main(int argc, char** argv) {
156  folly::Init init(&argc, &argv);
157 
159  server.childPipeline(std::make_shared<ProxyFrontendPipelineFactory>(
160  SocketAddress(FLAGS_remote_host, FLAGS_remote_port)));
161  server.bind(FLAGS_port);
162  server.waitForStop();
163 
164  return 0;
165 }
std::enable_if<!std::is_same< T, folly::Unit >::value >::type transportInactive()
Definition: Pipeline-inl.h:214
ProxyBackendHandler(DefaultPipeline *frontendPipeline)
Definition: Proxy.cpp:33
void readException(Context *, exception_wrapper e) override
Definition: Proxy.cpp:45
void bind(folly::AsyncServerSocket::UniquePtr s)
void read(Context *, IOBufQueue &q) override
Definition: Proxy.cpp:36
fbstring exceptionStr(const std::exception &e)
int main(int argc, char **argv)
Definition: Proxy.cpp:155
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
ProxyFrontendHandler(SocketAddress remoteAddress)
Definition: Proxy.cpp:74
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
DefaultPipeline * frontendPipeline_
Definition: Proxy.cpp:51
ServerBootstrap * childPipeline(std::shared_ptr< PipelineFactory< Pipeline >> factory)
void init(int *argc, char ***argv, bool removeFlags)
Definition: Init.cpp:34
char ** argv
DefaultPipeline * frontendPipeline_
Definition: Proxy.cpp:69
std::enable_if<!std::is_same< T, folly::Unit >::value >::type transportActive()
Definition: Pipeline-inl.h:205
static Options cacheChainLength()
Definition: IOBufQueue.h:83
ProxyFrontendPipelineFactory(SocketAddress remoteAddress)
Definition: Proxy.cpp:139
void readEOF(Context *ctx) override
Definition: Proxy.cpp:86
void transportActive(Context *ctx) override
Definition: Proxy.cpp:106
ClientBootstrap< DefaultPipeline > client_
Definition: Proxy.cpp:132
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > sock) override
Definition: Proxy.cpp:142
DEFINE_string(remote_host,"127.0.0.1","remote host")
SocketAddress remoteAddress_
Definition: Proxy.cpp:131
DefaultPipeline::Ptr newPipeline(std::shared_ptr< AsyncTransportWrapper > sock) override
Definition: Proxy.cpp:59
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
Handler< R, R, W, W >::Context Context
Definition: Handler.h:161
void read(Context *, IOBufQueue &q) override
Definition: Proxy.cpp:77
void readEOF(Context *) override
Definition: Proxy.cpp:40
std::unique_ptr< unsigned char[]> buffer_
Definition: Random.cpp:105
int close(NetworkSocket s)
Definition: NetOps.cpp:90
ProxyBackendPipelineFactory(DefaultPipeline *frontendPipeline)
Definition: Proxy.cpp:56
SocketAddress remoteAddress_
Definition: Proxy.cpp:152
void readException(Context *ctx, exception_wrapper e) override
Definition: Proxy.cpp:96
DEFINE_int32(port, 1080,"proxy server port")