| 1 | /*
 | 
|---|
| 2 |  * Project: MoleCuilder
 | 
|---|
| 3 |  * Description: creates and alters molecular systems
 | 
|---|
| 4 |  * Copyright (C)  2011 University of Bonn. All rights reserved.
 | 
|---|
| 5 |  * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
 | 
|---|
| 6 |  */
 | 
|---|
| 7 | 
 | 
|---|
| 8 | /*
 | 
|---|
| 9 |  * \file FragmentScheduler.cpp
 | 
|---|
| 10 |  *
 | 
|---|
| 11 |  * This file strongly follows the Serialization example from the boost::asio
 | 
|---|
| 12 |  * library (see server.cpp)
 | 
|---|
| 13 |  *
 | 
|---|
| 14 |  *  Created on: Oct 19, 2011
 | 
|---|
| 15 |  *      Author: heber
 | 
|---|
| 16 |  */
 | 
|---|
| 17 | 
 | 
|---|
| 18 | // include config.h
 | 
|---|
| 19 | #ifdef HAVE_CONFIG_H
 | 
|---|
| 20 | #include <config.h>
 | 
|---|
| 21 | #endif
 | 
|---|
| 22 | 
 | 
|---|
| 23 | // boost asio needs specific operator new
 | 
|---|
| 24 | #include <boost/asio.hpp>
 | 
|---|
| 25 | 
 | 
|---|
| 26 | #include "CodePatterns/MemDebug.hpp"
 | 
|---|
| 27 | 
 | 
|---|
| 28 | #include <boost/bind.hpp>
 | 
|---|
| 29 | #include <boost/lexical_cast.hpp>
 | 
|---|
| 30 | #include <iostream>
 | 
|---|
| 31 | #include <vector>
 | 
|---|
| 32 | #include "Connection.hpp" // Must come before boost/serialization headers.
 | 
|---|
| 33 | #include <boost/serialization/vector.hpp>
 | 
|---|
| 34 | #include "CodePatterns/Info.hpp"
 | 
|---|
| 35 | #include "CodePatterns/Log.hpp"
 | 
|---|
| 36 | #include "FragmentJob.hpp"
 | 
|---|
| 37 | #include "JobId.hpp"
 | 
|---|
| 38 | 
 | 
|---|
| 39 | #include "FragmentScheduler.hpp"
 | 
|---|
| 40 | 
 | 
|---|
| 41 | FragmentJob FragmentScheduler::NoJob(std::string("NoJob"), JobId::NoJob);
 | 
|---|
| 42 | 
 | 
|---|
| 43 | /** Constructor of class FragmentScheduler.
 | 
|---|
| 44 |  *
 | 
|---|
| 45 |  * We setup both acceptors to accept connections from workers and Controller.
 | 
|---|
| 46 |  *
 | 
|---|
| 47 |  * \param io_service io_service of the asynchronous communications
 | 
|---|
| 48 |  * \param workerport port to listen for worker connections
 | 
|---|
| 49 |  * \param controllerport port to listen for controller connections.
 | 
|---|
| 50 |  */
 | 
|---|
| 51 | FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
 | 
|---|
| 52 |   worker_acceptor_(io_service,
 | 
|---|
| 53 |       boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
 | 
|---|
| 54 |   ),
 | 
|---|
| 55 |   controller_acceptor_(io_service,
 | 
|---|
| 56 |       boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
 | 
|---|
| 57 |   ),
 | 
|---|
| 58 |   result(JobId::NoJob),
 | 
|---|
| 59 |   choice(NoOperation),
 | 
|---|
| 60 |   Exitflag(OkFlag)
 | 
|---|
| 61 | {
 | 
|---|
| 62 |   Info info(__FUNCTION__);
 | 
|---|
| 63 | 
 | 
|---|
| 64 |   // only initiate socket if jobs are already present
 | 
|---|
| 65 |   if (JobsQueue.isJobPresent()) {
 | 
|---|
| 66 |     LOG(1, "Listening for workers on port " << workerport << ".");
 | 
|---|
| 67 |     initiateWorkerSocket();
 | 
|---|
| 68 |   }
 | 
|---|
| 69 | 
 | 
|---|
| 70 |   initiateControllerSocket();
 | 
|---|
| 71 |   LOG(1, "Listening for controller on port " << controllerport << ".");
 | 
|---|
| 72 | }
 | 
|---|
| 73 | 
 | 
|---|
| 74 | /** Internal function to start worker connection.
 | 
|---|
| 75 |  *
 | 
|---|
| 76 |  */
 | 
|---|
| 77 | void FragmentScheduler::initiateWorkerSocket()
 | 
|---|
| 78 | {
 | 
|---|
| 79 |   // Start an accept operation for worker connections.
 | 
|---|
| 80 |   connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
 | 
|---|
| 81 |   worker_acceptor_.async_accept(new_conn->socket(),
 | 
|---|
| 82 |     boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
 | 
|---|
| 83 |       boost::asio::placeholders::error, new_conn));
 | 
|---|
| 84 | }
 | 
|---|
| 85 | 
 | 
|---|
| 86 | /** Internal function to start controller connection.
 | 
|---|
| 87 |  *
 | 
|---|
| 88 |  */
 | 
|---|
| 89 | void FragmentScheduler::initiateControllerSocket()
 | 
|---|
| 90 | {
 | 
|---|
| 91 |   // Start an accept operation for controller connection.
 | 
|---|
| 92 |   connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
 | 
|---|
| 93 |   controller_acceptor_.async_accept(new_conn->socket(),
 | 
|---|
| 94 |     boost::bind(&FragmentScheduler::handle_AcceptController, this,
 | 
|---|
| 95 |       boost::asio::placeholders::error, new_conn));
 | 
|---|
| 96 | }
 | 
|---|
| 97 | 
 | 
|---|
| 98 | 
 | 
|---|
| 99 | /** Handle a new worker connection.
 | 
|---|
| 100 |  *
 | 
|---|
| 101 |  * We check whether jobs are in the JobsQueue. If present, job is sent.
 | 
|---|
| 102 |  *
 | 
|---|
| 103 |  * \sa handle_SendJobtoWorker()
 | 
|---|
| 104 |  *
 | 
|---|
| 105 |  * \param e error code if something went wrong
 | 
|---|
| 106 |  * \param conn reference with the connection
 | 
|---|
| 107 |  */
 | 
|---|
| 108 | void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 109 | {
 | 
|---|
| 110 |   Info info(__FUNCTION__);
 | 
|---|
| 111 |   if (!e)
 | 
|---|
| 112 |   {
 | 
|---|
| 113 |     // Successfully accepted a new connection.
 | 
|---|
| 114 |     // Check whether there are jobs in the queue
 | 
|---|
| 115 |     if (JobsQueue.isJobPresent()) {
 | 
|---|
| 116 |       // pop a job and send it to the client.
 | 
|---|
| 117 |       FragmentJob job(JobsQueue.popJob());
 | 
|---|
| 118 |       // The connection::async_write() function will automatically
 | 
|---|
| 119 |       // serialize the data structure for us.
 | 
|---|
| 120 |       LOG(1, "INFO: Sending job #" << job.getId() << ".");
 | 
|---|
| 121 |       conn->async_write(job,
 | 
|---|
| 122 |         boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
 | 
|---|
| 123 |         boost::asio::placeholders::error, conn));
 | 
|---|
| 124 | 
 | 
|---|
| 125 |     } else {
 | 
|---|
| 126 |       // send the static NoJob
 | 
|---|
| 127 |       conn->async_write(NoJob,
 | 
|---|
| 128 |         boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
 | 
|---|
| 129 |         boost::asio::placeholders::error, conn));
 | 
|---|
| 130 | 
 | 
|---|
| 131 |       // then there must be no read necesary
 | 
|---|
| 132 | 
 | 
|---|
| 133 |       ELOG(2, "There is currently no job present in the queue.");
 | 
|---|
| 134 |     }
 | 
|---|
| 135 |   }
 | 
|---|
| 136 |   else
 | 
|---|
| 137 |   {
 | 
|---|
| 138 |     // An error occurred. Log it and return. Since we are not starting a new
 | 
|---|
| 139 |     // accept operation the io_service will run out of work to do and the
 | 
|---|
| 140 |     // server will exit.
 | 
|---|
| 141 |     Exitflag = WorkerErrorFlag;
 | 
|---|
| 142 |     ELOG(0, e.message());
 | 
|---|
| 143 |   }
 | 
|---|
| 144 | 
 | 
|---|
| 145 |   // Start an accept operation for a new Connection only when there
 | 
|---|
| 146 |   // are still jobs present
 | 
|---|
| 147 |   if (JobsQueue.isJobPresent())
 | 
|---|
| 148 |     initiateWorkerSocket();
 | 
|---|
| 149 | }
 | 
|---|
| 150 | 
 | 
|---|
| 151 | /** Callback function when job has been sent.
 | 
|---|
| 152 |  *
 | 
|---|
| 153 |  * After job has been sent we start async_read() for the result.
 | 
|---|
| 154 |  *
 | 
|---|
| 155 |  * \sa handle_ReceiveResultFromWorker()
 | 
|---|
| 156 |  *
 | 
|---|
| 157 |  * \param e error code if something went wrong
 | 
|---|
| 158 |  * \param conn reference with the connection
 | 
|---|
| 159 |  */
 | 
|---|
| 160 | void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 161 | {
 | 
|---|
| 162 |     Info info(__FUNCTION__);
 | 
|---|
| 163 |     LOG(1, "INFO: Job sent.");
 | 
|---|
| 164 |     // obtain result
 | 
|---|
| 165 |     LOG(1, "INFO: Receiving result for a job ...");
 | 
|---|
| 166 |     conn->async_read(result,
 | 
|---|
| 167 |       boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
 | 
|---|
| 168 |       boost::asio::placeholders::error, conn));
 | 
|---|
| 169 | }
 | 
|---|
| 170 | 
 | 
|---|
| 171 | /** Callback function when result has been received.
 | 
|---|
| 172 |  *
 | 
|---|
| 173 |  * \param e error code if something went wrong
 | 
|---|
| 174 |  * \param conn reference with the connection
 | 
|---|
| 175 |  */
 | 
|---|
| 176 | void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 177 | {
 | 
|---|
| 178 |   Info info(__FUNCTION__);
 | 
|---|
| 179 |   LOG(1, "INFO: Received result for job #" << result.getId() << " ...");
 | 
|---|
| 180 |   ASSERT(result.getId() != (JobId_t)JobId::NoJob,
 | 
|---|
| 181 |       "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
 | 
|---|
| 182 |   ASSERT(result.getId() != (JobId_t)JobId::IllegalJob,
 | 
|---|
| 183 |       "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
 | 
|---|
| 184 |   // place id into expected
 | 
|---|
| 185 |   if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob))
 | 
|---|
| 186 |     JobsQueue.pushResult(result);
 | 
|---|
| 187 |   // erase result
 | 
|---|
| 188 |   result = FragmentResult(JobId::NoJob);
 | 
|---|
| 189 |   LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
 | 
|---|
| 190 | }
 | 
|---|
| 191 | 
 | 
|---|
| 192 | /** Handle a new controller connection.
 | 
|---|
| 193 |  *
 | 
|---|
| 194 |  * \sa handle_ReceiveJobs()
 | 
|---|
| 195 |  * \sa handle_CheckResultState()
 | 
|---|
| 196 |  * \sa handle_SendResults()
 | 
|---|
| 197 |  *
 | 
|---|
| 198 |  * \param e error code if something went wrong
 | 
|---|
| 199 |  * \param conn reference with the connection
 | 
|---|
| 200 |  */
 | 
|---|
| 201 | void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 202 | {
 | 
|---|
| 203 |   Info info(__FUNCTION__);
 | 
|---|
| 204 |   if (!e)
 | 
|---|
| 205 |   {
 | 
|---|
| 206 |     conn->async_read(choice,
 | 
|---|
| 207 |       boost::bind(&FragmentScheduler::handle_ReadChoice, this,
 | 
|---|
| 208 |       boost::asio::placeholders::error, conn));
 | 
|---|
| 209 |   }
 | 
|---|
| 210 |   else
 | 
|---|
| 211 |   {
 | 
|---|
| 212 |     // An error occurred. Log it and return. Since we are not starting a new
 | 
|---|
| 213 |     // accept operation the io_service will run out of work to do and the
 | 
|---|
| 214 |     // server will exit.
 | 
|---|
| 215 |     Exitflag = ControllerErrorFlag;
 | 
|---|
| 216 |     ELOG(0, e.message());
 | 
|---|
| 217 |   }
 | 
|---|
| 218 | }
 | 
|---|
| 219 | 
 | 
|---|
| 220 | /** Controller callback function to read the choice for next operation.
 | 
|---|
| 221 |  *
 | 
|---|
| 222 |  * \param e error code if something went wrong
 | 
|---|
| 223 |  * \param conn reference with the connection
 | 
|---|
| 224 |  */
 | 
|---|
| 225 | void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 226 | {
 | 
|---|
| 227 |   Info info(__FUNCTION__);
 | 
|---|
| 228 |   if (!e)
 | 
|---|
| 229 |   {
 | 
|---|
| 230 |     bool LaunchNewAcceptor = true;
 | 
|---|
| 231 |     // switch over the desired choice read previously
 | 
|---|
| 232 |     switch(choice) {
 | 
|---|
| 233 |     case NoOperation:
 | 
|---|
| 234 |     {
 | 
|---|
| 235 |       ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
 | 
|---|
| 236 |       break;
 | 
|---|
| 237 |     }
 | 
|---|
| 238 |     case ReceiveJobs:
 | 
|---|
| 239 |       {
 | 
|---|
| 240 |         // The connection::async_write() function will automatically
 | 
|---|
| 241 |         // serialize the data structure for us.
 | 
|---|
| 242 |         LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
 | 
|---|
| 243 |         conn->async_read(jobs,
 | 
|---|
| 244 |           boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
 | 
|---|
| 245 |           boost::asio::placeholders::error, conn));
 | 
|---|
| 246 |         break;
 | 
|---|
| 247 |       }
 | 
|---|
| 248 |     case CheckState:
 | 
|---|
| 249 |     {
 | 
|---|
| 250 |       // first update number
 | 
|---|
| 251 |       doneJobs = JobsQueue.getDoneJobs();
 | 
|---|
| 252 |       // now we accept connections to check for state of calculations
 | 
|---|
| 253 |       LOG(1, "INFO: Sending state that "+toString(doneJobs)+" jobs are done to controller ...");
 | 
|---|
| 254 |       conn->async_write(doneJobs,
 | 
|---|
| 255 |         boost::bind(&FragmentScheduler::handle_CheckResultState, this,
 | 
|---|
| 256 |         boost::asio::placeholders::error, conn));
 | 
|---|
| 257 |       break;
 | 
|---|
| 258 |     }
 | 
|---|
| 259 |     case SendResults:
 | 
|---|
| 260 |     {
 | 
|---|
| 261 |       const std::vector<FragmentResult> results = JobsQueue.getAllResults();
 | 
|---|
| 262 |       // ... or we give the results
 | 
|---|
| 263 |       LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
 | 
|---|
| 264 |       conn->async_write(results,
 | 
|---|
| 265 |         boost::bind(&FragmentScheduler::handle_SendResults, this,
 | 
|---|
| 266 |         boost::asio::placeholders::error, conn));
 | 
|---|
| 267 |       break;
 | 
|---|
| 268 |     }
 | 
|---|
| 269 |     case Shutdown:
 | 
|---|
| 270 |     {
 | 
|---|
| 271 |       LaunchNewAcceptor = false;
 | 
|---|
| 272 |       break;
 | 
|---|
| 273 |     }
 | 
|---|
| 274 |     default:
 | 
|---|
| 275 |       Exitflag = ControllerErrorFlag;
 | 
|---|
| 276 |       ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
 | 
|---|
| 277 |       break;
 | 
|---|
| 278 |     }
 | 
|---|
| 279 |     // restore NoOperation choice such that choice is not read twice
 | 
|---|
| 280 |     choice = NoOperation;
 | 
|---|
| 281 | 
 | 
|---|
| 282 |     if (LaunchNewAcceptor) {
 | 
|---|
| 283 |       LOG(1, "Launching new acceptor on socket.");
 | 
|---|
| 284 |       // Start an accept operation for a new Connection.
 | 
|---|
| 285 |       connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
 | 
|---|
| 286 |       controller_acceptor_.async_accept(new_conn->socket(),
 | 
|---|
| 287 |         boost::bind(&FragmentScheduler::handle_AcceptController, this,
 | 
|---|
| 288 |         boost::asio::placeholders::error, new_conn));
 | 
|---|
| 289 |     }
 | 
|---|
| 290 |   }
 | 
|---|
| 291 |   else
 | 
|---|
| 292 |   {
 | 
|---|
| 293 |     // An error occurred. Log it and return. Since we are not starting a new
 | 
|---|
| 294 |     // accept operation the io_service will run out of work to do and the
 | 
|---|
| 295 |     // server will exit.
 | 
|---|
| 296 |     Exitflag = ControllerErrorFlag;
 | 
|---|
| 297 |     ELOG(0, e.message());
 | 
|---|
| 298 |   }
 | 
|---|
| 299 | }
 | 
|---|
| 300 | 
 | 
|---|
| 301 | /** Controller callback function when job has been sent.
 | 
|---|
| 302 |  *
 | 
|---|
| 303 |  * We check here whether the worker socket is accepting, if there
 | 
|---|
| 304 |  * have been no jobs we re-activate it, as it is shut down after
 | 
|---|
| 305 |  * last job.
 | 
|---|
| 306 |  *
 | 
|---|
| 307 |  * \param e error code if something went wrong
 | 
|---|
| 308 |  * \param conn reference with the connection
 | 
|---|
| 309 |  */
 | 
|---|
| 310 | void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 311 | {
 | 
|---|
| 312 |   Info info(__FUNCTION__);
 | 
|---|
| 313 |   bool initiateSocket = !JobsQueue.isJobPresent();
 | 
|---|
| 314 | 
 | 
|---|
| 315 |   // jobs are received, hence place in JobsQueue
 | 
|---|
| 316 |   if (!jobs.empty()) {
 | 
|---|
| 317 |     LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
 | 
|---|
| 318 |     JobsQueue.pushJobs(jobs);
 | 
|---|
| 319 |     // initiate socket if we had no jobs before
 | 
|---|
| 320 |     if (initiateSocket)
 | 
|---|
| 321 |       initiateWorkerSocket();
 | 
|---|
| 322 |   }
 | 
|---|
| 323 | 
 | 
|---|
| 324 |   jobs.clear();
 | 
|---|
| 325 | 
 | 
|---|
| 326 | }
 | 
|---|
| 327 | 
 | 
|---|
| 328 | /** Controller callback function when checking on state of results.
 | 
|---|
| 329 |  *
 | 
|---|
| 330 |  * \param e error code if something went wrong
 | 
|---|
| 331 |  * \param conn reference with the connection
 | 
|---|
| 332 |  */
 | 
|---|
| 333 | void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 334 | {
 | 
|---|
| 335 |   Info info(__FUNCTION__);
 | 
|---|
| 336 |   // do nothing
 | 
|---|
| 337 |   LOG(1, "INFO: Sent that " << doneJobs << " jobs are done.");
 | 
|---|
| 338 | }
 | 
|---|
| 339 | 
 | 
|---|
| 340 | /** Controller callback function when result has been received.
 | 
|---|
| 341 |  *
 | 
|---|
| 342 |  * \param e error code if something went wrong
 | 
|---|
| 343 |  * \param conn reference with the connection
 | 
|---|
| 344 |  */
 | 
|---|
| 345 | void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
 | 
|---|
| 346 | {
 | 
|---|
| 347 |   Info info(__FUNCTION__);
 | 
|---|
| 348 |   // do nothing
 | 
|---|
| 349 |   LOG(1, "INFO: Results have been sent.");
 | 
|---|
| 350 | }
 | 
|---|
| 351 | 
 | 
|---|