source: src/Fragmentation/Automation/FragmentWorker.cpp@ ef2767

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since ef2767 was ef2767, checked in by Frederik Heber <heber@…>, 13 years ago

Server and Worker now also correctly exchange result.

  • Server uses FragmentQueue which also stores the obtained result.
  • Worker calls received FragmentJob::Work() function and returns the thereby calculated FragmentResult.
  • Regression test for Server/Worker now checks whether job #1 is listed twice, this basically checks whether the result has been exchanged.

There have been some concepts to understand to get this working and these

shall be briefly recorded here:

  • asynchronous communication can only work on objects that live beyond the scope of where they have been called, e.g. therefore FragmentScheduler contains a FragmentResult and FragmentWorker a FragmentJob instance. They receive this object and need the write access in the scope of the aync. comm. and not of the caller's scope. This is probably because the initial argument is only used to set up a buffer of correct length. However, the received instance is created/deserialized first when the communication is completed. And this may well be after the caller's scope has been left.
  • This is different to read operations as probably there the object to send is immediately serialized and placed into an internal buffer such that later access is only to this buffer and not to the original instance which therefore does not need to exist anymore. That's why the above Schedulder and Worker do not have the "other" instance as class members as well.
  • chaining asynchronous communications, e.g. a write after a read has been performed, can only be done by using the callback functions that the async_write/read gets as parameters. They are called when the one operation has finished and therein the next operation can then be launched. This way a successful chain is executed.
  • Property mode set to 100644
File size: 4.7 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentWorker.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see client.cpp).
13 *
14 * Created on: Nov 18, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <boost/bind.hpp>
29#include <iostream>
30#include <vector>
31#include "Connection.hpp" // Must come before boost/serialization headers.
32#include <boost/serialization/vector.hpp>
33#include "CodePatterns/Info.hpp"
34#include "CodePatterns/Log.hpp"
35#include "FragmentJob.hpp"
36#include "FragmentResult.hpp"
37#include "FragmentWorker.hpp"
38
39FragmentResult FragmentWorker::EmptyResult(JobId::NoJob, std::string("EmptyResult"));
40
41/// Constructor starts the asynchronous connect operation.
42FragmentWorker::FragmentWorker(
43 boost::asio::io_service& io_service,
44 const std::string& host,
45 const std::string& service) :
46 connection_(io_service),
47 result(JobId::IllegalJob),
48 Exitflag(OkFlag)
49{
50 Info info(__FUNCTION__);
51 // Resolve the host name into an IP address.
52 boost::asio::ip::tcp::resolver resolver(io_service);
53 boost::asio::ip::tcp::resolver::query query(host, service);
54 boost::asio::ip::tcp::resolver::iterator endpoint_iterator =
55 resolver.resolve(query);
56 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
57
58 // Start an asynchronous connect operation.
59 std::cout << "Connecting to endpoint " << endpoint << std::endl;
60 connection_.socket().async_connect(endpoint,
61 boost::bind(&FragmentWorker::handle_connect, this,
62 boost::asio::placeholders::error, ++endpoint_iterator));
63}
64
65/// Handle completion of a connect operation.
66void FragmentWorker::handle_connect(const boost::system::error_code& e,
67 boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
68{
69 Info info(__FUNCTION__);
70 if (!e)
71 {
72 // Successfully established connection. Start operation to read the list
73 // of jobs. The connection::async_read() function will automatically
74 // decode the data that is read from the underlying socket.
75 LOG(1, "INFO: Receiving a job ...");
76 connection_.async_read(job,
77 boost::bind(&FragmentWorker::handle_read, this,
78 boost::asio::placeholders::error));
79 } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
80 // Try the next endpoint.
81 connection_.socket().close();
82 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
83 connection_.socket().async_connect(endpoint,
84 boost::bind(&FragmentWorker::handle_connect, this,
85 boost::asio::placeholders::error, ++endpoint_iterator));
86 } else {
87 // An error occurred. Log it and return. Since we are not starting a new
88 // operation the io_service will run out of work to do and the client will
89 // exit.
90 Exitflag = ErrorFlag;
91 ELOG(1, e.message());
92 }
93}
94
95/// Handle completion of a read operation.
96void FragmentWorker::handle_read(const boost::system::error_code& e)
97{
98 Info info(__FUNCTION__);
99 if (!e)
100 {
101 LOG(1, "INFO: Received job #" << job.getId() << ".");
102 if (job.getId() != JobId::NoJob) {
103 // Print out the data that was received.
104 std::cout << "Job output: " << job.outputfile << "\n";
105 std::cout << "Job id: " << job.getId() << "\n";
106
107 // do something .. right now: wait
108 LOG(1, "INFO: Calculating job #" << job.getId() << " ...");
109 FragmentResult result(job.Work());
110
111 // write the result to the server
112 LOG(1, "INFO: Sending result for job #" << job.getId() << " ...");
113 connection_.async_write(result,
114 boost::bind(&FragmentWorker::handle_write, this,
115 boost::asio::placeholders::error));
116
117 } else {
118 std::cout << "The server has no job for me." << std::endl;
119 // send out empty result
120 LOG(1, "INFO: Sending empty result ...");
121 connection_.async_write(EmptyResult,
122 boost::bind(&FragmentWorker::handle_write, this,
123 boost::asio::placeholders::error));
124 Exitflag = NoJobFlag;
125 }
126 }
127 else
128 {
129 // An error occurred.
130 Exitflag = ErrorFlag;
131 ELOG(1, e.message());
132 }
133
134 // Since we are not starting a new operation the io_service will run out of
135 // work to do and the client will exit.
136}
137
138/// Handle completion of a write operation.
139void FragmentWorker::handle_write(const boost::system::error_code& e)
140{
141 Info info(__FUNCTION__);
142 // Nothing to do.
143 LOG(1, "INFO: Job #" << job.getId() << " calculated and sent.");
144 // erase job
145 job = FragmentJob();
146}
147
Note: See TracBrowser for help on using the repository browser.