proxygen
BroadcastPoolTest.cpp File Reference

Go to the source code of this file.

Classes

class  BroadcastPoolTest
 
class  BroadcastPoolTest::ServerPipelineFactory
 

Functions

 TEST_F (BroadcastPoolTest, BasicConnect)
 
 TEST_F (BroadcastPoolTest, OutstandingConnect)
 
 TEST_F (BroadcastPoolTest, ConnectError)
 
 TEST_F (BroadcastPoolTest, ConnectErrorServerPool)
 
 TEST_F (BroadcastPoolTest, RoutingDataException)
 
 TEST_F (BroadcastPoolTest, RoutingDataPipelineDeletion)
 
 TEST_F (BroadcastPoolTest, HandlerEOFPoolDeletion)
 
 TEST_F (BroadcastPoolTest, SubscriberDeletionBeforeConnect)
 
 TEST_F (BroadcastPoolTest, ThreadLocalPool)
 

Function Documentation

TEST_F ( BroadcastPoolTest  ,
BasicConnect   
)

Definition at line 86 of file BroadcastPoolTest.cpp.

References testing::_, dummy(), EXPECT_CALL, EXPECT_EQ, EXPECT_FALSE, EXPECT_NE, EXPECT_TRUE, folly::EventBaseManager::get(), wangle::BroadcastHandler< T, R >::getArbitraryIdentifier(), wangle::HandlerBase< Context >::getContext(), folly::EventBaseManager::getEventBase(), h, wangle::BroadcastHandler< T, R >::readEOF(), string, wangle::BroadcastHandler< T, R >::subscribe(), uint64_t, and folly::detail::distributed_mutex::wait().

86  {
87  // Test simple calls to getHandler()
88  std::string routingData1 = "url1";
89  std::string routingData2 = "url2";
90  BroadcastHandler<int, std::string>* handler1 = nullptr;
91  BroadcastHandler<int, std::string>* handler2 = nullptr;
92  uint64_t handler1Id = 0;
93  uint64_t handler2Id = 0;
94  auto base = EventBaseManager::get()->getEventBase();
95 
97 
98  // No broadcast available for routingData1. Test that a new connection
99  // is established and handler created.
100  EXPECT_FALSE(pool->isBroadcasting(routingData1));
101  pool->getHandler(routingData1)
102  .thenValue([&](BroadcastHandler<int, std::string>* h) {
103  handler1 = h;
104  handler1Id = handler1->getArbitraryIdentifier();
105  handler1->subscribe(&subscriber);
106  });
107  EXPECT_TRUE(handler1 == nullptr);
108  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url1")).Times(1);
109  base->loopOnce(); // Do async connect
110  EXPECT_TRUE(handler1 != nullptr);
111  EXPECT_NE(0, handler1Id);
112  EXPECT_TRUE(pool->isBroadcasting(routingData1));
113 
114  // Broadcast available for routingData1. Test that the same handler
115  // is returned.
116  pool->getHandler(routingData1)
117  .thenValue([&](BroadcastHandler<int, std::string>* h) {
118  EXPECT_EQ(handler1Id, h->getArbitraryIdentifier());
119  EXPECT_TRUE(h == handler1);
120  })
121  .wait();
122  EXPECT_TRUE(pool->isBroadcasting(routingData1));
123 
124  // Close the handler. This will delete the pipeline and the broadcast.
125  handler1->readEOF(handler1->getContext());
126  EXPECT_FALSE(pool->isBroadcasting(routingData1));
127 
128  // routingData1 doesn't have an available broadcast now. Test that a
129  // new connection is established again and handler created.
130  handler1 = nullptr;
131  pool->getHandler(routingData1)
132  .thenValue([&](BroadcastHandler<int, std::string>* h) {
133  handler1 = h;
134  handler1Id = handler1->getArbitraryIdentifier();
135  handler1->subscribe(&subscriber);
136  });
137  EXPECT_TRUE(handler1 == nullptr);
138  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url1")).Times(1);
139  base->loopOnce(); // Do async connect
140  EXPECT_TRUE(handler1 != nullptr);
141  EXPECT_TRUE(pool->isBroadcasting(routingData1));
142 
143  // Cleanup
144  handler1->readEOF(handler1->getContext());
145 
146  // Test that a new connection is established for routingData2 with
147  // a new handler created
148  EXPECT_FALSE(pool->isBroadcasting(routingData2));
149  pool->getHandler(routingData2)
150  .thenValue([&](BroadcastHandler<int, std::string>* h) {
151  handler2 = h;
152  handler2Id = handler2->getArbitraryIdentifier();
153  handler2->subscribe(&subscriber);
154  });
155  EXPECT_TRUE(handler2 == nullptr);
156  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url2")).Times(1);
157  base->loopOnce(); // Do async connect
158  EXPECT_TRUE(handler2 != nullptr);
159  EXPECT_TRUE(handler2Id != handler1Id);
160  EXPECT_TRUE(pool->isBroadcasting(routingData2));
161 
162  // Cleanup
163  handler2->readEOF(handler2->getContext());
164 }
void readEOF(Context *ctx) override
*than *hazptr_holder h
Definition: Hazptr.h:116
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
Context * getContext()
Definition: Handler.h:34
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
void dummy()
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
TEST_F ( BroadcastPoolTest  ,
OutstandingConnect   
)

Definition at line 166 of file BroadcastPoolTest.cpp.

References testing::_, dummy(), EXPECT_CALL, EXPECT_FALSE, EXPECT_TRUE, folly::EventBaseManager::get(), wangle::HandlerBase< Context >::getContext(), folly::EventBaseManager::getEventBase(), h, wangle::BroadcastHandler< T, R >::readEOF(), string, wangle::BroadcastHandler< T, R >::subscribe(), and folly::detail::distributed_mutex::wait().

166  {
167  // Test with multiple getHandler() calls for the same routing data
168  // when a connect request is in flight
169  std::string routingData = "url1";
170  BroadcastHandler<int, std::string>* handler1 = nullptr;
171  BroadcastHandler<int, std::string>* handler2 = nullptr;
172  auto base = EventBaseManager::get()->getEventBase();
173 
175 
176  // No broadcast available for routingData. Kick off a connect request.
177  EXPECT_FALSE(pool->isBroadcasting(routingData));
178  pool->getHandler(routingData)
179  .thenValue([&](BroadcastHandler<int, std::string>* h) {
180  handler1 = h;
181  handler1->subscribe(&subscriber);
182  });
183  EXPECT_TRUE(handler1 == nullptr);
184  EXPECT_TRUE(pool->isBroadcasting(routingData));
185 
186  // Invoke getHandler() for the same routing data when a connect request
187  // is outstanding
188  pool->getHandler(routingData)
189  .thenValue([&](BroadcastHandler<int, std::string>* h) {
190  handler2 = h;
191  handler2->subscribe(&subscriber);
192  });
193  EXPECT_TRUE(handler1 == nullptr);
194  EXPECT_TRUE(handler2 == nullptr);
195  EXPECT_TRUE(pool->isBroadcasting(routingData));
196 
197  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url1")).Times(1);
198 
199  base->loopOnce(); // Do async connect
200 
201  // Verify that both promises are fulfilled
202  EXPECT_TRUE(handler1 != nullptr);
203  EXPECT_TRUE(handler2 != nullptr);
204  EXPECT_TRUE(handler1 == handler2);
205  EXPECT_TRUE(pool->isBroadcasting(routingData));
206 
207  // Invoke getHandler() again to test if the same handler is returned
208  // from the existing connection
209  pool->getHandler(routingData)
210  .thenValue([&](BroadcastHandler<int, std::string>* h) {
211  EXPECT_TRUE(h == handler1);
212  })
213  .wait();
214  EXPECT_TRUE(pool->isBroadcasting(routingData));
215 
216  // Cleanup
217  handler1->readEOF(handler1->getContext());
218 }
void readEOF(Context *ctx) override
*than *hazptr_holder h
Definition: Hazptr.h:116
Context * getContext()
Definition: Handler.h:34
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
void dummy()
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
TEST_F ( BroadcastPoolTest  ,
ConnectError   
)

Definition at line 220 of file BroadcastPoolTest.cpp.

References testing::_, dummy(), EXPECT_CALL, EXPECT_FALSE, EXPECT_TRUE, folly::EventBaseManager::get(), wangle::HandlerBase< Context >::getContext(), folly::EventBaseManager::getEventBase(), h, wangle::BroadcastHandler< T, R >::readEOF(), string, and wangle::BroadcastHandler< T, R >::subscribe().

220  {
221  // Test when an exception occurs during connect request
222  std::string routingData = "url1";
223  BroadcastHandler<int, std::string>* handler1 = nullptr;
224  BroadcastHandler<int, std::string>* handler2 = nullptr;
225  bool handler1Error = false;
226  bool handler2Error = false;
227  auto base = EventBaseManager::get()->getEventBase();
228 
230 
231  // Stop the server to inject connect failure
232  stopServer();
233 
234  // No broadcast available for routingData. Kick off a connect request.
235  pool->getHandler(routingData)
236  .thenValue([&](BroadcastHandler<int, std::string>* h) { handler1 = h; })
237  .onError([&](const std::exception&) {
238  handler1Error = true;
239  EXPECT_FALSE(pool->isBroadcasting(routingData));
240  });
241  EXPECT_TRUE(handler1 == nullptr);
242  EXPECT_FALSE(handler1Error);
243  EXPECT_TRUE(pool->isBroadcasting(routingData));
244 
245  // Invoke getHandler() again while the connect request is in flight
246  pool->getHandler(routingData)
247  .thenValue([&](BroadcastHandler<int, std::string>* h) { handler2 = h; })
248  .onError([&](const std::exception&) {
249  handler2Error = true;
250  EXPECT_FALSE(pool->isBroadcasting(routingData));
251  });
252  EXPECT_TRUE(handler2 == nullptr);
253  EXPECT_FALSE(handler2Error);
254  EXPECT_TRUE(pool->isBroadcasting(routingData));
255 
256  base->loopOnce(); // Do async connect
257 
258  // Verify that the exception is set on both promises
259  EXPECT_TRUE(handler1 == nullptr);
260  EXPECT_TRUE(handler2 == nullptr);
261  EXPECT_TRUE(handler1Error);
262  EXPECT_TRUE(handler2Error);
263 
264  // The broadcast should have been deleted now
265  EXPECT_FALSE(pool->isBroadcasting(routingData));
266 
267  // Start the server now. Connect requests should succeed.
268  startServer();
269  pool->getHandler(routingData)
270  .thenValue([&](BroadcastHandler<int, std::string>* h) {
271  handler1 = h;
272  handler1->subscribe(&subscriber);
273  });
274  EXPECT_TRUE(handler1 == nullptr);
275  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url1")).Times(1);
276  base->loopOnce(); // Do async connect
277  EXPECT_TRUE(handler1 != nullptr);
278  EXPECT_TRUE(pool->isBroadcasting(routingData));
279 
280  // Cleanup
281  handler1->readEOF(handler1->getContext());
282 }
void readEOF(Context *ctx) override
*than *hazptr_holder h
Definition: Hazptr.h:116
Context * getContext()
Definition: Handler.h:34
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
void dummy()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
TEST_F ( BroadcastPoolTest  ,
ConnectErrorServerPool   
)

Definition at line 284 of file BroadcastPoolTest.cpp.

References dummy(), EXPECT_FALSE, EXPECT_TRUE, folly::EventBaseManager::get(), folly::EventBaseManager::getEventBase(), h, and string.

284  {
285  // Test when an error occurs in ServerPool when trying to kick off
286  // a connect request
287  std::string routingData = "url1";
288  BroadcastHandler<int, std::string>* handler1 = nullptr;
289  BroadcastHandler<int, std::string>* handler2 = nullptr;
290  bool handler1Error = false;
291  bool handler2Error = false;
292  auto base = EventBaseManager::get()->getEventBase();
293 
295 
296  // Inject a ServerPool error
297  serverPool->failConnect();
298  pool->getHandler(routingData)
299  .thenValue([&](BroadcastHandler<int, std::string>* h) { handler1 = h; })
300  .onError([&](const std::exception&) {
301  handler1Error = true;
302  EXPECT_FALSE(pool->isBroadcasting(routingData));
303  });
304  EXPECT_TRUE(handler1 == nullptr);
305  EXPECT_TRUE(handler1Error);
306  EXPECT_FALSE(pool->isBroadcasting(routingData));
307 }
*than *hazptr_holder h
Definition: Hazptr.h:116
void dummy()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
TEST_F ( BroadcastPoolTest  ,
RoutingDataException   
)

Definition at line 309 of file BroadcastPoolTest.cpp.

References testing::_, dummy(), EXPECT_CALL, EXPECT_FALSE, EXPECT_TRUE, folly::EventBaseManager::get(), folly::EventBaseManager::getEventBase(), h, handler(), and string.

309  {
310  // Test when an exception occurs while setting routing data on
311  // the pipeline after the socket connect succeeds.
312  std::string routingData = "url";
314  bool handlerError = false;
315  auto base = EventBaseManager::get()->getEventBase();
316 
318 
319  EXPECT_FALSE(pool->isBroadcasting(routingData));
320  pool->getHandler(routingData)
321  .thenValue([&](BroadcastHandler<int, std::string>* h) { handler = h; })
322  .onError([&](const std::exception&) {
323  handlerError = true;
324  EXPECT_FALSE(pool->isBroadcasting(routingData));
325  });
326  EXPECT_TRUE(handler == nullptr);
327  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url"))
328  .WillOnce(Throw(std::exception()));
329  base->loopOnce(); // Do async connect
330  EXPECT_TRUE(handler == nullptr);
331  EXPECT_TRUE(handlerError);
332  EXPECT_FALSE(pool->isBroadcasting(routingData));
333 }
*than *hazptr_holder h
Definition: Hazptr.h:116
void handler(int, siginfo_t *, void *)
void dummy()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
TEST_F ( BroadcastPoolTest  ,
RoutingDataPipelineDeletion   
)

Definition at line 335 of file BroadcastPoolTest.cpp.

References testing::_, dummy(), EXPECT_CALL, EXPECT_FALSE, EXPECT_TRUE, folly::EventBaseManager::get(), folly::EventBaseManager::getEventBase(), h, handler(), testing::Invoke(), wangle::Pipeline< R, W >::readException(), and string.

335  {
336  // Test when the broadcast pipeline gets deleted inline while setting
337  // routing data after the socket connection succeeds.
338  std::string routingData = "url";
340  bool handlerError = false;
341  auto base = EventBaseManager::get()->getEventBase();
342 
344 
345  EXPECT_FALSE(pool->isBroadcasting(routingData));
346  pool->getHandler(routingData)
347  .thenValue([&](BroadcastHandler<int, std::string>* h) { handler = h; })
348  .onError([&](const std::exception&) {
349  handlerError = true;
350  EXPECT_FALSE(pool->isBroadcasting(routingData));
351  });
352  EXPECT_TRUE(handler == nullptr);
353  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url"))
354  .WillOnce(Invoke(
355  [&](DefaultPipeline* pipeline, const std::string&) {
356  pipeline->readException(std::runtime_error("upstream error"));
357  }));
358  base->loopOnce(); // Do async connect
359  EXPECT_TRUE(handler == nullptr);
360  EXPECT_TRUE(handlerError);
361  EXPECT_FALSE(pool->isBroadcasting(routingData));
362 }
*than *hazptr_holder h
Definition: Hazptr.h:116
void handler(int, siginfo_t *, void *)
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void dummy()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
std::enable_if<!std::is_same< T, folly::Unit >::value >::type readException(folly::exception_wrapper e)
Definition: Pipeline-inl.h:223
const char * string
Definition: Conv.cpp:212
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
TEST_F ( BroadcastPoolTest  ,
HandlerEOFPoolDeletion   
)

Definition at line 364 of file BroadcastPoolTest.cpp.

References testing::_, dummy(), EXPECT_CALL, EXPECT_FALSE, EXPECT_TRUE, folly::EventBaseManager::get(), wangle::HandlerBase< Context >::getContext(), folly::EventBaseManager::getEventBase(), wangle::HandlerContext< In, Out >::getPipeline(), h, handler(), wangle::Pipeline< R, W >::readEOF(), string, and wangle::BroadcastHandler< T, R >::subscribe().

364  {
365  // Test against use-after-free on BroadcastManager when the pool
366  // is deleted before the handler
367  std::string routingData = "url1";
369  DefaultPipeline* pipeline = nullptr;
370  auto base = EventBaseManager::get()->getEventBase();
371 
373 
374  // Dispatch a connect request and create a handler
375  pool->getHandler(routingData)
376  .thenValue([&](BroadcastHandler<int, std::string>* h) {
377  handler = h;
378  handler->subscribe(&subscriber);
379  pipeline = dynamic_cast<DefaultPipeline*>(
380  handler->getContext()->getPipeline());
381  });
382  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url1")).Times(1);
383  base->loopOnce(); // Do async connect
384  EXPECT_TRUE(pool->isBroadcasting(routingData));
385  EXPECT_TRUE(handler != nullptr);
386  EXPECT_TRUE(pipeline != nullptr);
387 
388  EXPECT_CALL(subscriber, onCompleted()).Times(1);
389 
390  // This will also delete the pipeline and the handler
391  pipeline->readEOF();
392  EXPECT_FALSE(pool->isBroadcasting(routingData));
393 }
*than *hazptr_holder h
Definition: Hazptr.h:116
std::enable_if<!std::is_same< T, folly::Unit >::value >::type readEOF()
Definition: Pipeline-inl.h:195
Context * getContext()
Definition: Handler.h:34
void handler(int, siginfo_t *, void *)
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
void dummy()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
virtual PipelineBase * getPipeline()=0
TEST_F ( BroadcastPoolTest  ,
SubscriberDeletionBeforeConnect   
)

Definition at line 395 of file BroadcastPoolTest.cpp.

References testing::_, dummy(), EXPECT_CALL, EXPECT_FALSE, EXPECT_TRUE, folly::EventBaseManager::get(), wangle::HandlerBase< Context >::getContext(), folly::EventBaseManager::getEventBase(), h, handler(), wangle::BroadcastHandler< T, R >::readEOF(), string, and wangle::BroadcastHandler< T, R >::subscribe().

395  {
396  // Test when the caller goes away before connect request returns
397  // resulting in a new BroadcastHandler without any subscribers
398  std::string routingData = "url1";
400  bool handler1Connected = false;
401  bool handler2Connected = false;
402  auto base = EventBaseManager::get()->getEventBase();
403 
405 
406  // No broadcast available for routingData. Kick off a connect request.
407  EXPECT_FALSE(pool->isBroadcasting(routingData));
408  pool->getHandler(routingData)
409  .thenValue([&](BroadcastHandler<int, std::string>*) {
410  handler1Connected = true;
411  // Do not subscribe to the handler. This will simulate
412  // the caller going away before we get here.
413  });
414  EXPECT_FALSE(handler1Connected);
415  EXPECT_TRUE(pool->isBroadcasting(routingData));
416 
417  // Invoke getHandler() for the same routing data when a connect request
418  // is outstanding
419  pool->getHandler(routingData)
420  .thenValue([&](BroadcastHandler<int, std::string>*) {
421  handler2Connected = true;
422  // Do not subscribe to the handler.
423  });
424  EXPECT_FALSE(handler2Connected);
425  EXPECT_TRUE(pool->isBroadcasting(routingData));
426 
427  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url1")).Times(1);
428 
429  base->loopOnce(); // Do async connect
430 
431  // Verify that both promises are fulfilled, but the broadcast is
432  // deleted from the pool because no subscriber was added.
433  EXPECT_TRUE(handler1Connected);
434  EXPECT_TRUE(handler2Connected);
435  EXPECT_FALSE(pool->isBroadcasting(routingData));
436 
437  // Test test same scenario but with one subscriber going away
438  // sooner, but another subscriber being added to the handler.
439  handler1Connected = false;
440  handler2Connected = false;
441  pool->getHandler(routingData)
442  .thenValue([&](BroadcastHandler<int, std::string>*) {
443  handler1Connected = true;
444  // Do not subscribe to the handler. This will simulate
445  // the caller going away before we get here.
446  });
447  EXPECT_FALSE(handler1Connected);
448  EXPECT_TRUE(pool->isBroadcasting(routingData));
449 
450  pool->getHandler(routingData)
451  .thenValue([&](BroadcastHandler<int, std::string>* h) {
452  handler2Connected = true;
453  // Subscriber to the handler. The handler should stick around now.
454  handler = h;
455  handler->subscribe(&subscriber);
456  });
457  EXPECT_FALSE(handler2Connected);
458  EXPECT_TRUE(pool->isBroadcasting(routingData));
459 
460  EXPECT_CALL(*pipelineFactory, setRoutingData(_, "url1")).Times(1);
461 
462  base->loopOnce(); // Do async connect
463 
464  // Verify that both promises are fulfilled, but the broadcast is
465  // deleted from the pool because no subscriber was added.
466  EXPECT_TRUE(handler1Connected);
467  EXPECT_TRUE(handler2Connected);
468  EXPECT_TRUE(pool->isBroadcasting(routingData));
469 
470  // Cleanup
471  handler->readEOF(handler->getContext());
472 }
void readEOF(Context *ctx) override
*than *hazptr_holder h
Definition: Hazptr.h:116
Context * getContext()
Definition: Handler.h:34
void handler(int, siginfo_t *, void *)
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
void dummy()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391
TEST_F ( BroadcastPoolTest  ,
ThreadLocalPool   
)

Definition at line 474 of file BroadcastPoolTest.cpp.

References testing::_, wangle::ObservingPipelineFactory< T, R, P >::broadcastPool(), dummy(), EXPECT_CALL, EXPECT_FALSE, EXPECT_NE, EXPECT_TRUE, folly::EventBaseManager::get(), wangle::BroadcastHandler< T, R >::getArbitraryIdentifier(), folly::EventBaseManager::getEventBase(), testing::Invoke(), wangle::BroadcastPool< T, R, P >::isBroadcasting(), folly::join(), folly::EventBase::loopOnce(), wangle::MockObservingPipelineFactory::newPipeline(), string, and uint64_t.

474  {
475  // Test that thread-local broadcast pool works correctly
476  MockObservingPipelineFactory factory1(serverPool, pipelineFactory);
477  MockObservingPipelineFactory factory2(serverPool, pipelineFactory);
478  BroadcastHandler<int, std::string>* broadcastHandler = nullptr;
479  uint64_t broadcastHandlerId = 0;
480  const std::string kUrl = "url";
481 
483 
484  // There should be no broadcast available for this routing data
485  EXPECT_FALSE(factory1.broadcastPool()->isBroadcasting(kUrl));
486  EXPECT_FALSE(factory2.broadcastPool()->isBroadcasting(kUrl));
487 
488  // Test creating a new broadcast
489  EXPECT_CALL(*pipelineFactory, setRoutingData(_, kUrl))
490  .WillOnce(Invoke([&](DefaultPipeline* pipeline, const std::string&) {
491  broadcastHandler = pipelineFactory->getBroadcastHandler(pipeline);
492  broadcastHandlerId = broadcastHandler->getArbitraryIdentifier();
493  }));
494  auto pipeline1 = factory1.newPipeline(nullptr, kUrl, nullptr, nullptr);
495  pipeline1->transportActive();
496  EventBaseManager::get()->getEventBase()->loopOnce();
497  EXPECT_TRUE(factory1.broadcastPool()->isBroadcasting(kUrl));
498  EXPECT_FALSE(factory2.broadcastPool()->isBroadcasting(kUrl));
499 
500  // Test broadcast with the same routing data in the same thread. No
501  // new broadcast handler should be created.
502  EXPECT_CALL(*pipelineFactory, setRoutingData(_, _)).Times(0);
503  auto pipeline2 = factory1.newPipeline(nullptr, kUrl, nullptr, nullptr);
504  pipeline2->transportActive();
505  EXPECT_TRUE(factory1.broadcastPool()->isBroadcasting(kUrl));
506  EXPECT_FALSE(factory2.broadcastPool()->isBroadcasting(kUrl));
507 
508  // Test creating a broadcast with the same routing data but in a
509  // different thread. Should return a different broadcast handler.
510  std::thread([&] {
511  // There should be no broadcast available for this routing data since we
512  // are on a different thread.
513  EXPECT_FALSE(factory1.broadcastPool()->isBroadcasting(kUrl));
514  EXPECT_FALSE(factory2.broadcastPool()->isBroadcasting(kUrl));
515 
516  EXPECT_CALL(*pipelineFactory, setRoutingData(_, kUrl))
517  .WillOnce(Invoke([&](DefaultPipeline* pipeline, const std::string&) {
518  EXPECT_NE(
519  pipelineFactory->getBroadcastHandler(pipeline)
520  ->getArbitraryIdentifier(),
521  broadcastHandlerId);
522  }));
523  auto pipeline3 = factory1.newPipeline(nullptr, kUrl, nullptr, nullptr);
524  pipeline3->transportActive();
525  EventBaseManager::get()->getEventBase()->loopOnce();
526  EXPECT_TRUE(factory1.broadcastPool()->isBroadcasting(kUrl));
527  EXPECT_FALSE(factory2.broadcastPool()->isBroadcasting(kUrl));
528 
529  // Cleanup
530  pipeline3->readEOF();
531  }).join();
532 
533  // Test creating a broadcast with the same routing data but using a
534  // different ObservingPipelineFactory. Should return a different broadcast
535  // handler since a different thread-local BroadcastPool is used.
536  EXPECT_CALL(*pipelineFactory, setRoutingData(_, kUrl))
537  .WillOnce(Invoke([&](DefaultPipeline* pipeline, const std::string&) {
538  EXPECT_NE(
539  pipelineFactory->getBroadcastHandler(pipeline)
540  ->getArbitraryIdentifier(),
541  broadcastHandlerId);
542  }));
543  auto pipeline4 = factory2.newPipeline(nullptr, kUrl, nullptr, nullptr);
544  pipeline4->transportActive();
545  EventBaseManager::get()->getEventBase()->loopOnce();
546  EXPECT_TRUE(factory2.broadcastPool()->isBroadcasting(kUrl));
547 
548  // Cleanup
549  pipeline1->readEOF();
550  pipeline2->readEOF();
551  pipeline4->readEOF();
552 }
PolymorphicAction< internal::InvokeAction< FunctionImpl > > Invoke(FunctionImpl function_impl)
void dummy()
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
#define join
#define EXPECT_NE(val1, val2)
Definition: gtest.h:1926
#define EXPECT_CALL(obj, call)
const internal::AnythingMatcher _
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
PUSHMI_INLINE_VAR constexpr detail::get_fn< T > get
Definition: submit.h:391