Prodigy Engine supports multi-threading which can be used in a variety of use cases. The engine also supports handling multiple threads such as the main thread, generic work thread, and a log thread. This requires the use of asynchronous data structures that are thread-safe for both reading and writing data. The 2 thread-safe containers used in Prodigy is the Async Queue and the Async MPSC Ring Buffer.
Below are the implementations used for the 2 data structures starting with the asynchronous queue:
Async Queue
The specifications for the Async Queue are as follows:
- Be able to insert items and remove items from the queue
- Handle insert and remove from multiple threads
- Use locks to design a simple Async Queue
- Be templated to store objects of the desired type
The implementation is as follows:
template <typename TYPE> class AsyncQueue { public: AsyncQueue(); ~AsyncQueue(); void EnqueueLocked(TYPE const &element); bool DequeueLocked(TYPE* out); int GetLength() const; private: std::queue<TYPE> m_queue; std::mutex m_mutex; }; //------------------------------------------------------------------------------------------------------------------------------ template <typename TYPE> int AsyncQueue<TYPE>::GetLength() const { std::lock_guard<std::mutex> mutexLock(m_mutex); return m_queue.size(); } //------------------------------------------------------------------------------------------------------------------------------ template <typename TYPE> AsyncQueue<TYPE>::AsyncQueue() { } //------------------------------------------------------------------------------------------------------------------------------ template <typename TYPE> AsyncQueue<TYPE>::~AsyncQueue() { } //------------------------------------------------------------------------------------------------------------------------------ template <typename TYPE> void AsyncQueue<TYPE>::EnqueueLocked(TYPE const& element) { std::lock_guard<std::mutex> mutexLock(m_mutex); m_queue.push(element); } //------------------------------------------------------------------------------------------------------------------------------ template <typename TYPE> bool AsyncQueue<TYPE>::DequeueLocked(TYPE *out) { std::lock_guard<std::mutex> mutexLock(m_mutex); if (m_queue.empty()) { return false; } else { *out = m_queue.front(); m_queue.pop(); return true; } }
With this templated implementation, the Async Queue is able to store and use objects of different types and can handle read or write requests from any thread.
Note: The async queue implementation uses blocked enqueue and dequeue which is a design choice that blocks the thread from execution till the enqueue or dequeue takes place.
Async MPSC Ring Buffer
In the case of the Profiler used in Prodigy Engine, I needed to have a multiple producer single consumer asynchronous ring buffer that would be used to allocate ProfilerSample objects. This asynchronous ring buffer would be given either a fixed size of memory or be allowed to use a block allocator to assign more memory if and when required.
Below is the implementation for the same:
struct RingBufferMeta_T { uint bufferObjectSize : 31; uint isBufferObjectUnlocked : 1; }; //------------------------------------------------------------------------------------------------------------------------------ // This is a Multiple Producer Single Consumer Async Ring Buffer. //------------------------------------------------------------------------------------------------------------------------------ class MPSCRingBuffer { public: MPSCRingBuffer(); ~MPSCRingBuffer(); bool InitializeBuffer(size_t sizeInBytes); void ReleaseBuffer(); //Instantly returns, either receive valid data pointer or a nullptr void* TryLockWrite(size_t size); //Block until there room to write onto buffer void* LockWrite(size_t size); void UnlockWrite(void* ptr); size_t GetWritableSpace() const; void* TryLockRead(size_t* outSize); void* LockRead(size_t* outSize); void UnlockRead(void* ptr); private: byte* m_buffer = nullptr; size_t m_byteSize = 0; uint m_writeHead = 0; uint m_readHead = 0; std::mutex m_lock; };
bool MPSCRingBuffer::InitializeBuffer(size_t sizeInBytes) { if (m_buffer == nullptr) { m_buffer = (byte*)malloc(sizeInBytes); m_byteSize = sizeInBytes; return true; } else { ReleaseBuffer(); return InitializeBuffer(sizeInBytes); } } //------------------------------------------------------------------------------------------------------------------------------ void MPSCRingBuffer::ReleaseBuffer() { if (m_buffer != nullptr) { free(m_buffer); m_buffer = nullptr; m_byteSize = 0U; } } //------------------------------------------------------------------------------------------------------------------------------ void* MPSCRingBuffer::TryLockWrite(size_t writeSize) { ASSERT_OR_DIE(writeSize < (1 << 31), "The size of object to write exceeded the total size of the MPSC ring buffer"); size_t metaDataSize = sizeof(RingBufferMeta_T); size_t totalSize = 2 * metaDataSize + writeSize; //If the space available is less than the size of meta data + write data, return nullptr std::scoped_lock lock(m_lock); if (GetWritableSpace() < totalSize) { return nullptr; } //We have enough space in the buffer so let's write meta and alloc the required space uint newHead = (uint)(m_writeHead + totalSize); if (newHead > m_byteSize) { //Buffer needs to wrap, so let's write a skip meta buffer so the read head will wrap at this point RingBufferMeta_T* skipBufferEntry = (RingBufferMeta_T*)(m_buffer + m_writeHead); skipBufferEntry->bufferObjectSize = 0; // 0 means skip; skipBufferEntry->isBufferObjectUnlocked = 1; m_writeHead = 0; } //ASfter we wrapped we would like to check if the buffer has space again if (GetWritableSpace() < totalSize) { return nullptr; } //My write usable buffer byte* usableBuffer = m_buffer + m_writeHead; RingBufferMeta_T* head = (RingBufferMeta_T*)usableBuffer; head->bufferObjectSize = (uint)writeSize; head->isBufferObjectUnlocked = 0; //uint usedHead = m_writeHead; m_writeHead += (int)totalSize; //we moved by 2 meta data in size so let's get back by 1 meta data size (TotalSize is 2*metaDataSize + writeSize) m_writeHead -= (int)metaDataSize; return head + 1; } //------------------------------------------------------------------------------------------------------------------------------ // No matter how long it takes, this call will lock the thread and make it wait for a write operation //------------------------------------------------------------------------------------------------------------------------------ void* MPSCRingBuffer::LockWrite(size_t size) { void* ptr = TryLockWrite(size); while (ptr == nullptr) { std::this_thread::yield(); ptr = TryLockWrite(size); } return ptr; } //------------------------------------------------------------------------------------------------------------------------------ size_t MPSCRingBuffer::GetWritableSpace() const { // size_t remaining = (m_read_head - m_write_head - 1) % m_size; size_t remaining = 0; if (m_writeHead >= m_readHead) { remaining = m_byteSize - m_writeHead; remaining += m_readHead; } else { remaining = m_readHead - m_writeHead; } return remaining; } //------------------------------------------------------------------------------------------------------------------------------ void MPSCRingBuffer::UnlockWrite(void* ptr) { RingBufferMeta_T* writeHead = (RingBufferMeta_T*)ptr; --writeHead; writeHead->isBufferObjectUnlocked = 1; } //------------------------------------------------------------------------------------------------------------------------------ void* MPSCRingBuffer::TryLockRead(size_t* outSize) { std::scoped_lock lock(m_lock); while (true) { //If the buffer is empty return if (m_readHead == m_writeHead) { return nullptr; } //Cast to Meta object and figure how big the buffer is to return it RingBufferMeta_T* readMeta = (RingBufferMeta_T*)(m_buffer + m_readHead); if (readMeta->isBufferObjectUnlocked) { if (readMeta->bufferObjectSize == 0) { // Wrap around case m_readHead = 0; } else { // valid read case *outSize = readMeta->bufferObjectSize; // SINGLE CONSUMER CASE - nothing else happens void* returnBuffer = readMeta + 1; return returnBuffer; } } else { // only one consumer - this shouldn't happen return nullptr; } } } //------------------------------------------------------------------------------------------------------------------------------ void* MPSCRingBuffer::LockRead(size_t* outSize) { void* ptr = TryLockRead(outSize); while (ptr == nullptr) { std::this_thread::yield(); ptr = TryLockRead(outSize); } return ptr; } //------------------------------------------------------------------------------------------------------------------------------ void MPSCRingBuffer::UnlockRead(void* ptr) { std::scoped_lock lock(m_lock); RingBufferMeta_T* readHead = (RingBufferMeta_T*)ptr; readHead--; ASSERT_RECOVERABLE(((m_buffer + m_readHead) == (byte*)readHead), "The read head for MPSC Async Ring Buffer is invalid"); m_readHead += sizeof(RingBufferMeta_T) + readHead->bufferObjectSize; }
With this implementation, I was able to use the MPSC Ring Buffer in my profiler. Note however that similar to the case of Async Queue, the MPSC Ring Buffer also has to block write and read calls.