#include "threadmanager.hpp" inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo(sprawl::threading::ThreadManager::Task&& what_, uint64_t where_, int64_t when_) : what(std::move(what_)) , where(where_) , when(when_) { // } inline sprawl::threading::ThreadManager::TaskInfo::TaskInfo(const sprawl::threading::ThreadManager::Task& what_, uint64_t where_, int64_t when_) : what(what_) , where(where_) , when(when_) { // } void sprawl::threading::ThreadManager::AddThread(uint64_t threadFlags, const char* const threadName, uint64_t secondaryFlags) { m_threads.PushFront(new Thread(threadName, std::bind(&ThreadManager::eventLoop_, this, threadFlags, secondaryFlags))); } void sprawl::threading::ThreadManager::AddThread(uint64_t threadFlags, uint64_t secondaryFlags) { m_threads.PushFront(new Thread(std::bind(&ThreadManager::eventLoop_, this, threadFlags, secondaryFlags))); } void sprawl::threading::ThreadManager::AddThreads(uint64_t threadFlags, int count, const char* const threadName, uint64_t secondaryFlags) { for(int i = 0; i < count; ++i) { AddThread(threadFlags, threadName, secondaryFlags); } } void sprawl::threading::ThreadManager::AddThreads(uint64_t threadFlags, int count, uint64_t secondaryFlags) { for(int i = 0; i < count; ++i) { AddThread(threadFlags, secondaryFlags); } } void sprawl::threading::ThreadManager::AddTask(sprawl::threading::ThreadManager::Task&& task, uint64_t threadFlags, int64_t whenNanosecs) { pushTask_(TaskInfo(std::move(task), threadFlags, whenNanosecs)); } void sprawl::threading::ThreadManager::AddTask(const sprawl::threading::ThreadManager::Task& task, uint64_t threadFlags, int64_t whenNanosecs) { pushTask_(TaskInfo(task, threadFlags, whenNanosecs)); } void sprawl::threading::ThreadManager::AddFutureTask(sprawl::threading::ThreadManager::Task&& task, uint64_t threadFlags, int64_t nanosecondsFromNow) { AddTask(std::move(task), threadFlags, nanosecondsFromNow + time::Now(time::Resolution::Nanoseconds)); } void sprawl::threading::ThreadManager::AddFutureTask(const sprawl::threading::ThreadManager::Task& task, uint64_t threadFlags, int64_t nanosecondsFromNow) { AddTask(task, threadFlags, nanosecondsFromNow + time::Now(time::Resolution::Nanoseconds)); } void sprawl::threading::ThreadManager::Run(uint64_t thisThreadFlags, uint64_t secondaryFlags) { m_running = true; for(auto& thread : m_threads) { thread->Start(); } eventLoop_(thisThreadFlags, secondaryFlags); } void sprawl::threading::ThreadManager::Start(uint64_t thisThreadFlags, uint64_t secondaryFlags) { m_running = true; m_callingThreadFlags = thisThreadFlags; m_callingThreadSecondaryFlags = secondaryFlags; for(auto& thread : m_threads) { thread->Start(); } } void sprawl::threading::ThreadManager::Pump() { Task taskToExecute; collections::List<TaskInfo>::iterator secondaryTask = m_taskQueue.end(); { ScopedLock lock(m_mutex); int64_t now = time::Now(time::Resolution::Nanoseconds); for(auto it = m_taskQueue.begin(); it != m_taskQueue.end(); ) { auto delete_it = it++; if(delete_it->when > now) { break; } if((delete_it->where & m_callingThreadFlags) == 0) { if((delete_it->where & m_callingThreadSecondaryFlags) == 0 && secondaryTask == m_taskQueue.end()) { secondaryTask = delete_it; } continue; } taskToExecute = delete_it->what; m_taskQueue.Erase(delete_it); break; } if(!taskToExecute && secondaryTask != m_taskQueue.end() && (secondaryTask->when - now) > m_secondaryTaskWindow) { taskToExecute = secondaryTask->what; m_taskQueue.Erase(secondaryTask); } } if(taskToExecute) { taskToExecute(); } } void sprawl::threading::ThreadManager::Wait() { SharedLock lock(m_mutex); while(m_running) { int64_t now = time::Now(time::Resolution::Nanoseconds); int64_t nextTaskTime = -1; for(auto it = m_taskQueue.begin(); it != m_taskQueue.end(); ++it) { if((it->where & m_callingThreadFlags) == 0 && (it->where & m_callingThreadSecondaryFlags) == 0) { continue; } if(it->when <= now) { return; } nextTaskTime = it->when; break; } if(nextTaskTime >= 0) { m_conditionVariable.WaitUntil(lock, nextTaskTime); } else { m_conditionVariable.Wait(lock); } } } void sprawl::threading::ThreadManager::Stop() { { ScopedLock lock(m_mutex); m_running = false; } m_conditionVariable.NotifyAll(); } void sprawl::threading::ThreadManager::ShutDown() { for(auto& thread : m_threads) { thread->Join(); delete thread; } m_threads.Clear(); } void sprawl::threading::ThreadManager::pushTask_(TaskInfo&& task) { { ScopedLock lock(m_mutex); bool inserted = false; for(auto it = m_taskQueue.begin(); it != m_taskQueue.end(); ++it) { if(task.when < it->when) { m_taskQueue.Insert(it, std::move(task)); inserted = true; break; } } if(!inserted) { m_taskQueue.PushBack(std::move(task)); } } m_conditionVariable.NotifyAll(); } void sprawl::threading::ThreadManager::eventLoop_(uint64_t flags, uint64_t secondaryFlags) { while(m_running) { Task taskToExecute; { SharedLock lock(m_mutex); collections::List<TaskInfo>::iterator secondaryTask = m_taskQueue.end(); while(m_running && !taskToExecute) { int64_t now = time::Now(time::Resolution::Nanoseconds); int64_t nextTaskTime = -1; for(auto it = m_taskQueue.begin(); it != m_taskQueue.end(); ) { auto delete_it = it++; if((delete_it->where & flags) == 0) { if((delete_it->where & secondaryFlags) != 0 && delete_it->when <= now && secondaryTask == m_taskQueue.end()) { secondaryTask = delete_it; } continue; } if(delete_it->when <= now && !taskToExecute) { taskToExecute = delete_it->what; m_taskQueue.Erase(delete_it); continue; } nextTaskTime = delete_it->when; break; } if(!taskToExecute) { if(secondaryTask != m_taskQueue.end() && (secondaryTask->when - now) > m_secondaryTaskWindow) { taskToExecute = secondaryTask->what; m_taskQueue.Erase(secondaryTask); } else { if(nextTaskTime >= 0) { m_conditionVariable.WaitUntil(lock, nextTaskTime); } else { m_conditionVariable.Wait(lock); } } } } if(!m_running) { return; } } taskToExecute(); taskToExecute = nullptr; } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#7 | 16070 | ShadauxCat |
Fixed possible hang during ThreadManager::RunStaged() and ThreadManager::Sync() in a pre-emptive environment if the mailman thread happens to be pre-empted right after calling m_mailSyncEvent.Notify(). Probably a better fix for this will come in the future. I'm overall not fully sold on the current implementation of Sync(). #review-16071 |
||
#6 | 16052 | ShadauxCat |
- Changed default block size for concurrent queue to a more reasonable value - Changed some memory orders to memory_order_seq_cst when they don't actually need to be that to get around a bug in visual studio 2013 - debug builds assert when memory_order_acq_rel is used for a compare_exchange_strong (this is a standard library bug and is fixed in VS2015) - Added Event API - events are an alternative to condition variables that do not require a mutex and are guaranteed not to miss any signals, even if the signal comes while the thread is not listening for it. Unlike condition variables, however, they do not support broadcasting (and in fact, in general, are not safe to use with multiple threads listening for the same event simultaneously - though notifying on the same event is fine) - Rewrote ThreadManager around ConcurrentQueue and Event API so it is now lock-free. Also improved some behaviors of the staged thread manager operation so it now supports tasks that can be run on multiple stages via a bitmask. - Fixed an issue where the Coroutine copy constructor was calling the std::function constructor instead and another where initializing with a stack might try to call the wrong constructor and vice-versa - Fixed Coroutine never calling munmap() on its stack in linux and causing a memory leak - Added default arguments to time functions - Attempted to fix some issues with BinaryTree. Fixed some but not all. It's currently not suitable for use, sadly. - Logging Improvements: - - Added thread ID to logging - - Fixed some issues with category handlers - - Added backtraces - - Added the following additional log macros: - - - LOG_IF - - - LOG_EVERY_N - - - LOG_FIRST_N - - - LOG_IF_EVERY_N - - - LOG_IF_FIRST_N - - - LOG_ASSERT - - Added the ability to set extra info callbacks to get data such as script backtraces - - Removed the thread-related handlers and replaced them with RunHandler_Threaded and RunHandler_ThreadManager, which will enable any passed-in handler to be run in a threaded fashion - Removed StaticPoolAllocator and renamed DynamicPoolAllocator to PoolAllocator; adjusted unit tests accordingly - PoolAllocator now allocates its pool with mmap and VirtualAlloc, rather than with malloc - Fixed a bug with Vector copy assignment operator - Improved performance of StringBuilder considerably for cases where there are no modifier strings - Removed Copy-On-Write behavior of JSONToken as it was broken; copies are now performed with explicit DeepCopy() and ShallowCopy() functions - Fixed some parser bugs with JSONToken - Added iteration to JSONToken to iterate its children - Fixed crash when reading a negative number of bytes from a file - Changed StringBuilder to favor speed instead of memory by default - Added some performance unit tests for JSON token #review-16053 |
||
#5 | 14833 | ShadauxCat |
First checkin of logging module. Also fixes the following issues: -Added UpperBound() and LowerBound() to BinaryTree and created appropriate unit tests -Added Sync() to ThreadManager to force it to run all tasks to completion and not return until it has no tasks left -Fixed a bug in String::format() where a non-numeric value inside {} would be treated as an empty {}; it now simply prints whatever the value was. (i.e., "{blah}".format(foo) simply returns "{blah}") -Added Reset() to sprawl::StringBuilder -Disabled the switch-enum warning flag in gcc because it's stupid and ridiculous that a default case doesn't shut it up -Made sprawl::Mutex movable. This may turn out to be a bad idea but it enabled keeping them in a map. -Fixed a name collission between HashMap and BinaryTree; both defined sprawl::collections::detail::UnderlyingType and ::MethodType. Prefixed the ones in BinaryTree with "Tree". This isn't the best solution, but it works for now. #review-14834 |
||
#4 | 14783 | ShadauxCat |
Style corrections (placement of const) #review-14784 |
||
#3 | 14163 | ShadauxCat |
-Renamed HashMap functions to follow coding style. Only begin, end, find, and variants are left lowercase, in keeping with C++ algorithm and range-based for support. -Fixed some accounting issues with list and forwardlist; size wasn't properly being maintained. -Made a small pedantic change to ThreadManager to ensure that m_numThreadsSynced got reset to 0 before the NotifyAll() to eliminate the miniscule potential for deadlock it would cause if it happened after another thread had already woken up. #review-14164 |
||
#2 | 14161 | ShadauxCat |
-Added staged option for ThreadManager and corresponding unit test -Added operator[] and getOrInsert() in HashMap. getOrInsert() doesn't follow standard but it's consistent with the rest of the HashMap interface; I'll change them when I go back and redo that interface to fit the style. #review-14162 |
||
#1 | 12508 | ShadauxCat |
-Added threading library. Currently only functional for Linux; Windows will fail to link. (I will fix this soon.) -Fixed missing move and copy constructors in List and ForwardList -Fixed broken move constructor in HashMap -Fixed missing const get() in HashMap -Fixed broken operator-> in ListIterator -Added sprawl::noncopyable -Added sketch headers for filesystem library -Made StringLiteral hashable, added special hashes for pointers and integers in murmur3 -Fixed compiler warning in async_network -Updated memory allocators to use new threading library for mutexes -Added accessibility to sprawl::StringLiteral to be able toa ccess its pointer and length and perform pointer comparisons #review-12504 |