namespace sprawl { namespace threading { class ThreadManager; } } #include <stdint.h> #include <functional> #include "../time/time.hpp" #include "../collections/Vector.hpp" #include "../collections/ConcurrentQueue.hpp" #include "../collections/BinaryTree.hpp" #include "../collections/HashMap.hpp" #include "thread.hpp" #include "mutex.hpp" #include "condition_variable.hpp" #include "event.hpp" #include <atomic> class sprawl::threading::ThreadManager { public: typedef std::function<void()> Task; ThreadManager(); ~ThreadManager(); void AddThread(uint64_t threadFlags, char const* const threadName); void AddThread(uint64_t threadFlags); void AddThreads(uint64_t threadFlags, int count, char const* const threadName); void AddThreads(uint64_t threadFlags, int count); void AddTask(Task&& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void AddTask(Task const& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void AddTaskStaged(uint64_t stage, Task&& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void AddTaskStaged(uint64_t stage, Task const& task, uint64_t threadFlags, int64_t whenNanosecs = time::Now(time::Resolution::Nanoseconds)); void SetMaxStage(uint64_t maxStage); void AddFutureTask(Task&& task, uint64_t threadFlags, int64_t nanosecondsFromNow); void AddFutureTask(Task const& task, uint64_t threadFlags, int64_t nanosecondsFromNow); void AddFutureTaskStaged(uint64_t stage, Task&& task, uint64_t threadFlags, int64_t nanosecondsFromNow); void AddFutureTaskStaged(uint64_t stage, Task const& task, uint64_t threadFlags, int64_t nanosecondsFromNow); /** * @brief Start all threads and include the calling thread in a loop controlled by the thread manager * @param thisThreadFlags The flags that apply to the calling thread */ void Run(uint64_t thisThreadFlags); void RunStaged(uint64_t thisThreadFlags); /** * @brief Start all threads but do not block on the calling thread. * @details If thisThreadFlags is not 0, the calling thread will be added to the thread pool. * It will then be up to the calling thread to call Pump() to execute the tasks * that get queued up for it. * @param thisThreadFlags The flags that apply to the calling thread */ void Start(uint64_t thisThreadFlags); uint64_t RunNextStage(); void Pump(); void Wait(); uint64_t Sync(); void Stop(); void ShutDown(); private: struct TaskInfo { TaskInfo(); TaskInfo(Task&& what_, uint64_t where_, int64_t when_); TaskInfo(Task const& what_, uint64_t where_, int64_t when_); TaskInfo(Task&& what_, uint64_t where_, int64_t when_, int64_t stage); TaskInfo(Task const& what_, uint64_t where_, int64_t when_, int64_t stage); TaskInfo(TaskInfo&& other); TaskInfo& operator=(TaskInfo&& other); Task what; uint64_t where; int64_t when; std::atomic<bool> taken; uint64_t stage; inline int64_t When() { return when; } }; struct ThreadData { explicit ThreadData(uint64_t flags_) : flags(flags_) , mailbox() { } uint64_t flags; Event mailbox; }; struct ThreadInfo { ThreadInfo(ThreadData* data_, std::function<void()> fn) : data(data_) , thread(new Thread(fn)) { } ThreadInfo(ThreadData* data_, char const* const threadName, std::function<void()> fn) : data(data_) , thread(new Thread(threadName, fn)) { } ThreadInfo(ThreadData* data_) : data(data_) , thread(nullptr) { } ThreadInfo(ThreadInfo&& other) : data(other.data) , thread(other.thread) { other.data = nullptr; other.thread = nullptr; } ~ThreadInfo() { if(data) { delete data; } if(thread) { delete thread; } } ThreadData* data; Thread* thread; }; struct FlagGroup { collections::Vector<Event*> events; collections::ConcurrentQueue<TaskInfo*> taskQueue; }; void pump_(); void pushTask_(TaskInfo* info); void eventLoop_(ThreadData* threadData); void mailMan_(); collections::ConcurrentQueue<TaskInfo*> m_taskQueue; collections::BasicHashMap<int64_t, FlagGroup*> m_flagGroups; Event* m_mainThreadMailbox; collections::ConcurrentQueue<TaskInfo*>* m_mainThreadQueue; collections::Vector<ThreadInfo> m_threads; Thread m_mailmanThread; std::atomic<bool> m_running; uint64_t m_currentStage; uint64_t m_maxStage; enum class SyncState { None, Mailman, Threads, }; std::atomic<SyncState> m_syncState; Event m_workerSyncEvent; Event m_mailmanSyncEvent; std::atomic<size_t> m_syncCount; Event m_mailReady; };
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#4 | 16052 | ShadauxCat |
- Changed default block size for concurrent queue to a more reasonable value - Changed some memory orders to memory_order_seq_cst when they don't actually need to be that to get around a bug in visual studio 2013 - debug builds assert when memory_order_acq_rel is used for a compare_exchange_strong (this is a standard library bug and is fixed in VS2015) - Added Event API - events are an alternative to condition variables that do not require a mutex and are guaranteed not to miss any signals, even if the signal comes while the thread is not listening for it. Unlike condition variables, however, they do not support broadcasting (and in fact, in general, are not safe to use with multiple threads listening for the same event simultaneously - though notifying on the same event is fine) - Rewrote ThreadManager around ConcurrentQueue and Event API so it is now lock-free. Also improved some behaviors of the staged thread manager operation so it now supports tasks that can be run on multiple stages via a bitmask. - Fixed an issue where the Coroutine copy constructor was calling the std::function constructor instead and another where initializing with a stack might try to call the wrong constructor and vice-versa - Fixed Coroutine never calling munmap() on its stack in linux and causing a memory leak - Added default arguments to time functions - Attempted to fix some issues with BinaryTree. Fixed some but not all. It's currently not suitable for use, sadly. - Logging Improvements: - - Added thread ID to logging - - Fixed some issues with category handlers - - Added backtraces - - Added the following additional log macros: - - - LOG_IF - - - LOG_EVERY_N - - - LOG_FIRST_N - - - LOG_IF_EVERY_N - - - LOG_IF_FIRST_N - - - LOG_ASSERT - - Added the ability to set extra info callbacks to get data such as script backtraces - - Removed the thread-related handlers and replaced them with RunHandler_Threaded and RunHandler_ThreadManager, which will enable any passed-in handler to be run in a threaded fashion - Removed StaticPoolAllocator and renamed DynamicPoolAllocator to PoolAllocator; adjusted unit tests accordingly - PoolAllocator now allocates its pool with mmap and VirtualAlloc, rather than with malloc - Fixed a bug with Vector copy assignment operator - Improved performance of StringBuilder considerably for cases where there are no modifier strings - Removed Copy-On-Write behavior of JSONToken as it was broken; copies are now performed with explicit DeepCopy() and ShallowCopy() functions - Fixed some parser bugs with JSONToken - Added iteration to JSONToken to iterate its children - Fixed crash when reading a negative number of bytes from a file - Changed StringBuilder to favor speed instead of memory by default - Added some performance unit tests for JSON token #review-16053 |
||
#3 | 14833 | ShadauxCat |
First checkin of logging module. Also fixes the following issues: -Added UpperBound() and LowerBound() to BinaryTree and created appropriate unit tests -Added Sync() to ThreadManager to force it to run all tasks to completion and not return until it has no tasks left -Fixed a bug in String::format() where a non-numeric value inside {} would be treated as an empty {}; it now simply prints whatever the value was. (i.e., "{blah}".format(foo) simply returns "{blah}") -Added Reset() to sprawl::StringBuilder -Disabled the switch-enum warning flag in gcc because it's stupid and ridiculous that a default case doesn't shut it up -Made sprawl::Mutex movable. This may turn out to be a bad idea but it enabled keeping them in a map. -Fixed a name collission between HashMap and BinaryTree; both defined sprawl::collections::detail::UnderlyingType and ::MethodType. Prefixed the ones in BinaryTree with "Tree". This isn't the best solution, but it works for now. #review-14834 |
||
#2 | 14161 | ShadauxCat |
-Added staged option for ThreadManager and corresponding unit test -Added operator[] and getOrInsert() in HashMap. getOrInsert() doesn't follow standard but it's consistent with the rest of the HashMap interface; I'll change them when I go back and redo that interface to fit the style. #review-14162 |
||
#1 | 12508 | ShadauxCat |
-Added threading library. Currently only functional for Linux; Windows will fail to link. (I will fix this soon.) -Fixed missing move and copy constructors in List and ForwardList -Fixed broken move constructor in HashMap -Fixed missing const get() in HashMap -Fixed broken operator-> in ListIterator -Added sprawl::noncopyable -Added sketch headers for filesystem library -Made StringLiteral hashable, added special hashes for pointers and integers in murmur3 -Fixed compiler warning in async_network -Updated memory allocators to use new threading library for mutexes -Added accessibility to sprawl::StringLiteral to be able toa ccess its pointer and length and perform pointer comparisons #review-12504 |