Changeset 1b5188 for src


Ignore:
Timestamp:
Jul 30, 2015, 8:47:36 PM (10 years ago)
Author:
Frederik Heber <heber@…>
Children:
454bc54
Parents:
163eec
git-author:
Frederik Heber <heber@…> (06/18/15 00:02:38)
git-committer:
Frederik Heber <heber@…> (07/30/15 20:47:36)
Message:

Observables are now protected by mutexes, i.e. allow for concurrent use.

Location:
src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • src/CodePatterns/Observer/Observable.hpp

    r163eec r1b5188  
    1818#include <string>
    1919#include <boost/function.hpp>
     20#include <boost/thread.hpp>
    2021
    2122#include "CodePatterns/Range.hpp"
     
    135136  static std::set<Observable*> busyObservables;
    136137
     138  static boost::recursive_mutex ObservablesMapLock; //!< a lock for the pointer of the instance
     139
    137140private:
    138141  friend class Zombie;
  • src/Observer/Observable.cpp

    r163eec r1b5188  
    4646std::set<Observable*> Observable::busyObservables; //!< Set of Observables that are currently busy notifying their sign-on'ed Observers
    4747Observable::ChannelMap Observable::NotificationChannels; //!< Map of Observables to their Channels.
     48boost::recursive_mutex Observable::ObservablesMapLock;  //!< mutex for locking the above maps
    4849
    4950// ValidRange must be initialized before PriorityLevel.
     
    7778 * \param *publisher reference of sub-observable
    7879 */
    79 void Observable::start_observer_internal(Observable *publisher){
     80void Observable::start_observer_internal(Observable *publisher)
     81{
     82  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    8083  // increase the count for this observable by one
    8184  // if no entry for this observable is found, an new one is created
     
    97100 * \param *publisher reference of sub-observable
    98101 */
    99 void Observable::finish_observer_internal(Observable *publisher){
     102void Observable::finish_observer_internal(Observable *publisher)
     103{
    100104  // decrease the count for this observable
    101105  // if zero is reached all observed blocks are done and we can
    102106  // start to notify our observers
     107  ObservablesMapLock.lock();
    103108  --depth[publisher];
     109  ObservablesMapLock.unlock();
    104110#ifdef LOG_OBSERVER
    105111  observerLog().addMessage(depth[publisher]) << "<< Unlocking " << observerLog().getName(publisher);
    106112#endif
    107   if(depth[publisher]){}
     113  ObservablesMapLock.lock();
     114  int depth_publisher = depth[publisher];
     115  ObservablesMapLock.unlock();
     116  if(depth_publisher){}
    108117  else{
    109118    publisher->notifyAll();
    110119    // this item is done, so we don't have to keep the count with us
    111120    // save some memory by erasing it
     121    boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    112122    depth.erase(publisher);
    113123  }
    114124}
    115125
    116 void Observable::enque_notification_internal(Observable *publisher, Notification_ptr notification){
     126void Observable::enque_notification_internal(Observable *publisher, Notification_ptr notification)
     127{
     128  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    117129  notifications[publisher].insert(notification);
    118130}
     
    160172  // we are busy notifying others right now
    161173  // add ourselves to the list of busy subjects to enable circle detection
     174  ObservablesMapLock.lock();
    162175  busyObservables.insert(this);
     176  ObservablesMapLock.unlock();
    163177  // see if anyone has signed up for observation
    164178  // and call all observers
    165179  try {
    166     if(callTable.count(this)) {
     180    ObservablesMapLock.lock();
     181    const bool anybodyThere = callTable.count(this);
     182    ObservablesMapLock.unlock();
     183    if(anybodyThere) {
    167184      // elements are stored sorted by keys in the multimap
    168185      // so iterating over it gives us a the callees sorted by
    169186      // the priorities
     187      ObservablesMapLock.lock();
    170188      callees_t callees = callTable[this];
     189      ObservablesMapLock.unlock();
    171190      callees_t::iterator iter;
    172191      for(iter=callees.begin();iter!=callees.end();++iter){
     
    190209  }
    191210
     211  ObservablesMapLock.lock();
    192212  notifications.erase(this);
    193213
    194214   // done with notification, we can leave the set of busy subjects
    195215  busyObservables.erase(this);
     216  ObservablesMapLock.unlock();
    196217
    197218#ifdef LOG_OBSERVER
     
    209230void Observable::update(Observable *publisher) {
    210231  // circle detection
    211   if(busyObservables.find(this)!=busyObservables.end()) {
     232  ObservablesMapLock.lock();
     233  const bool presentCircle = busyObservables.find(this)!=busyObservables.end();
     234  ObservablesMapLock.unlock();
     235  if(presentCircle) {
    212236    // somehow a circle was introduced... we were busy notifying our
    213237    // observers, but still we are called by one of our sub-Observables
     
    222246    // if we are changing ourselves at the same time our sub-observables change
    223247    // we do not need to publish all the changes at each time we are called
    224     if(depth.find(this)==depth.end()) {
     248    ObservablesMapLock.lock();
     249    const bool depth_this = depth.find(this)==depth.end();
     250    ObservablesMapLock.unlock();
     251    if(depth_this) {
    225252#ifdef LOG_OBSERVER
    226253      observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher)
     
    249276#endif
    250277  bool res = false;
     278  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    251279  callees_t &callees = callTable[const_cast<Observable *>(this)];
    252280
     
    265293void Observable::signOff(Observer *target) const
    266294{
     295  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    267296  ASSERT(callTable.count(const_cast<Observable *>(this)),
    268297      "SignOff called for an Observable without Observers.");
     
    290319void Observable::signOn(Observer *target, size_t channelno) const
    291320{
     321  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    292322  Notification_ptr notification = getChannel(channelno);
    293323#ifdef LOG_OBSERVER
     
    301331void Observable::signOff(Observer *target, size_t channelno) const
    302332{
     333  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    303334  Notification_ptr notification = getChannel(channelno);
    304335#ifdef LOG_OBSERVER
     
    313344bool Observable::isBlocked() const
    314345{
     346  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    315347  return depth.count(const_cast<Observable *>(this)) > 0;
    316348}
     
    318350Notification_ptr Observable::getChannel(size_t no) const
    319351{
     352  ObservablesMapLock.lock();
    320353  const ChannelMap::const_iterator iter = NotificationChannels.find(const_cast<Observable * const>(this));
    321   ASSERT(iter != NotificationChannels.end(),
     354  const bool status = iter != NotificationChannels.end();
     355  Channels *OurChannel = NULL;
     356  if (status)
     357    OurChannel = iter->second;
     358  ObservablesMapLock.unlock();
     359  ASSERT(status,
    322360      "Observable::getChannel() - we do not have a channel in NotificationChannels.");
    323   const Channels *OurChannel = iter->second;
    324361  ASSERT(OurChannel != NULL,
    325362      "Observable::getChannel() - observable has no channels.");
     
    329366size_t Observable::getNumberOfObservers() const
    330367{
     368  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    331369  size_t ObserverCount = 0;
    332370  {
     
    354392 *  \param *publisher Sub-Observable.
    355393 */
    356 void Observable::subjectKilled(Observable *publisher){
     394void Observable::subjectKilled(Observable *publisher)
     395{
    357396}
    358397
     
    379418      << observerLog().getName(static_cast<Observable *>(this));
    380419#endif
     420  boost::recursive_mutex::scoped_lock guard(ObservablesMapLock);
    381421  if(callTable.count(this)) {
    382422    // delete all entries for this observable
     423    ObservablesMapLock.lock();
    383424    callees_t callees = callTable[this];
     425    ObservablesMapLock.unlock();
    384426    callees_t::iterator iter;
    385427    for(iter=callees.begin();iter!=callees.end();++iter)
    386428      (*iter).second->subjectKilled(this);
     429    ObservablesMapLock.lock();
    387430    callTable.erase(this);
     431    ObservablesMapLock.unlock();
    388432  }
    389433
     
    392436  if (iter != NotificationChannels.end()) {
    393437    iter->second->subjectKilled(static_cast<Observable *>(this));
     438    ObservablesMapLock.lock();
    394439    delete iter->second;
    395440    NotificationChannels.erase(iter);
    396   }
    397 }
     441    ObservablesMapLock.unlock();
     442  }
     443}
Note: See TracChangeset for help on using the changeset viewer.