proxygen
BroadcastHandler-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 namespace wangle {
19 
20 template <typename T, typename R>
22  onData(data);
23  forEachSubscriber([&](Subscriber<T, R>* s) {
24  s->onNext(data);
25  });
26 }
27 
28 template <typename T, typename R>
30  forEachSubscriber([&](Subscriber<T, R>* s) {
31  s->onCompleted();
32  });
33  subscribers_.clear();
34  closeIfIdle();
35 }
36 
37 template <typename T, typename R>
40  LOG(ERROR) << "Error while reading from upstream for broadcast: "
41  << exceptionStr(ex);
42 
43  forEachSubscriber([&](Subscriber<T, R>* s) {
44  s->onError(ex);
45  });
46  subscribers_.clear();
47  closeIfIdle();
48 }
49 
50 template <typename T, typename R>
52  auto subscriptionId = nextSubscriptionId_++;
53  subscribers_[subscriptionId] = subscriber;
54  onSubscribe(subscriber);
55  return subscriptionId;
56 }
57 
58 template <typename T, typename R>
60  auto iter = subscribers_.find(subscriptionId);
61  if (iter == subscribers_.end()) {
62  return;
63  }
64 
65  onUnsubscribe(iter->second);
66  subscribers_.erase(iter);
67  closeIfIdle();
68 }
69 
70 template <typename T, typename R>
72  if (subscribers_.empty()) {
73  // No more subscribers. Clean up.
74  // This will delete the broadcast from the pool.
75  this->close(this->getContext());
76  }
77 }
78 
79 template <typename T, typename R>
81  static std::atomic<uint64_t> identifierCounter{42};
82  return identifier_ ? identifier_ : (identifier_ = ++identifierCounter);
83 }
84 
85 } // namespace wangle
void readEOF(Context *ctx) override
#define T(v)
Definition: http_parser.c:233
virtual void onError(folly::exception_wrapper ex)=0
HandlerAdapter< T, std::unique_ptr< folly::IOBuf > >::Context Context
fbstring exceptionStr(const std::exception &e)
void read(Context *ctx, T data) override
virtual void onNext(const T &)=0
virtual uint64_t subscribe(Subscriber< T, R > *subscriber)
virtual void unsubscribe(uint64_t subscriptionId)
void readException(Context *ctx, folly::exception_wrapper ex) override
virtual void onCompleted()=0
static set< string > s
int close(NetworkSocket s)
Definition: NetOps.cpp:90
static constexpr uint64_t data[1]
Definition: Fingerprint.cpp:43