source: src/Fragmentation/Automation/FragmentController.cpp@ e70b9d

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since e70b9d was 778abb, checked in by Frederik Heber <heber@…>, 13 years ago

Added ResultGetter and capabilities to receive calculated results to FragmentController.

  • Added enum (and file) ControllerChoices that defines the state of FragmentScheduler.
  • depending on what is desired the Scheduler switches between these states and either receives or sends information. Requires new member variable choice because receival is of course asynchronous (see note in previous commit).
  • FragmentController has additional functions connect_get() and handle_connect_get() to receive results.
  • connect_calc() and connect_check() now just the choice whereas the actual sending and receiving is done in handle_... functions.
  • handle_FinishOperation() is the common final callback function for all three of these functions.
  • FragmentScheduler contains an internal list of delivered results.
  • FragmentScheduler only initiates worker socket when jobs are present.
  • FIX: FragmentScheduler does only send results that are done and only once.
  • TESTFIX: Removed third Worker that receives NoJob as socket is powered down before because queue has run empty and we haven't add new jobs.
  • Property mode set to 100644
File size: 12.0 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * FragmentController.cpp
10 *
11 * Created on: Nov 27, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20// boost asio needs specific operator new
21#include <boost/asio.hpp>
22
23#include "CodePatterns/MemDebug.hpp"
24
25#include <boost/bind.hpp>
26#include <boost/foreach.hpp>
27#include <iostream>
28#include <vector>
29#include "Connection.hpp" // Must come before boost/serialization headers.
30#include <boost/serialization/vector.hpp>
31#include "CodePatterns/Info.hpp"
32#include "CodePatterns/Log.hpp"
33#include "FragmentJob.hpp"
34#include "FragmentResult.hpp"
35#include "ControllerChoices.hpp"
36
37#include "FragmentController.hpp"
38
39/** Constructor of class FragmentController.
40 *
41 * \param io_service io_service for the asynchronous operations
42 * \param _host hostname of server that accepts jobs
43 * \param _service of server
44 */
45FragmentController::FragmentController(
46 boost::asio::io_service& io_service,
47 const std::string& _host,
48 const std::string& _service) :
49 connection_(io_service),
50 host(_host),
51 service(_service),
52 Exitflag(OkFlag)
53{
54 Info info(__FUNCTION__);
55}
56
57/** Destructor of class FragmentController.
58 *
59 */
60FragmentController::~FragmentController()
61{}
62
63/** Handle completion of a connect operation.
64 *
65 * \param e error code if something went wrong
66 * \param endpoint_iterator endpoint of the connection
67 */
68void FragmentController::handle_connect_calc(const boost::system::error_code& e,
69 boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
70{
71 Info info(__FUNCTION__);
72 if (!e)
73 {
74 // Successfully established connection. Give choice.
75 enum ControllerChoices choice = ReceiveJobs;
76 connection_.async_write(choice,
77 boost::bind(&FragmentController::handle_SendJobs, this,
78 boost::asio::placeholders::error));
79 } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
80 // Try the next endpoint.
81 connection_.socket().close();
82 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
83 connection_.socket().async_connect(endpoint,
84 boost::bind(&FragmentController::handle_connect_calc, this,
85 boost::asio::placeholders::error, ++endpoint_iterator));
86 } else {
87 // An error occurred. Log it and return. Since we are not starting a new
88 // operation the io_service will run out of work to do and the client will
89 // exit.
90 Exitflag = ErrorFlag;
91 ELOG(1, e.message());
92 }
93}
94
95/** Handle completion of a connect operation.
96 *
97 * \param e error code if something went wrong
98 * \param endpoint_iterator endpoint of the connection
99 */
100void FragmentController::handle_connect_check(const boost::system::error_code& e,
101 boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
102{
103 Info info(__FUNCTION__);
104 if (!e)
105 {
106 // Successfully established connection. Give choice.
107 enum ControllerChoices choice = CheckState;
108 connection_.async_write(choice,
109 boost::bind(&FragmentController::handle_ReceiveDoneJobs, this,
110 boost::asio::placeholders::error));
111 } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
112 // Try the next endpoint.
113 connection_.socket().close();
114 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
115 connection_.socket().async_connect(endpoint,
116 boost::bind(&FragmentController::handle_connect_check, this,
117 boost::asio::placeholders::error, ++endpoint_iterator));
118 } else {
119 // An error occurred. Log it and return. Since we are not starting a new
120 // operation the io_service will run out of work to do and the client will
121 // exit.
122 Exitflag = ErrorFlag;
123 ELOG(1, e.message());
124 }
125}
126
127/** Handle completion of a connect operation.
128 *
129 * \param e error code if something went wrong
130 * \param endpoint_iterator endpoint of the connection
131 */
132void FragmentController::handle_connect_get(const boost::system::error_code& e,
133 boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
134{
135 Info info(__FUNCTION__);
136 if (!e)
137 {
138 // Successfully established connection. Give choice.
139 enum ControllerChoices choice = SendResults;
140 connection_.async_write(choice,
141 boost::bind(&FragmentController::handle_ReceivingResults, this,
142 boost::asio::placeholders::error));
143 } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
144 // Try the next endpoint.
145 connection_.socket().close();
146 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
147 connection_.socket().async_connect(endpoint,
148 boost::bind(&FragmentController::handle_connect_check, this,
149 boost::asio::placeholders::error, ++endpoint_iterator));
150 } else {
151 // An error occurred. Log it and return. Since we are not starting a new
152 // operation the io_service will run out of work to do and the client will
153 // exit.
154 Exitflag = ErrorFlag;
155 ELOG(1, e.message());
156 }
157}
158
159/** Callback function when operation has been completed.
160 *
161 * \param e error code if something went wrong
162 */
163void FragmentController::handle_FinishOperation(const boost::system::error_code& e)
164{
165 Info info(__FUNCTION__);
166 if (!e)
167 {
168 LOG(1, "INFO: Operation completed.");
169 }
170 else
171 {
172 // An error occurred.
173 Exitflag = ErrorFlag;
174 ELOG(1, e.message());
175 }
176
177 // Since we are not starting a new operation the io_service will run out of
178 // work to do and the client will exit.
179}
180
181/** Callback function when jobs have been sent.
182 *
183 * \param e error code if something went wrong
184 */
185void FragmentController::handle_SendJobs(const boost::system::error_code& e)
186{
187 Info info(__FUNCTION__);
188 if (!e)
189 {
190 // Successfully established connection. Start operation to read the vector
191 // of jobs. The connection::async_write() function will automatically
192 // encode the data that is written to the underlying socket.
193 LOG(1, "INFO: Sending "+toString(jobs.size())+" jobs ...");
194 connection_.async_write(jobs,
195 boost::bind(&FragmentController::handle_FinishOperation, this,
196 boost::asio::placeholders::error));
197 }
198 else
199 {
200 // An error occurred.
201 Exitflag = ErrorFlag;
202 ELOG(1, e.message());
203 }
204
205 // Since we are not starting a new operation the io_service will run out of
206 // work to do and the client will exit.
207}
208
209/** Callback function when results have been received.
210 *
211 * \param e error code if something went wrong
212 */
213void FragmentController::handle_ReceivingResults(const boost::system::error_code& e)
214{
215 Info info(__FUNCTION__);
216 if (!e)
217 {
218 // The connection::async_read() function will automatically
219 // decode the data that is written to the underlying socket.
220 connection_.async_read(results,
221 boost::bind(&FragmentController::handle_ReceivedResults, this,
222 boost::asio::placeholders::error));
223 }
224 else
225 {
226 // An error occurred.
227 Exitflag = ErrorFlag;
228 ELOG(1, e.message());
229 }
230
231 // Since we are not starting a new operation the io_service will run out of
232 // work to do and the client will exit.
233}
234
235/** Callback function when doneJobs have been received.
236 *
237 * \param e error code if something went wrong
238 */
239void FragmentController::handle_ReceivedResults(const boost::system::error_code& e)
240{
241 Info info(__FUNCTION__);
242
243 LOG(1, "INFO: Received "+toString(results.size())+" results ...");
244
245 handle_FinishOperation(e);
246}
247
248/** Callback function when doneJobs have been received.
249 *
250 * \param e error code if something went wrong
251 */
252void FragmentController::handle_ReceiveDoneJobs(const boost::system::error_code& e)
253{
254 Info info(__FUNCTION__);
255 if (!e)
256 {
257 // The connection::async_read() function will automatically
258 // decode the data that is written to the underlying socket.
259 LOG(1, "INFO: Checking number of done jobs ...");
260 connection_.async_read(doneJobs,
261 boost::bind(&FragmentController::handle_FinishOperation, this,
262 boost::asio::placeholders::error));
263 }
264 else
265 {
266 // An error occurred.
267 Exitflag = ErrorFlag;
268 ELOG(1, e.message());
269 }
270}
271
272/** Internal function to resolve all possible connection endpoints.
273 *
274 * \return endpoint iterator of connection
275 */
276boost::asio::ip::tcp::resolver::iterator FragmentController::getEndpointIterator()
277{
278 // Resolve the host name into an IP address.
279 boost::asio::ip::tcp::resolver resolver(connection_.socket().get_io_service());
280 boost::asio::ip::tcp::resolver::query query(host, service);
281 boost::asio::ip::tcp::resolver::iterator endpoint_iterator =
282 resolver.resolve(query);
283
284 return endpoint_iterator;
285}
286
287/** Internal function to connect to the endpoint of the server asynchronuously.
288 *
289 * We require internal connetion_ and host and service to be set up for this.
290 */
291void FragmentController::connect_calc()
292{
293 Info info(__FUNCTION__);
294 // Resolve the host name into an IP address.
295 boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator();
296 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
297
298 // Start an asynchronous connect operation.
299 std::cout << "Connecting to endpoint " << endpoint << " to calc " << std::endl;
300 connection_.socket().async_connect(endpoint,
301 boost::bind(&FragmentController::handle_connect_calc, this,
302 boost::asio::placeholders::error, ++endpoint_iterator));
303}
304
305/** Internal function to connect to the endpoint of the server asynchronuously.
306 *
307 * We require internal connetion_ and host and service to be set up for this.
308 */
309void FragmentController::connect_check()
310{
311 Info info(__FUNCTION__);
312 // Resolve the host name into an IP address.
313 boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator();
314 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
315
316 // Start an asynchronous connect operation.
317 std::cout << "Connecting to endpoint " << endpoint << " to check " << std::endl;
318 connection_.socket().async_connect(endpoint,
319 boost::bind(&FragmentController::handle_connect_check, this,
320 boost::asio::placeholders::error, ++endpoint_iterator));
321}
322
323/** Internal function to connect to the endpoint of the server asynchronuously.
324 *
325 * We require internal connetion_ and host and service to be set up for this.
326 */
327void FragmentController::connect_get()
328{
329 Info info(__FUNCTION__);
330 // Resolve the host name into an IP address.
331 boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator();
332 boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
333
334 // Start an asynchronous connect operation.
335 std::cout << "Connecting to endpoint " << endpoint << " to get results " << std::endl;
336 connection_.socket().async_connect(endpoint,
337 boost::bind(&FragmentController::handle_connect_get, this,
338 boost::asio::placeholders::error, ++endpoint_iterator));
339}
340
341/** Internal function to disconnect connection_ correctly.
342 *
343 */
344void FragmentController::disconnect()
345{
346 //connection_.socket().close();
347}
348
349/** Place number of jobs into this controller.
350 *
351 * \param _jobs jobs to add
352 */
353void FragmentController::addJobs(const std::vector<FragmentJob> &_jobs)
354{
355 jobs.reserve(jobs.size()+_jobs.size());
356 BOOST_FOREACH(FragmentJob job, _jobs) {
357 jobs.push_back(job);
358 }
359}
360
361/** Prepares the calculation of the results for the current jobs.
362 */
363void FragmentController::calculateResults()
364{
365 Info info(__FUNCTION__);
366 // connect
367 connect_calc();
368 //disconnect
369 disconnect();
370}
371
372/** Prepares the calculation of the results for the current jobs.
373 */
374void FragmentController::checkResults()
375{
376 Info info(__FUNCTION__);
377 // connect
378 connect_check();
379 //disconnect
380 disconnect();
381}
382
383/** Getter for results.
384 *
385 * \sa calculateResults()
386 * \return vector of results for the added jobs (\sa addJobs()).
387 */
388std::vector<FragmentResult> FragmentController::getResults()
389{
390 Info info(__FUNCTION__);
391 return results;
392}
393
394/** Function to initiate receival of results.
395 *
396 */
397void FragmentController::obtainResults()
398{
399 // connect
400 connect_get();
401 //disconnect
402 disconnect();
403}
404
405/** Getter for doneJobs.
406 *
407 * \sa checkResults()
408 * \param doneJobs
409 */
410size_t FragmentController::getDoneJobs() const
411{
412 return doneJobs;
413}
Note: See TracBrowser for help on using the repository browser.