proxygen
Main Page
Related Pages
Namespaces
Classes
Files
Examples
File List
File Members
OutputBufferingHandler.h
Go to the documentation of this file.
1
/*
2
* Copyright 2017-present Facebook, Inc.
3
*
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
7
*
8
* http://www.apache.org/licenses/LICENSE-2.0
9
*
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
15
*/
16
17
#pragma once
18
19
#include <
folly/futures/SharedPromise.h
>
20
#include <
folly/io/IOBuf.h
>
21
#include <
folly/io/IOBufQueue.h
>
22
#include <
folly/io/async/EventBase.h
>
23
#include <
folly/io/async/EventBaseManager.h
>
24
#include <
wangle/channel/Handler.h
>
25
26
namespace
wangle
{
27
28
/*
29
* OutputBufferingHandler buffers writes in order to minimize syscalls. The
30
* transport will be written to once per event loop instead of on every write.
31
*
32
* This handler may only be used in a single Pipeline.
33
*/
34
class
OutputBufferingHandler
:
public
OutboundBytesToBytesHandler
,
35
protected
folly::EventBase::LoopCallback
{
36
public
:
37
folly::Future<folly::Unit>
write
(
38
Context
* ctx,
39
std::unique_ptr<folly::IOBuf> buf)
override
{
40
CHECK(buf);
41
if
(!
queueSends_
) {
42
return
ctx->
fireWrite
(
std::move
(buf));
43
}
else
{
44
// Delay sends to optimize for fewer syscalls
45
if
(!
sends_
) {
46
DCHECK(!
isLoopCallbackScheduled
());
47
// Buffer all the sends, and call writev once per event loop.
48
sends_
=
std::move
(buf);
49
ctx->
getTransport
()->getEventBase()->runInLoop(
this
);
50
}
else
{
51
DCHECK(
isLoopCallbackScheduled
());
52
sends_
->prependChain(
std::move
(buf));
53
}
54
return
sharedPromise_
.
getFuture
();
55
}
56
}
57
58
void
runLoopCallback
()
noexcept
override
{
59
folly::SharedPromise<folly::Unit>
sharedPromise;
60
std::swap
(sharedPromise,
sharedPromise_
);
61
getContext
()
62
->
fireWrite
(
std::move
(
sends_
))
63
.thenTry([sharedPromise =
std::move
(sharedPromise)](
64
folly::Try<folly::Unit>
t
)
mutable
{
65
sharedPromise.
setTry
(
std::move
(t));
66
});
67
}
68
69
void
cleanUp
() {
70
if
(
isLoopCallbackScheduled
()) {
71
cancelLoopCallback
();
72
}
73
74
sends_
.reset();
75
sharedPromise_
=
folly::SharedPromise<folly::Unit>
();
76
}
77
78
folly::Future<folly::Unit>
close
(
Context
* ctx)
override
{
79
if
(
isLoopCallbackScheduled
()) {
80
cancelLoopCallback
();
81
}
82
83
// If there are sends queued, cancel them
84
sharedPromise_
.
setException
(
85
folly::make_exception_wrapper<std::runtime_error>(
86
"close() called while sends still pending"
));
87
sends_
.reset();
88
sharedPromise_
=
folly::SharedPromise<folly::Unit>
();
89
return
ctx->
fireClose
();
90
}
91
92
folly::SharedPromise<folly::Unit>
sharedPromise_
;
93
std::unique_ptr<folly::IOBuf>
sends_
{
nullptr
};
94
bool
queueSends_
{
true
};
95
};
96
97
}
// namespace wangle
folly::SharedPromise::getFuture
Future< T > getFuture()
Definition:
SharedPromise-inl.h:72
wangle::OutboundHandlerContext::fireWrite
virtual folly::Future< folly::Unit > fireWrite(Out msg)=0
folly::EventBase::LoopCallback
Definition:
EventBase.h:148
IOBuf.h
IOBufQueue.h
folly::gen::move
constexpr detail::Map< Move > move
Definition:
Base-inl.h:2567
wangle::OutputBufferingHandler::sends_
std::unique_ptr< folly::IOBuf > sends_
Definition:
OutputBufferingHandler.h:93
folly::pushmi::__adl::noexcept
requires E e noexcept(noexcept(s.error(std::move(e))))
Definition:
extension_points.h:40
folly::SharedPromise::setTry
void setTry(Try< T > &&t)
Definition:
SharedPromise-inl.h:114
wangle::OutputBufferingHandler::cleanUp
void cleanUp()
Definition:
OutputBufferingHandler.h:69
wangle
Definition:
Acceptor.cpp:49
wangle::HandlerBase< OutboundHandlerContext< Wout > >::getContext
OutboundHandlerContext< Wout > * getContext()
Definition:
Handler.h:34
wangle::OutputBufferingHandler
Definition:
OutputBufferingHandler.h:34
folly::SharedPromise::setException
void setException(exception_wrapper ew)
Definition:
SharedPromise-inl.h:84
wangle::OutboundHandlerContext< Wout >
folly::EventBase::LoopCallback::isLoopCallbackScheduled
bool isLoopCallbackScheduled() const
Definition:
EventBase.h:160
wangle::OutputBufferingHandler::runLoopCallback
void runLoopCallback() noexceptoverride
Definition:
OutputBufferingHandler.h:58
folly::EventBase::LoopCallback::cancelLoopCallback
void cancelLoopCallback()
Definition:
EventBase.h:155
wangle::OutputBufferingHandler::sharedPromise_
folly::SharedPromise< folly::Unit > sharedPromise_
Definition:
OutputBufferingHandler.h:92
folly::SharedPromise< folly::Unit >
EventBaseManager.h
wangle::OutboundHandler
Definition:
Handler.h:137
wangle::OutputBufferingHandler::close
folly::Future< folly::Unit > close(Context *ctx) override
Definition:
OutputBufferingHandler.h:78
folly::Future
Definition:
FiberManagerInternal.h:46
folly::Try< folly::Unit >
wangle::OutputBufferingHandler::queueSends_
bool queueSends_
Definition:
OutputBufferingHandler.h:94
Handler.h
wangle::OutboundHandlerContext::fireClose
virtual folly::Future< folly::Unit > fireClose()=0
SharedPromise.h
wangle::OutputBufferingHandler::write
folly::Future< folly::Unit > write(Context *ctx, std::unique_ptr< folly::IOBuf > buf) override
Definition:
OutputBufferingHandler.h:37
folly::pushmi::detail::t
requires Tuple && t
Definition:
extension_operators.h:48
folly::f14::swap
void swap(SwapTrackingAlloc< T > &, SwapTrackingAlloc< T > &)
Definition:
F14TestUtil.h:414
EventBase.h
wangle::OutboundHandlerContext::getTransport
std::shared_ptr< folly::AsyncTransport > getTransport()
Definition:
HandlerContext.h:102
proxygen
wangle
wangle
channel
OutputBufferingHandler.h
Generated by
1.8.11