Changeset 959c82
- Timestamp:
- Oct 30, 2015, 11:43:20 AM (10 years ago)
- Children:
- 1f96ec
- Parents:
- e24dde
- git-author:
- Frederik Heber <heber@…> (07/04/15 22:33:12)
- git-committer:
- Frederik Heber <heber@…> (10/30/15 11:43:20)
- Location:
- src
- Files:
-
- 2 added
- 7 edited
-
CodePatterns/Cacheable.hpp (modified) (1 diff)
-
CodePatterns/ObservedValue.hpp (modified) (1 diff)
-
CodePatterns/Observer/GlobalObservableInfo.hpp (added)
-
CodePatterns/Observer/Observable.hpp (modified) (5 diffs)
-
CodePatterns/Observer/Relay.hpp (modified) (1 diff)
-
Observer/GlobalObservableInfo.cpp (added)
-
Observer/Makefile.am (modified) (2 diffs)
-
Observer/Observable.cpp (modified) (21 diffs)
-
Observer/Relay.cpp (modified) (14 diffs)
Legend:
- Unmodified
- Added
- Removed
-
src/CodePatterns/Cacheable.hpp
re24dde r959c82 197 197 if (owner != NULL) { 198 198 if (channels.empty()) { 199 owner->signOn(this, Observable::PriorityLevel(int(-20)));199 owner->signOn(this,GlobalObservableInfo::PriorityLevel(int(-20))); 200 200 } else { 201 201 for (Observable::channels_t::const_iterator iter = channels.begin(); 202 202 iter != channels.end(); ++iter) 203 owner->signOn(this,*iter, Observable::PriorityLevel(int(-20)));203 owner->signOn(this,*iter,GlobalObservableInfo::PriorityLevel(int(-20))); 204 204 } 205 205 } -
src/CodePatterns/ObservedValue.hpp
re24dde r959c82 151 151 if ((owner != NULL) && (!signedOn)) { 152 152 if (channels.empty()) { 153 owner->signOn(this, Observable::PriorityLevel(int(-20)));153 owner->signOn(this,GlobalObservableInfo::PriorityLevel(int(-20))); 154 154 } else { 155 155 for (Observable::channels_t::const_iterator iter = channels.begin(); -
src/CodePatterns/Observer/Observable.hpp
re24dde r959c82 18 18 #include <string> 19 19 #include <boost/function.hpp> 20 #include <boost/thread.hpp>21 20 22 21 #include "CodePatterns/Range.hpp" 23 22 #include "CodePatterns/Observer/defs.hpp" 23 #include "CodePatterns/Observer/GlobalObservableInfo.hpp" 24 24 #include "CodePatterns/Observer/Observer.hpp" 25 25 26 26 class Graveyard; 27 class scoped_lock; 27 28 28 29 /** … … 48 49 typedef std::vector<size_t> channels_t; 49 50 50 Observable(std::string _name, const channels_t &_channels = channels_t()); 51 Observable( 52 std::string _name, 53 const channels_t &_channels = channels_t()); 51 54 virtual ~Observable(); 52 53 /** This class is only used to distinguish from size_t in the overload.54 *55 * It encapsulates a const int (the priority level) and checks valid bounds56 * in constructor.57 *58 */59 class PriorityLevel {60 public:61 explicit PriorityLevel(const int i);62 ~PriorityLevel();63 64 const int level;65 private:66 static range<int> ValidRange;67 };68 55 69 56 private: … … 92 79 * ussually no need to order the update sequence. 93 80 */ 94 virtual void signOn(Observer *target, PriorityLevel priority = PriorityDefault) const; 81 virtual void signOn( 82 Observer * target, 83 GlobalObservableInfo::PriorityLevel priority = GlobalObservableInfo::PriorityDefault) const; 95 84 96 85 /** … … 106 95 Observer *target, 107 96 size_t channelno, 108 PriorityLevel priority = PriorityDefault) const; 97 GlobalObservableInfo::PriorityLevel priority = 98 GlobalObservableInfo::PriorityDefault) const; 109 99 110 100 /** … … 143 133 static void enque_notification_internal(Observable *publisher, Notification_ptr notification); 144 134 135 protected: 136 145 137 typedef std::map<Observable*, Channels *> ChannelMap; 146 138 static ChannelMap NotificationChannels; 147 148 static PriorityLevel PriorityDefault;149 150 protected:151 typedef std::multimap<int,Observer*> callees_t;152 typedef std::set<Notification*> notificationSet;153 static std::map<Observable*, int> depth;154 static std::map<Observable*,callees_t> callTable;155 static std::map<Observable*,notificationSet> notifications;156 static std::set<Observable*> busyObservables;157 158 static boost::recursive_mutex ObservablesMapLock; //!< a lock for the pointer of the instance159 139 160 140 private: -
src/CodePatterns/Observer/Relay.hpp
re24dde r959c82 36 36 virtual ~Relay(); 37 37 38 virtual void signOn(Observer *target, PriorityLevel priority = Observable::PriorityDefault) const; 38 virtual void signOn( 39 Observer *target, 40 GlobalObservableInfo::PriorityLevel priority = GlobalObservableInfo::PriorityDefault) const; 39 41 40 42 virtual void signOff(Observer *target) const; 41 43 42 virtual void signOn(Observer *target, size_t channelno, PriorityLevel priority = Observable::PriorityDefault) const; 44 virtual void signOn( 45 Observer *target, 46 size_t channelno, 47 GlobalObservableInfo::PriorityLevel priority = GlobalObservableInfo::PriorityDefault) const; 43 48 44 49 virtual void signOff(Observer *target, size_t channelno) const; -
src/Observer/Makefile.am
re24dde r959c82 8 8 OBSERVERSOURCE = \ 9 9 Channels.cpp \ 10 GlobalObservableInfo.cpp \ 10 11 Graveyard.cpp \ 11 12 Notification.cpp \ … … 22 23 $(top_srcdir)/src/CodePatterns/Observer/Channels.hpp \ 23 24 $(top_srcdir)/src/CodePatterns/Observer/defs.hpp \ 25 $(top_srcdir)/src/CodePatterns/Observer/GlobalObservableInfo.hpp \ 24 26 $(top_srcdir)/src/CodePatterns/Observer/Graveyard.hpp \ 25 27 $(top_srcdir)/src/CodePatterns/Observer/Notification.hpp \ -
src/Observer/Observable.cpp
re24dde r959c82 29 29 #include <algorithm> 30 30 31 #include <boost/thread/locks.hpp> 32 #include <boost/thread/recursive_mutex.hpp> 33 31 34 //!> This function does nothing with the given Observable 32 35 void NoOp_informer(const Observable *) … … 35 38 Observable::graveyard_informer_t Observable::noop_informer(&NoOp_informer); 36 39 37 // All infrastructure for the observer-pattern is bundled at a central place 38 // this is more efficient if many objects can be observed (inherit from observable) 39 // but only few are actually coupled with observers. E.g. TMV has over 500.000 Atoms, 40 // which might become observable. Handling Observable infrastructure in each of 41 // these would use memory for each atom. By handling Observer-infrastructure 42 // here we only need memory for objects that actually are observed. 43 // See [Gamma et al, 1995] p. 297 44 45 std::map<Observable*, int> Observable::depth; //!< Map of Observables to the depth of the DAG of Observers 46 std::map<Observable*,std::multimap<int,Observer*> > Observable::callTable; //!< Table for each Observable of all its Observers 47 std::map<Observable*,std::set<Notification*> > Observable::notifications; //!< Table for all current notifications to perform 48 std::set<Observable*> Observable::busyObservables; //!< Set of Observables that are currently busy notifying their sign-on'ed Observers 49 Observable::ChannelMap Observable::NotificationChannels; //!< Map of Observables to their Channels. 50 boost::recursive_mutex Observable::ObservablesMapLock; //!< mutex for locking the above maps 51 52 // ValidRange must be initialized before PriorityLevel. 53 range<int> Observable::PriorityLevel::ValidRange(-20, 21); 54 Observable::PriorityLevel Observable::PriorityDefault(int(0)); 55 56 /** Constructor of PriorityLevel. 57 * 58 * \note We check whether the level is within Observable::PriorityLevel::ValidRange. 59 * 60 * @param i priority level encapsulated in this class. 61 */ 62 Observable::PriorityLevel::PriorityLevel(const int i) : 63 level(i) 64 { 65 ASSERT(ValidRange.isInRange(level), 66 "Observable::PriorityLevel::PriorityLevel() - Priority level " 67 +toString(level)+" out of range "+toString(ValidRange)+"."); 68 } 69 70 Observable::PriorityLevel::~PriorityLevel() 71 {} 40 Observable::ChannelMap Observable::NotificationChannels; 72 41 73 42 /** Attaching Sub-observables to Observables. 74 * Increases entry in Observable:: depthfor this \a *publisher by one.43 * Increases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one. 75 44 * 76 45 * The two functions \sa start_observer_internal() and \sa finish_observer_internal() … … 82 51 void Observable::start_observer_internal(Observable *publisher) 83 52 { 84 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);53 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 85 54 // increase the count for this observable by one 86 55 // if no entry for this observable is found, an new one is created 87 56 // by the STL and initialized to 0 (see STL documentation) 88 57 #ifdef LOG_OBSERVER 89 observerLog().addMessage( depth[publisher]) << ">> Locking " << observerLog().getName(publisher);90 #endif 91 depth[publisher]++;58 observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << ">> Locking " << observerLog().getName(publisher); 59 #endif 60 (GlobalObservableInfo::getInstance().getdepth())[publisher]++; 92 61 } 93 62 94 63 /** Detaching Sub-observables from Observables. 95 * Decreases entry in Observable:: depthfor this \a *publisher by one. If zero, we64 * Decreases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one. If zero, we 96 65 * start notifying all our Observers. 97 66 * … … 107 76 // if zero is reached all observed blocks are done and we can 108 77 // start to notify our observers 109 ObservablesMapLock.lock(); 110 --depth[publisher]; 111 #ifdef LOG_OBSERVER 112 observerLog().addMessage(depth[publisher]) << "<< Unlocking " << observerLog().getName(publisher); 113 #endif 114 int depth_publisher = depth[publisher]; 115 ObservablesMapLock.unlock(); 78 int depth_publisher = 0; 79 { 80 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 81 --(GlobalObservableInfo::getInstance().getdepth())[publisher]; 82 #ifdef LOG_OBSERVER 83 observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << "<< Unlocking " << observerLog().getName(publisher); 84 #endif 85 depth_publisher = (GlobalObservableInfo::getInstance().getdepth())[publisher]; 86 } 116 87 if(depth_publisher){} 117 88 else{ … … 119 90 // this item is done, so we don't have to keep the count with us 120 91 // save some memory by erasing it 121 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock); 122 depth.erase(publisher); 92 { 93 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 94 (GlobalObservableInfo::getInstance().getdepth()).erase(publisher); 95 } 123 96 } 124 97 } … … 126 99 void Observable::enque_notification_internal(Observable *publisher, Notification_ptr notification) 127 100 { 128 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);129 notifications[publisher].insert(notification);101 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 102 (GlobalObservableInfo::getInstance().getnotifications())[publisher].insert(notification); 130 103 } 131 104 … … 162 135 163 136 /** Notify all Observers of changes. 164 * Puts \a *this into Observable:: busyObservables, calls Observer::update() for all in callee_t137 * Puts \a *this into Observable::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t 165 138 * and removes from busy list. 166 139 */ … … 172 145 // we are busy notifying others right now 173 146 // add ourselves to the list of busy subjects to enable circle detection 174 ObservablesMapLock.lock(); 175 busyObservables.insert(this); 176 ObservablesMapLock.unlock(); 147 { 148 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 149 (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this); 150 } 177 151 // see if anyone has signed up for observation 178 152 // and call all observers 179 153 try { 180 ObservablesMapLock.lock();181 const bool anybodyThere = callTable.count(this);182 ObservablesMapLock.unlock();183 if (anybodyThere) {154 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 155 GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable(); 156 const bool callTable_contains = callTable.find(this) != callTable.end(); 157 if (callTable_contains) { 184 158 // elements are stored sorted by keys in the multimap 185 159 // so iterating over it gives us a the callees sorted by 186 160 // the priorities 187 ObservablesMapLock.lock(); 188 callees_t callees = callTable[this]; 189 ObservablesMapLock.unlock(); 190 callees_t::iterator iter; 161 // copy such that signOff() within receiving update() does not affect iterating 162 // this is because within the same thread and with the updateKilled() signOff() may be 163 // called and when executed it modifies targets 164 GlobalObservableInfo::callees_t callees = callTable[this]; 165 GlobalObservableInfo::callees_t::iterator iter; 191 166 for(iter=callees.begin();iter!=callees.end();++iter){ 192 167 #ifdef LOG_OBSERVER … … 202 177 203 178 // send out all notifications that need to be done 204 205 ObservablesMapLock.lock(); 206 notificationSet currentNotifications = notifications[this]; 207 ObservablesMapLock.unlock(); 208 for(notificationSet::iterator it = currentNotifications.begin(); 209 it != currentNotifications.end();++it){ 210 (*it)->notifyAll(this); 211 } 212 213 ObservablesMapLock.lock(); 214 notifications.erase(this); 215 216 // done with notification, we can leave the set of busy subjects 217 busyObservables.erase(this); 218 ObservablesMapLock.unlock(); 179 { 180 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 181 GlobalObservableInfo::notificationSet currentNotifications = 182 (GlobalObservableInfo::getInstance().getnotifications())[this]; 183 for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin(); 184 it != currentNotifications.end();++it){ 185 (*it)->notifyAll(this); 186 } 187 } 188 189 { 190 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 191 (GlobalObservableInfo::getInstance().getnotifications()).erase(this); 192 193 // done with notification, we can leave the set of busy subjects 194 (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this); 195 } 219 196 220 197 #ifdef LOG_OBSERVER … … 232 209 void Observable::update(Observable *publisher) { 233 210 // circle detection 234 ObservablesMapLock.lock(); 235 const bool presentCircle = busyObservables.find(this)!=busyObservables.end(); 236 ObservablesMapLock.unlock(); 211 bool presentCircle = false; 212 { 213 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 214 presentCircle = (GlobalObservableInfo::getInstance().getbusyObservables()).find(this)!=(GlobalObservableInfo::getInstance().getbusyObservables()).end(); 215 } 237 216 if(presentCircle) { 238 217 // somehow a circle was introduced... we were busy notifying our … … 245 224 } 246 225 else { 226 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 247 227 // see if we are in the process of changing ourselves 248 228 // if we are changing ourselves at the same time our sub-observables change 249 229 // we do not need to publish all the changes at each time we are called 250 ObservablesMapLock.lock(); 251 const bool depth_this = depth.find(this)==depth.end(); 252 ObservablesMapLock.unlock(); 253 if(depth_this) { 230 std::map<Observable*, int>& depth = GlobalObservableInfo::getInstance().getdepth(); 231 const bool depth_contains = depth.find(this)==depth.end(); 232 if(depth_contains) { 254 233 #ifdef LOG_OBSERVER 255 234 observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher) … … 268 247 269 248 /** Sign on an Observer to this Observable. 270 * Puts \a *target into Observable:: callTablelist.249 * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. 271 250 * \param *target Observer 272 251 * \param priority number in [-20,20] 273 252 */ 274 void Observable::signOn(Observer *target, PriorityLevel priority) const253 void Observable::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const 275 254 { 276 255 #ifdef LOG_OBSERVER … … 278 257 #endif 279 258 bool res = false; 280 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);281 callees_t &callees = callTable[const_cast<Observable *>(this)];282 283 callees_t::iterator iter;259 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 260 GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast<Observable *>(this)]; 261 262 GlobalObservableInfo::callees_t::iterator iter; 284 263 for(iter=callees.begin();iter!=callees.end();++iter){ 285 264 res |= ((*iter).second == target); … … 290 269 291 270 /** Sign off an Observer from this Observable. 292 * Removes \a *target from Observable:: callTablelist.271 * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. 293 272 * \param *target Observer 294 273 */ 295 274 void Observable::signOff(Observer *target) const 296 275 { 297 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock); 298 ASSERT(callTable.count(const_cast<Observable *>(this)), 299 "SignOff called for an Observable without Observers."); 300 #ifdef LOG_OBSERVER 301 observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast<Observable *>(this)); 302 #endif 303 callees_t &callees = callTable[const_cast<Observable *>(this)]; 304 305 callees_t::iterator iter; 306 callees_t::iterator deliter; 307 for(iter=callees.begin();iter!=callees.end();) { 308 if((*iter).second == target) { 309 callees.erase(iter++); 310 } 311 else { 312 ++iter; 313 } 314 } 315 if(callees.empty()){ 316 callTable.erase(const_cast<Observable *>(this)); 276 { 277 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 278 GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable(); 279 ASSERT(callTable.count(const_cast<Observable *>(this)), 280 "SignOff called for an Observable without Observers."); 281 #ifdef LOG_OBSERVER 282 observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast<Observable *>(this)); 283 #endif 284 GlobalObservableInfo::callees_t &callees = callTable[const_cast<Observable *>(this)]; 285 286 GlobalObservableInfo::callees_t::iterator iter; 287 GlobalObservableInfo::callees_t::iterator deliter; 288 for(iter=callees.begin();iter!=callees.end();) { 289 if((*iter).second == target) { 290 callees.erase(iter++); 291 } 292 else { 293 ++iter; 294 } 295 } 296 if(callees.empty()){ 297 callTable.erase(const_cast<Observable *>(this)); 298 } 317 299 } 318 300 (*graveyard_informer)(this); … … 322 304 Observer *target, 323 305 size_t channelno, 324 PriorityLevel priority) const 325 { 326 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock); 306 GlobalObservableInfo::PriorityLevel priority) const 307 { 327 308 Notification_ptr notification = getChannel(channelno); 328 309 #ifdef LOG_OBSERVER … … 336 317 void Observable::signOff(Observer *target, size_t channelno) const 337 318 { 338 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);339 319 Notification_ptr notification = getChannel(channelno); 340 320 #ifdef LOG_OBSERVER … … 349 329 bool Observable::isBlocked() const 350 330 { 351 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);352 return depth.count(const_cast<Observable *>(this)) > 0;331 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 332 return (GlobalObservableInfo::getInstance().getdepth()).count(const_cast<Observable *>(this)) > 0; 353 333 } 354 334 355 335 Notification_ptr Observable::getChannel(size_t no) const 356 336 { 357 ObservablesMapLock.lock();358 const ChannelMap::const_iterator iter = NotificationChannels.find(const_cast<Observable * const>(this));337 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 338 const ChannelMap::const_iterator iter = NotificationChannels.find(const_cast<Observable *>(this)); 359 339 const bool status = iter != NotificationChannels.end(); 360 340 Channels *OurChannel = NULL; 361 341 if (status) 362 342 OurChannel = iter->second; 363 ObservablesMapLock.unlock();364 343 ASSERT(status, 365 "Observable::getChannel() - we do not have a channel in NotificationChannels.");344 "Observable::getChannel() - we do not have a channel "+toString(no)+" in NotificationChannels."); 366 345 ASSERT(OurChannel != NULL, 367 346 "Observable::getChannel() - observable has no channels."); … … 371 350 size_t Observable::getNumberOfObservers() const 372 351 { 373 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);352 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 374 353 size_t ObserverCount = 0; 375 354 { 376 std::map<Observable*,callees_t>::const_iterator callTableiter = 355 GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable(); 356 GlobalObservableInfo::calltable_t::const_iterator callees_t_iter = 377 357 callTable.find(const_cast<Observable *>(this)); 378 358 // if not present, then we have zero observers 379 if (call Tableiter != callTable.end())380 ObserverCount += call Tableiter->second.size();359 if (callees_t_iter != callTable.end()) 360 ObserverCount += callees_t_iter->second.size(); 381 361 } 382 362 { 383 363 const ChannelMap::const_iterator iter = 384 NotificationChannels.find(const_cast<Observable * const>(this));364 NotificationChannels.find(const_cast<Observable *>(this)); 385 365 // if not present, then we have zero observers 386 366 if (iter != NotificationChannels.end()) … … 403 383 /** Constructor for class Observable. 404 384 */ 405 Observable::Observable(std::string name, const channels_t &_channels) : 385 Observable::Observable( 386 std::string name, 387 const channels_t &_channels) : 406 388 Observer(Observer::BaseConstructor()), 407 389 graveyard_informer(&noop_informer) … … 413 395 #endif 414 396 415 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);416 397 if (!_channels.empty()) { 398 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 417 399 Channels *OurChannel = new Channels; 418 400 NotificationChannels.insert( std::make_pair(static_cast<Observable *>(this), OurChannel) ); … … 433 415 << observerLog().getName(static_cast<Observable *>(this)); 434 416 #endif 435 boost::recursive_mutex::scoped_lock guard(ObservablesMapLock); 436 if(callTable.count(this)) { 417 bool CallTable_contains = false; 418 { 419 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 420 CallTable_contains = (GlobalObservableInfo::getInstance().getcallTable()).count(this); 421 } 422 if(CallTable_contains) { 423 // copy the list from the map 424 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 425 // copy such that signOff() within receiving subjectKilled() does not affect iterating 426 // this is because within the same thread and with the subjectKilled() signOff() may be 427 // called and when executed it modifies targets 428 GlobalObservableInfo::callees_t callees = (GlobalObservableInfo::getInstance().getcallTable())[this]; 437 429 // delete all entries for this observable 438 ObservablesMapLock.lock(); 439 callees_t callees = callTable[this]; 440 ObservablesMapLock.unlock(); 441 callees_t::iterator iter; 430 GlobalObservableInfo::callees_t::iterator iter; 442 431 for(iter=callees.begin();iter!=callees.end();++iter) 443 432 (*iter).second->subjectKilled(this); 444 ObservablesMapLock.lock(); 445 callTable.erase(this); 446 ObservablesMapLock.unlock(); 433 // erase the list in the map 434 (GlobalObservableInfo::getInstance().getcallTable()).erase(this); 447 435 } 448 436 449 437 // also kill instance in static Channels map if present 450 ChannelMap::iterator iter = NotificationChannels.find(static_cast<Observable *>(this)); 451 if (iter != NotificationChannels.end()) { 452 iter->second->subjectKilled(static_cast<Observable *>(this)); 453 ObservablesMapLock.lock(); 454 delete iter->second; 455 NotificationChannels.erase(iter); 456 ObservablesMapLock.unlock(); 438 { 439 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 440 ChannelMap::iterator iter = NotificationChannels.find(static_cast<Observable *>(this)); 441 if (iter != NotificationChannels.end()) { 442 iter->second->subjectKilled(static_cast<Observable *>(this)); 443 delete iter->second; 444 NotificationChannels.erase(iter); 445 } 457 446 } 458 447 } -
src/Observer/Relay.cpp
re24dde r959c82 26 26 #include "CodePatterns/Observer/Notification.hpp" 27 27 28 #include <boost/thread/locks.hpp> 29 #include <boost/thread/recursive_mutex.hpp> 28 30 29 31 /** Constructor for class Relay. … … 53 55 54 56 /** Sign on an Observer to this Observable. 55 * Puts \a *target into Observable:: callTablelist.57 * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. 56 58 * \param *target Observer 57 59 * \param priority number in [-20,20] 58 60 */ 59 void Relay::signOn(Observer *target, PriorityLevel priority) const61 void Relay::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const 60 62 { 61 63 #ifdef LOG_OBSERVER … … 65 67 #endif 66 68 bool res = false; 67 callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))]; 68 69 callees_t::iterator iter; 69 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 70 GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast<Observable *>(static_cast<const Observable * const>(this))]; 71 72 GlobalObservableInfo::callees_t::iterator iter; 70 73 for(iter=callees.begin();iter!=callees.end();++iter){ 71 74 res |= ((*iter).second == target); … … 76 79 77 80 /** Sign off an Observer from this Observable. 78 * Removes \a *target from Observable:: callTablelist.81 * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. 79 82 * \param *target Observer 80 83 */ 81 84 void Relay::signOff(Observer *target) const 82 85 { 86 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 87 GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable(); 83 88 ASSERT(callTable.count(const_cast<Observable *>(static_cast<const Observable * const>(this))), 84 89 "Relay::signOff() - called for an Observable without Observers."); … … 88 93 << observerLog().getName(const_cast<Observable *>(static_cast<const Observable * const>(this))); 89 94 #endif 90 callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))];91 92 callees_t::iterator iter;93 callees_t::iterator deliter;95 GlobalObservableInfo::callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))]; 96 97 GlobalObservableInfo::callees_t::iterator iter; 98 GlobalObservableInfo::callees_t::iterator deliter; 94 99 for(iter=callees.begin();iter!=callees.end();) { 95 100 if((*iter).second == target) { … … 105 110 } 106 111 107 void Relay::signOn(Observer *target, size_t channelno, PriorityLevel priority) const112 void Relay::signOn(Observer *target, size_t channelno, GlobalObservableInfo::PriorityLevel priority) const 108 113 { 109 114 Notification_ptr notification = getChannel(channelno); … … 118 123 119 124 /** Notify all Observers of changes. 120 * Puts \a *this into Relay:: busyObservables, calls Observer::update() for all in callee_t125 * Puts \a *this into Relay::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t 121 126 * and removes from busy list. 122 127 */ … … 126 131 // we are busy notifying others right now 127 132 // add ourselves to the list of busy subjects to enable circle detection 128 busyObservables.insert(this); 133 { 134 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 135 (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this); 136 } 129 137 // see if anyone has signed up for observation 130 138 // and call all observers 131 139 try { 132 if(callTable.count(this)) { 140 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 141 GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable(); 142 const bool callTable_contains = callTable.count(this); 143 if(callTable_contains) { 133 144 // elements are stored sorted by keys in the multimap 134 145 // so iterating over it gives us a the callees sorted by 135 146 // the priorities 136 callees_t callees = callTable[this]; 137 callees_t::iterator iter; 147 // copy such that signOff() within receiving update() does not affect iterating 148 // this is because within the same thread and with the updateKilled() signOff() may be 149 // called and when executed it modifies targets 150 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 151 GlobalObservableInfo::callees_t callees = callTable[this]; 152 GlobalObservableInfo::callees_t::iterator iter; 138 153 for(iter=callees.begin();iter!=callees.end();++iter){ 139 154 #ifdef LOG_OBSERVER … … 149 164 ASSERT_NOCATCH("Exception thrown from Observer Update"); 150 165 151 // send out all notifications that need to be done 152 153 notificationSet currentNotifications = notifications[Updater]; 154 for(notificationSet::iterator it = currentNotifications.begin(); 155 it != currentNotifications.end();++it){ 156 (*it)->notifyAll(Updater); 157 } 158 159 notifications.erase(Updater); 160 161 // done with notification, we can leave the set of busy subjects 162 busyObservables.erase(this); 166 // send out all (GlobalObservableInfo::getInstance().getnotifications()) that need to be done 167 { 168 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 169 GlobalObservableInfo::notificationSet currentNotifications = 170 (GlobalObservableInfo::getInstance().getnotifications())[Updater]; 171 for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin(); 172 it != currentNotifications.end();++it){ 173 (*it)->notifyAll(Updater); 174 } 175 } 176 177 { 178 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 179 (GlobalObservableInfo::getInstance().getnotifications()).erase(Updater); 180 181 // done with notification, we can leave the set of busy subjects 182 (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this); 183 } 163 184 } 164 185 … … 171 192 void Relay::update(Observable *publisher) { 172 193 // circle detection 173 if(busyObservables.find(this)!=busyObservables.end()) { 194 bool circle_present = false; 195 { 196 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 197 std::set<Observable*>& busyObservables = GlobalObservableInfo::getInstance().getbusyObservables(); 198 circle_present = busyObservables.find(this)!=busyObservables.end(); 199 } 200 if(circle_present) { 174 201 // somehow a circle was introduced... we were busy notifying our 175 202 // observers, but still we are called by one of our sub-Relays … … 184 211 // if we are changing ourselves at the same time our sub-observables change 185 212 // we do not need to publish all the changes at each time we are called 186 if(depth.find(this)==depth.end()) { 213 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 214 std::map<Observable*, int>& depth = GlobalObservableInfo::getInstance().getdepth(); 215 const bool depth_contains = depth.find(this)==depth.end(); 216 if(depth_contains) { 187 217 #ifdef LOG_OBSERVER 188 218 observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher) … … 202 232 } 203 233 204 /** Method for receiving specialized notifications.234 /** Method for receiving specialized (GlobalObservableInfo::getInstance().getnotifications()). 205 235 * 206 236 * \param *publisher The \a *this we observe. … … 210 240 { 211 241 Updater = publisher; 212 ChannelMap::const_iterator iter = NotificationChannels.find(this); 213 if (iter != NotificationChannels.end()) { 214 const Channels *myChannels = iter->second; 242 bool contains_channels = false; 243 { 244 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 245 contains_channels = NotificationChannels.find(this) != NotificationChannels.end(); 246 } 247 if (contains_channels) { 215 248 const size_t channelno = notification->getChannelNo(); 216 Notification_ptr mynotification = myChannels->getChannel(channelno); 249 Notification *mynotification = NULL; 250 { 251 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); 252 ChannelMap::const_iterator iter = NotificationChannels.find(this); 253 const Channels *myChannels = iter->second; 254 mynotification = myChannels->getChannel(channelno); 255 } 217 256 ASSERT(mynotification != NULL, 218 257 "Relay::recieveNotification() - this relay does not have a notification no "+toString(channelno)+"."); … … 228 267 * \param *publisher Sub-Relay. 229 268 */ 230 void Relay::subjectKilled(Observable *publisher){ 231 } 232 269 void Relay::subjectKilled(Observable *publisher) 270 { 271 } 272
Note:
See TracChangeset
for help on using the changeset viewer.
