Changeset db03d9 for src/Fragmentation/Automation/FragmentScheduler.cpp
- Timestamp:
- May 4, 2012, 2:19:07 PM (13 years ago)
- Branches:
- Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
- Children:
- 3c4a5e
- Parents:
- 8ee5ac
- git-author:
- Frederik Heber <heber@…> (11/27/11 23:20:43)
- git-committer:
- Frederik Heber <heber@…> (05/04/12 14:19:07)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Fragmentation/Automation/FragmentScheduler.cpp
r8ee5ac rdb03d9 41 41 FragmentJob FragmentScheduler::NoJob(std::string("NoJob"), JobId::NoJob); 42 42 43 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short port) : 44 acceptor_(io_service, 45 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port) 43 /** Constructor of class FragmentScheduler. 44 * 45 * We setup both acceptors to accept connections from workers and Controller. 46 * 47 * \param io_service io_service of the asynchronous communications 48 * \param workerport port to listen for worker connections 49 * \param controllerport port to listen for controller connections. 50 */ 51 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) : 52 worker_acceptor_(io_service, 53 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport) 46 54 ), 47 result(JobId::NoJob) 48 { 49 Info info(__FUNCTION__); 50 FragmentJob s(std::string("test"), 1); 51 JobsQueue.pushJob(s); 52 53 // Start an accept operation for a new connection. 54 connection_ptr new_conn(new Connection(acceptor_.get_io_service())); 55 acceptor_.async_accept(new_conn->socket(), 56 boost::bind(&FragmentScheduler::handle_accept, this, 57 boost::asio::placeholders::error, new_conn)); 58 } 59 60 /// Handle completion of a accept operation. 61 void FragmentScheduler::handle_accept(const boost::system::error_code& e, connection_ptr conn) 55 controller_acceptor_(io_service, 56 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport) 57 ), 58 result(JobId::NoJob), 59 Exitflag(OkFlag) 60 { 61 Info info(__FUNCTION__); 62 { 63 // Start an accept operation for worker connections. 64 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service())); 65 worker_acceptor_.async_accept(new_conn->socket(), 66 boost::bind(&FragmentScheduler::handle_AcceptWorker, this, 67 boost::asio::placeholders::error, new_conn)); 68 } 69 70 { 71 // Start an accept operation for controller connection. 72 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service())); 73 controller_acceptor_.async_accept(new_conn->socket(), 74 boost::bind(&FragmentScheduler::handle_AcceptController, this, 75 boost::asio::placeholders::error, new_conn)); 76 } 77 } 78 79 /** Handle a new worker connection. 80 * 81 * We check whether jobs are in the JobsQueue. If present, job is sent. 82 * 83 * \sa handle_SendJobtoWorker() 84 * 85 * \param e error code if something went wrong 86 * \param conn reference with the connection 87 */ 88 void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn) 62 89 { 63 90 Info info(__FUNCTION__); … … 73 100 LOG(1, "INFO: Sending job #" << job.getId() << "."); 74 101 conn->async_write(job, 75 boost::bind(&FragmentScheduler::handle_SendJob , this,102 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this, 76 103 boost::asio::placeholders::error, conn)); 77 104 78 105 // Start an accept operation for a new Connection only when there 79 106 // are still jobs present otherwise we quit. 80 connection_ptr new_conn(new Connection( acceptor_.get_io_service()));81 acceptor_.async_accept(new_conn->socket(),82 boost::bind(&FragmentScheduler::handle_ accept, this,107 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service())); 108 worker_acceptor_.async_accept(new_conn->socket(), 109 boost::bind(&FragmentScheduler::handle_AcceptWorker, this, 83 110 boost::asio::placeholders::error, new_conn)); 84 111 } else { 85 112 // send the static NoJob 86 113 conn->async_write(NoJob, 87 boost::bind(&FragmentScheduler::handle_SendJob , this,114 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this, 88 115 boost::asio::placeholders::error, conn)); 89 116 … … 102 129 } 103 130 104 /// Callback function when job has been sent. 105 void FragmentScheduler::handle_SendJob(const boost::system::error_code& e, connection_ptr conn) 131 /** Callback function when job has been sent. 132 * 133 * After job has been sent we start async_read() for the result. 134 * 135 * \sa handle_ReceiveResultFromWorker() 136 * 137 * \param e error code if something went wrong 138 * \param conn reference with the connection 139 */ 140 void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn) 106 141 { 107 142 Info info(__FUNCTION__); … … 110 145 LOG(1, "INFO: Receiving result for a job ..."); 111 146 conn->async_read(result, 112 boost::bind(&FragmentScheduler::handle_ReceiveResult , this,147 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this, 113 148 boost::asio::placeholders::error, conn)); 114 149 } 115 150 116 /// Callback function when result has been received. 117 void FragmentScheduler::handle_ReceiveResult(const boost::system::error_code& e, connection_ptr conn) 118 { 119 Info info(__FUNCTION__); 120 // nothing to do 121 LOG(1, "INFO: Received result for job #" << result.getId() << " ..."); 122 // and push into queue 123 ASSERT(result.getId() != JobId::NoJob, 124 "FragmentScheduler::handle_write() - result received has NoJob id."); 125 ASSERT(result.getId() != JobId::IllegalJob, 126 "FragmentScheduler::handle_write() - result received has IllegalJob id."); 127 if ((result.getId() != JobId::NoJob) && (result.getId() != JobId::IllegalJob)) 128 JobsQueue.pushResult(result); 129 // erase result 130 result = FragmentResult(JobId::NoJob); 131 } 132 151 /** Callback function when result has been received. 152 * 153 * \param e error code if something went wrong 154 * \param conn reference with the connection 155 */ 156 void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn) 157 { 158 Info info(__FUNCTION__); 159 // nothing to do 160 LOG(1, "INFO: Received result for job #" << result.getId() << " ..."); 161 // and push into queue 162 ASSERT(result.getId() != (JobId_t)JobId::NoJob, 163 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id."); 164 ASSERT(result.getId() != (JobId_t)JobId::IllegalJob, 165 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id."); 166 if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob)) 167 JobsQueue.pushResult(result); 168 // erase result 169 result = FragmentResult(JobId::NoJob); 170 } 171 172 /** Handle a new controller connection. 173 * 174 * \sa handle_ReceiveJobs() 175 * \sa handle_CheckResultState() 176 * \sa handle_SendResults() 177 * 178 * \param e error code if something went wrong 179 * \param conn reference with the connection 180 */ 181 void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn) 182 { 183 Info info(__FUNCTION__); 184 if (!e) 185 { 186 if (jobs.empty()) { 187 // The connection::async_write() function will automatically 188 // serialize the data structure for us. 189 LOG(1, "INFO: Receiving bunch of jobs from a controller ..."); 190 conn->async_read(jobs, 191 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this, 192 boost::asio::placeholders::error, conn)); 193 } 194 } 195 else 196 { 197 // An error occurred. Log it and return. Since we are not starting a new 198 // accept operation the io_service will run out of work to do and the 199 // server will exit. 200 Exitflag = ErrorFlag; 201 ELOG(0, e.message()); 202 } 203 } 204 205 /** Controller callback function when job has been sent. 206 * 207 * \param e error code if something went wrong 208 * \param conn reference with the connection 209 */ 210 void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn) 211 { 212 Info info(__FUNCTION__); 213 // jobs are received, hence place in JobsQueue 214 if (!jobs.empty()) { 215 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue."); 216 JobsQueue.pushJobs(jobs); 217 } 218 // launch new acceptor of queue has been filled/is full 219 if (JobsQueue.isJobPresent()) { 220 // Start an accept operation for a new Connection. 221 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service())); 222 controller_acceptor_.async_accept(new_conn->socket(), 223 boost::bind(&FragmentScheduler::handle_AcceptController, this, 224 boost::asio::placeholders::error, new_conn)); 225 } else { 226 LOG(1, "INFO: Shutting down controller socket."); 227 } 228 229 jobs.clear(); 230 } 231
Note:
See TracChangeset
for help on using the changeset viewer.