proxygen
CompressionSimulator.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree. An additional grant
7  * of patent rights can be found in the PATENTS file in the same directory.
8  *
9  */
15 
19 
20 using namespace std;
21 using namespace folly;
22 using namespace proxygen;
23 
24 namespace {
25 using namespace proxygen::compress;
26 
27 // This needs to be synchronized with HPACKEncoder::kAutoFlushThreshold.
28 const size_t kMTU = 1400;
29 
30 const std::string kTestDir = getContainingDirectory(__FILE__).str();
31 
32 } // namespace
33 
34 namespace proxygen { namespace compress {
35 
36 bool CompressionSimulator::readInputFromFileAndSchedule(
37  const string& filename) {
38  unique_ptr<HTTPArchive> har;
39  try {
40  har = HTTPArchive::fromFile(kTestDir + filename);
41  } catch (const std::exception& ex) {
42  LOG(ERROR) << folly::exceptionStr(ex);
43  }
44  if (!har || har->requests.size() == 0) {
45  return false;
46  }
47  // Sort by start time (har ordered by finish time?)
48  std::sort(har->requests.begin(),
49  har->requests.end(),
50  [](const HTTPMessage& a, const HTTPMessage& b) {
51  return a.getStartTime() < b.getStartTime();
52  });
53  TimePoint last = har->requests[0].getStartTime();
54  std::chrono::milliseconds cumulativeDelay(0);
55  uint16_t index = 0;
56  for (HTTPMessage& msg : har->requests) {
57  auto delayFromPrevious = millisecondsBetween(msg.getStartTime(), last);
58  // If there was a quiescent gap in the HAR of at least some value, shrink
59  // it so the test doesn't last forever
60  if (delayFromPrevious > std::chrono::milliseconds(1000)) {
61  delayFromPrevious = std::chrono::milliseconds(1000);
62  }
63  last = msg.getStartTime();
64  cumulativeDelay += delayFromPrevious;
65  setupRequest(index++, std::move(msg), cumulativeDelay);
66  }
67  for (auto& kv : domains_) {
68  flushRequests(kv.second.get());
69  }
70  return true;
71 }
72 
74 #ifndef HAVE_REAL_QMIN
75  if (params_.type == SchemeType::QMIN) {
76  LOG(INFO) << "QMIN not available";
77  return;
78  }
79 #endif
80 
81  LOG(INFO) << "Starting run";
82  eventBase_.loop();
83  uint32_t holBlockCount = 0;
84  for (auto& scheme : domains_) {
85  holBlockCount += scheme.second->getHolBlockCount();
86  }
87  LOG(INFO) << "Complete"
88  << "\nStats:"
89  "\nSeed: "
90  << params_.seed << "\nBlocks sent: " << requests_.size()
91  << "\nAllowed OOO: " << stats_.allowedOOO
92  << "\nPackets: " << stats_.packets
93  << "\nPacket Losses: " << stats_.packetLosses
94  << "\nHOL Block Count: " << holBlockCount
95  << "\nHOL Delay (ms): " << stats_.holDelay.count()
96  << "\nMax Queue Buffer Bytes: " << stats_.maxQueueBufferBytes
97  << "\nUncompressed Bytes: " << stats_.uncompressed
98  << "\nCompressed Bytes: " << stats_.compressed
99  << "\nCompression Ratio: "
100  << int(100 - double(100 * stats_.compressed) / stats_.uncompressed);
101 }
102 
103 void CompressionSimulator::flushRequests(CompressionScheme* scheme) {
104  VLOG(5) << "schedule encode for " << scheme->packetIndices.size()
105  << " blocks at " << scheme->prev.count();
106  // Flush previous train
107  scheduleEvent(
108  [this, scheme, indices = std::move(scheme->packetIndices)]() mutable {
109  bool newPacket = true;
110  while (!indices.empty()) {
111  int16_t index = indices.front();
112  indices.pop_front();
113  auto schemeIndex = scheme->index;
114  auto encodeRes = encode(scheme, newPacket, index);
115  FrameFlags flags = encodeRes.first;
116  bool allowOOO = flags.allowOOO;
117  if (schemeIndex < minOOOThresh()) {
118  allowOOO = false;
119  auto ack = scheme->getAck(schemeIndex);
120  if (ack) {
121  scheme->recvAck(std::move(ack));
122  }
123  }
124  stats_.allowedOOO += (allowOOO) ? 1 : 0;
125  flags.allowOOO = allowOOO;
126  scheme->encodedBlocks.emplace_back(flags,
127  newPacket,
128  std::move(encodeRes.second),
129  &callbacks_[index]);
130  newPacket = false;
131  }
132  eventBase_.runInLoop(scheme, true);
133  },
134  scheme->prev);
135 }
136 
137 void CompressionSimulator::setupRequest(uint16_t index,
138  HTTPMessage&& msg,
139  std::chrono::milliseconds encodeDelay) {
140  // Normalize to relative paths
141  const auto& query = msg.getQueryString();
142  if (query.empty()) {
143  msg.setURL(msg.getPath());
144  } else {
145  msg.setURL(folly::to<string>(msg.getPath(), "?", query));
146  }
147 
148  auto scheme = getScheme(msg.getHeaders().getSingleOrEmpty(HTTP_HEADER_HOST));
149  requests_.emplace_back(msg);
150  auto decodeCompleteCB =
151  [index, this, scheme](std::chrono::milliseconds holDelay) {
152  // record processed timestamp
153  CHECK(!callbacks_[index].getResult().hasError());
154  DCHECK_EQ(requests_[index], *callbacks_[index].getResult().value());
155  stats_.holDelay += holDelay;
156  VLOG(1) << "Finished decoding request=" << index
157  << " with holDelay=" << holDelay.count()
158  << " cumulative HoL delay=" << stats_.holDelay.count();
159  sendAck(scheme, scheme->getAck(callbacks_[index].seqn));
160  };
161  callbacks_.emplace_back(index, decodeCompleteCB);
162 
163  // Assume that all packets with same encodeDelay will form a packet
164  // train. Encode them as a group, so can we can emulate packet
165  // boundaries more realistically, telling the encoder which blocks
166  // start such trains.
167  if (scheme->packetIndices.size() > 0) {
168  auto delayFromPrevious = encodeDelay - scheme->prev;
169  VLOG(1) << "request " << index << " delay " << delayFromPrevious.count();
170  if (delayFromPrevious > std::chrono::milliseconds(1)) {
171  flushRequests(scheme);
172  }
173  }
174  scheme->prev = encodeDelay;
175  scheme->packetIndices.push_back(index);
176 }
177 
178 // Once per loop, each connection flushes it's encode blocks and schedules
179 // decodes based on how many packets the block occupies
180 void CompressionScheme::runLoopCallback() noexcept {
181  simulator_->flushSchemePackets(this);
182 }
183 
184 void CompressionSimulator::flushPacket(CompressionScheme* scheme) {
185  if (scheme->packetBlocks.empty()) {
186  return;
187  }
188 
189  stats_.packets++;
190  VLOG(1) << "schedule decode for " << scheme->packetBlocks.size()
191  << " blocks at " << scheme->decodeDelay.count();
192  scheduleEvent(
193  {[this, scheme, blocks = std::move(scheme->packetBlocks)]() mutable {
194  decodePacket(scheme, blocks);
195  }},
196  scheme->decodeDelay);
197  scheme->packetBytes = 0;
198 }
199 
200 void CompressionSimulator::flushSchemePackets(CompressionScheme* scheme) {
201  CHECK(!scheme->encodedBlocks.empty());
202  VLOG(2) << "Flushing " << scheme->encodedBlocks.size() << " requests";
203  // tracks the number of bytes in the current simulated packet
204  auto encodeRes = &scheme->encodedBlocks.front();
205  bool newPacket = std::get<1>(*encodeRes);
206  size_t headerBlockBytesRemaining =
207  std::get<2>(*encodeRes)->computeChainDataLength();
208  std::chrono::milliseconds packetDelay = deliveryDelay();
209  scheme->decodeDelay = packetDelay;
210  while (true) {
211  if (newPacket) {
212  flushPacket(scheme);
213  }
214  newPacket = false;
215  // precondition packetBytes < kMTU
216  if (scheme->packetBytes + headerBlockBytesRemaining >= kMTU) {
217  // Header block filled current packet, triggering a flush
218  VLOG(2) << "Request(s) spanned multiple packets";
219  newPacket = true;
220  } else {
221  scheme->packetBytes += headerBlockBytesRemaining;
222  }
223  headerBlockBytesRemaining -=
224  std::min(headerBlockBytesRemaining, kMTU - scheme->packetBytes);
225  if (headerBlockBytesRemaining == 0) {
226  // Move from the first element of encodedBlocks to the last
227  // element of packetBlocks.
228  scheme->packetBlocks.splice(scheme->packetBlocks.end(),
229  scheme->encodedBlocks,
230  scheme->encodedBlocks.begin());
231  if (scheme->encodedBlocks.empty()) {
232  // All done
233  break;
234  }
235  // Grab the next request
236  encodeRes = &scheme->encodedBlocks.front();
237  newPacket = std::get<1>(*encodeRes);
238  headerBlockBytesRemaining =
239  std::get<2>(*encodeRes)->computeChainDataLength();
240  }
241  if (newPacket) {
242  packetDelay = deliveryDelay();
243  scheme->decodeDelay = std::max(scheme->decodeDelay, packetDelay);
244  }
245  }
246  flushPacket(scheme);
247  CHECK(scheme->encodedBlocks.empty());
248 }
249 
250 CompressionScheme* CompressionSimulator::getScheme(StringPiece domain) {
251  static string blended("\"Facebook\"");
252  if (params_.blend &&
253  (domain.endsWith("facebook.com") || domain.endsWith("fbcdn.net"))) {
254  domain = blended;
255  }
256 
257  auto it = domains_.find(domain.str());
258  CompressionScheme* scheme = nullptr;
259  if (it == domains_.end()) {
260  LOG(INFO) << "Creating scheme for domain=" << domain;
261  auto schemePtr = makeScheme();
262  scheme = schemePtr.get();
263  domains_.emplace(domain.str(), std::move(schemePtr));
264  } else {
265  scheme = it->second.get();
266  }
267  return scheme;
268 }
269 
270 unique_ptr<CompressionScheme> CompressionSimulator::makeScheme() {
271  switch (params_.type) {
272  case SchemeType::QPACK:
273  return make_unique<QPACKScheme>(this, params_.tableSize,
274  params_.maxBlocking);
275  case SchemeType::QMIN:
276  return make_unique<QMINScheme>(this, params_.tableSize);
277  case SchemeType::HPACK:
278  return make_unique<HPACKScheme>(this, params_.tableSize);
279  }
280  LOG(FATAL) << "Bad scheme";
281  return nullptr;
282 }
283 
284 std::pair<FrameFlags, unique_ptr<IOBuf>> CompressionSimulator::encode(
285  CompressionScheme* scheme, bool newPacket, uint16_t index) {
286  VLOG(1) << "Start encoding request=" << index;
287  // vector to hold cookie crumbs
288  vector<string> cookies;
289  vector<compress::Header> allHeaders = prepareMessageForCompression(
290  requests_[index], cookies);
291 
292  auto before = stats_.uncompressed;
293  auto res = scheme->encode(newPacket, std::move(allHeaders), stats_);
294  VLOG(1) << "Encoded request=" << index << " for host="
295  << requests_[index].getHeaders().getSingleOrEmpty(HTTP_HEADER_HOST)
296  << " orig size=" << (stats_.uncompressed - before)
297  << " block size=" << res.second->computeChainDataLength()
298  << " cumulative bytes=" << stats_.compressed
299  << " cumulative compression ratio="
300  << int(100 - double(100 * stats_.compressed) / stats_.uncompressed);
301  return res;
302 }
303 
306  unique_ptr<IOBuf> encodedReq,
307  SimStreamingCallback& cb) {
308  scheme->decode(flags, std::move(encodedReq), stats_, cb);
309 }
310 
311 void CompressionSimulator::decodePacket(
312  CompressionScheme* scheme,
313  std::list<CompressionScheme::BlockInfo>& blocks) {
314  VLOG(1) << "decode packet with " << blocks.size() << " blocks";
315  while (!blocks.empty()) {
316  auto encodeRes = &blocks.front();
317  // TODO(ckrasic) - to get packet coordination correct, could plumb
318  // through "start of packet" flag here. Probably not worth it,
319  // since it seems to make only a very small difference (about a
320  // 0.1% compressiondifference on my facebook har).
321  decode(scheme,
322  std::get<0>(*encodeRes),
323  std::move(std::get<2>(*encodeRes)),
324  *std::get<3>(*encodeRes));
325  blocks.pop_front();
326  }
327 }
328 
329 void CompressionSimulator::scheduleEvent(folly::Function<void()> f,
330  std::chrono::milliseconds ms) {
331  eventBase_.runAfterDelay(std::move(f), ms.count());
332 }
333 
334 void CompressionSimulator::sendAck(CompressionScheme* scheme,
335  unique_ptr<CompressionScheme::Ack> ack) {
336  if (!ack) {
337  return;
338  }
339  // An ack is a packet
340  stats_.packets++;
341  scheduleEvent([a = std::move(ack),
342  this,
343  scheme]() mutable { recvAck(scheme, std::move(a)); },
344  deliveryDelay());
345 }
346 
347 void CompressionSimulator::recvAck(CompressionScheme* scheme,
348  unique_ptr<CompressionScheme::Ack> ack) {
349  scheme->recvAck(std::move(ack));
350 }
351 
352 std::chrono::milliseconds CompressionSimulator::deliveryDelay() {
353  std::chrono::milliseconds delay = one_half_rtt();
354  while (loss()) {
355  stats_.packetLosses++;
356  scheduleEvent([] { VLOG(4) << "Packet lost!"; }, delay);
357  std::chrono::milliseconds rxmit = rxmitDelay();
358  delay += rxmit;
359  scheduleEvent(
360  [rxmit] {
361  VLOG(4) << "Packet loss detected, retransmitting with additional "
362  << rxmit.count();
363  },
364  delay - one_half_rtt());
365  }
366  if (delayed()) {
367  scheduleEvent([] { VLOG(4) << "Packet delayed in network"; }, delay);
368  delay += extraDelay();
369  }
370  return delay;
371 }
372 
373 std::chrono::milliseconds CompressionSimulator::rtt() {
374  return params_.rtt;
375 }
376 
377 std::chrono::milliseconds CompressionSimulator::one_half_rtt() {
378  return params_.rtt / 2;
379 }
380 
381 std::chrono::milliseconds CompressionSimulator::rxmitDelay() {
382  uint32_t ms = rtt().count() * Random::randDouble(1.1, 2, rng_);
383  return std::chrono::milliseconds(ms);
384 }
385 
386 bool CompressionSimulator::loss() {
387  return Random::randDouble01(rng_) < params_.lossProbability;
388 }
389 
390 bool CompressionSimulator::delayed() {
391  return Random::randDouble01(rng_) < params_.delayProbability;
392 }
393 
394 std::chrono::milliseconds CompressionSimulator::extraDelay() {
395  uint32_t ms = params_.maxDelay.count() * Random::randDouble01(rng_);
396  return std::chrono::milliseconds(ms);
397 }
398 
399 uint32_t CompressionSimulator::minOOOThresh() {
400  return params_.minOOOThresh;
401 }
402 }} // namespace proxygen::compress
std::chrono::milliseconds millisecondsBetween(std::chrono::time_point< ClockType > finish, std::chrono::time_point< ClockType > start)
Definition: Time.h:85
auto f
flags
Definition: http_parser.h:127
unique_ptr< IOBuf > encode(vector< HPACKHeader > &headers, HPACKEncoder &encoder)
std::string str() const
Definition: Range.h:591
char b
LogLevel max
Definition: LogLevel.cpp:31
virtual void recvAck(std::unique_ptr< Ack >)=0
TokenBindingMessage decode(folly::io::Cursor &cursor)
Definition: Types.cpp:132
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
size_type find(const_range_type str) const
Definition: Range.h:721
virtual std::pair< FrameFlags, std::unique_ptr< folly::IOBuf > > encode(bool newPacket, std::vector< compress::Header > allHeaders, SimStats &stats)=0
STL namespace.
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
std::vector< compress::Header > prepareMessageForCompression(const HTTPMessage &msg, std::vector< string > &cookies)
requires E e noexcept(noexcept(s.error(std::move(e))))
TimePoint getStartTime() const
Definition: HTTPMessage.h:622
LogLevel min
Definition: LogLevel.cpp:30
static void run(EventBaseManager *ebm, EventBase *eb, folly::Baton<> *stop, const StringPiece &name)
char a
static const char *const value
Definition: Conv.cpp:50
SteadyClock::time_point TimePoint
Definition: Time.h:25
std::chrono::milliseconds decodeDelay
virtual void decode(FrameFlags flags, std::unique_ptr< folly::IOBuf > encodedReq, SimStats &stats, SimStreamingCallback &cb)=0
const char * string
Definition: Conv.cpp:212
folly::StringPiece getContainingDirectory(folly::StringPiece input)
Definition: TestUtils.h:33
std::vector< HTTPMessage > requests
Definition: HTTPArchive.h:24
S fromFile(File file, size_t bufferSize=4096)
Definition: File.h:38
bool endsWith(const const_range_type &other) const
Definition: Range.h:849