Changeset 959c82


Ignore:
Timestamp:
Oct 30, 2015, 11:43:20 AM (10 years ago)
Author:
Frederik Heber <heber@…>
Children:
1f96ec
Parents:
e24dde
git-author:
Frederik Heber <heber@…> (07/04/15 22:33:12)
git-committer:
Frederik Heber <heber@…> (10/30/15 11:43:20)
Message:

Extracted all static Observable maps (and mutex) into singleton GlobalObservableInfo.

  • this way we may safely control is destruction, i.e. it is always valid as it boils down to a primitive void pointer which does not need to be destroyed or constructed.
  • Minimized code where mutex is locked.
Location:
src
Files:
2 added
7 edited

Legend:

Unmodified
Added
Removed
  • src/CodePatterns/Cacheable.hpp

    re24dde r959c82  
    197197    if (owner != NULL) {
    198198      if (channels.empty()) {
    199         owner->signOn(this,Observable::PriorityLevel(int(-20)));
     199        owner->signOn(this,GlobalObservableInfo::PriorityLevel(int(-20)));
    200200      } else {
    201201        for (Observable::channels_t::const_iterator iter = channels.begin();
    202202            iter != channels.end(); ++iter)
    203           owner->signOn(this,*iter,Observable::PriorityLevel(int(-20)));
     203          owner->signOn(this,*iter,GlobalObservableInfo::PriorityLevel(int(-20)));
    204204      }
    205205    }
  • src/CodePatterns/ObservedValue.hpp

    re24dde r959c82  
    151151  if ((owner != NULL) && (!signedOn)) {
    152152    if (channels.empty()) {
    153       owner->signOn(this,Observable::PriorityLevel(int(-20)));
     153      owner->signOn(this,GlobalObservableInfo::PriorityLevel(int(-20)));
    154154    } else {
    155155      for (Observable::channels_t::const_iterator iter = channels.begin();
  • src/CodePatterns/Observer/Observable.hpp

    re24dde r959c82  
    1818#include <string>
    1919#include <boost/function.hpp>
    20 #include <boost/thread.hpp>
    2120
    2221#include "CodePatterns/Range.hpp"
    2322#include "CodePatterns/Observer/defs.hpp"
     23#include "CodePatterns/Observer/GlobalObservableInfo.hpp"
    2424#include "CodePatterns/Observer/Observer.hpp"
    2525
    2626class Graveyard;
     27class scoped_lock;
    2728
    2829/**
     
    4849  typedef std::vector<size_t> channels_t;
    4950
    50   Observable(std::string _name, const channels_t &_channels = channels_t());
     51  Observable(
     52      std::string _name,
     53      const channels_t &_channels = channels_t());
    5154  virtual ~Observable();
    52 
    53   /** This class is only used to distinguish from size_t in the overload.
    54    *
    55    * It encapsulates a const int (the priority level) and checks valid bounds
    56    * in constructor.
    57    *
    58    */
    59   class PriorityLevel {
    60   public:
    61     explicit PriorityLevel(const int i);
    62     ~PriorityLevel();
    63 
    64     const int level;
    65   private:
    66     static range<int> ValidRange;
    67   };
    6855
    6956private:
     
    9279   * ussually no need to order the update sequence.
    9380   */
    94   virtual void signOn(Observer *target, PriorityLevel priority = PriorityDefault) const;
     81  virtual void signOn(
     82      Observer * target,
     83      GlobalObservableInfo::PriorityLevel priority = GlobalObservableInfo::PriorityDefault) const;
    9584
    9685  /**
     
    10695      Observer *target,
    10796      size_t channelno,
    108       PriorityLevel priority = PriorityDefault) const;
     97      GlobalObservableInfo::PriorityLevel priority =
     98          GlobalObservableInfo::PriorityDefault) const;
    10999
    110100  /**
     
    143133  static void enque_notification_internal(Observable *publisher, Notification_ptr notification);
    144134
     135protected:
     136
    145137  typedef std::map<Observable*, Channels *> ChannelMap;
    146138  static ChannelMap NotificationChannels;
    147 
    148   static PriorityLevel PriorityDefault;
    149 
    150 protected:
    151   typedef std::multimap<int,Observer*> callees_t;
    152   typedef std::set<Notification*> notificationSet;
    153   static std::map<Observable*, int> depth;
    154   static std::map<Observable*,callees_t> callTable;
    155   static std::map<Observable*,notificationSet> notifications;
    156   static std::set<Observable*> busyObservables;
    157 
    158   static boost::recursive_mutex ObservablesMapLock; //!< a lock for the pointer of the instance
    159139
    160140private:
  • src/CodePatterns/Observer/Relay.hpp

    re24dde r959c82  
    3636  virtual ~Relay();
    3737
    38   virtual void signOn(Observer *target, PriorityLevel priority = Observable::PriorityDefault) const;
     38  virtual void signOn(
     39      Observer *target,
     40      GlobalObservableInfo::PriorityLevel priority = GlobalObservableInfo::PriorityDefault) const;
    3941
    4042  virtual void signOff(Observer *target) const;
    4143
    42   virtual void signOn(Observer *target, size_t channelno, PriorityLevel priority = Observable::PriorityDefault) const;
     44  virtual void signOn(
     45      Observer *target,
     46      size_t channelno,
     47      GlobalObservableInfo::PriorityLevel priority = GlobalObservableInfo::PriorityDefault) const;
    4348
    4449  virtual void signOff(Observer *target, size_t channelno) const;
  • src/Observer/Makefile.am

    re24dde r959c82  
    88OBSERVERSOURCE = \
    99        Channels.cpp \
     10        GlobalObservableInfo.cpp \
    1011        Graveyard.cpp \
    1112        Notification.cpp \
     
    2223        $(top_srcdir)/src/CodePatterns/Observer/Channels.hpp \
    2324        $(top_srcdir)/src/CodePatterns/Observer/defs.hpp \
     25        $(top_srcdir)/src/CodePatterns/Observer/GlobalObservableInfo.hpp \
    2426        $(top_srcdir)/src/CodePatterns/Observer/Graveyard.hpp \
    2527        $(top_srcdir)/src/CodePatterns/Observer/Notification.hpp \
  • src/Observer/Observable.cpp

    re24dde r959c82  
    2929#include <algorithm>
    3030
     31#include <boost/thread/locks.hpp>
     32#include <boost/thread/recursive_mutex.hpp>
     33
    3134//!> This function does nothing with the given Observable
    3235void NoOp_informer(const Observable *)
     
    3538Observable::graveyard_informer_t Observable::noop_informer(&NoOp_informer);
    3639
    37 // All infrastructure for the observer-pattern is bundled at a central place
    38 // this is more efficient if many objects can be observed (inherit from observable)
    39 // but only few are actually coupled with observers. E.g. TMV has over 500.000 Atoms,
    40 // which might become observable. Handling Observable infrastructure in each of
    41 // these would use memory for each atom. By handling Observer-infrastructure
    42 // here we only need memory for objects that actually are observed.
    43 // See [Gamma et al, 1995] p. 297
    44 
    45 std::map<Observable*, int> Observable::depth;  //!< Map of Observables to the depth of the DAG of Observers
    46 std::map<Observable*,std::multimap<int,Observer*> > Observable::callTable; //!< Table for each Observable of all its Observers
    47 std::map<Observable*,std::set<Notification*> > Observable::notifications; //!< Table for all current notifications to perform
    48 std::set<Observable*> Observable::busyObservables; //!< Set of Observables that are currently busy notifying their sign-on'ed Observers
    49 Observable::ChannelMap Observable::NotificationChannels; //!< Map of Observables to their Channels.
    50 boost::recursive_mutex Observable::ObservablesMapLock;  //!< mutex for locking the above maps
    51 
    52 // ValidRange must be initialized before PriorityLevel.
    53 range<int> Observable::PriorityLevel::ValidRange(-20, 21);
    54 Observable::PriorityLevel Observable::PriorityDefault(int(0));
    55 
    56 /** Constructor of PriorityLevel.
    57  *
    58  * \note We check whether the level is within Observable::PriorityLevel::ValidRange.
    59  *
    60  * @param i priority level encapsulated in this class.
    61  */
    62 Observable::PriorityLevel::PriorityLevel(const int i) :
    63     level(i)
    64 {
    65   ASSERT(ValidRange.isInRange(level),
    66       "Observable::PriorityLevel::PriorityLevel() - Priority level "
    67       +toString(level)+" out of range "+toString(ValidRange)+".");
    68 }
    69 
    70 Observable::PriorityLevel::~PriorityLevel()
    71 {}
     40Observable::ChannelMap Observable::NotificationChannels;
    7241
    7342/** Attaching Sub-observables to Observables.
    74  * Increases entry in Observable::depth for this \a *publisher by one.
     43 * Increases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one.
    7544 *
    7645 * The two functions \sa start_observer_internal() and \sa finish_observer_internal()
     
    8251void Observable::start_observer_internal(Observable *publisher)
    8352{
    84   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
     53  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    8554  // increase the count for this observable by one
    8655  // if no entry for this observable is found, an new one is created
    8756  // by the STL and initialized to 0 (see STL documentation)
    8857#ifdef LOG_OBSERVER
    89   observerLog().addMessage(depth[publisher]) << ">> Locking " << observerLog().getName(publisher);
    90 #endif
    91   depth[publisher]++;
     58  observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << ">> Locking " << observerLog().getName(publisher);
     59#endif
     60  (GlobalObservableInfo::getInstance().getdepth())[publisher]++;
    9261}
    9362
    9463/** Detaching Sub-observables from Observables.
    95  * Decreases entry in Observable::depth for this \a *publisher by one. If zero, we
     64 * Decreases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one. If zero, we
    9665 * start notifying all our Observers.
    9766 *
     
    10776  // if zero is reached all observed blocks are done and we can
    10877  // start to notify our observers
    109   ObservablesMapLock.lock();
    110   --depth[publisher];
    111 #ifdef LOG_OBSERVER
    112   observerLog().addMessage(depth[publisher]) << "<< Unlocking " << observerLog().getName(publisher);
    113 #endif
    114   int depth_publisher = depth[publisher];
    115   ObservablesMapLock.unlock();
     78  int depth_publisher = 0;
     79  {
     80    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     81    --(GlobalObservableInfo::getInstance().getdepth())[publisher];
     82#ifdef LOG_OBSERVER
     83    observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << "<< Unlocking " << observerLog().getName(publisher);
     84#endif
     85    depth_publisher = (GlobalObservableInfo::getInstance().getdepth())[publisher];
     86  }
    11687  if(depth_publisher){}
    11788  else{
     
    11990    // this item is done, so we don't have to keep the count with us
    12091    // save some memory by erasing it
    121     boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    122     depth.erase(publisher);
     92    {
     93      boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     94      (GlobalObservableInfo::getInstance().getdepth()).erase(publisher);
     95    }
    12396  }
    12497}
     
    12699void Observable::enque_notification_internal(Observable *publisher, Notification_ptr notification)
    127100{
    128   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    129   notifications[publisher].insert(notification);
     101  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     102  (GlobalObservableInfo::getInstance().getnotifications())[publisher].insert(notification);
    130103}
    131104
     
    162135
    163136/** Notify all Observers of changes.
    164  * Puts \a *this into Observable::busyObservables, calls Observer::update() for all in callee_t
     137 * Puts \a *this into Observable::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t
    165138 * and removes from busy list.
    166139 */
     
    172145  // we are busy notifying others right now
    173146  // add ourselves to the list of busy subjects to enable circle detection
    174   ObservablesMapLock.lock();
    175   busyObservables.insert(this);
    176   ObservablesMapLock.unlock();
     147  {
     148    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     149    (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this);
     150  }
    177151  // see if anyone has signed up for observation
    178152  // and call all observers
    179153  try {
    180     ObservablesMapLock.lock();
    181     const bool anybodyThere = callTable.count(this);
    182     ObservablesMapLock.unlock();
    183     if(anybodyThere) {
     154    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     155    GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable();
     156    const bool callTable_contains = callTable.find(this) != callTable.end();
     157    if (callTable_contains) {
    184158      // elements are stored sorted by keys in the multimap
    185159      // so iterating over it gives us a the callees sorted by
    186160      // the priorities
    187       ObservablesMapLock.lock();
    188       callees_t callees = callTable[this];
    189       ObservablesMapLock.unlock();
    190       callees_t::iterator iter;
     161      // copy such that signOff() within receiving update() does not affect iterating
     162      // this is because within the same thread and with the updateKilled() signOff() may be
     163      // called and when executed it modifies targets
     164      GlobalObservableInfo::callees_t callees = callTable[this];
     165      GlobalObservableInfo::callees_t::iterator iter;
    191166      for(iter=callees.begin();iter!=callees.end();++iter){
    192167#ifdef LOG_OBSERVER
     
    202177
    203178  // send out all notifications that need to be done
    204 
    205   ObservablesMapLock.lock();
    206   notificationSet currentNotifications = notifications[this];
    207   ObservablesMapLock.unlock();
    208   for(notificationSet::iterator it = currentNotifications.begin();
    209       it != currentNotifications.end();++it){
    210     (*it)->notifyAll(this);
    211   }
    212 
    213   ObservablesMapLock.lock();
    214   notifications.erase(this);
    215 
    216    // done with notification, we can leave the set of busy subjects
    217   busyObservables.erase(this);
    218   ObservablesMapLock.unlock();
     179  {
     180    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     181    GlobalObservableInfo::notificationSet currentNotifications =
     182        (GlobalObservableInfo::getInstance().getnotifications())[this];
     183    for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin();
     184        it != currentNotifications.end();++it){
     185      (*it)->notifyAll(this);
     186    }
     187  }
     188
     189  {
     190    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     191    (GlobalObservableInfo::getInstance().getnotifications()).erase(this);
     192
     193     // done with notification, we can leave the set of busy subjects
     194    (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this);
     195  }
    219196
    220197#ifdef LOG_OBSERVER
     
    232209void Observable::update(Observable *publisher) {
    233210  // circle detection
    234   ObservablesMapLock.lock();
    235   const bool presentCircle = busyObservables.find(this)!=busyObservables.end();
    236   ObservablesMapLock.unlock();
     211  bool presentCircle = false;
     212  {
     213    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     214    presentCircle = (GlobalObservableInfo::getInstance().getbusyObservables()).find(this)!=(GlobalObservableInfo::getInstance().getbusyObservables()).end();
     215  }
    237216  if(presentCircle) {
    238217    // somehow a circle was introduced... we were busy notifying our
     
    245224  }
    246225  else {
     226    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    247227    // see if we are in the process of changing ourselves
    248228    // if we are changing ourselves at the same time our sub-observables change
    249229    // we do not need to publish all the changes at each time we are called
    250     ObservablesMapLock.lock();
    251     const bool depth_this = depth.find(this)==depth.end();
    252     ObservablesMapLock.unlock();
    253     if(depth_this) {
     230    std::map<Observable*, int>& depth = GlobalObservableInfo::getInstance().getdepth();
     231    const bool depth_contains = depth.find(this)==depth.end();
     232    if(depth_contains) {
    254233#ifdef LOG_OBSERVER
    255234      observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher)
     
    268247
    269248/** Sign on an Observer to this Observable.
    270  * Puts \a *target into Observable::callTable list.
     249 * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
    271250 * \param *target Observer
    272251 * \param priority number in [-20,20]
    273252 */
    274 void Observable::signOn(Observer *target, PriorityLevel priority) const
     253void Observable::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const
    275254{
    276255#ifdef LOG_OBSERVER
     
    278257#endif
    279258  bool res = false;
    280   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    281   callees_t &callees = callTable[const_cast<Observable *>(this)];
    282 
    283   callees_t::iterator iter;
     259  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     260  GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast<Observable *>(this)];
     261
     262  GlobalObservableInfo::callees_t::iterator iter;
    284263  for(iter=callees.begin();iter!=callees.end();++iter){
    285264    res |= ((*iter).second == target);
     
    290269
    291270/** Sign off an Observer from this Observable.
    292  * Removes \a *target from Observable::callTable list.
     271 * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
    293272 * \param *target Observer
    294273 */
    295274void Observable::signOff(Observer *target) const
    296275{
    297   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    298   ASSERT(callTable.count(const_cast<Observable *>(this)),
    299       "SignOff called for an Observable without Observers.");
    300 #ifdef LOG_OBSERVER
    301   observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast<Observable *>(this));
    302 #endif
    303   callees_t &callees = callTable[const_cast<Observable *>(this)];
    304 
    305   callees_t::iterator iter;
    306   callees_t::iterator deliter;
    307   for(iter=callees.begin();iter!=callees.end();) {
    308     if((*iter).second == target) {
    309       callees.erase(iter++);
    310     }
    311     else {
    312       ++iter;
    313     }
    314   }
    315   if(callees.empty()){
    316     callTable.erase(const_cast<Observable *>(this));
     276  {
     277    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     278    GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable();
     279    ASSERT(callTable.count(const_cast<Observable *>(this)),
     280        "SignOff called for an Observable without Observers.");
     281#ifdef LOG_OBSERVER
     282    observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast<Observable *>(this));
     283#endif
     284    GlobalObservableInfo::callees_t &callees = callTable[const_cast<Observable *>(this)];
     285
     286    GlobalObservableInfo::callees_t::iterator iter;
     287    GlobalObservableInfo::callees_t::iterator deliter;
     288    for(iter=callees.begin();iter!=callees.end();) {
     289      if((*iter).second == target) {
     290        callees.erase(iter++);
     291      }
     292      else {
     293        ++iter;
     294      }
     295    }
     296    if(callees.empty()){
     297      callTable.erase(const_cast<Observable *>(this));
     298    }
    317299  }
    318300  (*graveyard_informer)(this);
     
    322304    Observer *target,
    323305    size_t channelno,
    324     PriorityLevel priority) const
    325 {
    326   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
     306    GlobalObservableInfo::PriorityLevel priority) const
     307{
    327308  Notification_ptr notification = getChannel(channelno);
    328309#ifdef LOG_OBSERVER
     
    336317void Observable::signOff(Observer *target, size_t channelno) const
    337318{
    338   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    339319  Notification_ptr notification = getChannel(channelno);
    340320#ifdef LOG_OBSERVER
     
    349329bool Observable::isBlocked() const
    350330{
    351   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    352   return depth.count(const_cast<Observable *>(this)) > 0;
     331  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     332  return (GlobalObservableInfo::getInstance().getdepth()).count(const_cast<Observable *>(this)) > 0;
    353333}
    354334
    355335Notification_ptr Observable::getChannel(size_t no) const
    356336{
    357   ObservablesMapLock.lock();
    358   const ChannelMap::const_iterator iter = NotificationChannels.find(const_cast<Observable * const>(this));
     337  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     338  const ChannelMap::const_iterator iter = NotificationChannels.find(const_cast<Observable *>(this));
    359339  const bool status = iter != NotificationChannels.end();
    360340  Channels *OurChannel = NULL;
    361341  if (status)
    362342    OurChannel = iter->second;
    363   ObservablesMapLock.unlock();
    364343  ASSERT(status,
    365       "Observable::getChannel() - we do not have a channel in NotificationChannels.");
     344      "Observable::getChannel() - we do not have a channel "+toString(no)+" in NotificationChannels.");
    366345  ASSERT(OurChannel != NULL,
    367346      "Observable::getChannel() - observable has no channels.");
     
    371350size_t Observable::getNumberOfObservers() const
    372351{
    373   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
     352  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    374353  size_t ObserverCount = 0;
    375354  {
    376     std::map<Observable*,callees_t>::const_iterator callTableiter =
     355    GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable();
     356    GlobalObservableInfo::calltable_t::const_iterator callees_t_iter =
    377357        callTable.find(const_cast<Observable *>(this));
    378358    // if not present, then we have zero observers
    379     if (callTableiter != callTable.end())
    380       ObserverCount += callTableiter->second.size();
     359    if (callees_t_iter != callTable.end())
     360      ObserverCount += callees_t_iter->second.size();
    381361  }
    382362  {
    383363    const ChannelMap::const_iterator iter =
    384         NotificationChannels.find(const_cast<Observable * const>(this));
     364        NotificationChannels.find(const_cast<Observable *>(this));
    385365    // if not present, then we have zero observers
    386366    if (iter != NotificationChannels.end())
     
    403383/** Constructor for class Observable.
    404384 */
    405 Observable::Observable(std::string name, const channels_t &_channels) :
     385Observable::Observable(
     386    std::string name,
     387    const channels_t &_channels) :
    406388  Observer(Observer::BaseConstructor()),
    407389  graveyard_informer(&noop_informer)
     
    413395#endif
    414396
    415   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    416397  if (!_channels.empty()) {
     398    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
    417399    Channels *OurChannel = new Channels;
    418400    NotificationChannels.insert( std::make_pair(static_cast<Observable *>(this), OurChannel) );
     
    433415      << observerLog().getName(static_cast<Observable *>(this));
    434416#endif
    435   boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    436   if(callTable.count(this)) {
     417  bool CallTable_contains = false;
     418  {
     419    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     420    CallTable_contains = (GlobalObservableInfo::getInstance().getcallTable()).count(this);
     421  }
     422  if(CallTable_contains) {
     423    // copy the list from the map
     424    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     425    // copy such that signOff() within receiving subjectKilled() does not affect iterating
     426    // this is because within the same thread and with the subjectKilled() signOff() may be
     427    // called and when executed it modifies targets
     428    GlobalObservableInfo::callees_t callees = (GlobalObservableInfo::getInstance().getcallTable())[this];
    437429    // delete all entries for this observable
    438     ObservablesMapLock.lock();
    439     callees_t callees = callTable[this];
    440     ObservablesMapLock.unlock();
    441     callees_t::iterator iter;
     430    GlobalObservableInfo::callees_t::iterator iter;
    442431    for(iter=callees.begin();iter!=callees.end();++iter)
    443432      (*iter).second->subjectKilled(this);
    444     ObservablesMapLock.lock();
    445     callTable.erase(this);
    446     ObservablesMapLock.unlock();
     433    // erase the list in the map
     434    (GlobalObservableInfo::getInstance().getcallTable()).erase(this);
    447435  }
    448436
    449437  // also kill instance in static Channels map if present
    450   ChannelMap::iterator iter = NotificationChannels.find(static_cast<Observable *>(this));
    451   if (iter != NotificationChannels.end()) {
    452     iter->second->subjectKilled(static_cast<Observable *>(this));
    453     ObservablesMapLock.lock();
    454     delete iter->second;
    455     NotificationChannels.erase(iter);
    456     ObservablesMapLock.unlock();
     438  {
     439    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     440    ChannelMap::iterator iter = NotificationChannels.find(static_cast<Observable *>(this));
     441    if (iter != NotificationChannels.end()) {
     442      iter->second->subjectKilled(static_cast<Observable *>(this));
     443      delete iter->second;
     444      NotificationChannels.erase(iter);
     445    }
    457446  }
    458447}
  • src/Observer/Relay.cpp

    re24dde r959c82  
    2626#include "CodePatterns/Observer/Notification.hpp"
    2727
     28#include <boost/thread/locks.hpp>
     29#include <boost/thread/recursive_mutex.hpp>
    2830
    2931/** Constructor for class Relay.
     
    5355
    5456/** Sign on an Observer to this Observable.
    55  * Puts \a *target into Observable::callTable list.
     57 * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
    5658 * \param *target Observer
    5759 * \param priority number in [-20,20]
    5860 */
    59 void Relay::signOn(Observer *target, PriorityLevel priority) const
     61void Relay::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const
    6062{
    6163#ifdef LOG_OBSERVER
     
    6567#endif
    6668  bool res = false;
    67   callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))];
    68 
    69   callees_t::iterator iter;
     69  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     70  GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast<Observable *>(static_cast<const Observable * const>(this))];
     71
     72  GlobalObservableInfo::callees_t::iterator iter;
    7073  for(iter=callees.begin();iter!=callees.end();++iter){
    7174    res |= ((*iter).second == target);
     
    7679
    7780/** Sign off an Observer from this Observable.
    78  * Removes \a *target from Observable::callTable list.
     81 * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
    7982 * \param *target Observer
    8083 */
    8184void Relay::signOff(Observer *target) const
    8285{
     86  boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     87  GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable();
    8388  ASSERT(callTable.count(const_cast<Observable *>(static_cast<const Observable * const>(this))),
    8489      "Relay::signOff() - called for an Observable without Observers.");
     
    8893      << observerLog().getName(const_cast<Observable *>(static_cast<const Observable * const>(this)));
    8994#endif
    90   callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))];
    91 
    92   callees_t::iterator iter;
    93   callees_t::iterator deliter;
     95  GlobalObservableInfo::callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))];
     96
     97  GlobalObservableInfo::callees_t::iterator iter;
     98  GlobalObservableInfo::callees_t::iterator deliter;
    9499  for(iter=callees.begin();iter!=callees.end();) {
    95100    if((*iter).second == target) {
     
    105110}
    106111
    107 void Relay::signOn(Observer *target, size_t channelno, PriorityLevel priority) const
     112void Relay::signOn(Observer *target, size_t channelno, GlobalObservableInfo::PriorityLevel priority) const
    108113{
    109114  Notification_ptr notification = getChannel(channelno);
     
    118123
    119124/** Notify all Observers of changes.
    120  * Puts \a *this into Relay::busyObservables, calls Observer::update() for all in callee_t
     125 * Puts \a *this into Relay::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t
    121126 * and removes from busy list.
    122127 */
     
    126131  // we are busy notifying others right now
    127132  // add ourselves to the list of busy subjects to enable circle detection
    128   busyObservables.insert(this);
     133  {
     134    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     135    (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this);
     136  }
    129137  // see if anyone has signed up for observation
    130138  // and call all observers
    131139  try {
    132     if(callTable.count(this)) {
     140    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     141    GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable();
     142    const bool callTable_contains = callTable.count(this);
     143    if(callTable_contains) {
    133144      // elements are stored sorted by keys in the multimap
    134145      // so iterating over it gives us a the callees sorted by
    135146      // the priorities
    136       callees_t callees = callTable[this];
    137       callees_t::iterator iter;
     147      // copy such that signOff() within receiving update() does not affect iterating
     148      // this is because within the same thread and with the updateKilled() signOff() may be
     149      // called and when executed it modifies targets
     150      boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     151      GlobalObservableInfo::callees_t callees = callTable[this];
     152      GlobalObservableInfo::callees_t::iterator iter;
    138153      for(iter=callees.begin();iter!=callees.end();++iter){
    139154#ifdef LOG_OBSERVER
     
    149164  ASSERT_NOCATCH("Exception thrown from Observer Update");
    150165
    151   // send out all notifications that need to be done
    152 
    153   notificationSet currentNotifications = notifications[Updater];
    154   for(notificationSet::iterator it = currentNotifications.begin();
    155       it != currentNotifications.end();++it){
    156     (*it)->notifyAll(Updater);
    157   }
    158 
    159   notifications.erase(Updater);
    160 
    161    // done with notification, we can leave the set of busy subjects
    162   busyObservables.erase(this);
     166  // send out all (GlobalObservableInfo::getInstance().getnotifications()) that need to be done
     167  {
     168    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     169    GlobalObservableInfo::notificationSet currentNotifications =
     170        (GlobalObservableInfo::getInstance().getnotifications())[Updater];
     171    for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin();
     172        it != currentNotifications.end();++it){
     173      (*it)->notifyAll(Updater);
     174    }
     175  }
     176
     177  {
     178    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     179    (GlobalObservableInfo::getInstance().getnotifications()).erase(Updater);
     180
     181     // done with notification, we can leave the set of busy subjects
     182    (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this);
     183  }
    163184}
    164185
     
    171192void Relay::update(Observable *publisher) {
    172193  // circle detection
    173   if(busyObservables.find(this)!=busyObservables.end()) {
     194  bool circle_present = false;
     195  {
     196    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     197    std::set<Observable*>& busyObservables = GlobalObservableInfo::getInstance().getbusyObservables();
     198    circle_present = busyObservables.find(this)!=busyObservables.end();
     199  }
     200  if(circle_present) {
    174201    // somehow a circle was introduced... we were busy notifying our
    175202    // observers, but still we are called by one of our sub-Relays
     
    184211    // if we are changing ourselves at the same time our sub-observables change
    185212    // we do not need to publish all the changes at each time we are called
    186     if(depth.find(this)==depth.end()) {
     213    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     214    std::map<Observable*, int>& depth = GlobalObservableInfo::getInstance().getdepth();
     215    const bool depth_contains = depth.find(this)==depth.end();
     216    if(depth_contains) {
    187217#ifdef LOG_OBSERVER
    188218      observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher)
     
    202232}
    203233
    204 /** Method for receiving specialized notifications.
     234/** Method for receiving specialized (GlobalObservableInfo::getInstance().getnotifications()).
    205235 *
    206236 * \param *publisher The \a *this we observe.
     
    210240{
    211241  Updater = publisher;
    212   ChannelMap::const_iterator iter = NotificationChannels.find(this);
    213   if (iter != NotificationChannels.end()) {
    214     const Channels *myChannels = iter->second;
     242  bool contains_channels = false;
     243  {
     244    boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     245    contains_channels = NotificationChannels.find(this) != NotificationChannels.end();
     246  }
     247  if (contains_channels) {
    215248    const size_t channelno = notification->getChannelNo();
    216     Notification_ptr mynotification = myChannels->getChannel(channelno);
     249    Notification *mynotification = NULL;
     250    {
     251      boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
     252      ChannelMap::const_iterator iter = NotificationChannels.find(this);
     253      const Channels *myChannels = iter->second;
     254      mynotification = myChannels->getChannel(channelno);
     255    }
    217256    ASSERT(mynotification != NULL,
    218257        "Relay::recieveNotification() - this relay does not have a notification no "+toString(channelno)+".");
     
    228267 *  \param *publisher Sub-Relay.
    229268 */
    230 void Relay::subjectKilled(Observable *publisher){
    231 }
    232 
     269void Relay::subjectKilled(Observable *publisher)
     270{
     271}
     272
Note: See TracChangeset for help on using the changeset viewer.