/* * Project: JobMarket * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio * Copyright (C) 2012 Frederik Heber. All rights reserved. * */ /* * OperationQueue.cpp * * Created on: Apr 24, 2012 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include //#include "CodePatterns/MemDebug.hpp" #include #include #include #include "CodePatterns/Log.hpp" #include "CodePatterns/Observer/Observer.hpp" #include "JobMarket/Operations/AsyncOperation.hpp" #include "JobMarket/Operations/OperationQueue.hpp" static void NoOp(const WorkerAddress) {} // static instances const boost::function OperationQueue::NoOpCallback = boost::bind(&NoOp, _1); size_t OperationQueue::max_connections = 1; OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op) { OperationQueue_t::iterator iter = std::find_if(queue.begin(), queue.end(), boost::bind(&AsyncOp_ptr::get, boost::lambda::_1) == op); return iter; } void OperationQueue::push_back(AsyncOperation *&op, const WorkerAddress &address) { if (op != NULL) { if (!IsBlockedFlag) { AsyncOp_ptr ptr(op); // this always prevents memory loss ptr->signOn(this); OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr ); op = NULL; AddressMap.insert( make_pair(*iter, address) ); LaunchNextOp(); } else { ELOG(1, "Queue is currently blocked, dropping operation "+op->getName()+"."); delete op; op = NULL; } } else { ELOG(1, "Given operation pointer is NULL."); } } void OperationQueue::LaunchNextOp() { // connection available? if (getNumberOfRunningOps() < max_connections) { if (!queue.empty()) { // only start operation when address is valid OperationQueue_t::iterator queueiter = std::find_if(queue.begin(), queue.end(), boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) ); if (queueiter != queue.end()) { const AddressMap_t::iterator mapiter = AddressMap.find(*queueiter); ASSERT( mapiter != AddressMap.end(), "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap."); currentOpsAddress = mapiter->second; const AsyncOp_ptr ptr = mapiter->first; // always erase the op from the list of ones pending for launch AddressMap.erase(mapiter); // only launch when not a debug op if ((!currentOpsAddress.host.empty()) && (!currentOpsAddress.service.empty())) { LOG(2, "DEBUG: Launching next operation " << ptr->getName() << "."); (*ptr)(currentOpsAddress.host, currentOpsAddress.service); } else { LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address."); } } else { LOG(2, "DEBUG: All remaining operations are already running."); } } else { LOG(2, "DEBUG: There are no more operations in the queue."); } } else { LOG(2, "DEBUG: Currently there are no free connections."); } } void OperationQueue::remove(AsyncOperation *op, Observer *observer) { if (op != NULL) { OperationQueue_t::iterator iter = findOperation(op); if (iter != queue.end()) { // sign off and remove op if (observer != NULL) op->signOff(observer); queue.erase(iter); } else { ELOG(1, "Could not find Operation " << op->getName() << " in operation's queue."); } } else { ELOG(1, "Given operation pointer is NULL."); } } void OperationQueue::update(Observable *publisher) { AsyncOperation *op = static_cast(publisher); if (op != NULL) { // check for error code of operation if (op->getStatus() != Operation::success) { // remove the worker from the queue failure_callback(currentOpsAddress); LOG(1, "INFO: We are notified that " << op->getName() << " has failed, removing ..."); } else { LOG(1, "INFO: We are notified that " << op->getName() << " is done, removing ..."); } // remove from queue remove(op, this); LaunchNextOp(); } } void OperationQueue::recieveNotification(Observable *publisher, Notification_ptr notification) {} void OperationQueue::subjectKilled(Observable *publisher) { AsyncOperation *op = static_cast(publisher); if (op != NULL) { ELOG(2, "DEBUG: AsyncOperation at " << publisher << " got killed before being done?"); // remove from queue remove(op, this); } }