OgreHapticsConcurrentQueue.h

Go to the documentation of this file.
00001 /*
00002 This source file is part of OgreHaptics
00003  (a library for wrapping haptics devices for use with the open-source
00004  graphics engine OGRE, http://www.ogre3d.org)
00005 
00006 Copyright (c) 2006 - 2008 Jorrit de Vries
00007 Also see acknowledgements in Readme.html
00008 
00009 This library is free software; you can redistribute it and/or
00010 modify it under the terms of the GNU Lesser General Public License
00011 as published by the Free Software Foundation; either version 2.1
00012 of the License, or (at your option) any later version.
00013 
00014 This library is distributed in the hope that it will be useful,
00015 but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00017 GNU Lesser General Public License for more details.
00018 
00019 You should have received a copy of the GNU Lesser General Public
00020 License along with this library; if not, write to the
00021 Free Software Foundation, Inc.,
00022 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
00023 */
00024 #ifndef _OgreHapticsConcurrentQueue_H__
00025 #define _OgreHapticsConcurrentQueue_H__
00026 
00027 #include "OgreHapticsPrerequisites.h"
00028 #include "OgreHapticsAtomic.h"
00029 #include "OgreHapticsNonCopyable.h"
00030 
00031 namespace OgreHaptics {
00032 
00062     template<typename T> class ConcurrentQueue : public NonCopyable
00063     {
00064     public:
00065         // Predeclaration
00066         class HazardPtr;
00067     private:
00068         // Structure which is used to contain data within the queue.
00069         struct Node
00070         {
00071             T data;
00072             Node* volatile next;
00073 
00074             Node()
00075                 : data(), next(0) {}
00076             Node(T val)
00077                 : data(val), next(0) {}
00078         };
00079 
00080         // Head of single linked list of hazard pointers.
00081         HazardPtr* mHPHead;
00082         // Number of hazards present at most in list of hazard pointers.
00083         int mHPCount;
00084         // Head of queue.
00085         Node* volatile mHead;
00086         // Tail of queue.
00087         Node* volatile mTail;
00088 
00089         typedef std::vector<Node*> NodeList;
00090     public:
00107         class HazardPtr : public NonCopyable
00108         {
00109             // Allow ConcurrentQueue full access.
00110             friend class ConcurrentQueue;
00111         public:
00113             HazardPtr()
00114                 : active(1), 
00115                 next(0)
00116             {
00117                 hazards[0] = 0;
00118                 hazards[1] = 0;
00119             }
00121             virtual ~HazardPtr() {}
00122         protected:
00125             int active;
00128             HazardPtr* next;
00131             Node* hazards[2];
00134             NodeList retiredList;
00135         };
00136 
00138         ConcurrentQueue()
00139             : mHPHead(0),
00140             mHPCount(0)
00141         {
00142             // Create dummy node ensuring both head and tail point at a node
00143             // in the queue to avoid problems when the queue is empty or
00144             // contains only a single item.
00145             mHead = mTail = new Node();
00146         }
00147 
00154         ~ConcurrentQueue()
00155         {
00156             // Cleanup list of hazard pointers
00157             while (mHPHead)
00158             {
00159                 // Advance in list
00160                 HazardPtr* tmp = mHPHead;
00161                 mHPHead = mHPHead->next;
00162 
00163                 // Clean up left retired nodes
00164                 NodeList::iterator it;
00165                 for (it = tmp->retiredList.begin(); it != tmp->retiredList.end(); ++it)
00166                 {
00167                     delete *it;
00168                 }
00169                 tmp->retiredList.clear();
00170 
00171                 delete tmp;
00172             }
00173 
00174             // Cleanup head (head is actually never removed)
00175             delete mHead;
00176         }
00177 
00187         HazardPtr* allocateHazardPtr(void)
00188         {
00189             // First try to use a retired hazard pointer
00190             HazardPtr* ptr = mHPHead;
00191             for (; ptr; ptr = ptr->next)
00192             {
00193                 if (ptr->active != 0 || !atomicCompareAndSwap(&ptr->active, 0, 1))
00194                 {
00195                     continue;
00196                 }
00197                 return ptr;
00198             }
00199 
00200             // No hazard pointer available for reuse
00201             // Increment list count by two (maximum two hazards per thread)
00202             int count;
00203             do
00204             {
00205                 count = mHPCount;
00206             }
00207             while (!atomicCompareAndSwap(&mHPCount, count, count + 2));
00208 
00209             // Allocate a new hazard pointer
00210             ptr = new HazardPtr();
00211             // Push new hazard pointer to front
00212             HazardPtr* oldHead;
00213             do
00214             {
00215                 oldHead = mHPHead;
00216                 ptr->next = oldHead;
00217             }
00218             while (!atomicCompareAndSwapPtr(&mHPHead, oldHead, ptr));
00219 
00220             return ptr;
00221         }
00222 
00229         void retireHazardPtr(HazardPtr* ptr)
00230         {
00231             // Clear hazards
00232             ptr->hazards[0] = 0;
00233             ptr->hazards[1] = 0;
00234             // Deactivate hazard pointer after ensuring previous operations
00235             // have been executed (compare-and-swap issues full memory barrier)
00236             atomicCompareAndSwap(&ptr->active, 1, 0);
00237             return;
00238         }
00239 
00247         void push(HazardPtr* ptr, const T& value)
00248         {
00249             assert((ptr && ptr->active != 0) && "HazardPtr must be active.");
00250 
00251             // Create new node
00252             Node* node = new Node(value);
00253             Node* tail;
00254 
00255             while (true)
00256             {
00257                 tail = mTail;
00258                 // Hazard, tail may have been removed by other thread after
00259                 // previous line has been executed
00260                 ptr->hazards[0] = tail;
00261                 // Some other thread might have removed tail and didn't find
00262                 // hazardous references to it
00263                 if (tail != mTail)
00264                 {
00265                     continue;
00266                 }
00267                 Node* next = tail->next;
00268                 // We need to guarantee next is from actual tail in order to
00269                 // prevent ABA problems
00270                 if (tail != mTail)
00271                 {
00272                     continue;
00273                 }
00274                 if (next)
00275                 {
00276                     // Keep updating tail until it points to last node
00277                     atomicCompareAndSwapPtr(&mTail, tail, next);
00278                     continue;
00279                 }
00280                 // If tail points to last node update tail to point to new node
00281                 if (atomicCompareAndSwapPtr(&tail->next, reinterpret_cast<Node*>(0), node))
00282                 {
00283                     break;
00284                 }
00285             }
00286 
00287             // If tail points to what we thought was last node update tail to
00288             // point at new node
00289             atomicCompareAndSwapPtr(&mTail, tail, node);
00290         }
00291 
00302         bool pop(HazardPtr* ptr, T& destination)
00303         {
00304             assert((ptr && ptr->active != 0) && "HazardPtr must be active.");
00305 
00306             Node* head;
00307             
00308             while (true)
00309             {
00310                 head = mHead;
00311                 ptr->hazards[0] = head;
00312                 if (head != mHead)
00313                 {
00314                     continue;
00315                 }
00316 
00317                 Node* tail = mTail;
00318 
00319                 Node* next = head->next;
00320                 ptr->hazards[1] = next;
00321                 if (head != mHead)
00322                 {
00323                     continue;
00324                 }
00325 
00326                 // Queue is empty
00327                 if (!next)
00328                 {
00329                     return false;
00330                 }
00331 
00332                 if (head == tail)
00333                 {
00334                     // Tail is behind, move tail off of head
00335                     atomicCompareAndSwapPtr(&mTail, tail, next);
00336                     continue;
00337                 }
00338                 else
00339                 {
00340                     destination = next->data;
00341                     // Move head pointer, effectively removing the node
00342                     if (atomicCompareAndSwapPtr(&mHead, head, next))
00343                     {
00344                         break;
00345                     }
00346                 }
00347             }
00348 
00349             // Mark node for reclamation
00350             retireNode(ptr, head);
00351             return true;
00352         }
00353     private:
00355         void retireNode(HazardPtr* ptr, Node* node)
00356         {
00357             // Add node to retiredList of hazard pointer
00358             ptr->retiredList.push_back(node);
00359             HazardPtr* head = mHPHead;
00360             
00361             // Check whether we need to scan for nodes eligible for reclamation
00362             //TODO comment omega 
00363             static const float omega = 1.25f;
00364             if (ptr->retiredList.size() >= (omega * mHPCount))
00365             {
00366                 scan(ptr, head);
00367                 helpScan(ptr);
00368             }
00369         }
00370 
00375         void scan(HazardPtr* ptr, HazardPtr* head)
00376         {
00377             // Scan list of hazard pointers to collect all non-null hazards
00378             NodeList hazards;
00379             while (head)
00380             {
00381                 for (int i = 0; i < 2; ++i)
00382                 {
00383                     Node* hazard = head->hazards[i];
00384                     if (hazard)
00385                     {
00386                         hazards.push_back(hazard);
00387                     }
00388                 }
00389                 head = head->next;
00390             }
00391 
00392             // Sort the list of hazards
00393             std::sort(hazards.begin(), hazards.end(), std::less<Node*>());
00394 
00395             // Check each hazard against retired nodes
00396             NodeList retiredList(ptr->retiredList);
00397             ptr->retiredList.clear();
00398             NodeList::iterator it;
00399             for (it = retiredList.begin(); it != retiredList.end(); ++it)
00400             {
00401                 if (!std::binary_search(hazards.begin(), hazards.end(), *it))
00402                 {
00403                     // Node is eligible for reuse
00404                     delete *it;
00405                 }
00406                 else
00407                 {
00408                     ptr->retiredList.push_back(*it);
00409                 }
00410             }
00411         }
00412 
00419         void helpScan(HazardPtr* ptr)
00420         {
00421             HazardPtr* head = mHPHead;
00422             for (; head; head = head->next)
00423             {
00424                 if (head->active != 0 || !atomicCompareAndSwap(&head->active, 0, 1))
00425                 {
00426                     // Only scan inactive hazard pointers. Inactive hazard
00427                     // pointers are locked through compare-and-swap
00428                     continue;
00429                 }
00430                 while (!head->retiredList.empty())
00431                 {
00432                     // Add node to list owned by given hazard pointer
00433                     ptr->retiredList.push_back(head->retiredList.back());
00434                     head->retiredList.pop_back();
00435                     HazardPtr* h = mHPHead;
00436                     //TODO comment omega
00437                     static const float omega = 1.25f;
00438                     if (ptr->retiredList.size() > (omega * mHPCount))
00439                     {
00440                         scan(ptr, h);
00441                     }
00442                 }
00443                 // Deactivate hazard pointer
00444                 head->active = 0;
00445             }
00446         }
00447     };
00448 
00449 }
00450 
00451 #endif

Last modified Tue Jan 6 22:31:25 2009