source: src/Fragmentation/Automation/FragmentScheduler.cpp@ d57585

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since d57585 was 17017b, checked in by Frederik Heber <heber@…>, 13 years ago

FragmentScheduler sends true or false instead of specific enumeration as acknowledge of enrollment.

  • Property mode set to 100644
File size: 21.0 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[c4f43e]28#include <algorithm>
[72eaf7f]29#include <boost/bind.hpp>
[9a6b895]30#include <boost/lambda/lambda.hpp>
[72eaf7f]31#include <boost/lexical_cast.hpp>
32#include <iostream>
33#include <vector>
[af3aed]34#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]35#include <boost/serialization/vector.hpp>
[af3aed]36#include "CodePatterns/Info.hpp"
[b0b64c]37#include "CodePatterns/Log.hpp"
[2344a3]38#include "CodePatterns/Observer/Notification.hpp"
39#include "ControllerChoices.hpp"
[9a6b895]40#include "Operations/Servers/SendJobToWorkerOperation.hpp"
[50d095]41#include "Operations/Workers/EnrollInPoolOperation.hpp"
[ff60cfa]42#include "Jobs/MPQCCommandJob.hpp"
[d920b9]43#include "Jobs/SystemCommandJob.hpp"
[ef2767]44#include "JobId.hpp"
[72eaf7f]45
[cd4a6e]46#include "FragmentScheduler.hpp"
[72eaf7f]47
[ff60cfa]48/** Helper function to enforce binding of FragmentWorker to possible derived
49 * FragmentJob classes.
50 */
51void dummyInit() {
52 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
53 MPQCCommandJob("nofile", JobId::IllegalJob);
54}
[c7deca]55
[db03d9]56/** Constructor of class FragmentScheduler.
57 *
58 * We setup both acceptors to accept connections from workers and Controller.
59 *
60 * \param io_service io_service of the asynchronous communications
61 * \param workerport port to listen for worker connections
62 * \param controllerport port to listen for controller connections.
63 */
[2344a3]64FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) :
65 Observer("FragmentScheduler"),
66 io_service(_io_service),
67 WorkerListener(_io_service, workerport, JobsQueue, pool,
[41c1b7]68 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
[2344a3]69 ControllerListener(_io_service, controllerport, JobsQueue,
[b15c4f]70 boost::bind(&FragmentScheduler::removeAllWorkers, boost::ref(*this)),
[2344a3]71 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
[ba995d]72 connection(_io_service)
[ed2c5b]73{
[b0b64c]74 Info info(__FUNCTION__);
[72eaf7f]75
[2344a3]76 // sign on to idle workers and present jobs
77 pool.signOn(this, WorkerPool::WorkerIdle);
78 JobsQueue.signOn(this, FragmentQueue::JobAdded);
79
[41c1b7]80 // listen for controller
81 ControllerListener.initiateSocket();
82
[2344a3]83 // listen for workers
84 WorkerListener.initiateSocket();
85}
86
87FragmentScheduler::~FragmentScheduler()
88{
89 // sign off
90 pool.signOff(this, WorkerPool::WorkerIdle);
91 JobsQueue.signOff(this, FragmentQueue::JobAdded);
[402bde]92}
93
[db03d9]94/** Handle a new worker connection.
95 *
[41c1b7]96 * We store the given address in the pool.
[db03d9]97 *
98 * \param e error code if something went wrong
99 * \param conn reference with the connection
100 */
[8036b7]101void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]102{
[cd4a6e]103 Info info(__FUNCTION__);
[ed2c5b]104 if (!e)
[72eaf7f]105 {
[b0b64c]106 // Successfully accepted a new connection.
[41c1b7]107 // read address
108 conn->async_read(address,
[9a3f84]109 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this,
[41c1b7]110 boost::asio::placeholders::error, conn));
[9a3f84]111 }
112 else
113 {
[41c1b7]114 // An error occurred. Log it and return. Since we are not starting a new
115 // accept operation the io_service will run out of work to do and the
116 // server will exit.
117 Exitflag = ErrorFlag;
118 ELOG(0, e.message());
119 }
120}
[0bdd51b]121
[9a3f84]122/** Handle having received Worker's address
[41c1b7]123 *
124 * \param e error code if something went wrong
125 * \param conn reference with the connection
126 */
[9a3f84]127void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn)
[41c1b7]128{
129 Info info(__FUNCTION__);
130 if (!e)
131 {
[9a3f84]132 // Successfully accepted a new connection.
133 // read address
134 conn->async_read(choice,
135 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this,
136 boost::asio::placeholders::error, conn));
137 }
138 else
139 {
140 // An error occurred. Log it and return. Since we are not starting a new
141 // accept operation the io_service will run out of work to do and the
142 // server will exit.
143 Exitflag = ErrorFlag;
144 ELOG(0, e.message());
145 }
146}
147
148/** Controller callback function to read the choice for next operation.
149 *
150 * \param e error code if something went wrong
151 * \param conn reference with the connection
152 */
153void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
154{
155 Info info(__FUNCTION__);
156 if (!e)
157 {
158 LOG(1, "INFO: Received request for operation " << choice << ".");
159 // switch over the desired choice read previously
160 switch(choice) {
161 case NoWorkerOperation:
162 {
163 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation.");
164 break;
165 }
166 case EnrollInPool:
167 {
168 if (pool.presentInPool(address)) {
169 ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool.");
[17017b]170 conn->async_write(false,
[9a3f84]171 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
172 boost::asio::placeholders::error, conn));
173 } else {
174 // insert as its new worker
175 LOG(1, "INFO: Adding " << address << " to pool ...");
176 pool.addWorker(address);
[17017b]177 conn->async_write(true,
[9a3f84]178 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
179 boost::asio::placeholders::error, conn));
180 break;
181 }
182 case SendResult:
183 {
184 if (pool.presentInPool(address)) {
185 // check whether its priority is busy_priority
186 if (pool.isWorkerBusy(address)) {
187 conn->async_read(result,
188 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
189 boost::asio::placeholders::error, conn));
190 } else {
191 ELOG(1, "Worker " << address << " trying to send result who is not marked as busy.");
192 conn->async_read(result,
193 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
194 boost::asio::placeholders::error, conn));
195 }
196 } else {
197 ELOG(1, "Worker " << address << " trying to send result who is not in pool.");
198 conn->async_read(result,
199 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
200 boost::asio::placeholders::error, conn));
201 }
202 break;
203 }
204 case RemoveFromPool:
205 {
206 if (pool.presentInPool(address)) {
207 // removing present worker
208 pool.removeWorker(address);
209 } else {
210 ELOG(1, "Shutting down Worker " << address << " not contained in pool.");
211 }
212 break;
213 }
214 default:
215 Exitflag = ErrorFlag;
216 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice.");
217 break;
[41c1b7]218 }
[b0b64c]219 }
[9a3f84]220 // restore NoOperation choice such that choice is not read twice
221 choice = NoWorkerOperation;
[2344a3]222
223 initiateSocket();
[cd4a6e]224 }
225 else
226 {
227 // An error occurred. Log it and return. Since we are not starting a new
228 // accept operation the io_service will run out of work to do and the
229 // server will exit.
[8036b7]230 Exitflag = ErrorFlag;
[b0b64c]231 ELOG(0, e.message());
[cd4a6e]232 }
[ed2c5b]233}
[72eaf7f]234
[9a3f84]235
[41c1b7]236/** Callback function when new worker has enrolled.
[db03d9]237 *
238 * \param e error code if something went wrong
239 * \param conn reference with the connection
240 */
[41c1b7]241void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]242{
[41c1b7]243 Info info(__FUNCTION__);
[d9373b]244 if (!e) {
245 LOG(2, "DEBUG: Successfully enrolled.");
246 LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "
247 << pool.getNoIdleWorkers() << " of which are idle.");
248 } else {
[41c1b7]249 // An error occurred. Log it and return. Since we are not starting a new
250 // accept operation the io_service will run out of work to do and the
251 // server will exit.
252 Exitflag = ErrorFlag;
253 ELOG(0, e.message());
254 }
[ef2767]255}
256
[db03d9]257/** Callback function when result has been received.
258 *
259 * \param e error code if something went wrong
260 * \param conn reference with the connection
261 */
[8036b7]262void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]263{
[db03d9]264 Info info(__FUNCTION__);
[35f587]265 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
[41c1b7]266
[35f587]267 // and push into queue
268 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[41c1b7]269 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]270 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[41c1b7]271 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]272 // place id into expected
[35f587]273 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]274 JobsQueue.pushResult(result);
[41c1b7]275
276 // mark as idle
277 pool.unmarkWorkerBusy(address);
[d9373b]278 LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "
279 << pool.getNoIdleWorkers() << " of which are idle.");
[41c1b7]280
[db03d9]281 // erase result
[35f587]282 result.reset();
[778abb]283 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]284}
285
[9a3f84]286/** Callback function when result has been received.
287 *
288 * \param e error code if something went wrong
289 * \param conn reference with the connection
290 */
291void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
292{
293 Info info(__FUNCTION__);
294 // nothing to do
295 LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue.");
296
297 JobsQueue.resubmitJob(result->getId());
298
299 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
300}
301
[41c1b7]302
[db03d9]303/** Handle a new controller connection.
304 *
305 * \sa handle_ReceiveJobs()
306 * \sa handle_CheckResultState()
307 * \sa handle_SendResults()
308 *
309 * \param e error code if something went wrong
310 * \param conn reference with the connection
311 */
[8036b7]312void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[db03d9]313{
314 Info info(__FUNCTION__);
315 if (!e)
316 {
[778abb]317 conn->async_read(choice,
[8036b7]318 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
[778abb]319 boost::asio::placeholders::error, conn));
320 }
321 else
322 {
323 // An error occurred. Log it and return. Since we are not starting a new
324 // accept operation the io_service will run out of work to do and the
325 // server will exit.
[8036b7]326 Exitflag = ErrorFlag;
[778abb]327 ELOG(0, e.message());
328 }
329}
330
331/** Controller callback function to read the choice for next operation.
332 *
333 * \param e error code if something went wrong
334 * \param conn reference with the connection
335 */
[8036b7]336void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
[778abb]337{
338 Info info(__FUNCTION__);
339 if (!e)
340 {
[0196c6]341 bool LaunchNewAcceptor = true;
[d1dbfc]342 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]343 // switch over the desired choice read previously
344 switch(choice) {
[38032a]345 case NoControllerOperation:
[778abb]346 {
[9a3f84]347 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation.");
[778abb]348 break;
349 }
[d1dbfc]350 case GetNextJobId:
351 {
[c4f43e]352 LOG(1, "INFO: Receiving number of desired job ids from controller ...");
353 conn->async_read(NumberIds,
[8036b7]354 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
[d1dbfc]355 boost::asio::placeholders::error, conn));
356 break;
357 }
[425fc6]358 case SendJobs:
[d1dbfc]359 {
360 // The connection::async_write() function will automatically
361 // serialize the data structure for us.
362 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
363 conn->async_read(jobs,
[8036b7]364 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
[d1dbfc]365 boost::asio::placeholders::error, conn));
366 break;
367 }
[778abb]368 case CheckState:
369 {
[3c4a5e]370 // first update number
[6f2bc7]371 jobInfo[0] = JobsQueue.getPresentJobs();
372 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]373 // now we accept connections to check for state of calculations
[6f2bc7]374 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
375 conn->async_write(jobInfo,
[8036b7]376 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
[3c4a5e]377 boost::asio::placeholders::error, conn));
[778abb]378 break;
379 }
[b15c4f]380 case RemoveAll:
381 {
382 removeallWorkers();
383 break;
384 }
[9d14c3]385 case ReceiveResults:
[778abb]386 {
[35f587]387 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]388 // ... or we give the results
389 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
390 conn->async_write(results,
[8036b7]391 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
[778abb]392 boost::asio::placeholders::error, conn));
[0196c6]393 break;
394 }
[38032a]395 case ShutdownControllerSocket:
[0196c6]396 {
[9a3f84]397 LOG(1, "INFO: Received shutdown from controller ...");
398 // only allow for shutdown when there are no more jobs in the queue
399 if (!JobsQueue.isJobPresent()) {
[668b55]400 // we shutdown? Hence, also shutdown controller
401 LaunchNewAcceptor = !shutdownAllSockets();
[9a3f84]402 } else {
403 ELOG(2, "There are still jobs waiting in the queue.");
404 }
[778abb]405 break;
[db03d9]406 }
[778abb]407 default:
[8036b7]408 Exitflag = ErrorFlag;
[9a3f84]409 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice.");
[778abb]410 break;
411 }
[38032a]412 // restore NoControllerOperation choice such that choice is not read twice
413 choice = NoControllerOperation;
[778abb]414
[0196c6]415 if (LaunchNewAcceptor) {
416 LOG(1, "Launching new acceptor on socket.");
417 // Start an accept operation for a new Connection.
[8036b7]418 initiateSocket();
[0196c6]419 }
[db03d9]420 }
421 else
422 {
423 // An error occurred. Log it and return. Since we are not starting a new
424 // accept operation the io_service will run out of work to do and the
425 // server will exit.
[8036b7]426 Exitflag = ErrorFlag;
[db03d9]427 ELOG(0, e.message());
428 }
429}
430
431/** Controller callback function when job has been sent.
[778abb]432 *
433 * We check here whether the worker socket is accepting, if there
434 * have been no jobs we re-activate it, as it is shut down after
435 * last job.
[db03d9]436 *
437 * \param e error code if something went wrong
438 * \param conn reference with the connection
439 */
[8036b7]440void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
[db03d9]441{
442 Info info(__FUNCTION__);
443 // jobs are received, hence place in JobsQueue
444 if (!jobs.empty()) {
445 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
446 JobsQueue.pushJobs(jobs);
447 }
448 jobs.clear();
[ed2c5b]449}
[cd4a6e]450
[3c4a5e]451/** Controller callback function when checking on state of results.
452 *
453 * \param e error code if something went wrong
454 * \param conn reference with the connection
455 */
[8036b7]456void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
[3c4a5e]457{
458 Info info(__FUNCTION__);
459 // do nothing
[6f2bc7]460 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]461}
[778abb]462
[d1dbfc]463/** Controller callback function when checking on state of results.
464 *
465 * \param e error code if something went wrong
466 * \param conn reference with the connection
467 */
[8036b7]468void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
[c4f43e]469{
470 Info info(__FUNCTION__);
471
472 std::vector<JobId_t> nextids( NumberIds, JobId::IllegalJob);
473 std::generate(nextids.begin(), nextids.end(),
474 boost::bind(&GlobalJobId::getNextId, boost::ref(globalId)));
475 LOG(1, "INFO: Sending next available job ids " << nextids << " to controller ...");
476 conn->async_write(nextids,
477 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendIds, this,
478 boost::asio::placeholders::error, conn));
479}
480
481/** Controller callback function when free job ids have been sent.
482 *
483 * \param e error code if something went wrong
484 * \param conn reference with the connection
485 */
486void FragmentScheduler::ControllerListener_t::handle_SendIds(const boost::system::error_code& e, connection_ptr conn)
[d1dbfc]487{
488 Info info(__FUNCTION__);
489 // do nothing
[c4f43e]490 LOG(1, "INFO: Ids have been sent.");
[d1dbfc]491}
492
[778abb]493/** Controller callback function when result has been received.
494 *
495 * \param e error code if something went wrong
496 * \param conn reference with the connection
497 */
[8036b7]498void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
[778abb]499{
500 Info info(__FUNCTION__);
501 // do nothing
502 LOG(1, "INFO: Results have been sent.");
503}
504
[41c1b7]505
506/** Helper function to send a job to worker.
[9a3f84]507 *
508 * Note that we do not set the worker as busy. We simply send it the job.
[41c1b7]509 *
510 * @param address address of worker
511 * @param job job to send
512 */
513void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
514{
[9a3f84]515 ASSERT( pool.isWorkerBusy(address),
516 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
[41c1b7]517 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
[9a6b895]518
519 // create op, sign on, and hand over to queue
520 AsyncOperation *sendJobOp = new SendJobToWorkerOperation(connection,job);
521 OpQueue.push_back(sendJobOp, address);
[41c1b7]522}
523
[2344a3]524/** Helper function to shutdown a single worker.
525 *
526 * We send NoJob to indicate shutdown
527 *
528 * @param address of worker to shutdown
529 */
530void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
531{
[6ea7f4]532 ASSERT( !pool.isWorkerBusy(address),
533 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
534 LOG(2, "INFO: Shutting down worker " << address << "...");
[ba995d]535 AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection);
536 OpQueue.push_back(shutdownWorkerOp, address);
[2344a3]537}
538
539/** Sends shutdown to all current workers in the pool.
540 *
541 */
542void FragmentScheduler::removeAllWorkers()
543{
[6b3a37]544 // first, sign off such that no new jobs are given to workers
545 pool.signOff(this, WorkerPool::WorkerIdle);
[befcf8]546
547 LOG(2, "DEBUG: Waiting for busy workers to finish ...");
[6b3a37]548 while (pool.hasBusyWorkers())
549 ;
550
[befcf8]551 LOG(2, "INFO: Shutting down workers ...");
552 // iterate until there are no more idle workers
[270364]553 // get list of all idle workers
554 typedef std::vector<std::pair<std::string, std::string> > WorkerList_t;
555 WorkerList_t WorkerList = pool.getListOfIdleWorkers();
556
557 // give all workers shutdown signal
558 for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter)
559 shutdownWorker(WorkerAddress(iter->first, iter->second));
[2344a3]560}
561
[267b8d]562/** Function to shutdown server properly, e.g. for use as signal handler.
563 *
564 * @param sig signal number
565 */
566void FragmentScheduler::shutdown(int sig)
567{
568 LOG(0, "STATUS: Shutting down due to signal " << sig << ".");
569
570 if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) {
571 shutdown();
572 } else {
573 removeAllWorkers();
574 }
575}
576
[2344a3]577/** Helper function to shutdown the server properly.
578 *
579 * \todo one should idle here until all workers have returned from
[b15c4f]580 * calculating stuff (or workers need to still listen while they are
[2344a3]581 * calculating which is probably better).
582 *
[668b55]583 * \note We only shutdown when there are no workers left
584 *
585 * @return true - doing shutdown, false - precondition not met, not shutting down
[2344a3]586 */
[668b55]587bool FragmentScheduler::shutdown()
[2344a3]588{
[668b55]589 if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) {
590 LOG(1, "INFO: Shutting all down ...");
[2344a3]591
[668b55]592 /// close the worker listener's socket
593 WorkerListener.closeSocket();
[2344a3]594
[668b55]595 /// close the controller listener's socket
596 ControllerListener.closeSocket();
[2344a3]597
[668b55]598 /// finally, stop the io_service
599 io_service.stop();
600 return true;
601 } else {
602 ELOG(2, "There are still idle or busy workers present.");
603 return false;
604 }
[2344a3]605}
606
607/** Internal helper to send the next available job to the next idle worker.
608 *
609 */
610void FragmentScheduler::sendAvailableJobToNextIdleWorker()
611{
612 const WorkerAddress address = pool.getNextIdleWorker();
613 FragmentJob::ptr job = JobsQueue.popJob();
614 sendJobToWorker(address, job);
615}
616
617void FragmentScheduler::update(Observable *publisher)
618{
[a40c85]619 ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates.");
[2344a3]620}
621
622void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
623{
624 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
[e032b4]625 // we have an idle worker
[2344a3]626 LOG(1, "INFO: We are notified of an idle worker.");
627 // are jobs available?
628 if (JobsQueue.isJobPresent()) {
629 sendAvailableJobToNextIdleWorker();
630 }
[e032b4]631 } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
632 // we have new jobs
[2344a3]633 LOG(1, "INFO: We are notified of a new job.");
634 // check for idle workers
635 if (pool.presentIdleWorkers()) {
636 sendAvailableJobToNextIdleWorker();
637 }
[e032b4]638 } else {
639 ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel "
640 +toString(notification->getChannelNo())+".");
[2344a3]641 }
642}
643
644void FragmentScheduler::subjectKilled(Observable *publisher)
645{}
Note: See TracBrowser for help on using the repository browser.