Ignore:
Timestamp:
Oct 30, 2015, 11:43:20 AM (10 years ago)
Author:
Frederik Heber <heber@…>
Children:
1f96ec
Parents:
e24dde
git-author:
Frederik Heber <heber@…> (07/04/15 22:33:12)
git-committer:
Frederik Heber <heber@…> (10/30/15 11:43:20)
Message:

Extracted all static Observable maps (and mutex) into singleton GlobalObservableInfo.

  • this way we may safely control is destruction, i.e. it is always valid as it boils down to a primitive void pointer which does not need to be destroyed or constructed.
  • Minimized code where mutex is locked.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/Observer/Observable.cpp

    re24dde r959c82  
    2929#include <algorithm>
    3030
     31#include <boost/thread/locks.hpp>
     32#include <boost/thread/recursive_mutex.hpp>
     33
    3134//!> This function does nothing with the given Observable
    3235void NoOp_informer(const Observable *)
     
    3538Observable::graveyard_informer_t Observable::noop_informer(&NoOp_informer);
    3639
    37 // All infrastructure for the observer-pattern is bundled at a central place
    38 // this is more efficient if many objects can be observed (inherit from observable)
    39 // but only few are actually coupled with observers. E.g. TMV has over 500.000 Atoms,
    40 // which might become observable. Handling Observable infrastructure in each of
    41 // these would use memory for each atom. By handling Observer-infrastructure
    42 // here we only need memory for objects that actually are observed.
    43 // See [Gamma et al, 1995] p. 297
    44 
    45 std::map<Observable*, int> Observable::depth;  //!< Map of Observables to the depth of the DAG of Observers
    46 std::map<Observable*,std::multimap<int,Observer*> > Observable::callTable; //!< Table for each Observable of all its Observers
    47 std::map<Observable*,std::set<Notification*> > Observable::notifications; //!< Table for all current notifications to perform
    48 std::set<Observable*> Observable::busyObservables; //!< Set of Observables that are currently busy notifying their sign-on'ed Observers
    49 Observable::ChannelMap Observable::NotificationChannels; //!< Map of Observables to their Channels.
    50 boost::recursive_mutex Observable::ObservablesMapLock;  //!< mutex for locking the above maps
    51 
    52 // ValidRange must be initialized before PriorityLevel.
    53 range<int> Observable::PriorityLevel::ValidRange(-20, 21);
    54 Observable::PriorityLevel Observable::PriorityDefault(int(0));
    55 
    56 /** Constructor of PriorityLevel.
    57  *
    58  * \note We check whether the level is within Observable::PriorityLevel::ValidRange.
    59  *
    60  * @param i priority level encapsulated in this class.
    61  */
    62 Observable::PriorityLevel::PriorityLevel(const int i) :
    63     level(i)
    64 {
    65   ASSERT(ValidRange.isInRange(level),
    66       "Observable::PriorityLevel::PriorityLevel() - Priority level "
    67       +toString(level)+" out of range "+toString(ValidRange)+".");
    68 }
    69 
    70 Observable::PriorityLevel::~PriorityLevel()
    71 {}
     40Observable::ChannelMap Observable::NotificationChannels;
    7241
    7342/** Attaching Sub-observables to Observables.
    74  * Increases entry in Observable::depth for this \a *publisher by one.
     43 * Increases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one.
    7544 *
    7645 * The two functions \sa start_observer_internal() and \sa finish_observer_internal()
     
    8251void Observable::start_observer_internal(Observable *publisher)
    8352{
    84   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
     53  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    8554  // increase the count for this observable by one
    8655  // if no entry for this observable is found, an new one is created
    8756  // by the STL and initialized to 0 (see STL documentation)
    8857#ifdef LOG_OBSERVER
    89   observerLog().addMessage(depth[publisher]) << ">> Locking " << observerLog().getName(publisher);
    90 #endif
    91   depth[publisher]++;
     58  observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << ">> Locking " << observerLog().getName(publisher);
     59#endif
     60  (GlobalObservableInfo::getInstance().getdepth())[publisher]++;
    9261}
    9362
    9463/** Detaching Sub-observables from Observables.
    95  * Decreases entry in Observable::depth for this \a *publisher by one. If zero, we
     64 * Decreases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one. If zero, we
    9665 * start notifying all our Observers.
    9766 *
     
    10776  // if zero is reached all observed blocks are done and we can
    10877  // start to notify our observers
    109   ObservablesMapLock.lock();
    110   --depth[publisher];
    111 #ifdef LOG_OBSERVER
    112   observerLog().addMessage(depth[publisher]) << "<< Unlocking " << observerLog().getName(publisher);
    113 #endif
    114   int depth_publisher = depth[publisher];
    115   ObservablesMapLock.unlock();
     78  int depth_publisher = 0;
     79  {
     80    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     81    --(GlobalObservableInfo::getInstance().getdepth())[publisher];
     82#ifdef LOG_OBSERVER
     83    observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << "<< Unlocking " << observerLog().getName(publisher);
     84#endif
     85    depth_publisher = (GlobalObservableInfo::getInstance().getdepth())[publisher];
     86  }
    11687  if(depth_publisher){}
    11788  else{
     
    11990    // this item is done, so we don't have to keep the count with us
    12091    // save some memory by erasing it
    121     boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    122     depth.erase(publisher);
     92    {
     93      boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     94      (GlobalObservableInfo::getInstance().getdepth()).erase(publisher);
     95    }
    12396  }
    12497}
     
    12699void Observable::enque_notification_internal(Observable *publisher, Notification_ptr notification)
    127100{
    128   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    129   notifications[publisher].insert(notification);
     101  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     102  (GlobalObservableInfo::getInstance().getnotifications())[publisher].insert(notification);
    130103}
    131104
     
    162135
    163136/** Notify all Observers of changes.
    164  * Puts \a *this into Observable::busyObservables, calls Observer::update() for all in callee_t
     137 * Puts \a *this into Observable::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t
    165138 * and removes from busy list.
    166139 */
     
    172145  // we are busy notifying others right now
    173146  // add ourselves to the list of busy subjects to enable circle detection
    174   ObservablesMapLock.lock();
    175   busyObservables.insert(this);
    176   ObservablesMapLock.unlock();
     147  {
     148    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     149    (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this);
     150  }
    177151  // see if anyone has signed up for observation
    178152  // and call all observers
    179153  try {
    180     ObservablesMapLock.lock();
    181     const bool anybodyThere = callTable.count(this);
    182     ObservablesMapLock.unlock();
    183     if(anybodyThere) {
     154    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     155    GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable();
     156    const bool callTable_contains = callTable.find(this) != callTable.end();
     157    if (callTable_contains) {
    184158      // elements are stored sorted by keys in the multimap
    185159      // so iterating over it gives us a the callees sorted by
    186160      // the priorities
    187       ObservablesMapLock.lock();
    188       callees_t callees = callTable[this];
    189       ObservablesMapLock.unlock();
    190       callees_t::iterator iter;
     161      // copy such that signOff() within receiving update() does not affect iterating
     162      // this is because within the same thread and with the updateKilled() signOff() may be
     163      // called and when executed it modifies targets
     164      GlobalObservableInfo::callees_t callees = callTable[this];
     165      GlobalObservableInfo::callees_t::iterator iter;
    191166      for(iter=callees.begin();iter!=callees.end();++iter){
    192167#ifdef LOG_OBSERVER
     
    202177
    203178  // send out all notifications that need to be done
    204 
    205   ObservablesMapLock.lock();
    206   notificationSet currentNotifications = notifications[this];
    207   ObservablesMapLock.unlock();
    208   for(notificationSet::iterator it = currentNotifications.begin();
    209       it != currentNotifications.end();++it){
    210     (*it)->notifyAll(this);
    211   }
    212 
    213   ObservablesMapLock.lock();
    214   notifications.erase(this);
    215 
    216    // done with notification, we can leave the set of busy subjects
    217   busyObservables.erase(this);
    218   ObservablesMapLock.unlock();
     179  {
     180    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     181    GlobalObservableInfo::notificationSet currentNotifications =
     182        (GlobalObservableInfo::getInstance().getnotifications())[this];
     183    for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin();
     184        it != currentNotifications.end();++it){
     185      (*it)->notifyAll(this);
     186    }
     187  }
     188
     189  {
     190    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     191    (GlobalObservableInfo::getInstance().getnotifications()).erase(this);
     192
     193     // done with notification, we can leave the set of busy subjects
     194    (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this);
     195  }
    219196
    220197#ifdef LOG_OBSERVER
     
    232209void Observable::update(Observable *publisher) {
    233210  // circle detection
    234   ObservablesMapLock.lock();
    235   const bool presentCircle = busyObservables.find(this)!=busyObservables.end();
    236   ObservablesMapLock.unlock();
     211  bool presentCircle = false;
     212  {
     213    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     214    presentCircle = (GlobalObservableInfo::getInstance().getbusyObservables()).find(this)!=(GlobalObservableInfo::getInstance().getbusyObservables()).end();
     215  }
    237216  if(presentCircle) {
    238217    // somehow a circle was introduced... we were busy notifying our
     
    245224  }
    246225  else {
     226    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    247227    // see if we are in the process of changing ourselves
    248228    // if we are changing ourselves at the same time our sub-observables change
    249229    // we do not need to publish all the changes at each time we are called
    250     ObservablesMapLock.lock();
    251     const bool depth_this = depth.find(this)==depth.end();
    252     ObservablesMapLock.unlock();
    253     if(depth_this) {
     230    std::map<Observable*, int>& depth = GlobalObservableInfo::getInstance().getdepth();
     231    const bool depth_contains = depth.find(this)==depth.end();
     232    if(depth_contains) {
    254233#ifdef LOG_OBSERVER
    255234      observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher)
     
    268247
    269248/** Sign on an Observer to this Observable.
    270  * Puts \a *target into Observable::callTable list.
     249 * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
    271250 * \param *target Observer
    272251 * \param priority number in [-20,20]
    273252 */
    274 void Observable::signOn(Observer *target, PriorityLevel priority) const
     253void Observable::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const
    275254{
    276255#ifdef LOG_OBSERVER
     
    278257#endif
    279258  bool res = false;
    280   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    281   callees_t &callees = callTable[const_cast<Observable *>(this)];
    282 
    283   callees_t::iterator iter;
     259  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     260  GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast<Observable *>(this)];
     261
     262  GlobalObservableInfo::callees_t::iterator iter;
    284263  for(iter=callees.begin();iter!=callees.end();++iter){
    285264    res |= ((*iter).second == target);
     
    290269
    291270/** Sign off an Observer from this Observable.
    292  * Removes \a *target from Observable::callTable list.
     271 * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
    293272 * \param *target Observer
    294273 */
    295274void Observable::signOff(Observer *target) const
    296275{
    297   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    298   ASSERT(callTable.count(const_cast<Observable *>(this)),
    299       "SignOff called for an Observable without Observers.");
    300 #ifdef LOG_OBSERVER
    301   observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast<Observable *>(this));
    302 #endif
    303   callees_t &callees = callTable[const_cast<Observable *>(this)];
    304 
    305   callees_t::iterator iter;
    306   callees_t::iterator deliter;
    307   for(iter=callees.begin();iter!=callees.end();) {
    308     if((*iter).second == target) {
    309       callees.erase(iter++);
    310     }
    311     else {
    312       ++iter;
    313     }
    314   }
    315   if(callees.empty()){
    316     callTable.erase(const_cast<Observable *>(this));
     276  {
     277    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     278    GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable();
     279    ASSERT(callTable.count(const_cast<Observable *>(this)),
     280        "SignOff called for an Observable without Observers.");
     281#ifdef LOG_OBSERVER
     282    observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast<Observable *>(this));
     283#endif
     284    GlobalObservableInfo::callees_t &callees = callTable[const_cast<Observable *>(this)];
     285
     286    GlobalObservableInfo::callees_t::iterator iter;
     287    GlobalObservableInfo::callees_t::iterator deliter;
     288    for(iter=callees.begin();iter!=callees.end();) {
     289      if((*iter).second == target) {
     290        callees.erase(iter++);
     291      }
     292      else {
     293        ++iter;
     294      }
     295    }
     296    if(callees.empty()){
     297      callTable.erase(const_cast<Observable *>(this));
     298    }
    317299  }
    318300  (*graveyard_informer)(this);
     
    322304    Observer *target,
    323305    size_t channelno,
    324     PriorityLevel priority) const
    325 {
    326   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
     306    GlobalObservableInfo::PriorityLevel priority) const
     307{
    327308  Notification_ptr notification = getChannel(channelno);
    328309#ifdef LOG_OBSERVER
     
    336317void Observable::signOff(Observer *target, size_t channelno) const
    337318{
    338   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    339319  Notification_ptr notification = getChannel(channelno);
    340320#ifdef LOG_OBSERVER
     
    349329bool Observable::isBlocked() const
    350330{
    351   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    352   return depth.count(const_cast<Observable *>(this)) > 0;
     331  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     332  return (GlobalObservableInfo::getInstance().getdepth()).count(const_cast<Observable *>(this)) > 0;
    353333}
    354334
    355335Notification_ptr Observable::getChannel(size_t no) const
    356336{
    357   ObservablesMapLock.lock();
    358   const ChannelMap::const_iterator iter = NotificationChannels.find(const_cast<Observable * const>(this));
     337  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     338  const ChannelMap::const_iterator iter = NotificationChannels.find(const_cast<Observable *>(this));
    359339  const bool status = iter != NotificationChannels.end();
    360340  Channels *OurChannel = NULL;
    361341  if (status)
    362342    OurChannel = iter->second;
    363   ObservablesMapLock.unlock();
    364343  ASSERT(status,
    365       "Observable::getChannel() - we do not have a channel in NotificationChannels.");
     344      "Observable::getChannel() - we do not have a channel "+toString(no)+" in NotificationChannels.");
    366345  ASSERT(OurChannel != NULL,
    367346      "Observable::getChannel() - observable has no channels.");
     
    371350size_t Observable::getNumberOfObservers() const
    372351{
    373   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
     352  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    374353  size_t ObserverCount = 0;
    375354  {
    376     std::map<Observable*,callees_t>::const_iterator callTableiter =
     355    GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable();
     356    GlobalObservableInfo::calltable_t::const_iterator callees_t_iter =
    377357        callTable.find(const_cast<Observable *>(this));
    378358    // if not present, then we have zero observers
    379     if (callTableiter != callTable.end())
    380       ObserverCount += callTableiter->second.size();
     359    if (callees_t_iter != callTable.end())
     360      ObserverCount += callees_t_iter->second.size();
    381361  }
    382362  {
    383363    const ChannelMap::const_iterator iter =
    384         NotificationChannels.find(const_cast<Observable * const>(this));
     364        NotificationChannels.find(const_cast<Observable *>(this));
    385365    // if not present, then we have zero observers
    386366    if (iter != NotificationChannels.end())
     
    403383/** Constructor for class Observable.
    404384 */
    405 Observable::Observable(std::string name, const channels_t &_channels) :
     385Observable::Observable(
     386    std::string name,
     387    const channels_t &_channels) :
    406388  Observer(Observer::BaseConstructor()),
    407389  graveyard_informer(&noop_informer)
     
    413395#endif
    414396
    415   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    416397  if (!_channels.empty()) {
     398    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    417399    Channels *OurChannel = new Channels;
    418400    NotificationChannels.insert( std::make_pair(static_cast<Observable *>(this), OurChannel) );
     
    433415      << observerLog().getName(static_cast<Observable *>(this));
    434416#endif
    435   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    436   if(callTable.count(this)) {
     417  bool CallTable_contains = false;
     418  {
     419    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     420    CallTable_contains = (GlobalObservableInfo::getInstance().getcallTable()).count(this);
     421  }
     422  if(CallTable_contains) {
     423    // copy the list from the map
     424    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     425    // copy such that signOff() within receiving subjectKilled() does not affect iterating
     426    // this is because within the same thread and with the subjectKilled() signOff() may be
     427    // called and when executed it modifies targets
     428    GlobalObservableInfo::callees_t callees = (GlobalObservableInfo::getInstance().getcallTable())[this];
    437429    // delete all entries for this observable
    438     ObservablesMapLock.lock();
    439     callees_t callees = callTable[this];
    440     ObservablesMapLock.unlock();
    441     callees_t::iterator iter;
     430    GlobalObservableInfo::callees_t::iterator iter;
    442431    for(iter=callees.begin();iter!=callees.end();++iter)
    443432      (*iter).second->subjectKilled(this);
    444     ObservablesMapLock.lock();
    445     callTable.erase(this);
    446     ObservablesMapLock.unlock();
     433    // erase the list in the map
     434    (GlobalObservableInfo::getInstance().getcallTable()).erase(this);
    447435  }
    448436
    449437  // also kill instance in static Channels map if present
    450   ChannelMap::iterator iter = NotificationChannels.find(static_cast<Observable *>(this));
    451   if (iter != NotificationChannels.end()) {
    452     iter->second->subjectKilled(static_cast<Observable *>(this));
    453     ObservablesMapLock.lock();
    454     delete iter->second;
    455     NotificationChannels.erase(iter);
    456     ObservablesMapLock.unlock();
     438  {
     439    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     440    ChannelMap::iterator iter = NotificationChannels.find(static_cast<Observable *>(this));
     441    if (iter != NotificationChannels.end()) {
     442      iter->second->subjectKilled(static_cast<Observable *>(this));
     443      delete iter->second;
     444      NotificationChannels.erase(iter);
     445    }
    457446  }
    458447}
Note: See TracChangeset for help on using the changeset viewer.