Changeset 959c82 for src/Observer/Relay.cpp
- Timestamp:
- Oct 30, 2015, 11:43:20 AM (10 years ago)
- Children:
- 1f96ec
- Parents:
- e24dde
- git-author:
- Frederik Heber <heber@…> (07/04/15 22:33:12)
- git-committer:
- Frederik Heber <heber@…> (10/30/15 11:43:20)
- File:
-
- 1 edited
-
src/Observer/Relay.cpp (modified) (14 diffs)
Legend:
- Unmodified
- Added
- Removed
-
src/Observer/Relay.cpp
re24dde r959c82 26 26 #include "CodePatterns/Observer/Notification.hpp" 27 27 28 #include <boost/thread/locks.hpp> 29 #include <boost/thread/recursive_mutex.hpp> 28 30 29 31 /** Constructor for class Relay. … … 53 55 54 56 /** Sign on an Observer to this Observable. 55 * Puts \a *target into Observable:: callTablelist.57 * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. 56 58 * \param *target Observer 57 59 * \param priority number in [-20,20] 58 60 */ 59 void Relay::signOn(Observer *target, PriorityLevel priority) const61 void Relay::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const 60 62 { 61 63 #ifdef LOG_OBSERVER … … 65 67 #endif 66 68 bool res = false; 67 callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))]; 68 69 callees_t::iterator iter; 69 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 70 GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast<Observable *>(static_cast<const Observable * const>(this))]; 71 72 GlobalObservableInfo::callees_t::iterator iter; 70 73 for(iter=callees.begin();iter!=callees.end();++iter){ 71 74 res |= ((*iter).second == target); … … 76 79 77 80 /** Sign off an Observer from this Observable. 78 * Removes \a *target from Observable:: callTablelist.81 * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. 79 82 * \param *target Observer 80 83 */ 81 84 void Relay::signOff(Observer *target) const 82 85 { 86 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 87 GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable(); 83 88 ASSERT(callTable.count(const_cast<Observable *>(static_cast<const Observable * const>(this))), 84 89 "Relay::signOff() - called for an Observable without Observers."); … … 88 93 << observerLog().getName(const_cast<Observable *>(static_cast<const Observable * const>(this))); 89 94 #endif 90 callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))];91 92 callees_t::iterator iter;93 callees_t::iterator deliter;95 GlobalObservableInfo::callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))]; 96 97 GlobalObservableInfo::callees_t::iterator iter; 98 GlobalObservableInfo::callees_t::iterator deliter; 94 99 for(iter=callees.begin();iter!=callees.end();) { 95 100 if((*iter).second == target) { … … 105 110 } 106 111 107 void Relay::signOn(Observer *target, size_t channelno, PriorityLevel priority) const112 void Relay::signOn(Observer *target, size_t channelno, GlobalObservableInfo::PriorityLevel priority) const 108 113 { 109 114 Notification_ptr notification = getChannel(channelno); … … 118 123 119 124 /** Notify all Observers of changes. 120 * Puts \a *this into Relay:: busyObservables, calls Observer::update() for all in callee_t125 * Puts \a *this into Relay::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t 121 126 * and removes from busy list. 122 127 */ … … 126 131 // we are busy notifying others right now 127 132 // add ourselves to the list of busy subjects to enable circle detection 128 busyObservables.insert(this); 133 { 134 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 135 (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this); 136 } 129 137 // see if anyone has signed up for observation 130 138 // and call all observers 131 139 try { 132 if(callTable.count(this)) { 140 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 141 GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable(); 142 const bool callTable_contains = callTable.count(this); 143 if(callTable_contains) { 133 144 // elements are stored sorted by keys in the multimap 134 145 // so iterating over it gives us a the callees sorted by 135 146 // the priorities 136 callees_t callees = callTable[this]; 137 callees_t::iterator iter; 147 // copy such that signOff() within receiving update() does not affect iterating 148 // this is because within the same thread and with the updateKilled() signOff() may be 149 // called and when executed it modifies targets 150 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 151 GlobalObservableInfo::callees_t callees = callTable[this]; 152 GlobalObservableInfo::callees_t::iterator iter; 138 153 for(iter=callees.begin();iter!=callees.end();++iter){ 139 154 #ifdef LOG_OBSERVER … … 149 164 ASSERT_NOCATCH("Exception thrown from Observer Update"); 150 165 151 // send out all notifications that need to be done 152 153 notificationSet currentNotifications = notifications[Updater]; 154 for(notificationSet::iterator it = currentNotifications.begin(); 155 it != currentNotifications.end();++it){ 156 (*it)->notifyAll(Updater); 157 } 158 159 notifications.erase(Updater); 160 161 // done with notification, we can leave the set of busy subjects 162 busyObservables.erase(this); 166 // send out all (GlobalObservableInfo::getInstance().getnotifications()) that need to be done 167 { 168 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 169 GlobalObservableInfo::notificationSet currentNotifications = 170 (GlobalObservableInfo::getInstance().getnotifications())[Updater]; 171 for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin(); 172 it != currentNotifications.end();++it){ 173 (*it)->notifyAll(Updater); 174 } 175 } 176 177 { 178 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 179 (GlobalObservableInfo::getInstance().getnotifications()).erase(Updater); 180 181 // done with notification, we can leave the set of busy subjects 182 (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this); 183 } 163 184 } 164 185 … … 171 192 void Relay::update(Observable *publisher) { 172 193 // circle detection 173 if(busyObservables.find(this)!=busyObservables.end()) { 194 bool circle_present = false; 195 { 196 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 197 std::set<Observable*>& busyObservables = GlobalObservableInfo::getInstance().getbusyObservables(); 198 circle_present = busyObservables.find(this)!=busyObservables.end(); 199 } 200 if(circle_present) { 174 201 // somehow a circle was introduced... we were busy notifying our 175 202 // observers, but still we are called by one of our sub-Relays … … 184 211 // if we are changing ourselves at the same time our sub-observables change 185 212 // we do not need to publish all the changes at each time we are called 186 if(depth.find(this)==depth.end()) { 213 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 214 std::map<Observable*, int>& depth = GlobalObservableInfo::getInstance().getdepth(); 215 const bool depth_contains = depth.find(this)==depth.end(); 216 if(depth_contains) { 187 217 #ifdef LOG_OBSERVER 188 218 observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher) … … 202 232 } 203 233 204 /** Method for receiving specialized notifications.234 /** Method for receiving specialized (GlobalObservableInfo::getInstance().getnotifications()). 205 235 * 206 236 * \param *publisher The \a *this we observe. … … 210 240 { 211 241 Updater = publisher; 212 ChannelMap::const_iterator iter = NotificationChannels.find(this); 213 if (iter != NotificationChannels.end()) { 214 const Channels *myChannels = iter->second; 242 bool contains_channels = false; 243 { 244 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 245 contains_channels = NotificationChannels.find(this) != NotificationChannels.end(); 246 } 247 if (contains_channels) { 215 248 const size_t channelno = notification->getChannelNo(); 216 Notification_ptr mynotification = myChannels->getChannel(channelno); 249 Notification *mynotification = NULL; 250 { 251 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 252 ChannelMap::const_iterator iter = NotificationChannels.find(this); 253 const Channels *myChannels = iter->second; 254 mynotification = myChannels->getChannel(channelno); 255 } 217 256 ASSERT(mynotification != NULL, 218 257 "Relay::recieveNotification() - this relay does not have a notification no "+toString(channelno)+"."); … … 228 267 * \param *publisher Sub-Relay. 229 268 */ 230 void Relay::subjectKilled(Observable *publisher){ 231 } 232 269 void Relay::subjectKilled(Observable *publisher) 270 { 271 } 272
Note:
See TracChangeset
for help on using the changeset viewer.
