source: ThirdParty/JobMarket/src/JobMarket/Operations/OperationQueue.cpp@ f67dfb

Action_Thermostats Add_AtomRandomPerturbation Add_RotateAroundBondAction Add_SelectAtomByNameAction Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_StructOpt_integration_tests AutomationFragmentation_failures Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph_documentation Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_ChronosMutex Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion GeometryObjects Gui_displays_atomic_force_velocity IndependentFragmentGrids_IntegrationTest JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix PartialCharges_OrthogonalSummation PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks RotateToPrincipalAxisSystem_UndoRedo StoppableMakroAction TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps Ubuntu_1604_changes stable
Last change on this file since f67dfb was f67dfb, checked in by Frederik Heber <heber@…>, 8 years ago

FIX: FragmentScheduler and PoolWorker now handle unresolvable addresses properly.

  • OperationQueue now checks whether last operation has succeeded of failed and calls a failure callback in error case which is a NoOp by default.
  • if PoolWorker's Ops fail, he will automatically shutdown now.
  • if FragmentScheduler's Ops fail, he will automatically remove the worker from its pool.
  • Property mode set to 100644
File size: 4.5 KB
Line 
1/*
2 * Project: JobMarket
3 * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
4 * Copyright (C) 2012 Frederik Heber. All rights reserved.
5 *
6 */
7
8/*
9 * OperationQueue.cpp
10 *
11 * Created on: Apr 24, 2012
12 * Author: heber
13 */
14
15
16// include config.h
17#ifdef HAVE_CONFIG_H
18#include <config.h>
19#endif
20
21// boost asio needs specific operator new
22#include <boost/asio.hpp>
23
24#include "CodePatterns/MemDebug.hpp"
25
26#include <boost/bind.hpp>
27#include <boost/lambda/lambda.hpp>
28#include <string>
29
30#include "CodePatterns/Log.hpp"
31#include "CodePatterns/Observer/Observer.hpp"
32
33#include "JobMarket/Operations/AsyncOperation.hpp"
34#include "JobMarket/Operations/OperationQueue.hpp"
35
36static void NoOp(const WorkerAddress) {}
37
38// static instances
39const boost::function<void (const WorkerAddress)> OperationQueue::NoOpCallback = boost::bind(&NoOp, _1);
40
41size_t OperationQueue::max_connections = 1;
42
43OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op)
44{
45 OperationQueue_t::iterator iter =
46 std::find_if(queue.begin(), queue.end(),
47 boost::bind(&AsyncOp_ptr::get, boost::lambda::_1) == op);
48 return iter;
49}
50
51void OperationQueue::push_back(AsyncOperation *&op, const WorkerAddress &address)
52{
53 if (op != NULL) {
54 if (!IsBlockedFlag) {
55 AsyncOp_ptr ptr(op); // this always prevents memory loss
56 ptr->signOn(this);
57 OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr );
58 op = NULL;
59 AddressMap.insert( make_pair(*iter, address) );
60 LaunchNextOp();
61 } else {
62 ELOG(1, "Queue is currently blocked, dropping operation "+op->getName()+".");
63 delete op;
64 op = NULL;
65 }
66 } else {
67 ELOG(1, "Given operation pointer is NULL.");
68 }
69}
70
71void OperationQueue::LaunchNextOp()
72{
73 // connection available?
74 if (getNumberOfRunningOps() < max_connections) {
75 // only start operation when address is valid
76 OperationQueue_t::iterator queueiter =
77 std::find_if(queue.begin(), queue.end(),
78 boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) );
79 if (queueiter != queue.end()) {
80 const AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
81 ASSERT( mapiter != AddressMap.end(),
82 "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap.");
83 currentOpsAddress = mapiter->second;
84 const AsyncOp_ptr ptr = mapiter->first;
85 // always erase the op from the list of ones pending for launch
86 AddressMap.erase(mapiter);
87 // only launch when not a debug op
88 if ((!currentOpsAddress.host.empty()) && (!currentOpsAddress.service.empty())) {
89 LOG(2, "DEBUG: Launching next operation " << ptr->getName() << ".");
90 (*ptr)(currentOpsAddress.host, currentOpsAddress.service);
91 } else {
92 LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address.");
93 }
94 } else {
95 LOG(2, "DEBUG: All remaining operations are already running.");
96 }
97 } else {
98 LOG(2, "DEBUG: Currently there are no free connections.");
99 }
100}
101
102void OperationQueue::remove(AsyncOperation *op, Observer *observer)
103{
104 if (op != NULL) {
105 OperationQueue_t::iterator iter = findOperation(op);
106 if (iter != queue.end()) {
107 // sign off and remove op
108 if (observer != NULL)
109 op->signOff(observer);
110 queue.erase(iter);
111 } else {
112 ELOG(1, "Could not find Operation " << op->getName() << " in operation's queue.");
113 }
114 } else {
115 ELOG(1, "Given operation pointer is NULL.");
116 }
117}
118
119void OperationQueue::update(Observable *publisher)
120{
121 AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
122 if (op != NULL) {
123 // check for error code of operation
124 if (op->getStatus() != Operation::success) {
125 // remove the worker from the queue
126 failure_callback(currentOpsAddress);
127 LOG(1, "INFO: We are notified that " << op->getName() << " has failed, removing ...");
128 } else {
129 LOG(1, "INFO: We are notified that " << op->getName() << " is done, removing ...");
130 }
131 // remove from queue
132 remove(op, this);
133 LaunchNextOp();
134 }
135}
136
137void OperationQueue::recieveNotification(Observable *publisher, Notification_ptr notification)
138{}
139
140void OperationQueue::subjectKilled(Observable *publisher)
141{
142 AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
143 if (op != NULL) {
144 ELOG(2, "DEBUG: AsyncOperation at " << publisher << " got killed before being done?");
145 // remove from queue
146 remove(op, this);
147 }
148}
149
Note: See TracBrowser for help on using the repository browser.