Page Speed Optimization Libraries  1.13.35.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
queued_worker_pool.h
Go to the documentation of this file.
1 /*
2  * Copyright 2011 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http:///www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 
25 #ifndef PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_
26 #define PAGESPEED_KERNEL_THREAD_QUEUED_WORKER_POOL_H_
27 
28 #include <cstddef>
29 #include <deque>
30 #include <set>
31 #include <vector>
32 
39 #include "pagespeed/kernel/base/thread_annotations.h"
42 
43 namespace net_instaweb {
44 
45 class QueuedWorker;
46 class Waveform;
47 
51  public:
52  static const int kNoLoadShedding = -1;
53 
54  QueuedWorkerPool(int max_workers, StringPiece thread_name_base,
55  ThreadSystem* thread_system);
57 
66  public:
71  class AddFunction : public Function {
72  public:
73  AddFunction(net_instaweb::Sequence* sequence, Function* callback)
74  : sequence_(sequence), callback_(callback) { }
75  virtual ~AddFunction();
76 
77  protected:
78  virtual void Run() {
79  sequence_->Add(callback_);
80  }
81  virtual void Cancel() {
82  sequence_->Add(MakeFunction(callback_, &Function::CallCancel));
83  }
84 
85  private:
86  net_instaweb::Sequence* sequence_;
87  Function* callback_;
88 
89  };
90 
103  void Add(Function* function) LOCKS_EXCLUDED(sequence_mutex_);
104 
105  void set_queue_size_stat(Waveform* x) { queue_size_ = x; }
106 
110  void set_max_queue_size(size_t x) { max_queue_size_ = x; }
111 
113  void CancelPendingFunctions() LOCKS_EXCLUDED(sequence_mutex_);
114 
115  private:
117  Sequence(ThreadSystem* thread_system, QueuedWorkerPool* pool);
118 
120  ~Sequence();
121 
123  void Reset();
124 
131  void WaitForShutDown() LOCKS_EXCLUDED(sequence_mutex_);
132 
136  bool InitiateShutDown() LOCKS_EXCLUDED(sequence_mutex_);
137 
140  Function* NextFunction() LOCKS_EXCLUDED(sequence_mutex_);
141 
142  bool IsBusy() EXCLUSIVE_LOCKS_REQUIRED(sequence_mutex_);
143 
145  int CancelTasksOnWorkQueue() EXCLUSIVE_LOCKS_REQUIRED(sequence_mutex_);
146 
148  void Cancel() LOCKS_EXCLUDED(sequence_mutex_);
149 
150  friend class QueuedWorkerPool;
151  std::deque<Function*> work_queue_ GUARDED_BY(sequence_mutex_);
152  scoped_ptr<ThreadSystem::CondvarCapableMutex> sequence_mutex_;
153  QueuedWorkerPool* pool_;
154  bool shutdown_ GUARDED_BY(sequence_mutex_);
155  bool active_ GUARDED_BY(sequence_mutex_);
156  scoped_ptr<ThreadSystem::Condvar> termination_condvar_
157  GUARDED_BY(sequence_mutex_);
158  Waveform* queue_size_;
159  size_t max_queue_size_;
160 
161 
162  };
163 
164  typedef std::set<Sequence*> SequenceSet;
165 
170  Sequence* NewSequence();
171 
174  void FreeSequence(Sequence* sequence);
175 
180  void ShutDown();
181 
186  void InitiateShutDown();
187 
191 
206  static bool AreBusy(const SequenceSet& sequences);
207 
217  void SetLoadSheddingThreshold(int x);
218 
222  void set_queue_size_stat(Waveform* x) { queue_size_ = x; }
223 
224  private:
225  friend class Sequence;
226  void Run(Sequence* sequence, QueuedWorker* worker);
227  void QueueSequence(Sequence* sequence);
228  Sequence* AssignWorkerToNextSequence(QueuedWorker* worker);
229  void SequenceNoLongerActive(Sequence* sequence);
230 
231  ThreadSystem* thread_system_;
233 
235  std::set<QueuedWorker*> active_workers_;
236  std::vector<QueuedWorker*> available_workers_;
237 
240  std::vector<Sequence*> all_sequences_;
241  std::deque<Sequence*> queued_sequences_;
242  std::vector<Sequence*> free_sequences_;
243 
244  GoogleString thread_name_base_;
245 
246  size_t max_workers_;
247  bool shutdown_;
248 
249  Waveform* queue_size_;
250  int load_shedding_threshold_;
251 
252 
253 };
254 
255 }
256 
257 #endif
Function * MakeFunction(C *object, void(C::*run)())
Makes a Function* that calls a 0-arg class method.
Definition: function.h:291
virtual void Cancel()
Definition: queued_worker_pool.h:81
Definition: sequence.h:33
Sequence * NewSequence()
Returns NULL if shutting down.
Definition: waveform.h:45
Definition: scoped_ptr.h:30
Definition: function.h:47
static bool AreBusy(const SequenceSet &sequences)
std::string GoogleString
PAGESPEED_KERNEL_BASE_STRING_H_.
Definition: string.h:24
See file comment.
Definition: queued_worker.h:35
Definition: queued_worker_pool.h:50
virtual void Run()
Definition: queued_worker_pool.h:78
virtual void Add(Function *function)=0
Definition: queued_worker_pool.h:65
void set_max_queue_size(size_t x)
Definition: queued_worker_pool.h:110
Definition: thread_system.h:40
void Add(Function *function) LOCKS_EXCLUDED(sequence_mutex_)
void FreeSequence(Sequence *sequence)
void CancelPendingFunctions() LOCKS_EXCLUDED(sequence_mutex_)
Calls Cancel on all pending functions in the queue.