proxygen
ObservingHandler-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 namespace wangle {
19 
20 template <typename T, typename R, typename P>
22  const R& routingData,
23  BroadcastPool<T, R, P>* broadcastPool)
24  : routingData_(routingData), broadcastPool_(CHECK_NOTNULL(broadcastPool)) {}
25 
26 template <typename T, typename R, typename P>
28  if (broadcastHandler_) {
29  auto broadcastHandler = broadcastHandler_;
30  broadcastHandler_ = nullptr;
31  broadcastHandler->unsubscribe(subscriptionId_);
32  }
33 
34  if (deleted_) {
35  *deleted_ = true;
36  }
37 }
38 
39 template <typename T, typename R, typename P>
41  if (broadcastHandler_) {
42  // Already connected
43  return;
44  }
45 
46  // Pause ingress until the remote connection is established and
47  // broadcast handler is ready
48  auto pipeline = dynamic_cast<ObservingPipeline<T>*>(ctx->getPipeline());
49  CHECK(pipeline);
50  pipeline->transportInactive();
51 
52  auto deleted = deleted_;
53  broadcastPool_->getHandler(routingData_)
54  .thenValue(
55  [this, pipeline, deleted](BroadcastHandler<T, R>* broadcastHandler) {
56  if (*deleted) {
57  return;
58  }
59 
60  broadcastHandler_ = broadcastHandler;
61  subscriptionId_ = broadcastHandler_->subscribe(this);
62  VLOG(10) << "Subscribed to a broadcast";
63 
64  // Resume ingress
65  pipeline->transportActive();
66  })
67  .onError([this, ctx, deleted](const std::exception& ex) {
68  if (*deleted) {
69  return;
70  }
71 
72  LOG(ERROR) << "Error subscribing to a broadcast: " << ex.what();
73  this->close(ctx);
74  });
75 }
76 
77 template <typename T, typename R, typename P>
79  this->close(ctx);
80 }
81 
82 template <typename T, typename R, typename P>
84  Context* ctx,
86  LOG(ERROR) << "Error on read: " << exceptionStr(ex);
87  this->close(ctx);
88 }
89 
90 template <typename T, typename R, typename P>
92  auto ctx = this->getContext();
93  auto deleted = deleted_;
94  this->write(ctx, data)
95  .onError([this, ctx, deleted](const std::exception& ex) {
96  if (*deleted) {
97  return;
98  }
99 
100  LOG(ERROR) << "Error on write: " << ex.what();
101  this->close(ctx);
102  });
103 }
104 
105 template <typename T, typename R, typename P>
107  LOG(ERROR) << "Error observing a broadcast: " << exceptionStr(ex);
108 
109  // broadcastHandler_ will clear its subscribers and delete itself
110  broadcastHandler_ = nullptr;
111  this->close(this->getContext());
112 }
113 
114 template <typename T, typename R, typename P>
116  // broadcastHandler_ will clear its subscribers and delete itself
117  broadcastHandler_ = nullptr;
118  this->close(this->getContext());
119 }
120 
121 template <typename T, typename R, typename P>
123  return routingData_;
124 }
125 
126 } // namespace wangle
void readEOF(Context *ctx) override
#define T(v)
Definition: http_parser.c:233
std::shared_ptr< bool > deleted_
HandlerAdapter< folly::IOBufQueue &, T >::Context Context
BroadcastHandler< T, R > * broadcastHandler_
fbstring exceptionStr(const std::exception &e)
void onError(folly::exception_wrapper ex) override
ObservingHandler(const R &routingData, BroadcastPool< T, R, P > *broadcastPool)
folly::Future< folly::Unit > write(Context *ctx, Tmsg) override
Definition: Handler.h:167
HandlerContext< folly::IOBufQueue &, T > * getContext()
Definition: Handler.h:34
BroadcastPool< T, R, P > * broadcastPool_
virtual folly::Future< folly::Unit > close(Context *ctx)
Definition: Handler.h:79
void onNext(const T &buf) override
void transportActive(Context *ctx) override
void readException(Context *ctx, folly::exception_wrapper ex) override
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43