Line data Source code
1 : // 2 : // Copyright 2024 OpenModelViewer Authors 3 : // 4 : // Licensed under the Apache License, Version 2.0 (the "License"); 5 : // you may not use this file except in compliance with the License. 6 : // You may obtain a copy of the License at 7 : // 8 : // http://www.apache.org/licenses/LICENSE-2.0 9 : // 10 : // Unless required by applicable law or agreed to in writing, software 11 : // distributed under the License is distributed on an "AS IS" BASIS, 12 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 : // See the License for the specific language governing permissions and 14 : // limitations under the License. 15 : // 16 : 17 : #pragma once 18 : 19 : #include "openmodelviewer/core/lifecycle/irunnable.hpp" 20 : #include "openmodelviewer/core/async/itask.hpp" 21 : #include "openmodelviewer/core/async/task.hpp" 22 : #include "openmodelviewer/core/async/task_data.hpp" 23 : 24 : #include <memory> 25 : #include <atomic> 26 : #include <mutex> 27 : #include <vector> 28 : #include <thread> 29 : #include <queue> 30 : #include <type_traits> 31 : #include <functional> 32 : #include <stdexcept> 33 : #include <condition_variable> 34 : 35 : namespace openmodelviewer::core::async 36 : { 37 : /** 38 : * @brief Fixed-size thread pool for executing asynchronous tasks. 39 : * 40 : * The ThreadPool manages a set of worker threads which process submitted 41 : * tasks from a shared queue. Tasks are expected to implement the ITask interface 42 : * and are executed in FIFO order. This class is thread-safe and intended 43 : * to be used internally by systems such as resource management, loaders, or 44 : * background processing. 45 : * 46 : * @note The pool size is fixed at creation and does not resize dynamically. 47 : * @note The pool does not expose execution state or callbacks directly; use TaskData for tracking. 48 : */ 49 : class ThreadPool : public lifecycle::IRunnable 50 : { 51 : public: 52 : /** 53 : * @brief Constructs a new thread pool with the given number of worker threads. 54 : * 55 : * @param poolSize The number of worker threads to create. 56 : */ 57 : ThreadPool(size_t poolSize); 58 : 59 : /** 60 : * @brief Destroys the thread pool and stops all threads. 61 : * 62 : * Calls stop() to ensure all tasks are completed before shutdown. 63 : */ 64 : ~ThreadPool(); 65 : 66 : /** 67 : * @brief Starts the thread pool with the predefined number of worker threads. 68 : * 69 : * Creates and launches worker threads that wait for tasks to execute. 70 : * If the pool is already running, this call has no effect. 71 : * 72 : * @return true if the pool was successfully started. 73 : */ 74 : bool start() override; 75 : 76 : /** 77 : * @brief Stops the thread pool and joins all worker threads. 78 : * 79 : * Signals all threads to exit, notifies any waiting threads, and joins them. 80 : * Safe to call multiple times; redundant calls have no effect. 81 : */ 82 : void stop() override; 83 : 84 : /** 85 : * @brief Checks if the thread pool is currently running. 86 : * 87 : * @return true if the pool has been started and is still active. 88 : */ 89 : bool isRunning() const noexcept override; 90 : 91 : /** 92 : * @brief Submits a callable for asynchronous execution and returns its tracking data. 93 : * 94 : * This overload creates a Task and TaskData from a callable object, 95 : * submits it to the pool, and returns a shared pointer to the TaskData 96 : * that can be used to monitor the task state and result. 97 : * 98 : * @tparam FType The type of the callable (lambda, functor, etc.) 99 : * @param func The callable to execute. 100 : * @return A shared_ptr to TaskData<RType> where RType is the return type of FType. 101 : * @throws std::runtime_error If the pool is not running. 102 : */ 103 : template<typename FType> 104 91 : inline auto submit(FType&& func) -> std::shared_ptr<TaskData<std::invoke_result_t<FType>>> 105 : { 106 91 : if (!isRunning()) 107 : { 108 1 : throw std::runtime_error("Cannot submit task to a non-running ThreadPool."); 109 : } 110 : 111 : using RType = std::invoke_result_t<FType>; 112 : 113 90 : auto wrapped = std::function<RType()>(std::forward<FType>(func)); 114 90 : auto data = std::make_shared<TaskData<RType>>(); 115 90 : auto task_ptr = Task<RType>::make(data, std::move(wrapped)); 116 : 117 90 : this->submit(std::move(task_ptr)); 118 : 119 180 : return data; 120 90 : } 121 : 122 : /** 123 : * @brief Submits a pre-constructed Task for execution. 124 : * 125 : * This overload is intended for internal systems (such as schedulers or resource queues) 126 : * that create tasks explicitly and wish to submit them directly. 127 : * 128 : * @tparam RType The return type of the task. 129 : * @param task_ptr The unique_ptr to the task object. 130 : * @throws std::runtime_error If the pool is not running. 131 : */ 132 : template<typename RType> 133 90 : inline void submit(std::unique_ptr<Task<RType>> task_ptr) 134 : { 135 90 : if (!isRunning()) 136 : { 137 0 : throw std::runtime_error("Cannot submit task to a non-running ThreadPool."); 138 : } 139 : 140 : { 141 90 : std::lock_guard lock(m_mutex); 142 90 : m_tasks.push(std::move(task_ptr)); 143 90 : } 144 : 145 90 : m_condition.notify_one(); 146 90 : } 147 : 148 : /** 149 : * @brief Computes the optimal number of threads to use for the thread pool. 150 : * 151 : * The result is based on the number of hardware threads available, 152 : * subtracting a few reserved threads for other systems like rendering or audio. 153 : * 154 : * @param reserved Number of threads to reserve for other engine components. 155 : * @return The optimal thread count, with a minimum of 1. 156 : */ 157 : static size_t computeOptimalPoolSize(size_t reserved = 0); 158 : 159 : private: 160 : std::atomic<bool> m_running; 161 : std::condition_variable m_condition; 162 : 163 : std::atomic<size_t> m_activeTasks = 0; 164 : std::condition_variable m_idleCondition; 165 : 166 : size_t m_poolSize; 167 : mutable std::mutex m_mutex; 168 : std::vector<std::thread> m_workers; 169 : std::queue<std::unique_ptr<ITask>> m_tasks; 170 : 171 : void workLoop(); 172 : void waitIdle(); 173 : }; 174 : } // namespace openmodelviewer::core::async