Line data Source code
1 : // 2 : // Copyright 2024 OpenModelViewer Authors 3 : // 4 : // Licensed under the Apache License, Version 2.0 (the "License"); 5 : // you may not use this file except in compliance with the License. 6 : // You may obtain a copy of the License at 7 : // 8 : // http://www.apache.org/licenses/LICENSE-2.0 9 : // 10 : // Unless required by applicable law or agreed to in writing, software 11 : // distributed under the License is distributed on an "AS IS" BASIS, 12 : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 : // See the License for the specific language governing permissions and 14 : // limitations under the License. 15 : // 16 : 17 : #pragma once 18 : 19 : #include "openmodelviewer/core/lifecycle/irunnable.hpp" 20 : 21 : #include "openmodelviewer/core/async/task_type.hpp" 22 : #include "openmodelviewer/core/async/itask.hpp" 23 : #include "openmodelviewer/core/async/itask_data.hpp" 24 : #include "openmodelviewer/core/async/task_result.hpp" 25 : #include "openmodelviewer/core/async/task_handle.hpp" 26 : #include "openmodelviewer/core/async/task.hpp" 27 : #include "openmodelviewer/core/async/task_data.hpp" 28 : #include "openmodelviewer/core/async/task_handle_generator.hpp" 29 : #include "openmodelviewer/core/async/thread_pool.hpp" 30 : 31 : #include <mutex> 32 : #include <vector> 33 : #include <memory> 34 : #include <functional> 35 : #include <stdexcept> 36 : #include <unordered_map> 37 : 38 : namespace openmodelviewer::core::async 39 : { 40 : /** 41 : * @brief Asynchronous task scheduler. 42 : * 43 : * The TaskScheduler manages the lifecycle of a thread pool and allows 44 : * asynchronous tasks to be submitted and executed in the background. 45 : * 46 : * Currently, all submitted tasks are routed to a single internal thread pool, 47 : * regardless of their TaskType or priority. No categorization or task sorting 48 : * is performed at this stage. This behavior is planned to evolve in the future, 49 : * where task types and priorities may be used to route tasks to specialized pools. 50 : */ 51 : class TaskScheduler : public lifecycle::IRunnable 52 : { 53 : public: 54 : /** 55 : * @brief Constructs the TaskScheduler. 56 : * @param reserved_processes Number of CPU threads to reserve for the system. 57 : * The internal thread pool will be sized based on the remaining available cores. 58 : */ 59 : TaskScheduler(size_t reserved_processes = 0); 60 : 61 : /** 62 : * @brief Destructor. 63 : * 64 : * Automatically stops the TaskScheduler processes if it is running. 65 : */ 66 : ~TaskScheduler(); 67 : 68 : /** 69 : * @brief Starts the TaskScheduler. 70 : * @return true if the pool was successfully started, false otherwise. 71 : */ 72 : bool start() override; 73 : 74 : /** 75 : * @brief Stops the TaskScheduler. 76 : * 77 : * This operation is safe to call multiple times. 78 : */ 79 : void stop() override; 80 : 81 : /** 82 : * @brief Checks whether the TaskScheduler is currently running. 83 : * @return true if the TaskScheduler is active, false otherwise. 84 : */ 85 : bool isRunning() const noexcept override; 86 : 87 : /** 88 : * @brief Returns the internal ITaskData pointer associated with a given task handle. 89 : * 90 : * This method allows external systems to query or inspect the state of a task 91 : * by accessing its abstract ITaskData interface. The caller can use this 92 : * pointer to check if the task has completed, failed, or to rethrow exceptions. 93 : * 94 : * @param handle The task handle identifying the scheduled task. 95 : * @return A shared pointer to the associated ITaskData instance, or nullptr if not found. 96 : * 97 : * @note The returned shared pointer remains valid as long as the caller holds a reference. 98 : * However, once the task is completed, internal systems stop interacting with it. 99 : * It becomes a passive data object: no further updates or lifecycle management will occur. 100 : */ 101 : std::shared_ptr<ITaskData> getTaskData(const TaskHandle& handle) const; 102 : 103 : /** 104 : * @brief Checks whether a task is currently tracked by the scheduler. 105 : * 106 : * This method verifies if the given TaskHandle refers to a task that has 107 : * been scheduled and is still present in the internal tracking map. 108 : * 109 : * Note that a task may be removed from tracking after its result has been 110 : * retrieved via retrieveAndForget(). 111 : * 112 : * @param handle The TaskHandle identifying the task. 113 : * @return true if the task is still tracked, false otherwise. 114 : */ 115 : bool isAvailable(const TaskHandle& handle) const; 116 : 117 : /** 118 : * @brief Schedule a task for asynchronous execution and associate it with a typed handle. 119 : * 120 : * The TaskHandle can later be used to perform operation on the submitted task. 121 : * 122 : * @tparam FType The type of the callable object, must be invocable with no parameters and return a value. 123 : * @param type The type category of the task, used for task classification or filtering. 124 : * @param task The callable object to execute asynchronously. 125 : * 126 : * @return A TaskHandle uniquely identifying the scheduled task. 127 : * 128 : * @throws std::runtime_error If the scheduler is not running or a task ID collision is detected. 129 : * 130 : * @note This function is thread-safe and can be called concurrently. 131 : * @note The task result will be tracked internally and accessible via the TaskHandle. 132 : */ 133 : template<typename FType> 134 87 : inline TaskHandle schedule(TaskType type, FType&& task) 135 : { 136 87 : if (!this->m_running) 137 : { 138 1 : throw std::runtime_error("Cannot schedule with a non-running TaskScheduler."); 139 : } 140 : 141 86 : TaskHandle handle = m_handleGenerator.generate(type); 142 : 143 : { 144 86 : std::lock_guard lock(m_tasksMutex); 145 : 146 86 : auto data = m_pool->submit(std::forward<FType>(task)); 147 86 : auto base = std::static_pointer_cast<ITaskData>(data); 148 : 149 86 : auto [it, inserted] = m_tasks.emplace(handle.id, base); 150 86 : if (!inserted) 151 : { 152 0 : throw std::runtime_error("TaskHandle ID collision detected in TaskScheduler"); 153 : } 154 86 : } 155 : 156 86 : return handle; 157 : } 158 : 159 : /** 160 : * @brief Schedule a task for asynchronous execution and associate it with a general-typed handle. 161 : * It defaults the task type to TaskType::General. 162 : * 163 : * The TaskHandle can later be used to perform operation on the submitted task. 164 : * 165 : * @tparam FType Type of the callable object. Must be invocable with no arguments. 166 : * @param task The callable object representing the task to execute asynchronously. 167 : * @return A TaskHandle uniquely identifying the scheduled task. 168 : * 169 : * @throws std::runtime_error If the scheduler is not running or a task ID collision is detected. 170 : * 171 : * @note This function is thread-safe and can be called concurrently. 172 : * @note The task result will be tracked internally and accessible via the TaskHandle. 173 : */ 174 : template<typename FType> 175 85 : inline TaskHandle schedule(FType&& task) 176 : { 177 85 : return this->schedule(TaskType::General, std::forward<FType>(task)); 178 : } 179 : 180 : /** 181 : * @brief Registers a callback to be invoked when the task completes, whether successfully or with an error. 182 : * 183 : * If the task is still pending, the callback is stored and will be invoked automatically 184 : * by the thread pool once the task finishes. 185 : * 186 : * If the task has already completed at the time of registration, the callback 187 : * is executed immediately on the calling thread with a TaskResult containing either 188 : * the result or the captured exception. 189 : * 190 : * The callback is never invoked more than once. If the task is not found or if the 191 : * expected return type RType does not match, the operation fails. 192 : * 193 : * @tparam RType The return type of the task. 194 : * @param handle A valid handle identifying the task. 195 : * @param callback A callable that takes a const reference to a TaskResult<RType>. 196 : * @return true if the callback was successfully registered or invoked; false otherwise. 197 : * 198 : * @throws std::runtime_error if the task is found but the stored type does not match RType. 199 : */ 200 : template<typename RType> 201 80 : inline bool setCallback(const TaskHandle& handle, std::function<void(const TaskResult<RType>&)> callback) 202 : { 203 80 : if (!callback) 204 : { 205 0 : return false; 206 : } 207 : 208 80 : std::shared_ptr<TaskData<RType>> dptr; 209 : 210 : { 211 80 : std::lock_guard lock(m_tasksMutex); 212 : 213 80 : auto it = m_tasks.find(handle); 214 80 : if (it == m_tasks.end()) 215 : { 216 0 : return false; 217 : } 218 : 219 80 : std::shared_ptr<ITaskData> sptr = it->second; 220 80 : dptr = std::static_pointer_cast<TaskData<RType>>(sptr); 221 : 222 80 : if (!dptr) 223 : { 224 0 : throw std::runtime_error("Type error while setting callback function in TaskScheduler."); 225 : } 226 : 227 80 : if (!sptr->completed()) 228 : { 229 78 : dptr->setCallback(std::move(callback)); 230 78 : return true; 231 : } 232 : 233 2 : m_tasks.erase(it); 234 158 : } 235 : 236 2 : if (dptr->completed()) 237 : { 238 2 : dptr->setCallback(std::move(callback)); 239 2 : return true; 240 : } 241 : 242 0 : return false; 243 80 : } 244 : 245 : /** 246 : * @brief Retrieves the result of a completed task and removes its tracking, if no callback is registered. 247 : * 248 : * This method is used to extract the result of a completed task manually. 249 : * If the task is completed successfully and no completion callback has been registered, 250 : * the result is returned and the task is removed from internal tracking. 251 : * 252 : * If a callback is present, the result is considered "reserved" and this method returns std::nullopt. 253 : * Similarly, if the task is still running or has failed, std::nullopt is also returned. 254 : * 255 : * @tparam RType The expected result type of the task. 256 : * @param handle The task handle used to identify the task. 257 : * @return The task result if available and no callback is set, otherwise std::nullopt. 258 : * 259 : * @note If a completion callback was registered using `setCallback()`, the result cannot be retrieved 260 : * manually using this method. This prevents duplicate result consumption. 261 : */ 262 : template<typename RType> 263 4 : inline std::optional<RType> retrieveAndForget(const TaskHandle& handle) 264 : { 265 4 : std::lock_guard lock(m_tasksMutex); 266 : 267 4 : auto it = m_tasks.find(handle); 268 4 : if (it != m_tasks.end()) 269 : { 270 4 : auto& basePtr = it->second; 271 : 272 4 : if (basePtr->completed() && !basePtr->errored()) 273 : { 274 4 : auto dataPtr = std::static_pointer_cast<TaskData<RType>>(basePtr); 275 : 276 4 : if (!dataPtr->hasCallback()) 277 : { 278 4 : auto result = dataPtr->retrieve(); 279 4 : m_tasks.erase(it); 280 : 281 4 : return result; 282 2 : } 283 4 : } 284 : } 285 : 286 0 : return std::nullopt; 287 4 : } 288 : 289 : private: 290 : std::atomic<bool> m_running; 291 : std::unique_ptr<ThreadPool> m_pool; 292 : TaskHandleGenerator m_handleGenerator; 293 : 294 : mutable std::mutex m_tasksMutex; 295 : std::unordered_map<TaskHandle, std::shared_ptr<ITaskData>> m_tasks; 296 : }; 297 : } // namespace openmodelviewer::core::async