| 1 | /*
 | 
|---|
| 2 |  * Project: MoleCuilder
 | 
|---|
| 3 |  * Description: creates and alters molecular systems
 | 
|---|
| 4 |  * Copyright (C)  2011 University of Bonn. All rights reserved.
 | 
|---|
| 5 |  * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
 | 
|---|
| 6 |  */
 | 
|---|
| 7 | 
 | 
|---|
| 8 | /*
 | 
|---|
| 9 |  * WorkerPool.cpp
 | 
|---|
| 10 |  *
 | 
|---|
| 11 |  *  Created on: 22.02.2012
 | 
|---|
| 12 |  *      Author: heber
 | 
|---|
| 13 |  */
 | 
|---|
| 14 | 
 | 
|---|
| 15 | // include config.h
 | 
|---|
| 16 | #ifdef HAVE_CONFIG_H
 | 
|---|
| 17 | #include <config.h>
 | 
|---|
| 18 | #endif
 | 
|---|
| 19 | 
 | 
|---|
| 20 | // boost asio needs specific operator new
 | 
|---|
| 21 | #include <boost/asio.hpp>
 | 
|---|
| 22 | 
 | 
|---|
| 23 | #include "CodePatterns/MemDebug.hpp"
 | 
|---|
| 24 | 
 | 
|---|
| 25 | #include "WorkerPool.hpp"
 | 
|---|
| 26 | 
 | 
|---|
| 27 | #include "CodePatterns/Assert.hpp"
 | 
|---|
| 28 | #include "CodePatterns/Info.hpp"
 | 
|---|
| 29 | #include "CodePatterns/Log.hpp"
 | 
|---|
| 30 | #include "CodePatterns/Observer/Channels.hpp"
 | 
|---|
| 31 | #include "Connection.hpp"
 | 
|---|
| 32 | 
 | 
|---|
| 33 | WorkerPool::priority_t WorkerPool::default_priority = 0;
 | 
|---|
| 34 | WorkerAddress WorkerPool::emptyAddress("empty", "empty");
 | 
|---|
| 35 | 
 | 
|---|
| 36 | /** Constructor for class WorkerPool.
 | 
|---|
| 37 |  *
 | 
|---|
| 38 |  */
 | 
|---|
| 39 | WorkerPool::WorkerPool() :
 | 
|---|
| 40 |     Observable("WorkerPool")
 | 
|---|
| 41 | {
 | 
|---|
| 42 |   // observable stuff
 | 
|---|
| 43 |   Channels *OurChannel = new Channels;
 | 
|---|
| 44 |   NotificationChannels.insert( std::make_pair(this, OurChannel) );
 | 
|---|
| 45 |   // add instance for each notification type
 | 
|---|
| 46 |   for (size_t type = 0; type < NotificationType_MAX; ++type)
 | 
|---|
| 47 |     OurChannel->addChannel(type);
 | 
|---|
| 48 | }
 | 
|---|
| 49 | 
 | 
|---|
| 50 | /** Destructor for class WorkerPool.
 | 
|---|
| 51 |  *
 | 
|---|
| 52 |  */
 | 
|---|
| 53 | WorkerPool::~WorkerPool()
 | 
|---|
| 54 | {}
 | 
|---|
| 55 | 
 | 
|---|
| 56 | /** Helper function to check whether an address is already in the pool.
 | 
|---|
| 57 |  *
 | 
|---|
| 58 |  * @param address worker address to check
 | 
|---|
| 59 |  * @return true - address is present, false - else
 | 
|---|
| 60 |  */
 | 
|---|
| 61 | bool WorkerPool::presentInPool(const WorkerAddress &address) const
 | 
|---|
| 62 | {
 | 
|---|
| 63 |   return pool.find(address) != pool.end();
 | 
|---|
| 64 | }
 | 
|---|
| 65 | 
 | 
|---|
| 66 | /** Get address of next idle worker.
 | 
|---|
| 67 |  *
 | 
|---|
| 68 |  * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy()
 | 
|---|
| 69 |  *
 | 
|---|
| 70 |  * @return address of idle worker
 | 
|---|
| 71 |  */
 | 
|---|
| 72 | WorkerAddress WorkerPool::getNextIdleWorker()
 | 
|---|
| 73 | {
 | 
|---|
| 74 |   // get first idle worker
 | 
|---|
| 75 |   ASSERT( presentIdleWorkers(),
 | 
|---|
| 76 |       "WorkerPool::getNextIdleWorker() - there is no idle worker.");
 | 
|---|
| 77 |   if (!presentIdleWorkers())
 | 
|---|
| 78 |     return emptyAddress;
 | 
|---|
| 79 |   Idle_Queue_t::iterator iter = idle_queue.begin();
 | 
|---|
| 80 |   const WorkerAddress returnaddress = iter->second;
 | 
|---|
| 81 | 
 | 
|---|
| 82 |   // enter in busy queue
 | 
|---|
| 83 |   markWorkerBusy( iter );
 | 
|---|
| 84 | 
 | 
|---|
| 85 |   // return address
 | 
|---|
| 86 |   return returnaddress;
 | 
|---|
| 87 | }
 | 
|---|
| 88 | 
 | 
|---|
| 89 | WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address)
 | 
|---|
| 90 | {
 | 
|---|
| 91 |   Idle_Queue_t::iterator idleiter = idle_queue.begin();
 | 
|---|
| 92 |   while (idleiter != idle_queue.end()) {
 | 
|---|
| 93 |     if (idleiter->second == address) {
 | 
|---|
| 94 |       break;
 | 
|---|
| 95 |     }
 | 
|---|
| 96 |     ++idleiter;
 | 
|---|
| 97 |   }
 | 
|---|
| 98 |   return idleiter;
 | 
|---|
| 99 | }
 | 
|---|
| 100 | 
 | 
|---|
| 101 | /** Checks whether a worker is busy or not.
 | 
|---|
| 102 |  *
 | 
|---|
| 103 |  * @param address address of worker to check
 | 
|---|
| 104 |  */
 | 
|---|
| 105 | bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const
 | 
|---|
| 106 | {
 | 
|---|
| 107 |   Busy_Queue_t::const_iterator iter = busy_queue.find(address);
 | 
|---|
| 108 |   if (iter != busy_queue.end())
 | 
|---|
| 109 |     return true;
 | 
|---|
| 110 | #ifndef NDEBUG
 | 
|---|
| 111 |   else {
 | 
|---|
| 112 |     Idle_Queue_t::const_iterator iter = idle_queue.begin();
 | 
|---|
| 113 |     for(;iter != idle_queue.end(); ++iter)
 | 
|---|
| 114 |       if (iter->second == address)
 | 
|---|
| 115 |         break;
 | 
|---|
| 116 |     ASSERT( iter != idle_queue.end(),
 | 
|---|
| 117 |         "WorkerPool::isWorkerBusy() - worker "+toString(address)
 | 
|---|
| 118 |         +" is neither busy nor idle.");
 | 
|---|
| 119 | 
 | 
|---|
| 120 |   }
 | 
|---|
| 121 | #endif
 | 
|---|
| 122 |   return false;
 | 
|---|
| 123 | }
 | 
|---|
| 124 | 
 | 
|---|
| 125 | /** Adds another worker to the pool by noting down its address.
 | 
|---|
| 126 |  *
 | 
|---|
| 127 |  * @param address host and service address of the listening worker
 | 
|---|
| 128 |  * @return true - added successfully, false - not added
 | 
|---|
| 129 |  */
 | 
|---|
| 130 | bool WorkerPool::addWorker(const WorkerAddress& address)
 | 
|---|
| 131 | {
 | 
|---|
| 132 |   OBSERVE;
 | 
|---|
| 133 |   NOTIFY(WorkerAdded);
 | 
|---|
| 134 |   std::pair<Pool_t::iterator, bool> inserter =
 | 
|---|
| 135 |       pool.insert( address );
 | 
|---|
| 136 |   if (inserter.second) { // if new also add to queue
 | 
|---|
| 137 |     LOG(1, "INFO: Successfully added "+toString(address)+" to pool.");
 | 
|---|
| 138 |     idle_queue.insert( make_pair( default_priority, address ) );
 | 
|---|
| 139 |     NOTIFY(WorkerIdle);
 | 
|---|
| 140 |     return true;
 | 
|---|
| 141 |   } else {
 | 
|---|
| 142 |     LOG(1, "INFO: "+toString(address)+" is already present pool.");
 | 
|---|
| 143 |     return false;
 | 
|---|
| 144 |   }
 | 
|---|
| 145 | }
 | 
|---|
| 146 | 
 | 
|---|
| 147 | /** Removes a worker from the pool.
 | 
|---|
| 148 |  *
 | 
|---|
| 149 |  * @param address host and service address of the listening worker
 | 
|---|
| 150 |  * @return true - removed successfully, false - not removed
 | 
|---|
| 151 |  */
 | 
|---|
| 152 | bool WorkerPool::removeWorker(const WorkerAddress& address)
 | 
|---|
| 153 | {
 | 
|---|
| 154 |   Pool_t::iterator iter = pool.find( address );
 | 
|---|
| 155 |   if (iter != pool.end()) {
 | 
|---|
| 156 |     Idle_Queue_t::iterator idleiter = getIdleWorker(address);
 | 
|---|
| 157 |     if (idleiter != idle_queue.end())
 | 
|---|
| 158 |       idle_queue.erase(idleiter);
 | 
|---|
| 159 |     Busy_Queue_t::iterator busyiter = busy_queue.find(address);
 | 
|---|
| 160 |     if (busyiter != busy_queue.end())
 | 
|---|
| 161 |       busy_queue.erase(busyiter);
 | 
|---|
| 162 |     ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(),
 | 
|---|
| 163 |         "WorkerPool::removeWorker() - Worker "+toString(address)
 | 
|---|
| 164 |         +" is in pool but neither idle nor busy!");
 | 
|---|
| 165 |     ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()),
 | 
|---|
| 166 |         "WorkerPool::removeWorker() - Worker "+toString(address)
 | 
|---|
| 167 |         +" is in pool and both idle and busy!");
 | 
|---|
| 168 |     pool.erase(iter);
 | 
|---|
| 169 |     LOG(1, "INFO: Removed worker " << address << " from pool.");
 | 
|---|
| 170 |     return true;
 | 
|---|
| 171 |   } else {
 | 
|---|
| 172 |     ELOG(1, "Worker "+toString(address)+" is not present pool.");
 | 
|---|
| 173 |     return false;
 | 
|---|
| 174 |   }
 | 
|---|
| 175 | }
 | 
|---|
| 176 | 
 | 
|---|
| 177 | /** Sends shutdown to all current workers in the pool.
 | 
|---|
| 178 |  *
 | 
|---|
| 179 |  */
 | 
|---|
| 180 | void WorkerPool::removeAllWorkers()
 | 
|---|
| 181 | {
 | 
|---|
| 182 |   // empty pool and queue
 | 
|---|
| 183 |   idle_queue.clear();
 | 
|---|
| 184 |   busy_queue.clear();
 | 
|---|
| 185 |   pool.clear();
 | 
|---|
| 186 | }
 | 
|---|
| 187 | 
 | 
|---|
| 188 | /** Helper function to mark a worker as busy.
 | 
|---|
| 189 |  *
 | 
|---|
| 190 |  * Removes from idle_queue and places into busy_queue.
 | 
|---|
| 191 |  * Sets \a iter to Idle_Queue_t::end().
 | 
|---|
| 192 |  *
 | 
|---|
| 193 |  * @param iter iterator on idle worker
 | 
|---|
| 194 |  */
 | 
|---|
| 195 | void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
 | 
|---|
| 196 | {
 | 
|---|
| 197 |   const WorkerAddress returnaddress = iter->second;
 | 
|---|
| 198 |   if (isWorkerBusy(returnaddress))
 | 
|---|
| 199 |     return;
 | 
|---|
| 200 |   const priority_t priority = iter->first;
 | 
|---|
| 201 | 
 | 
|---|
| 202 |   // remove from idle queue
 | 
|---|
| 203 |   idle_queue.erase(iter);
 | 
|---|
| 204 | 
 | 
|---|
| 205 |   // insert into busy queue
 | 
|---|
| 206 | #ifndef NDEBUG
 | 
|---|
| 207 |   std::pair< Busy_Queue_t::iterator, bool > inserter =
 | 
|---|
| 208 | #endif
 | 
|---|
| 209 |   busy_queue.insert( make_pair(returnaddress, priority) );
 | 
|---|
| 210 |   ASSERT( inserter.second,
 | 
|---|
| 211 |       "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy.");
 | 
|---|
| 212 | 
 | 
|---|
| 213 |   LOG(1, "INFO: Worker " << returnaddress << " is now marked busy.");
 | 
|---|
| 214 | }
 | 
|---|
| 215 | 
 | 
|---|
| 216 | /** Helper function to unmark a worker as busy.
 | 
|---|
| 217 |  *
 | 
|---|
| 218 |  * Removes worker from busy_queue and returns it to idle_queue.
 | 
|---|
| 219 |  *
 | 
|---|
| 220 |  * @param address address of worker
 | 
|---|
| 221 |  */
 | 
|---|
| 222 | void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address)
 | 
|---|
| 223 | {
 | 
|---|
| 224 |   if (isWorkerBusy(address)) {
 | 
|---|
| 225 |     OBSERVE;
 | 
|---|
| 226 |     NOTIFY(WorkerIdle);
 | 
|---|
| 227 |     Busy_Queue_t::const_iterator iter = busy_queue.find(address);
 | 
|---|
| 228 |     const priority_t priority = iter->second;
 | 
|---|
| 229 |     busy_queue.erase(address);
 | 
|---|
| 230 |     idle_queue.insert( make_pair( priority, address) );
 | 
|---|
| 231 | 
 | 
|---|
| 232 |     LOG(1, "INFO: Worker " << address << " is now marked idle.");
 | 
|---|
| 233 |   }
 | 
|---|
| 234 | }
 | 
|---|
| 235 | 
 | 
|---|
| 236 | WorkerPool::WorkerList_t WorkerPool::getListOfIdleWorkers() const
 | 
|---|
| 237 | {
 | 
|---|
| 238 |   WorkerList_t WorkerList;
 | 
|---|
| 239 |   for (Idle_Queue_t::const_iterator iter = idle_queue.begin(); iter != idle_queue.end(); ++iter)
 | 
|---|
| 240 |     WorkerList.push_back( make_pair(iter->second.host, iter->second.service) );
 | 
|---|
| 241 |   return WorkerList;
 | 
|---|
| 242 | }
 | 
|---|