Changeset 950cd4
- Timestamp:
- Aug 11, 2025, 5:44:42 PM (3 months ago)
- Branches:
- Candidate_v1.7.0, stable
- Children:
- 74d798
- Parents:
- e5f9e7
- git-author:
- Frederik Heber <frederik.heber@…> (07/04/25 20:57:33)
- git-committer:
- Frederik Heber <frederik.heber@…> (08/11/25 17:44:42)
- Location:
- ThirdParty/JobMarket/src/JobMarket
- Files:
- 
      - 5 edited
 
 - 
          
  Pool/PoolWorker.cpp (modified) (4 diffs)
- 
          
  Pool/PoolWorker.hpp (modified) (3 diffs)
- 
          
  WorkerOptions.cpp (modified) (1 diff)
- 
          
  WorkerOptions.hpp (modified) (2 diffs)
- 
          
  poolworker_main.cpp (modified) (3 diffs)
 
Legend:
- Unmodified
- Added
- Removed
- 
      ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.cppre5f9e7 r950cd4 56 56 const std::string& _service, 57 57 const std::string& listenhost, 58 const std::string& listenservice) : 58 const std::string& listenservice, 59 const std::string& healthport) : 59 60 io_service(_io_service), 60 61 PoolListener(_io_service, boost::lexical_cast<unsigned short>(listenservice), *this), 62 HealthListener(_io_service, boost::lexical_cast<unsigned short>(healthport), PoolListener), 61 63 MyAddress(listenhost, listenservice), 62 64 ServerAddress(_host, _service), … … 70 72 if (PoolListener.getExitflag() == Listener::OkFlag) { 71 73 // always enroll and make listenining initiation depend on its success 72 const boost::function<void ()> initiateme = 73 boost::bind(&Pool Listener_t::initiateSocket, boost::ref(PoolListener));74 const boost::function<void ()> initiateme = 75 boost::bind(&PoolWorker::initiateSockets, this); 74 76 const boost::function<void ()> shutdown = 75 77 boost::bind(&PoolWorker::finish, this); // no need to remove from pool … … 82 84 } 83 85 } 86 87 void PoolWorker::initiateSockets() 88 { 89 PoolListener.initiateSocket(); 90 HealthListener.initiateSocket(); 91 } 92 93 /// Handle completion of a accept server operation. 94 void PoolWorker::HealthProbeListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) 95 { 96 DEBUG_FUNCTION_ENTRYEXIT 97 if (!e) 98 { 99 if (PoolListener.getExitflag() == Listener::OkFlag) { 100 LOG(2, "DEBUG: Ping from health check"); 101 // and listen for following connections 102 initiateSocket(); 103 } else { 104 LOG(2, "INFO: Exitflag is " << PoolListener.getExitflag() << ", unhealthy ..."); 105 closeSocket(); 106 } 107 } 108 else 109 { 110 // An error occurred. Log it and return. Since we are not starting a new 111 // accept operation the io_service will run out of work to do and the 112 // server will exit. 113 setExitflag( ErrorFlag ); 114 ELOG(0, e.message()); 115 } 116 } 117 84 118 85 119 /// Handle completion of a accept server operation. … … 324 358 { 325 359 // somehow stop listener 360 HealthListener.closeSocket(); 361 362 // somehow stop listener 326 363 PoolListener.closeSocket(); 327 364 
- 
      ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.hppre5f9e7 r950cd4 45 45 const std::string& service, 46 46 const std::string& listenhost, 47 const std::string& listenservice); 47 const std::string& listenservice, 48 const std::string& healthport); 48 49 49 50 /** Returns the flag of the handled operation. … … 122 123 }; 123 124 125 /** 126 * Handles TCP liveness probes. 127 */ 128 class HealthProbeListener_t : public Listener 129 { 130 public: 131 HealthProbeListener_t( 132 boost::asio::io_service& io_service, 133 unsigned short port, 134 const PoolListener_t& _PoolListener) : 135 Listener(io_service, port), 136 PoolListener(_PoolListener) 137 {} 138 139 virtual ~HealthProbeListener_t() {} 140 141 protected: 142 /// Handle completion of a accept controller operation. 143 void handle_Accept(const boost::system::error_code& e, connection_ptr conn); 144 145 private: 146 const PoolListener_t& PoolListener; 147 }; 148 149 private: 150 void initiateSockets(); 151 124 152 private: 125 153 //!> reference to io_service which we use for connections … … 128 156 //!> The listener for the WorkerPool 129 157 PoolListener_t PoolListener; 158 159 //!> The listener for the WorkerPool 160 HealthProbeListener_t HealthListener; 130 161 131 162 //!> address of this worker 
- 
      ThirdParty/JobMarket/src/JobMarket/WorkerOptions.cppre5f9e7 r950cd4 85 85 return 0; 86 86 } 87 88 int WorkerOptions::parseHealthProbePort(boost::program_options::variables_map &vm) { 89 if (vm.count("healthport")) { 90 try { 91 healthprobeport = vm["healthport"].as< std::string >(); 92 } catch (boost::bad_lexical_cast) { 93 ELOG(1, "Could not read " << vm["healthport"].as< std::string >() << " as digits."); 94 return 255; 95 } 96 LOG(1, "INFO: Using port " << healthprobeport << " to listen fdor health probe connects."); 97 } else { 98 healthprobeport = ""; 99 } 100 return 0; 101 } 
- 
      ThirdParty/JobMarket/src/JobMarket/WorkerOptions.hppre5f9e7 r950cd4 29 29 int parseLocalhost(boost::program_options::variables_map &vm); 30 30 int parseListenPort(boost::program_options::variables_map &vm); 31 int parseHealthProbePort(boost::program_options::variables_map &vm); 31 32 32 33 std::string server; … … 34 35 std::string hostname; 35 36 std::string listenport; 37 std::string healthprobeport; 36 38 }; 37 39 
- 
      ThirdParty/JobMarket/src/JobMarket/poolworker_main.cppre5f9e7 r950cd4 58 58 ("server", boost::program_options::value< std::string>(), "connect to server at this address (host:port)") 59 59 ("listen", boost::program_options::value< std::string >(), "listen on this port") 60 ("healthport", boost::program_options::value< std::string >(), "listen on this port for health probes") 60 61 ("hostname", boost::program_options::value< std::string>(), "name of host on which this codes runs and which server can resolve") 61 62 ; … … 76 77 status = WorkerOpts.parseLocalhost(vm); 77 78 if (status) return status; 79 status = WorkerOpts.parseHealthProbePort(vm); 80 if (status) return status; 78 81 status = WorkerOpts.parseSignals(vm); 79 82 if (status) return status; … … 83 86 { 84 87 boost::asio::io_service io_service; 85 PoolWorker client(io_service, WorkerOpts.server, WorkerOpts.serverport, WorkerOpts.hostname, WorkerOpts.listenport );88 PoolWorker client(io_service, WorkerOpts.server, WorkerOpts.serverport, WorkerOpts.hostname, WorkerOpts.listenport, "8092"); 86 89 87 90 // catch ctrl-c and shutdown worker properly 
  Note:
 See   TracChangeset
 for help on using the changeset viewer.
  
