00001 /* 00002 This source file is part of OgreHaptics 00003 (a library for wrapping haptics devices for use with the open-source 00004 graphics engine OGRE, http://www.ogre3d.org) 00005 00006 Copyright (c) 2006 - 2008 Jorrit de Vries 00007 Also see acknowledgements in Readme.html 00008 00009 This library is free software; you can redistribute it and/or 00010 modify it under the terms of the GNU Lesser General Public License 00011 as published by the Free Software Foundation; either version 2.1 00012 of the License, or (at your option) any later version. 00013 00014 This library is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public 00020 License along with this library; if not, write to the 00021 Free Software Foundation, Inc., 00022 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00023 */ 00024 #ifndef _OgreHapticsConcurrentQueue_H__ 00025 #define _OgreHapticsConcurrentQueue_H__ 00026 00027 #include "OgreHapticsPrerequisites.h" 00028 #include "OgreHapticsAtomic.h" 00029 #include "OgreHapticsNonCopyable.h" 00030 00031 namespace OgreHaptics { 00032 00062 template<typename T> class ConcurrentQueue : public NonCopyable 00063 { 00064 public: 00065 // Predeclaration 00066 class HazardPtr; 00067 private: 00068 // Structure which is used to contain data within the queue. 00069 struct Node 00070 { 00071 T data; 00072 Node* volatile next; 00073 00074 Node() 00075 : data(), next(0) {} 00076 Node(T val) 00077 : data(val), next(0) {} 00078 }; 00079 00080 // Head of single linked list of hazard pointers. 00081 HazardPtr* mHPHead; 00082 // Number of hazards present at most in list of hazard pointers. 00083 int mHPCount; 00084 // Head of queue. 00085 Node* volatile mHead; 00086 // Tail of queue. 00087 Node* volatile mTail; 00088 00089 typedef std::vector<Node*> NodeList; 00090 public: 00107 class HazardPtr : public NonCopyable 00108 { 00109 // Allow ConcurrentQueue full access. 00110 friend class ConcurrentQueue; 00111 public: 00113 HazardPtr() 00114 : active(1), 00115 next(0) 00116 { 00117 hazards[0] = 0; 00118 hazards[1] = 0; 00119 } 00121 virtual ~HazardPtr() {} 00122 protected: 00125 int active; 00128 HazardPtr* next; 00131 Node* hazards[2]; 00134 NodeList retiredList; 00135 }; 00136 00138 ConcurrentQueue() 00139 : mHPHead(0), 00140 mHPCount(0) 00141 { 00142 // Create dummy node ensuring both head and tail point at a node 00143 // in the queue to avoid problems when the queue is empty or 00144 // contains only a single item. 00145 mHead = mTail = new Node(); 00146 } 00147 00154 ~ConcurrentQueue() 00155 { 00156 // Cleanup list of hazard pointers 00157 while (mHPHead) 00158 { 00159 // Advance in list 00160 HazardPtr* tmp = mHPHead; 00161 mHPHead = mHPHead->next; 00162 00163 // Clean up left retired nodes 00164 NodeList::iterator it; 00165 for (it = tmp->retiredList.begin(); it != tmp->retiredList.end(); ++it) 00166 { 00167 delete *it; 00168 } 00169 tmp->retiredList.clear(); 00170 00171 delete tmp; 00172 } 00173 00174 // Cleanup head (head is actually never removed) 00175 delete mHead; 00176 } 00177 00187 HazardPtr* allocateHazardPtr(void) 00188 { 00189 // First try to use a retired hazard pointer 00190 HazardPtr* ptr = mHPHead; 00191 for (; ptr; ptr = ptr->next) 00192 { 00193 if (ptr->active != 0 || !atomicCompareAndSwap(&ptr->active, 0, 1)) 00194 { 00195 continue; 00196 } 00197 return ptr; 00198 } 00199 00200 // No hazard pointer available for reuse 00201 // Increment list count by two (maximum two hazards per thread) 00202 int count; 00203 do 00204 { 00205 count = mHPCount; 00206 } 00207 while (!atomicCompareAndSwap(&mHPCount, count, count + 2)); 00208 00209 // Allocate a new hazard pointer 00210 ptr = new HazardPtr(); 00211 // Push new hazard pointer to front 00212 HazardPtr* oldHead; 00213 do 00214 { 00215 oldHead = mHPHead; 00216 ptr->next = oldHead; 00217 } 00218 while (!atomicCompareAndSwapPtr(&mHPHead, oldHead, ptr)); 00219 00220 return ptr; 00221 } 00222 00229 void retireHazardPtr(HazardPtr* ptr) 00230 { 00231 // Clear hazards 00232 ptr->hazards[0] = 0; 00233 ptr->hazards[1] = 0; 00234 // Deactivate hazard pointer after ensuring previous operations 00235 // have been executed (compare-and-swap issues full memory barrier) 00236 atomicCompareAndSwap(&ptr->active, 1, 0); 00237 return; 00238 } 00239 00247 void push(HazardPtr* ptr, const T& value) 00248 { 00249 assert((ptr && ptr->active != 0) && "HazardPtr must be active."); 00250 00251 // Create new node 00252 Node* node = new Node(value); 00253 Node* tail; 00254 00255 while (true) 00256 { 00257 tail = mTail; 00258 // Hazard, tail may have been removed by other thread after 00259 // previous line has been executed 00260 ptr->hazards[0] = tail; 00261 // Some other thread might have removed tail and didn't find 00262 // hazardous references to it 00263 if (tail != mTail) 00264 { 00265 continue; 00266 } 00267 Node* next = tail->next; 00268 // We need to guarantee next is from actual tail in order to 00269 // prevent ABA problems 00270 if (tail != mTail) 00271 { 00272 continue; 00273 } 00274 if (next) 00275 { 00276 // Keep updating tail until it points to last node 00277 atomicCompareAndSwapPtr(&mTail, tail, next); 00278 continue; 00279 } 00280 // If tail points to last node update tail to point to new node 00281 if (atomicCompareAndSwapPtr(&tail->next, reinterpret_cast<Node*>(0), node)) 00282 { 00283 break; 00284 } 00285 } 00286 00287 // If tail points to what we thought was last node update tail to 00288 // point at new node 00289 atomicCompareAndSwapPtr(&mTail, tail, node); 00290 } 00291 00302 bool pop(HazardPtr* ptr, T& destination) 00303 { 00304 assert((ptr && ptr->active != 0) && "HazardPtr must be active."); 00305 00306 Node* head; 00307 00308 while (true) 00309 { 00310 head = mHead; 00311 ptr->hazards[0] = head; 00312 if (head != mHead) 00313 { 00314 continue; 00315 } 00316 00317 Node* tail = mTail; 00318 00319 Node* next = head->next; 00320 ptr->hazards[1] = next; 00321 if (head != mHead) 00322 { 00323 continue; 00324 } 00325 00326 // Queue is empty 00327 if (!next) 00328 { 00329 return false; 00330 } 00331 00332 if (head == tail) 00333 { 00334 // Tail is behind, move tail off of head 00335 atomicCompareAndSwapPtr(&mTail, tail, next); 00336 continue; 00337 } 00338 else 00339 { 00340 destination = next->data; 00341 // Move head pointer, effectively removing the node 00342 if (atomicCompareAndSwapPtr(&mHead, head, next)) 00343 { 00344 break; 00345 } 00346 } 00347 } 00348 00349 // Mark node for reclamation 00350 retireNode(ptr, head); 00351 return true; 00352 } 00353 private: 00355 void retireNode(HazardPtr* ptr, Node* node) 00356 { 00357 // Add node to retiredList of hazard pointer 00358 ptr->retiredList.push_back(node); 00359 HazardPtr* head = mHPHead; 00360 00361 // Check whether we need to scan for nodes eligible for reclamation 00362 //TODO comment omega 00363 static const float omega = 1.25f; 00364 if (ptr->retiredList.size() >= (omega * mHPCount)) 00365 { 00366 scan(ptr, head); 00367 helpScan(ptr); 00368 } 00369 } 00370 00375 void scan(HazardPtr* ptr, HazardPtr* head) 00376 { 00377 // Scan list of hazard pointers to collect all non-null hazards 00378 NodeList hazards; 00379 while (head) 00380 { 00381 for (int i = 0; i < 2; ++i) 00382 { 00383 Node* hazard = head->hazards[i]; 00384 if (hazard) 00385 { 00386 hazards.push_back(hazard); 00387 } 00388 } 00389 head = head->next; 00390 } 00391 00392 // Sort the list of hazards 00393 std::sort(hazards.begin(), hazards.end(), std::less<Node*>()); 00394 00395 // Check each hazard against retired nodes 00396 NodeList retiredList(ptr->retiredList); 00397 ptr->retiredList.clear(); 00398 NodeList::iterator it; 00399 for (it = retiredList.begin(); it != retiredList.end(); ++it) 00400 { 00401 if (!std::binary_search(hazards.begin(), hazards.end(), *it)) 00402 { 00403 // Node is eligible for reuse 00404 delete *it; 00405 } 00406 else 00407 { 00408 ptr->retiredList.push_back(*it); 00409 } 00410 } 00411 } 00412 00419 void helpScan(HazardPtr* ptr) 00420 { 00421 HazardPtr* head = mHPHead; 00422 for (; head; head = head->next) 00423 { 00424 if (head->active != 0 || !atomicCompareAndSwap(&head->active, 0, 1)) 00425 { 00426 // Only scan inactive hazard pointers. Inactive hazard 00427 // pointers are locked through compare-and-swap 00428 continue; 00429 } 00430 while (!head->retiredList.empty()) 00431 { 00432 // Add node to list owned by given hazard pointer 00433 ptr->retiredList.push_back(head->retiredList.back()); 00434 head->retiredList.pop_back(); 00435 HazardPtr* h = mHPHead; 00436 //TODO comment omega 00437 static const float omega = 1.25f; 00438 if (ptr->retiredList.size() > (omega * mHPCount)) 00439 { 00440 scan(ptr, h); 00441 } 00442 } 00443 // Deactivate hazard pointer 00444 head->active = 0; 00445 } 00446 } 00447 }; 00448 00449 } 00450 00451 #endif
Last modified Tue Jan 6 22:31:25 2009