| 1 | /*
 | 
|---|
| 2 |  * OperationQueue.hpp
 | 
|---|
| 3 |  *
 | 
|---|
| 4 |  *  Created on: Apr 24, 2012
 | 
|---|
| 5 |  *      Author: heber
 | 
|---|
| 6 |  */
 | 
|---|
| 7 | 
 | 
|---|
| 8 | #ifndef OPERATIONQUEUE_HPP_
 | 
|---|
| 9 | #define OPERATIONQUEUE_HPP_
 | 
|---|
| 10 | 
 | 
|---|
| 11 | 
 | 
|---|
| 12 | // include config.h
 | 
|---|
| 13 | #ifdef HAVE_CONFIG_H
 | 
|---|
| 14 | #include <config.h>
 | 
|---|
| 15 | #endif
 | 
|---|
| 16 | 
 | 
|---|
| 17 | #include <boost/shared_ptr.hpp>
 | 
|---|
| 18 | #include <deque>
 | 
|---|
| 19 | 
 | 
|---|
| 20 | #include "CodePatterns/Observer/Observer.hpp"
 | 
|---|
| 21 | 
 | 
|---|
| 22 | class AsyncOperation;
 | 
|---|
| 23 | class Observer;
 | 
|---|
| 24 | class OperationQueueTest;
 | 
|---|
| 25 | class WorkerAddress;
 | 
|---|
| 26 | 
 | 
|---|
| 27 | /** This class is a container for \ref AsyncOperation's that are kept as shared_ptr
 | 
|---|
| 28 |  * and removed when the operation is done.
 | 
|---|
| 29 |  */
 | 
|---|
| 30 | class OperationQueue : public Observer
 | 
|---|
| 31 | {
 | 
|---|
| 32 |   //!> grant unit test access to private part
 | 
|---|
| 33 |   friend class OperationQueueTest;
 | 
|---|
| 34 | public:
 | 
|---|
| 35 |   /** Default constructor for class OperationQueue.
 | 
|---|
| 36 |    *
 | 
|---|
| 37 |    */
 | 
|---|
| 38 |   OperationQueue() :
 | 
|---|
| 39 |     Observer("OperationQueue"),
 | 
|---|
| 40 |     RunningOps(0),
 | 
|---|
| 41 |     IsBlockedFlag(false)
 | 
|---|
| 42 |   {}
 | 
|---|
| 43 |   /** Default destructor for class OperationQueue.
 | 
|---|
| 44 |    *
 | 
|---|
| 45 |    */
 | 
|---|
| 46 |   virtual ~OperationQueue()
 | 
|---|
| 47 |   {}
 | 
|---|
| 48 | 
 | 
|---|
| 49 |   typedef boost::shared_ptr<AsyncOperation> AsyncOp_ptr;
 | 
|---|
| 50 | 
 | 
|---|
| 51 |   /** Add an operation to the internal queue and hand over memory responsibility to it,
 | 
|---|
| 52 |    * also the operation is run.
 | 
|---|
| 53 |    *
 | 
|---|
| 54 |    * @param op operation to add, is NULL on return.
 | 
|---|
| 55 |    */
 | 
|---|
| 56 |   void push_back(AsyncOperation *&op, const WorkerAddress &address);
 | 
|---|
| 57 | 
 | 
|---|
| 58 |   /** States whether the queue is empty.
 | 
|---|
| 59 |    *
 | 
|---|
| 60 |    * @return true - queue is empty, false - operations are pending
 | 
|---|
| 61 |    */
 | 
|---|
| 62 |   bool empty() const {
 | 
|---|
| 63 |     return queue.empty();
 | 
|---|
| 64 |   }
 | 
|---|
| 65 | 
 | 
|---|
| 66 |   /** Blocks the queue for any further operation.
 | 
|---|
| 67 |    *
 | 
|---|
| 68 |    */
 | 
|---|
| 69 |   void block()
 | 
|---|
| 70 |   {
 | 
|---|
| 71 |     IsBlockedFlag = true;
 | 
|---|
| 72 |   }
 | 
|---|
| 73 | 
 | 
|---|
| 74 |   /** Unblocks the queue, operations may again be pushed.
 | 
|---|
| 75 |    *
 | 
|---|
| 76 |    */
 | 
|---|
| 77 |   void unblock()
 | 
|---|
| 78 |   {
 | 
|---|
| 79 |     IsBlockedFlag = false;
 | 
|---|
| 80 |   }
 | 
|---|
| 81 | 
 | 
|---|
| 82 |   /** Getter whether queue is currently blocked.
 | 
|---|
| 83 |    *
 | 
|---|
| 84 |    * @return OperationQueue::IsBlockedFlag
 | 
|---|
| 85 |    */
 | 
|---|
| 86 |   bool isBlocked() const
 | 
|---|
| 87 |   {
 | 
|---|
| 88 |     return IsBlockedFlag;
 | 
|---|
| 89 |   }
 | 
|---|
| 90 | 
 | 
|---|
| 91 |   void update(Observable *publisher);
 | 
|---|
| 92 |   void recieveNotification(Observable *publisher, Notification_ptr notification);
 | 
|---|
| 93 |   void subjectKilled(Observable *publisher);
 | 
|---|
| 94 | 
 | 
|---|
| 95 | private:
 | 
|---|
| 96 |   /** Removes an operation from the queue.
 | 
|---|
| 97 |    *
 | 
|---|
| 98 |    * @param op operation to remove from queue
 | 
|---|
| 99 |    * @param observer observer to sign off from operation, NULL if none to sign off
 | 
|---|
| 100 |    */
 | 
|---|
| 101 |   void remove(AsyncOperation *op, Observer *observer);
 | 
|---|
| 102 | 
 | 
|---|
| 103 |   /** Returns the number of currently running operations.
 | 
|---|
| 104 |    *
 | 
|---|
| 105 |    * @return Gives the difference between the entries in the queue and in the AddressMap.
 | 
|---|
| 106 |    */
 | 
|---|
| 107 |   size_t getNumberOfRunningOps() const
 | 
|---|
| 108 |   {
 | 
|---|
| 109 |     return queue.size() - AddressMap.size();
 | 
|---|
| 110 |   }
 | 
|---|
| 111 | 
 | 
|---|
| 112 |   /** Helper to launch the next pending operation.
 | 
|---|
| 113 |    *
 | 
|---|
| 114 |    */
 | 
|---|
| 115 |   void LaunchNextOp();
 | 
|---|
| 116 | 
 | 
|---|
| 117 |   //!> internal operation to send jobs to workers
 | 
|---|
| 118 |   typedef std::deque<AsyncOp_ptr> OperationQueue_t;
 | 
|---|
| 119 | 
 | 
|---|
| 120 |   /** Tiny Helper function to find an operation inside FragmentScheduler::OperationQueue.
 | 
|---|
| 121 |    *
 | 
|---|
| 122 |    * @param op operation to remove from queue
 | 
|---|
| 123 |    * @return iterator to element or to OperationQueue.end()
 | 
|---|
| 124 |    */
 | 
|---|
| 125 |   OperationQueue_t::iterator findOperation(AsyncOperation *op);
 | 
|---|
| 126 | 
 | 
|---|
| 127 |   //!> internal number stating how many operations are running
 | 
|---|
| 128 |   size_t RunningOps;
 | 
|---|
| 129 | 
 | 
|---|
| 130 |   //!> giving the maximum number of connections
 | 
|---|
| 131 |   static size_t max_connections;
 | 
|---|
| 132 | 
 | 
|---|
| 133 | private:
 | 
|---|
| 134 |   //!> internal queue with operations
 | 
|---|
| 135 |   OperationQueue_t queue;
 | 
|---|
| 136 | 
 | 
|---|
| 137 |   //!> typedef for the association for each operation to its address to connect to
 | 
|---|
| 138 |   typedef std::map<AsyncOp_ptr, WorkerAddress> AddressMap_t;
 | 
|---|
| 139 |   //!> Association for each operation to its address to connect to
 | 
|---|
| 140 |   AddressMap_t AddressMap;
 | 
|---|
| 141 | 
 | 
|---|
| 142 |   //!> status flag whether queue is blocked or operations may be pushed.
 | 
|---|
| 143 |   bool IsBlockedFlag;
 | 
|---|
| 144 | };
 | 
|---|
| 145 | 
 | 
|---|
| 146 | #endif /* OPERATIONQUEUE_HPP_ */
 | 
|---|