proxygen
AsyncUDPServerSocket.h
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Memory.h>
20 #include <folly/io/IOBufQueue.h>
23 
24 namespace folly {
25 
39  public AsyncSocketBase {
40  public:
41  class Callback {
42  public:
47  virtual void onListenStarted() noexcept = 0;
48 
53  virtual void onListenStopped() noexcept = 0;
54 
59  virtual void onListenPaused() noexcept {}
60 
65  virtual void onListenResumed() noexcept {}
66 
70  virtual void onDataAvailable(
71  std::shared_ptr<AsyncUDPSocket> socket,
73  std::unique_ptr<folly::IOBuf> buf,
74  bool truncated) noexcept = 0;
75 
76  virtual ~Callback() = default;
77  };
78 
86  explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
87  : evb_(evb), packetSize_(sz), nextListener_(0) {}
88 
89  ~AsyncUDPServerSocket() override {
90  if (socket_) {
91  close();
92  }
93  }
94 
95  void bind(const folly::SocketAddress& addy) {
96  CHECK(!socket_);
97 
98  socket_ = std::make_shared<AsyncUDPSocket>(evb_);
99  socket_->setReusePort(reusePort_);
100  socket_->bind(addy);
101  }
102 
103  void setReusePort(bool reusePort) {
104  reusePort_ = reusePort;
105  }
106 
108  CHECK(socket_);
109  return socket_->address();
110  }
111 
112  void getAddress(SocketAddress* a) const override {
113  *a = address();
114  }
115 
119  void addListener(EventBase* evb, Callback* callback) {
120  listeners_.emplace_back(evb, callback);
121  }
122 
123  void listen() {
124  CHECK(socket_) << "Need to bind before listening";
125 
126  for (auto& listener : listeners_) {
127  auto callback = listener.second;
128 
129  listener.first->runInEventBaseThread(
130  [callback]() mutable { callback->onListenStarted(); });
131  }
132 
133  socket_->resumeRead(this);
134  }
135 
136  int getFD() const {
137  CHECK(socket_) << "Need to bind before getting FD";
138  return socket_->getFD();
139  }
140 
141  void close() {
142  CHECK(socket_) << "Need to bind before closing";
143  socket_->close();
144  socket_.reset();
145  }
146 
147  EventBase* getEventBase() const override {
148  return evb_;
149  }
150 
154  void pauseAccepting() {
155  socket_->pauseRead();
156  for (auto& listener : listeners_) {
157  auto callback = listener.second;
158 
159  listener.first->runInEventBaseThread(
160  [callback]() mutable { callback->onListenPaused(); });
161  }
162  }
163 
168  socket_->resumeRead(this);
169  for (auto& listener : listeners_) {
170  auto callback = listener.second;
171 
172  listener.first->runInEventBaseThread(
173  [callback]() mutable { callback->onListenResumed(); });
174  }
175  }
176 
177  private:
178  // AsyncUDPSocket::ReadCallback
179  void getReadBuffer(void** buf, size_t* len) noexcept override {
180  std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
181  }
182 
184  const folly::SocketAddress& clientAddress,
185  size_t len,
186  bool truncated) noexcept override {
187  buf_.postallocate(len);
188  auto data = buf_.split(len);
189 
190  if (listeners_.empty()) {
191  LOG(WARNING) << "UDP server socket dropping packet, "
192  << "no listener registered";
193  return;
194  }
195 
196  if (nextListener_ >= listeners_.size()) {
197  nextListener_ = 0;
198  }
199 
200  auto client = clientAddress;
201  auto callback = listeners_[nextListener_].second;
202  auto socket = socket_;
203 
204  // Schedule it in the listener's eventbase
205  // XXX: Speed this up
206  auto f = [socket,
207  client,
208  callback,
209  data = std::move(data),
210  truncated]() mutable {
211  callback->onDataAvailable(socket, client, std::move(data), truncated);
212  };
213 
214  listeners_[nextListener_].first->runInEventBaseThread(std::move(f));
215  ++nextListener_;
216  }
217 
218  void onReadError(const AsyncSocketException& ex) noexcept override {
219  LOG(ERROR) << ex.what();
220 
221  // Lets register to continue listening for packets
222  socket_->resumeRead(this);
223  }
224 
225  void onReadClosed() noexcept override {
226  for (auto& listener : listeners_) {
227  auto callback = listener.second;
228 
229  listener.first->runInEventBaseThread(
230  [callback]() mutable { callback->onListenStopped(); });
231  }
232  }
233 
234  EventBase* const evb_;
235  const size_t packetSize_;
236 
237  std::shared_ptr<AsyncUDPSocket> socket_;
238 
239  // List of listener to distribute packets among
240  typedef std::pair<EventBase*, Callback*> Listener;
241  std::vector<Listener> listeners_;
242 
243  // Next listener to send packet to
245 
246  // Temporary buffer for data
248 
249  bool reusePort_{false};
250 };
251 
252 } // namespace folly
void getAddress(SocketAddress *a) const override
std::unique_ptr< folly::IOBuf > split(size_t n)
Definition: IOBufQueue.h:420
auto f
void addListener(EventBase *evb, Callback *callback)
virtual void onDataAvailable(std::shared_ptr< AsyncUDPSocket > socket, const folly::SocketAddress &addr, std::unique_ptr< folly::IOBuf > buf, bool truncated) noexcept=0
std::pair< EventBase *, Callback * > Listener
std::shared_ptr< AsyncUDPSocket > socket_
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
void onDataAvailable(const folly::SocketAddress &clientAddress, size_t len, bool truncated) noexceptoverride
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
requires E e noexcept(noexcept(s.error(std::move(e))))
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
Definition: IOBufQueue.h:356
folly::SocketAddress address() const
void onReadError(const AsyncSocketException &ex) noexceptoverride
void onReadClosed() noexceptoverride
constexpr auto data(C &c) -> decltype(c.data())
Definition: Access.h:71
char a
AsyncUDPServerSocket(EventBase *evb, size_t sz=1500)
virtual void onListenStopped() noexcept=0
NetworkSocket socket(int af, int type, int protocol)
Definition: NetOps.cpp:412
void setReusePort(bool reusePort)
virtual void onListenStarted() noexcept=0
void bind(const folly::SocketAddress &addy)
std::vector< Listener > listeners_
void getReadBuffer(void **buf, size_t *len) noexceptoverride
EventBase * getEventBase() const override
void postallocate(std::size_t n)
Definition: IOBufQueue.h:380
ThreadPoolListHook * addr