Page Speed Optimization Libraries  1.13.35.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
rpc_handler.h
Go to the documentation of this file.
1 #ifndef PAGESPEED_CONTROLLER_RPC_HANDLER_H_
2 #define PAGESPEED_CONTROLLER_RPC_HANDLER_H_
3 
4 // Copyright 2016 Google Inc.
19 
20 #include "base/logging.h"
21 #include "base/macros.h"
25 
26 namespace net_instaweb {
27 
46 
54 template <typename AsyncService, typename RequestT, typename ResponseT>
56  : public RefCounted<RpcHandler<AsyncService, RequestT, ResponseT>> {
57  public:
58  virtual ~RpcHandler() { }
59 
60  protected:
61  typedef ::grpc::ServerAsyncReaderWriter<ResponseT, RequestT> ReaderWriterT;
62 
63  RpcHandler(AsyncService* service, ::grpc::ServerCompletionQueue* cq);
64 
73  bool Write(const ResponseT& resp);
74 
77  bool Finish(const ::grpc::Status& status);
78 
83  void Start();
84 
85  private:
86  enum State {
87  INIT,
88  WAITING_FOR_FIRST_READ,
89  RUNNING,
90  FINISHED,
91  };
92 
94  RefPtrT;
95 
97  virtual void HandleRequest(const RequestT& req) = 0;
98 
104  virtual void HandleError() = 0;
105 
107  virtual void HandleWriteDone() { }
108 
114  virtual void InitResponder(AsyncService* service, ::grpc::ServerContext* ctx,
115  ReaderWriterT* responder,
116  ::grpc::ServerCompletionQueue* cq,
117  void* tag) = 0;
118 
122  virtual RpcHandler* CreateHandler(AsyncService* service,
123  ::grpc::ServerCompletionQueue* cq) = 0;
124 
128  void InitDone(RefPtrT r);
129  void AttemptRead(RefPtrT r);
130  void ReadDone(RefPtrT r);
131  void FinishDone(RefPtrT r);
132  void WriteDone(RefPtrT r);
133  void CallHandleError(RefPtrT r);
134 
137  bool IsClientWriteable() const;
138 
139  AsyncService* service_;
140  ::grpc::ServerCompletionQueue* cq_;
141  ::grpc::ServerContext ctx_;
142  ReaderWriterT responder_;
143 
146  RequestT request_;
147 
148  State state_;
149  bool write_outstanding_;
150 
151 
152 };
153 
154 template <typename AsyncService, typename RequestT, typename ResponseT>
156  AsyncService* service, ::grpc::ServerCompletionQueue* cq)
157  : service_(service),
158  cq_(cq),
159  responder_(&ctx_),
160  state_(INIT),
161  write_outstanding_(false) {
165 }
166 
167 template <typename AsyncService, typename RequestT, typename ResponseT>
172  InitResponder(service_, &ctx_, &responder_, cq_,
173  MakeFunction(this, &RpcHandler::InitDone,
174  &RpcHandler::CallHandleError, RefPtrT(this)));
175 }
176 
177 template <typename AsyncService, typename RequestT, typename ResponseT>
179  CreateHandler(service_, cq_)->Start();
180 
181  if (state_ != FINISHED) {
182  state_ = WAITING_FOR_FIRST_READ;
185  AttemptRead(ref);
186  } else {
196  }
197 }
198 
199 template <typename AsyncService, typename RequestT, typename ResponseT>
200 void RpcHandler<AsyncService, RequestT, ResponseT>::AttemptRead(RefPtrT ref) {
201  if (state_ != WAITING_FOR_FIRST_READ) {
202  DCHECK_EQ(state_, RUNNING);
203  }
204 
205  if (state_ != FINISHED) {
206  responder_.Read(&request_, MakeFunction(this, &RpcHandler::ReadDone,
207  &RpcHandler::CallHandleError, ref));
208  }
209 }
210 
211 template <typename AsyncService, typename RequestT, typename ResponseT>
212 void RpcHandler<AsyncService, RequestT, ResponseT>::ReadDone(RefPtrT ref) {
213  if (state_ == WAITING_FOR_FIRST_READ) {
214  state_ = RUNNING;
215  }
216 
217  HandleRequest(request_);
218  request_.Clear();
219 
220  if (state_ != FINISHED) {
221  AttemptRead(ref);
222  }
223 }
224 
225 template <typename AsyncService, typename RequestT, typename ResponseT>
227  const ::grpc::Status& status) {
228  if (state_ == FINISHED) {
229  return false;
230  }
231  if (IsClientWriteable()) {
232  responder_.Finish(
233  status, MakeFunction(this, &RpcHandler::FinishDone,
234  &RpcHandler::CallHandleError, RefPtrT(this)));
235  }
236  state_ = FINISHED;
237  return true;
238 }
239 
240 template <typename AsyncService, typename RequestT, typename ResponseT>
242  const ResponseT& response) {
243  if (IsClientWriteable() && !write_outstanding_) {
244  write_outstanding_ = true;
245  responder_.Write(response,
246  MakeFunction(this, &RpcHandler::WriteDone,
247  &RpcHandler::CallHandleError, RefPtrT(this)));
248  return true;
249  } else {
250  return false;
251  }
252 }
253 
254 template <typename AsyncService, typename RequestT, typename ResponseT>
256  write_outstanding_ = false;
257  HandleWriteDone();
258 }
259 
260 template <typename AsyncService, typename RequestT, typename ResponseT>
261 void RpcHandler<AsyncService, RequestT, ResponseT>::FinishDone(RefPtrT ref) {
262  DCHECK_EQ(state_, FINISHED);
264 }
265 
266 template <typename AsyncService, typename RequestT, typename ResponseT>
267 void RpcHandler<AsyncService, RequestT, ResponseT>::CallHandleError(
268  RefPtrT ref) {
272  if (state_ == RUNNING) {
273  HandleError();
274  }
275  state_ = FINISHED;
277 }
278 
279 template <typename AsyncService, typename RequestT, typename ResponseT>
280 bool RpcHandler<AsyncService, RequestT, ResponseT>::IsClientWriteable() const {
281  return state_ == WAITING_FOR_FIRST_READ || state_ == RUNNING;
282 }
283 
284 }
285 
286 #endif
Function * MakeFunction(C *object, void(C::*run)())
Makes a Function* that calls a 0-arg class method.
Definition: function.h:291
RpcHandler(AsyncService *service,::grpc::ServerCompletionQueue *cq)
Definition: rpc_handler.h:155
void Start()
Definition: rpc_handler.h:168
Definition: ref_counted_ptr.h:41
Definition: rpc_handler.h:55
bool Finish(const ::grpc::Status &status)
Definition: rpc_handler.h:226
Definition: ref_counted_ptr.h:69
bool Write(const ResponseT &resp)
Definition: rpc_handler.h:241