Ignore:
Timestamp:
May 4, 2012, 2:19:07 PM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
e70b9d
Parents:
b9c486
git-author:
Frederik Heber <heber@…> (12/09/11 18:18:14)
git-committer:
Frederik Heber <heber@…> (05/04/12 14:19:07)
Message:

Added ResultGetter and capabilities to receive calculated results to FragmentController.

  • Added enum (and file) ControllerChoices that defines the state of FragmentScheduler.
  • depending on what is desired the Scheduler switches between these states and either receives or sends information. Requires new member variable choice because receival is of course asynchronous (see note in previous commit).
  • FragmentController has additional functions connect_get() and handle_connect_get() to receive results.
  • connect_calc() and connect_check() now just the choice whereas the actual sending and receiving is done in handle_... functions.
  • handle_FinishOperation() is the common final callback function for all three of these functions.
  • FragmentScheduler contains an internal list of delivered results.
  • FragmentScheduler only initiates worker socket when jobs are present.
  • FIX: FragmentScheduler does only send results that are done and only once.
  • TESTFIX: Removed third Worker that receives NoJob as socket is powered down before because queue has run empty and we haven't add new jobs.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentScheduler.cpp

    rb9c486 r778abb  
    5757  ),
    5858  result(JobId::NoJob),
     59  choice(NoOperation),
    5960  Exitflag(OkFlag)
    6061{
    6162  Info info(__FUNCTION__);
    6263
    63   initiateWorkerSocket();
     64  // only initiate socket if jobs are already present
     65  if (JobsQueue.isJobPresent()) {
     66    LOG(1, "Listening for workers on port " << workerport << ".");
     67    initiateWorkerSocket();
     68  }
    6469
    6570  initiateControllerSocket();
     71  LOG(1, "Listening for controller on port " << controllerport << ".");
    6672}
    6773
     
    117123        boost::asio::placeholders::error, conn));
    118124
    119       // Start an accept operation for a new Connection only when there
    120       // are still jobs present otherwise we quit.
    121       initiateWorkerSocket();
    122125    } else {
    123126      // send the static NoJob
     
    139142    ELOG(0, e.message());
    140143  }
     144
     145  // Start an accept operation for a new Connection only when there
     146  // are still jobs present
     147  if (JobsQueue.isJobPresent())
     148    initiateWorkerSocket();
    141149}
    142150
     
    169177{
    170178  Info info(__FUNCTION__);
    171   // nothing to do
    172179  LOG(1, "INFO: Received result for job #" << result.getId() << " ...");
    173   // and push into queue
    174180  ASSERT(result.getId() != (JobId_t)JobId::NoJob,
    175181      "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
    176182  ASSERT(result.getId() != (JobId_t)JobId::IllegalJob,
    177183      "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
     184  // place id into expected
    178185  if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob))
    179186    JobsQueue.pushResult(result);
    180187  // erase result
    181188  result = FragmentResult(JobId::NoJob);
     189  LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
    182190}
    183191
     
    196204  if (!e)
    197205  {
    198     if (JobsQueue.isJobPresent() || !JobsQueue.getDoneJobs()) {
    199       // The connection::async_write() function will automatically
    200       // serialize the data structure for us.
    201       LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
    202       conn->async_read(jobs,
    203         boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
    204         boost::asio::placeholders::error, conn));
    205     } else {
    206       // we just give a status report ...
    207       // first update number
    208       doneJobs = JobsQueue.getDoneJobs();
    209       // now we accept connections to check for state of calculations
    210       LOG(1, "INFO: Sending state of results to controller ...");
    211       conn->async_write(doneJobs,
    212         boost::bind(&FragmentScheduler::handle_CheckResultState, this,
    213         boost::asio::placeholders::error, conn));
    214     }
     206    conn->async_read(choice,
     207      boost::bind(&FragmentScheduler::handle_ReadChoice, this,
     208      boost::asio::placeholders::error, conn));
    215209  }
    216210  else
     
    224218}
    225219
     220/** Controller callback function to read the choice for next operation.
     221 *
     222 * \param e error code if something went wrong
     223 * \param conn reference with the connection
     224 */
     225void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
     226{
     227  Info info(__FUNCTION__);
     228  if (!e)
     229  {
     230    // switch over the desired choice read previously
     231    switch(choice) {
     232    case NoOperation:
     233    {
     234      ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
     235      break;
     236    }
     237    case ReceiveJobs:
     238      {
     239        // The connection::async_write() function will automatically
     240        // serialize the data structure for us.
     241        LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
     242        conn->async_read(jobs,
     243          boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
     244          boost::asio::placeholders::error, conn));
     245        break;
     246      }
     247    case CheckState:
     248    {
     249      // first update number
     250      doneJobs = JobsQueue.getDoneJobs();
     251      // now we accept connections to check for state of calculations
     252      LOG(1, "INFO: Sending state that "+toString(doneJobs)+" jobs are done to controller ...");
     253      conn->async_write(doneJobs,
     254        boost::bind(&FragmentScheduler::handle_CheckResultState, this,
     255        boost::asio::placeholders::error, conn));
     256
     257      initiateControllerSocket();
     258      break;
     259    }
     260    case SendResults:
     261    {
     262      const std::vector<FragmentResult> results = JobsQueue.getAllResults();
     263      // ... or we give the results
     264      LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
     265      conn->async_write(results,
     266        boost::bind(&FragmentScheduler::handle_SendResults, this,
     267        boost::asio::placeholders::error, conn));
     268
     269      initiateControllerSocket();
     270      break;
     271    }
     272    default:
     273      Exitflag = ControllerErrorFlag;
     274      ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
     275      break;
     276    }
     277    // restore NoOperation choice such that choice is not read twice
     278    choice = NoOperation;
     279
     280  }
     281  else
     282  {
     283    // An error occurred. Log it and return. Since we are not starting a new
     284    // accept operation the io_service will run out of work to do and the
     285    // server will exit.
     286    Exitflag = ControllerErrorFlag;
     287    ELOG(0, e.message());
     288  }
     289}
     290
    226291/** Controller callback function when job has been sent.
    227292 *
     293 * We check here whether the worker socket is accepting, if there
     294 * have been no jobs we re-activate it, as it is shut down after
     295 * last job.
     296 *
    228297 * \param e error code if something went wrong
    229298 * \param conn reference with the connection
     
    232301{
    233302  Info info(__FUNCTION__);
     303  bool initiateSocket = !JobsQueue.isJobPresent();
     304
    234305  // jobs are received, hence place in JobsQueue
    235306  if (!jobs.empty()) {
    236307    LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
    237308    JobsQueue.pushJobs(jobs);
    238   }
    239   // launch new acceptor of queue has been filled/is full
    240   if (JobsQueue.isJobPresent()) {
    241     // Start an accept operation for a new Connection.
     309    // initiate socket if we had no jobs before
     310    if (initiateSocket)
     311      initiateWorkerSocket();
     312    // launch new acceptor of queue has been filled/is full
    242313    initiateControllerSocket();
    243314  } else {
     
    246317
    247318  jobs.clear();
     319
    248320}
    249321
     
    259331  LOG(1, "INFO: Sent that " << doneJobs << " jobs are done.");
    260332}
     333
     334/** Controller callback function when result has been received.
     335 *
     336 * \param e error code if something went wrong
     337 * \param conn reference with the connection
     338 */
     339void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
     340{
     341  Info info(__FUNCTION__);
     342  // do nothing
     343  LOG(1, "INFO: Results have been sent.");
     344}
     345
Note: See TracChangeset for help on using the changeset viewer.