Changeset 778abb for src/Fragmentation/Automation/FragmentScheduler.cpp
- Timestamp:
- May 4, 2012, 2:19:07 PM (13 years ago)
- Branches:
- Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
- Children:
- e70b9d
- Parents:
- b9c486
- git-author:
- Frederik Heber <heber@…> (12/09/11 18:18:14)
- git-committer:
- Frederik Heber <heber@…> (05/04/12 14:19:07)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Fragmentation/Automation/FragmentScheduler.cpp
rb9c486 r778abb 57 57 ), 58 58 result(JobId::NoJob), 59 choice(NoOperation), 59 60 Exitflag(OkFlag) 60 61 { 61 62 Info info(__FUNCTION__); 62 63 63 initiateWorkerSocket(); 64 // only initiate socket if jobs are already present 65 if (JobsQueue.isJobPresent()) { 66 LOG(1, "Listening for workers on port " << workerport << "."); 67 initiateWorkerSocket(); 68 } 64 69 65 70 initiateControllerSocket(); 71 LOG(1, "Listening for controller on port " << controllerport << "."); 66 72 } 67 73 … … 117 123 boost::asio::placeholders::error, conn)); 118 124 119 // Start an accept operation for a new Connection only when there120 // are still jobs present otherwise we quit.121 initiateWorkerSocket();122 125 } else { 123 126 // send the static NoJob … … 139 142 ELOG(0, e.message()); 140 143 } 144 145 // Start an accept operation for a new Connection only when there 146 // are still jobs present 147 if (JobsQueue.isJobPresent()) 148 initiateWorkerSocket(); 141 149 } 142 150 … … 169 177 { 170 178 Info info(__FUNCTION__); 171 // nothing to do172 179 LOG(1, "INFO: Received result for job #" << result.getId() << " ..."); 173 // and push into queue174 180 ASSERT(result.getId() != (JobId_t)JobId::NoJob, 175 181 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id."); 176 182 ASSERT(result.getId() != (JobId_t)JobId::IllegalJob, 177 183 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id."); 184 // place id into expected 178 185 if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob)) 179 186 JobsQueue.pushResult(result); 180 187 // erase result 181 188 result = FragmentResult(JobId::NoJob); 189 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); 182 190 } 183 191 … … 196 204 if (!e) 197 205 { 198 if (JobsQueue.isJobPresent() || !JobsQueue.getDoneJobs()) { 199 // The connection::async_write() function will automatically 200 // serialize the data structure for us. 201 LOG(1, "INFO: Receiving bunch of jobs from a controller ..."); 202 conn->async_read(jobs, 203 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this, 204 boost::asio::placeholders::error, conn)); 205 } else { 206 // we just give a status report ... 207 // first update number 208 doneJobs = JobsQueue.getDoneJobs(); 209 // now we accept connections to check for state of calculations 210 LOG(1, "INFO: Sending state of results to controller ..."); 211 conn->async_write(doneJobs, 212 boost::bind(&FragmentScheduler::handle_CheckResultState, this, 213 boost::asio::placeholders::error, conn)); 214 } 206 conn->async_read(choice, 207 boost::bind(&FragmentScheduler::handle_ReadChoice, this, 208 boost::asio::placeholders::error, conn)); 215 209 } 216 210 else … … 224 218 } 225 219 220 /** Controller callback function to read the choice for next operation. 221 * 222 * \param e error code if something went wrong 223 * \param conn reference with the connection 224 */ 225 void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn) 226 { 227 Info info(__FUNCTION__); 228 if (!e) 229 { 230 // switch over the desired choice read previously 231 switch(choice) { 232 case NoOperation: 233 { 234 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation."); 235 break; 236 } 237 case ReceiveJobs: 238 { 239 // The connection::async_write() function will automatically 240 // serialize the data structure for us. 241 LOG(1, "INFO: Receiving bunch of jobs from a controller ..."); 242 conn->async_read(jobs, 243 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this, 244 boost::asio::placeholders::error, conn)); 245 break; 246 } 247 case CheckState: 248 { 249 // first update number 250 doneJobs = JobsQueue.getDoneJobs(); 251 // now we accept connections to check for state of calculations 252 LOG(1, "INFO: Sending state that "+toString(doneJobs)+" jobs are done to controller ..."); 253 conn->async_write(doneJobs, 254 boost::bind(&FragmentScheduler::handle_CheckResultState, this, 255 boost::asio::placeholders::error, conn)); 256 257 initiateControllerSocket(); 258 break; 259 } 260 case SendResults: 261 { 262 const std::vector<FragmentResult> results = JobsQueue.getAllResults(); 263 // ... or we give the results 264 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ..."); 265 conn->async_write(results, 266 boost::bind(&FragmentScheduler::handle_SendResults, this, 267 boost::asio::placeholders::error, conn)); 268 269 initiateControllerSocket(); 270 break; 271 } 272 default: 273 Exitflag = ControllerErrorFlag; 274 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice."); 275 break; 276 } 277 // restore NoOperation choice such that choice is not read twice 278 choice = NoOperation; 279 280 } 281 else 282 { 283 // An error occurred. Log it and return. Since we are not starting a new 284 // accept operation the io_service will run out of work to do and the 285 // server will exit. 286 Exitflag = ControllerErrorFlag; 287 ELOG(0, e.message()); 288 } 289 } 290 226 291 /** Controller callback function when job has been sent. 227 292 * 293 * We check here whether the worker socket is accepting, if there 294 * have been no jobs we re-activate it, as it is shut down after 295 * last job. 296 * 228 297 * \param e error code if something went wrong 229 298 * \param conn reference with the connection … … 232 301 { 233 302 Info info(__FUNCTION__); 303 bool initiateSocket = !JobsQueue.isJobPresent(); 304 234 305 // jobs are received, hence place in JobsQueue 235 306 if (!jobs.empty()) { 236 307 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue."); 237 308 JobsQueue.pushJobs(jobs); 238 }239 // launch new acceptor of queue has been filled/is full240 if (JobsQueue.isJobPresent()) {241 // Start an accept operation for a new Connection.309 // initiate socket if we had no jobs before 310 if (initiateSocket) 311 initiateWorkerSocket(); 312 // launch new acceptor of queue has been filled/is full 242 313 initiateControllerSocket(); 243 314 } else { … … 246 317 247 318 jobs.clear(); 319 248 320 } 249 321 … … 259 331 LOG(1, "INFO: Sent that " << doneJobs << " jobs are done."); 260 332 } 333 334 /** Controller callback function when result has been received. 335 * 336 * \param e error code if something went wrong 337 * \param conn reference with the connection 338 */ 339 void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn) 340 { 341 Info info(__FUNCTION__); 342 // do nothing 343 LOG(1, "INFO: Results have been sent."); 344 } 345
Note:
See TracChangeset
for help on using the changeset viewer.