proxygen
AsyncPipeTest.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
18 #include <folly/Memory.h>
21 
22 #include <fcntl.h>
23 
24 using namespace testing;
25 
26 namespace {
27 
28 class TestReadCallback : public folly::AsyncReader::ReadCallback {
29  public:
30  bool isBufferMovable() noexcept override {
31  return movable_;
32  }
33  void setMovable(bool movable) {
34  movable_ = movable;
35  }
36 
37  void readBufferAvailable(
38  std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
39  readBuffer_.append(std::move(readBuf));
40  }
41 
42  void readDataAvailable(size_t len) noexcept override {
43  readBuffer_.postallocate(len);
44  }
45 
46  void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
47  auto res = readBuffer_.preallocate(4000, 65000);
48  *bufReturn = res.first;
49  *lenReturn = res.second;
50  }
51 
52  void readEOF() noexcept override {}
53 
54  void readErr(const folly::AsyncSocketException&) noexcept override {
55  error_ = true;
56  }
57 
58  std::string getData() {
59  auto buf = readBuffer_.move();
60  buf->coalesce();
61  return std::string((char*)buf->data(), buf->length());
62  }
63 
64  void reset() {
65  movable_ = false;
66  error_ = false;
67  readBuffer_.clear();
68  }
69 
71  bool error_{false};
72  bool movable_{false};
73 };
74 
75 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
76  public:
77  void writeSuccess() noexcept override {
78  writes_++;
79  }
80 
81  void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
82  error_ = true;
83  }
84 
85  void reset() {
86  writes_ = 0;
87  error_ = false;
88  }
89 
90  uint32_t writes_{0};
91  bool error_{false};
92 };
93 
94 class AsyncPipeTest : public Test {
95  public:
96  void reset(bool movable) {
97  reader_.reset();
98  readCallback_.reset();
99  writer_.reset();
100  writeCallback_.reset();
101 
102  int rc = pipe(pipeFds_);
103  EXPECT_EQ(rc, 0);
104 
105  EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
106  EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
107  reader_ = folly::AsyncPipeReader::newReader(&eventBase_, pipeFds_[0]);
108  writer_ = folly::AsyncPipeWriter::newWriter(&eventBase_, pipeFds_[1]);
109 
110  readCallback_.setMovable(movable);
111  }
112 
113  protected:
114  folly::EventBase eventBase_;
115  int pipeFds_[2];
118  TestReadCallback readCallback_;
119  TestWriteCallback writeCallback_;
120 };
121 
122 std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
123  auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
124  return buf;
125 }
126 
127 } // namespace
128 
129 TEST_F(AsyncPipeTest, simple) {
130  for (int pass = 0; pass < 2; ++pass) {
131  reset(pass % 2 != 0);
132  reader_->setReadCB(&readCallback_);
133  writer_->write(getBuf("hello"), &writeCallback_);
134  writer_->closeOnEmpty();
135  eventBase_.loop();
136  EXPECT_EQ(readCallback_.getData(), "hello");
137  EXPECT_FALSE(readCallback_.error_);
138  EXPECT_EQ(writeCallback_.writes_, 1);
139  EXPECT_FALSE(writeCallback_.error_);
140  }
141 }
142 
143 TEST_F(AsyncPipeTest, blocked_writes) {
144  for (int pass = 0; pass < 2; ++pass) {
145  reset(pass % 2 != 0);
146  uint32_t writeAttempts = 0;
147  do {
148  ++writeAttempts;
149  writer_->write(getBuf("hello"), &writeCallback_);
150  } while (writeCallback_.writes_ == writeAttempts);
151  // there is one blocked write
152  writer_->closeOnEmpty();
153 
154  reader_->setReadCB(&readCallback_);
155 
156  eventBase_.loop();
157  std::string expected;
158  for (uint32_t i = 0; i < writeAttempts; i++) {
159  expected += "hello";
160  }
161  EXPECT_EQ(readCallback_.getData(), expected);
162  EXPECT_FALSE(readCallback_.error_);
163  EXPECT_EQ(writeCallback_.writes_, writeAttempts);
164  EXPECT_FALSE(writeCallback_.error_);
165  }
166 }
167 
168 TEST_F(AsyncPipeTest, writeOnClose) {
169  for (int pass = 0; pass < 2; ++pass) {
170  reset(pass % 2 != 0);
171  reader_->setReadCB(&readCallback_);
172  writer_->write(getBuf("hello"), &writeCallback_);
173  writer_->closeOnEmpty();
174  writer_->write(getBuf("hello"), &writeCallback_);
175  eventBase_.loop();
176  EXPECT_EQ(readCallback_.getData(), "hello");
177  EXPECT_FALSE(readCallback_.error_);
178  EXPECT_EQ(writeCallback_.writes_, 1);
179  EXPECT_TRUE(writeCallback_.error_);
180  }
181 }
size_t readBuf(Buf &buf, folly::io::Cursor &cursor)
Definition: Types-inl.h:220
std::unique_ptr< AsyncPipeReader, folly::DelayedDestruction::Destructor > UniquePtr
Definition: AsyncPipe.h:40
static UniquePtr newReader(Args &&...args)
Definition: AsyncPipe.h:43
TEST_F(TestInfoTest, Names)
#define EXPECT_EQ(val1, val2)
Definition: gtest.h:1922
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
static bool simple
requires E e noexcept(noexcept(s.error(std::move(e))))
std::unique_ptr< AsyncPipeWriter, folly::DelayedDestruction::Destructor > UniquePtr
Definition: AsyncPipe.h:101
static Options cacheChainLength()
Definition: IOBufQueue.h:83
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
const char * string
Definition: Conv.cpp:212
static UniquePtr newWriter(Args &&...args)
Definition: AsyncPipe.h:104
#define EXPECT_FALSE(condition)
Definition: gtest.h:1862
static std::unique_ptr< IOBuf > copyBuffer(const void *buf, std::size_t size, std::size_t headroom=0, std::size_t minTailroom=0)
Definition: IOBuf.h:1587
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43
void pipe(CPUExecutor cpu, IOExecutor io)