21 using namespace folly;
28 const size_t kMTU = 1400;
34 namespace proxygen {
namespace compress {
36 bool CompressionSimulator::readInputFromFileAndSchedule(
37 const string& filename) {
38 unique_ptr<HTTPArchive> har;
41 }
catch (
const std::exception& ex) {
44 if (!har || har->
requests.size() == 0) {
54 std::chrono::milliseconds cumulativeDelay(0);
60 if (delayFromPrevious > std::chrono::milliseconds(1000)) {
61 delayFromPrevious = std::chrono::milliseconds(1000);
64 cumulativeDelay += delayFromPrevious;
65 setupRequest(index++,
std::move(msg), cumulativeDelay);
67 for (
auto& kv : domains_) {
68 flushRequests(kv.second.get());
74 #ifndef HAVE_REAL_QMIN 75 if (params_.type == SchemeType::QMIN) {
76 LOG(
INFO) <<
"QMIN not available";
81 LOG(
INFO) <<
"Starting run";
84 for (
auto& scheme : domains_) {
85 holBlockCount += scheme.second->getHolBlockCount();
87 LOG(
INFO) <<
"Complete" 90 << params_.seed <<
"\nBlocks sent: " << requests_.size()
91 <<
"\nAllowed OOO: " << stats_.allowedOOO
92 <<
"\nPackets: " << stats_.packets
93 <<
"\nPacket Losses: " << stats_.packetLosses
94 <<
"\nHOL Block Count: " << holBlockCount
95 <<
"\nHOL Delay (ms): " << stats_.holDelay.count()
96 <<
"\nMax Queue Buffer Bytes: " << stats_.maxQueueBufferBytes
97 <<
"\nUncompressed Bytes: " << stats_.uncompressed
98 <<
"\nCompressed Bytes: " << stats_.compressed
99 <<
"\nCompression Ratio: " 100 << int(100 -
double(100 * stats_.compressed) / stats_.uncompressed);
104 VLOG(5) <<
"schedule encode for " << scheme->
packetIndices.size()
105 <<
" blocks at " << scheme->
prev.count();
109 bool newPacket = true;
110 while (!indices.empty()) {
111 int16_t index = indices.front();
113 auto schemeIndex = scheme->index;
114 auto encodeRes = encode(scheme, newPacket, index);
115 FrameFlags flags = encodeRes.first;
116 bool allowOOO = flags.allowOOO;
117 if (schemeIndex < minOOOThresh()) {
119 auto ack = scheme->getAck(schemeIndex);
121 scheme->recvAck(std::move(ack));
124 stats_.allowedOOO += (allowOOO) ? 1 : 0;
125 flags.allowOOO = allowOOO;
126 scheme->encodedBlocks.emplace_back(flags,
128 std::move(encodeRes.second),
132 eventBase_.runInLoop(scheme,
true);
137 void CompressionSimulator::setupRequest(
uint16_t index,
139 std::chrono::milliseconds encodeDelay) {
141 const auto& query = msg.getQueryString();
143 msg.setURL(msg.getPath());
145 msg.setURL(folly::to<string>(msg.getPath(),
"?", query));
148 auto scheme = getScheme(msg.getHeaders().getSingleOrEmpty(
HTTP_HEADER_HOST));
149 requests_.emplace_back(msg);
150 auto decodeCompleteCB =
151 [index,
this, scheme](std::chrono::milliseconds holDelay) {
153 CHECK(!callbacks_[index].getResult().hasError());
154 DCHECK_EQ(requests_[index], *callbacks_[index].getResult().
value());
155 stats_.holDelay += holDelay;
156 VLOG(1) <<
"Finished decoding request=" << index
157 <<
" with holDelay=" << holDelay.count()
158 <<
" cumulative HoL delay=" << stats_.holDelay.count();
159 sendAck(scheme, scheme->getAck(callbacks_[index].seqn));
161 callbacks_.emplace_back(index, decodeCompleteCB);
167 if (scheme->packetIndices.size() > 0) {
168 auto delayFromPrevious = encodeDelay - scheme->prev;
169 VLOG(1) <<
"request " << index <<
" delay " << delayFromPrevious.count();
170 if (delayFromPrevious > std::chrono::milliseconds(1)) {
171 flushRequests(scheme);
174 scheme->prev = encodeDelay;
175 scheme->packetIndices.push_back(index);
180 void CompressionScheme::runLoopCallback()
noexcept {
181 simulator_->flushSchemePackets(
this);
190 VLOG(1) <<
"schedule decode for " << scheme->
packetBlocks.size()
194 decodePacket(scheme, blocks);
202 VLOG(2) <<
"Flushing " << scheme->
encodedBlocks.size() <<
" requests";
205 bool newPacket = std::get<1>(*encodeRes);
206 size_t headerBlockBytesRemaining =
207 std::get<2>(*encodeRes)->computeChainDataLength();
208 std::chrono::milliseconds packetDelay = deliveryDelay();
216 if (scheme->
packetBytes + headerBlockBytesRemaining >= kMTU) {
218 VLOG(2) <<
"Request(s) spanned multiple packets";
223 headerBlockBytesRemaining -=
225 if (headerBlockBytesRemaining == 0) {
237 newPacket = std::get<1>(*encodeRes);
238 headerBlockBytesRemaining =
239 std::get<2>(*encodeRes)->computeChainDataLength();
242 packetDelay = deliveryDelay();
251 static string blended(
"\"Facebook\"");
257 auto it = domains_.
find(domain.
str());
259 if (it == domains_.end()) {
260 LOG(
INFO) <<
"Creating scheme for domain=" << domain;
261 auto schemePtr = makeScheme();
262 scheme = schemePtr.get();
265 scheme = it->second.get();
270 unique_ptr<CompressionScheme> CompressionSimulator::makeScheme() {
271 switch (params_.type) {
272 case SchemeType::QPACK:
273 return make_unique<QPACKScheme>(
this, params_.tableSize,
274 params_.maxBlocking);
275 case SchemeType::QMIN:
276 return make_unique<QMINScheme>(
this, params_.tableSize);
277 case SchemeType::HPACK:
278 return make_unique<HPACKScheme>(
this, params_.tableSize);
280 LOG(
FATAL) <<
"Bad scheme";
286 VLOG(1) <<
"Start encoding request=" << index;
288 vector<string> cookies;
290 requests_[index], cookies);
292 auto before = stats_.uncompressed;
294 VLOG(1) <<
"Encoded request=" << index <<
" for host=" 296 <<
" orig size=" << (stats_.uncompressed - before)
297 <<
" block size=" << res.second->computeChainDataLength()
298 <<
" cumulative bytes=" << stats_.compressed
299 <<
" cumulative compression ratio=" 300 << int(100 -
double(100 * stats_.compressed) / stats_.uncompressed);
306 unique_ptr<IOBuf> encodedReq,
311 void CompressionSimulator::decodePacket(
313 std::list<CompressionScheme::BlockInfo>& blocks) {
314 VLOG(1) <<
"decode packet with " << blocks.size() <<
" blocks";
315 while (!blocks.empty()) {
316 auto encodeRes = &blocks.front();
322 std::get<0>(*encodeRes),
324 *std::get<3>(*encodeRes));
330 std::chrono::milliseconds ms) {
331 eventBase_.runAfterDelay(
std::move(
f), ms.count());
335 unique_ptr<CompressionScheme::Ack> ack) {
343 scheme]()
mutable { recvAck(scheme,
std::move(
a)); },
348 unique_ptr<CompressionScheme::Ack> ack) {
352 std::chrono::milliseconds CompressionSimulator::deliveryDelay() {
353 std::chrono::milliseconds delay = one_half_rtt();
355 stats_.packetLosses++;
356 scheduleEvent([] { VLOG(4) <<
"Packet lost!"; }, delay);
357 std::chrono::milliseconds rxmit = rxmitDelay();
361 VLOG(4) <<
"Packet loss detected, retransmitting with additional " 364 delay - one_half_rtt());
367 scheduleEvent([] { VLOG(4) <<
"Packet delayed in network"; }, delay);
368 delay += extraDelay();
373 std::chrono::milliseconds CompressionSimulator::rtt() {
377 std::chrono::milliseconds CompressionSimulator::one_half_rtt() {
378 return params_.rtt / 2;
381 std::chrono::milliseconds CompressionSimulator::rxmitDelay() {
382 uint32_t ms = rtt().count() * Random::randDouble(1.1, 2, rng_);
383 return std::chrono::milliseconds(ms);
386 bool CompressionSimulator::loss() {
387 return Random::randDouble01(rng_) < params_.lossProbability;
390 bool CompressionSimulator::delayed() {
391 return Random::randDouble01(rng_) < params_.delayProbability;
394 std::chrono::milliseconds CompressionSimulator::extraDelay() {
395 uint32_t ms = params_.maxDelay.count() * Random::randDouble01(rng_);
396 return std::chrono::milliseconds(ms);
400 return params_.minOOOThresh;
std::chrono::milliseconds millisecondsBetween(std::chrono::time_point< ClockType > finish, std::chrono::time_point< ClockType > start)
std::chrono::milliseconds prev
unique_ptr< IOBuf > encode(vector< HPACKHeader > &headers, HPACKEncoder &encoder)
std::list< BlockInfo > packetBlocks
virtual void recvAck(std::unique_ptr< Ack >)=0
TokenBindingMessage decode(folly::io::Cursor &cursor)
fbstring exceptionStr(const std::exception &e)
constexpr detail::Map< Move > move
size_type find(const_range_type str) const
virtual std::pair< FrameFlags, std::unique_ptr< folly::IOBuf > > encode(bool newPacket, std::vector< compress::Header > allHeaders, SimStats &stats)=0
—— Concurrent Priority Queue Implementation ——
std::vector< compress::Header > prepareMessageForCompression(const HTTPMessage &msg, std::vector< string > &cookies)
requires E e noexcept(noexcept(s.error(std::move(e))))
TimePoint getStartTime() const
static void run(EventBaseManager *ebm, EventBase *eb, folly::Baton<> *stop, const StringPiece &name)
std::list< uint16_t > packetIndices
static const char *const value
SteadyClock::time_point TimePoint
std::chrono::milliseconds decodeDelay
virtual void decode(FrameFlags flags, std::unique_ptr< folly::IOBuf > encodedReq, SimStats &stats, SimStreamingCallback &cb)=0
std::list< BlockInfo > encodedBlocks
folly::StringPiece getContainingDirectory(folly::StringPiece input)
std::vector< HTTPMessage > requests
S fromFile(File file, size_t bufferSize=4096)
bool endsWith(const const_range_type &other) const