proxygen
BroadcastHandlerTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
18 
19 using namespace wangle;
20 using namespace folly;
21 using namespace testing;
22 
23 class BroadcastHandlerTest : public Test {
24  public:
26  : public BroadcastHandler<std::string, std::string> {
27  public:
28  MOCK_METHOD1(mockClose,
30 
32  return mockClose(ctx).move();
33  }
34  };
35 
36  void SetUp() override {
37  prevHandler = new StrictMock<MockBytesToBytesHandler>();
38  EXPECT_CALL(*prevHandler, read(_, _))
39  .WillRepeatedly(Invoke([&](MockBytesToBytesHandler::Context* ctx,
40  IOBufQueue& q) { ctx->fireRead(q); }));
41 
44 
45  pipeline = DefaultPipeline::create();
46  pipeline->addBack(
47  std::shared_ptr<StrictMock<MockBytesToBytesHandler>>(prevHandler));
48  pipeline->addBack(
50  decoder));
51  pipeline->addBack(
52  std::shared_ptr<StrictMock<MockBroadcastHandler>>(handler));
53  pipeline->finalize();
54  }
55 
56  void TearDown() override {
57  Mock::VerifyAndClear(&subscriber0);
58  Mock::VerifyAndClear(&subscriber1);
59 
60  pipeline.reset();
61  }
62 
63  protected:
65 
66  StrictMock<MockBytesToBytesHandler>* prevHandler{nullptr};
69 
72 };
73 
74 TEST_F(BroadcastHandlerTest, SubscribeUnsubscribe) {
75  // Test by adding a couple of subscribers and unsubscribing them
76  EXPECT_CALL(*decoder, decode(_, _, _, _))
77  .WillRepeatedly(
79  IOBufQueue& q,
81  size_t&) {
82  auto buf = q.move();
83  if (buf) {
84  buf->coalesce();
85  data = buf->moveToFbString().toStdString();
86  return true;
87  }
88  return false;
89  }));
90 
92 
93  // Add a subscriber
94  EXPECT_EQ(handler->subscribe(&subscriber0), 0);
95 
96  EXPECT_CALL(subscriber0, onNext("data1")).Times(1);
97  EXPECT_CALL(subscriber0, onNext("data2")).Times(1);
98 
99  // Push some data
100  IOBufQueue q;
101  q.append(IOBuf::copyBuffer("data1"));
102  pipeline->read(q);
103  q.clear();
104  q.append(IOBuf::copyBuffer("data2"));
105  pipeline->read(q);
106  q.clear();
107 
108  // Add another subscriber
109  EXPECT_EQ(handler->subscribe(&subscriber1), 1);
110 
111  EXPECT_CALL(subscriber0, onNext("data3")).Times(1);
112  EXPECT_CALL(subscriber1, onNext("data3")).Times(1);
113 
114  // Push more data
115  q.append(IOBuf::copyBuffer("data3"));
116  pipeline->read(q);
117  q.clear();
118 
119  // Unsubscribe one of the subscribers
120  handler->unsubscribe(0);
121 
122  EXPECT_CALL(subscriber1, onNext(Eq("data4"))).Times(1);
123 
124  // Push more data
125  q.append(IOBuf::copyBuffer("data4"));
126  pipeline->read(q);
127  q.clear();
128 
129  EXPECT_CALL(*handler, mockClose(_))
130  .WillOnce(InvokeWithoutArgs([this] {
131  pipeline.reset();
132  return makeMoveWrapper(makeFuture());
133  }));
134 
135  // Unsubscribe the other subscriber. The handler should be deleted now.
136  handler->unsubscribe(1);
137 }
138 
139 TEST_F(BroadcastHandlerTest, BufferedRead) {
140  // Test with decoder that buffers data based on some local logic
141  // before pushing to subscribers
143  EXPECT_CALL(*decoder, decode(_, _, _, _))
144  .WillRepeatedly(
146  IOBufQueue& q,
147  std::string& data,
148  size_t&) {
149  bufQueue.append(q);
150  if (bufQueue.chainLength() < 5) {
151  return false;
152  }
153  auto buf = bufQueue.move();
154  buf->coalesce();
155  data = buf->moveToFbString().toStdString();
156  return true;
157  }));
158 
160 
161  // Add a subscriber
162  EXPECT_EQ(handler->subscribe(&subscriber0), 0);
163 
164  EXPECT_CALL(subscriber0, onNext("data1")).Times(1);
165 
166  // Push some fragmented data
167  IOBufQueue q;
168  q.append(IOBuf::copyBuffer("da"));
169  pipeline->read(q);
170  q.clear();
171  q.append(IOBuf::copyBuffer("ta1"));
172  pipeline->read(q);
173  q.clear();
174 
175  // Push more fragmented data. onNext shouldn't be called yet.
176  q.append(IOBuf::copyBuffer("dat"));
177  pipeline->read(q);
178  q.clear();
179  q.append(IOBuf::copyBuffer("a"));
180  pipeline->read(q);
181  q.clear();
182 
183  // Add another subscriber
184  EXPECT_EQ(handler->subscribe(&subscriber1), 1);
185 
186  EXPECT_CALL(subscriber0, onNext("data3data4")).Times(1);
187  EXPECT_CALL(subscriber1, onNext("data3data4")).Times(1);
188 
189  // Push rest of the fragmented data. The entire data should be pushed
190  // to both subscribers.
191  q.append(IOBuf::copyBuffer("3data4"));
192  pipeline->read(q);
193  q.clear();
194 
195  EXPECT_CALL(subscriber0, onNext("data2")).Times(1);
196  EXPECT_CALL(subscriber1, onNext("data2")).Times(1);
197 
198  // Push some unfragmented data
199  q.append(IOBuf::copyBuffer("data2"));
200  pipeline->read(q);
201  q.clear();
202 
203  EXPECT_CALL(*handler, mockClose(_))
204  .WillOnce(InvokeWithoutArgs([this] {
205  pipeline.reset();
206  return makeMoveWrapper(makeFuture());
207  }));
208 
209  // Unsubscribe all subscribers. The handler should be deleted now.
210  handler->unsubscribe(0);
211  handler->unsubscribe(1);
212 }
213 
215  // Test with EOF on the handler
216  EXPECT_CALL(*decoder, decode(_, _, _, _))
217  .WillRepeatedly(
219  IOBufQueue& q,
220  std::string& data,
221  size_t&) {
222  auto buf = q.move();
223  if (buf) {
224  buf->coalesce();
225  data = buf->moveToFbString().toStdString();
226  return true;
227  }
228  return false;
229  }));
230 
232 
233  // Add a subscriber
234  EXPECT_EQ(handler->subscribe(&subscriber0), 0);
235 
236  EXPECT_CALL(subscriber0, onNext("data1")).Times(1);
237 
238  // Push some data
239  IOBufQueue q;
240  q.append(IOBuf::copyBuffer("data1"));
241  pipeline->read(q);
242  q.clear();
243 
244  // Add another subscriber
245  EXPECT_EQ(handler->subscribe(&subscriber1), 1);
246 
247  EXPECT_CALL(subscriber0, onNext("data2")).Times(1);
248  EXPECT_CALL(subscriber1, onNext("data2")).Times(1);
249 
250  // Push more data
251  q.append(IOBuf::copyBuffer("data2"));
252  pipeline->read(q);
253  q.clear();
254 
255  // Unsubscribe one of the subscribers
256  handler->unsubscribe(0);
257 
258  EXPECT_CALL(subscriber1, onCompleted()).Times(1);
259 
260  EXPECT_CALL(*handler, mockClose(_))
261  .WillOnce(InvokeWithoutArgs([this] {
262  pipeline.reset();
263  return makeMoveWrapper(makeFuture());
264  }));
265 
266  // The handler should be deleted now
267  handler->readEOF(nullptr);
268 }
269 
271  // Test with EOF on the handler
272  EXPECT_CALL(*decoder, decode(_, _, _, _))
273  .WillRepeatedly(
275  IOBufQueue& q,
276  std::string& data,
277  size_t&) {
278  auto buf = q.move();
279  if (buf) {
280  buf->coalesce();
281  data = buf->moveToFbString().toStdString();
282  return true;
283  }
284  return false;
285  }));
286 
288 
289  // Add a subscriber
290  EXPECT_EQ(handler->subscribe(&subscriber0), 0);
291 
292  EXPECT_CALL(subscriber0, onNext("data1")).Times(1);
293 
294  // Push some data
295  IOBufQueue q;
296  q.append(IOBuf::copyBuffer("data1"));
297  pipeline->read(q);
298  q.clear();
299 
300  // Add another subscriber
301  EXPECT_EQ(handler->subscribe(&subscriber1), 1);
302 
303  EXPECT_CALL(subscriber0, onNext("data2")).Times(1);
304  EXPECT_CALL(subscriber1, onNext("data2")).Times(1);
305 
306  // Push more data
307  q.append(IOBuf::copyBuffer("data2"));
308  pipeline->read(q);
309  q.clear();
310 
311  EXPECT_CALL(subscriber0, onError(_)).Times(1);
312  EXPECT_CALL(subscriber1, onError(_)).Times(1);
313 
314  EXPECT_CALL(*handler, mockClose(_))
315  .WillOnce(InvokeWithoutArgs([this] {
316  pipeline.reset();
317  return makeMoveWrapper(makeFuture());
318  }));
319 
320  // The handler should be deleted now
321  handler->readException(nullptr, make_exception_wrapper<std::exception>());
322 }
HandlerAdapter< std::string, std::unique_ptr< folly::IOBuf > >::Context Context
void append(std::unique_ptr< folly::IOBuf > &&buf, bool pack=false)
Definition: IOBufQueue.cpp:143
internal::EqMatcher< T > Eq(T x)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
TokenBindingMessage decode(folly::io::Cursor &cursor)
Definition: Types.cpp:132
std::unique_ptr< folly::IOBuf > move()
Definition: IOBufQueue.h:459
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
PolymorphicAction< internal::InvokeWithoutArgsAction< FunctionImpl > > InvokeWithoutArgs(FunctionImpl function_impl)
void handler(int, siginfo_t *, void *)
static Options cacheChainLength()
Definition: IOBufQueue.h:83
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
size_t read(T &out, folly::io::Cursor &cursor)
Definition: Types-inl.h:258
ByteToMessageDecoder< T >::Context Context
Definition: Mocks.h:44
void dummy()
MoveWrapper< T0 > makeMoveWrapper(T &&t)
Definition: MoveWrapper.h:82
StrictMock< MockSubscriber< std::string, std::string > > subscriber0
StrictMock< MockSubscriber< std::string, std::string > > subscriber1
DefaultPipeline::Ptr pipeline
#define MOCK_METHOD1(m,...)
const char * string
Definition: Conv.cpp:212
static Ptr create()
Definition: Pipeline.h:174
std::shared_ptr< Pipeline > Ptr
Definition: Pipeline.h:172
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
Handler< R, R, W, W >::Context Context
Definition: Handler.h:161
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
Future< typename std::decay< T >::type > makeFuture(T &&t)
Definition: Future-inl.h:1310
folly::Future< folly::Unit > close(Context *ctx) override
TEST_F(AcceptorTest, TestCanAcceptWithNoConnectionCounter)