| 1 | /*
|
|---|
| 2 | * Project: JobMarket
|
|---|
| 3 | * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
|
|---|
| 4 | * Copyright (C) 2012 Frederik Heber. All rights reserved.
|
|---|
| 5 | *
|
|---|
| 6 | */
|
|---|
| 7 |
|
|---|
| 8 | /*
|
|---|
| 9 | * OperationQueue.cpp
|
|---|
| 10 | *
|
|---|
| 11 | * Created on: Apr 24, 2012
|
|---|
| 12 | * Author: heber
|
|---|
| 13 | */
|
|---|
| 14 |
|
|---|
| 15 |
|
|---|
| 16 | // include config.h
|
|---|
| 17 | #ifdef HAVE_CONFIG_H
|
|---|
| 18 | #include <config.h>
|
|---|
| 19 | #endif
|
|---|
| 20 |
|
|---|
| 21 | // boost asio needs specific operator new
|
|---|
| 22 | #include <boost/asio.hpp>
|
|---|
| 23 |
|
|---|
| 24 | //#include "CodePatterns/MemDebug.hpp"
|
|---|
| 25 |
|
|---|
| 26 | #include <boost/bind.hpp>
|
|---|
| 27 | #include <boost/lambda/lambda.hpp>
|
|---|
| 28 | #include <string>
|
|---|
| 29 |
|
|---|
| 30 | #include "CodePatterns/Log.hpp"
|
|---|
| 31 | #include "CodePatterns/Observer/Observer.hpp"
|
|---|
| 32 |
|
|---|
| 33 | #include "JobMarket/Operations/AsyncOperation.hpp"
|
|---|
| 34 | #include "JobMarket/Operations/OperationQueue.hpp"
|
|---|
| 35 |
|
|---|
| 36 | static void NoOp(const WorkerAddress) {}
|
|---|
| 37 |
|
|---|
| 38 | // static instances
|
|---|
| 39 | const boost::function<void (const WorkerAddress)> OperationQueue::NoOpCallback = boost::bind(&NoOp, _1);
|
|---|
| 40 |
|
|---|
| 41 | size_t OperationQueue::max_connections = 1;
|
|---|
| 42 |
|
|---|
| 43 | OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op)
|
|---|
| 44 | {
|
|---|
| 45 | OperationQueue_t::iterator iter =
|
|---|
| 46 | std::find_if(queue.begin(), queue.end(),
|
|---|
| 47 | boost::bind(&AsyncOp_ptr::get, boost::lambda::_1) == op);
|
|---|
| 48 | return iter;
|
|---|
| 49 | }
|
|---|
| 50 |
|
|---|
| 51 | void OperationQueue::push_back(AsyncOperation *&op, const WorkerAddress &address)
|
|---|
| 52 | {
|
|---|
| 53 | if (op != NULL) {
|
|---|
| 54 | if (!IsBlockedFlag) {
|
|---|
| 55 | AsyncOp_ptr ptr(op); // this always prevents memory loss
|
|---|
| 56 | ptr->signOn(this);
|
|---|
| 57 | OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr );
|
|---|
| 58 | op = NULL;
|
|---|
| 59 | AddressMap.insert( make_pair(*iter, address) );
|
|---|
| 60 | LaunchNextOp();
|
|---|
| 61 | } else {
|
|---|
| 62 | ELOG(1, "Queue is currently blocked, dropping operation "+op->getName()+".");
|
|---|
| 63 | delete op;
|
|---|
| 64 | op = NULL;
|
|---|
| 65 | }
|
|---|
| 66 | } else {
|
|---|
| 67 | ELOG(1, "Given operation pointer is NULL.");
|
|---|
| 68 | }
|
|---|
| 69 | }
|
|---|
| 70 |
|
|---|
| 71 | void OperationQueue::LaunchNextOp()
|
|---|
| 72 | {
|
|---|
| 73 | // connection available?
|
|---|
| 74 | if (getNumberOfRunningOps() < max_connections) {
|
|---|
| 75 | if (!queue.empty()) {
|
|---|
| 76 | // only start operation when address is valid
|
|---|
| 77 | OperationQueue_t::iterator queueiter =
|
|---|
| 78 | std::find_if(queue.begin(), queue.end(),
|
|---|
| 79 | boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) );
|
|---|
| 80 | if (queueiter != queue.end()) {
|
|---|
| 81 | const AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
|
|---|
| 82 | ASSERT( mapiter != AddressMap.end(),
|
|---|
| 83 | "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap.");
|
|---|
| 84 | currentOpsAddress = mapiter->second;
|
|---|
| 85 | const AsyncOp_ptr ptr = mapiter->first;
|
|---|
| 86 | // always erase the op from the list of ones pending for launch
|
|---|
| 87 | AddressMap.erase(mapiter);
|
|---|
| 88 | // only launch when not a debug op
|
|---|
| 89 | if ((!currentOpsAddress.host.empty()) && (!currentOpsAddress.service.empty())) {
|
|---|
| 90 | LOG(2, "DEBUG: Launching next operation " << ptr->getName() << ".");
|
|---|
| 91 | (*ptr)(currentOpsAddress.host, currentOpsAddress.service);
|
|---|
| 92 | } else {
|
|---|
| 93 | LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address.");
|
|---|
| 94 | }
|
|---|
| 95 | } else {
|
|---|
| 96 | LOG(2, "DEBUG: All remaining operations are already running.");
|
|---|
| 97 | }
|
|---|
| 98 | } else {
|
|---|
| 99 | LOG(2, "DEBUG: There are no more operations in the queue.");
|
|---|
| 100 | }
|
|---|
| 101 | } else {
|
|---|
| 102 | LOG(2, "DEBUG: Currently there are no free connections.");
|
|---|
| 103 | }
|
|---|
| 104 | }
|
|---|
| 105 |
|
|---|
| 106 | void OperationQueue::remove(AsyncOperation *op, Observer *observer)
|
|---|
| 107 | {
|
|---|
| 108 | if (op != NULL) {
|
|---|
| 109 | OperationQueue_t::iterator iter = findOperation(op);
|
|---|
| 110 | if (iter != queue.end()) {
|
|---|
| 111 | // sign off and remove op
|
|---|
| 112 | if (observer != NULL)
|
|---|
| 113 | op->signOff(observer);
|
|---|
| 114 | queue.erase(iter);
|
|---|
| 115 | } else {
|
|---|
| 116 | ELOG(1, "Could not find Operation " << op->getName() << " in operation's queue.");
|
|---|
| 117 | }
|
|---|
| 118 | } else {
|
|---|
| 119 | ELOG(1, "Given operation pointer is NULL.");
|
|---|
| 120 | }
|
|---|
| 121 | }
|
|---|
| 122 |
|
|---|
| 123 | void OperationQueue::update(Observable *publisher)
|
|---|
| 124 | {
|
|---|
| 125 | AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
|
|---|
| 126 | if (op != NULL) {
|
|---|
| 127 | // check for error code of operation
|
|---|
| 128 | if (op->getStatus() != Operation::success) {
|
|---|
| 129 | // remove the worker from the queue
|
|---|
| 130 | failure_callback(currentOpsAddress);
|
|---|
| 131 | LOG(1, "INFO: We are notified that " << op->getName() << " has failed, removing ...");
|
|---|
| 132 | } else {
|
|---|
| 133 | LOG(1, "INFO: We are notified that " << op->getName() << " is done, removing ...");
|
|---|
| 134 | }
|
|---|
| 135 | // remove from queue
|
|---|
| 136 | remove(op, this);
|
|---|
| 137 | LaunchNextOp();
|
|---|
| 138 | }
|
|---|
| 139 | }
|
|---|
| 140 |
|
|---|
| 141 | void OperationQueue::recieveNotification(Observable *publisher, Notification_ptr notification)
|
|---|
| 142 | {}
|
|---|
| 143 |
|
|---|
| 144 | void OperationQueue::subjectKilled(Observable *publisher)
|
|---|
| 145 | {
|
|---|
| 146 | AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
|
|---|
| 147 | if (op != NULL) {
|
|---|
| 148 | ELOG(2, "DEBUG: AsyncOperation at " << publisher << " got killed before being done?");
|
|---|
| 149 | // remove from queue
|
|---|
| 150 | remove(op, this);
|
|---|
| 151 | }
|
|---|
| 152 | }
|
|---|
| 153 |
|
|---|