//@HEADER // ************************************************************************ // // Kokkos v. 4.0 // Copyright (2022) National Technology & Engineering // Solutions of Sandia, LLC (NTESS). // // Under the terms of Contract DE-NA0003525 with NTESS, // the U.S. Government retains certain rights in this software. // // Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions. // See https://kokkos.org/LICENSE for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //@HEADER #ifndef KOKKOS_IMPL_PUBLIC_INCLUDE #include static_assert(false, "Including non-public Kokkos header files is not allowed."); #endif #ifndef KOKKOS_TASKSCHEDULER_HPP #define KOKKOS_TASKSCHEDULER_HPP //---------------------------------------------------------------------------- #include #if defined(KOKKOS_ENABLE_TASKDAG) #include #include //---------------------------------------------------------------------------- #include #include #include #include #include #include #include #include //---------------------------------------------------------------------------- //---------------------------------------------------------------------------- namespace Kokkos { namespace Impl { template class TaskExec; } // end namespace Impl template class BasicTaskScheduler : public Impl::TaskSchedulerBase { public: using scheduler_type = BasicTaskScheduler; using execution_space = ExecSpace; using queue_type = QueueType; using memory_space = typename queue_type::memory_space; using memory_pool = typename queue_type::memory_pool; using specialization = Impl::TaskQueueSpecialization; using member_type = typename specialization::member_type; using team_scheduler_type = BasicTaskScheduler; template using runnable_task_type = Impl::Task; template using future_type = Kokkos::BasicFuture; template using future_type_for_functor = future_type; private: using track_type = Kokkos::Impl::SharedAllocationTracker; using task_base = Impl::TaskBase; track_type m_track; queue_type* m_queue; //---------------------------------------- template friend class Impl::TaskQueue; template friend struct Impl::TaskQueueSpecialization; template friend class Impl::TaskQueueSpecializationConstrained; template friend class Impl::TaskTeamMemberAdapter; template friend class Impl::TaskExec; //---------------------------------------- KOKKOS_INLINE_FUNCTION BasicTaskScheduler(track_type arg_track, queue_type* arg_queue) : m_track(std::move(arg_track)), m_queue(std::move(arg_queue)) {} KOKKOS_INLINE_FUNCTION team_scheduler_type get_team_scheduler(int team_rank) const { return {m_track, &m_queue->get_team_queue(team_rank)}; } //---------------------------------------- KOKKOS_INLINE_FUNCTION static constexpr task_base* _get_task_ptr(std::nullptr_t) { return nullptr; } template KOKKOS_INLINE_FUNCTION static constexpr task_base* _get_task_ptr( future_type&& f) { return f.m_task; } template KOKKOS_FUNCTION Kokkos::BasicFuture _spawn_impl(DepTaskType* arg_predecessor_task, TaskPriority arg_priority, typename task_base::function_type arg_function, typename task_base::destroy_type /*arg_destroy*/, FunctorType&& arg_functor) { using functor_future_type = future_type_for_functor>; using task_type = Impl::Task; //---------------------------------------- // Give single-thread back-ends an opportunity to clear // queue of ready tasks before allocating a new task // TODO @tasking @optimization DSH re-enable this, maybe? // specialization::iff_single_thread_recursive_execute(scheduler); //---------------------------------------- functor_future_type f; // Allocate task from memory pool const size_t alloc_size = m_queue->template spawn_allocation_size(); void* task_storage = m_queue->allocate(alloc_size); if (task_storage) { // Placement new construction // Reference count starts at two: // +1 for the matching decrement when task is complete // +1 for the future f.m_task = new (task_storage) task_type(std::forward(arg_functor)); f.m_task->m_apply = arg_function; // f.m_task->m_destroy = arg_destroy; f.m_task->m_queue = m_queue; f.m_task->m_next = arg_predecessor_task; f.m_task->m_ref_count = 2; f.m_task->m_alloc_size = alloc_size; f.m_task->m_task_type = TaskEnum; f.m_task->m_priority = (int16_t)arg_priority; Kokkos::memory_fence(); // The dependence (if any) is processed immediately // within the schedule function, as such the dependence's // reference count does not need to be incremented for // the assignment. m_queue->schedule_runnable(f.m_task); // This task may be updated or executed at any moment, // even during the call to 'schedule'. } return f; } public: KOKKOS_INLINE_FUNCTION BasicTaskScheduler() : m_track(), m_queue(nullptr) {} KOKKOS_INLINE_FUNCTION BasicTaskScheduler(BasicTaskScheduler&& rhs) noexcept : m_track(rhs.m_track), // probably should be a move, but this is // deprecated code anyway m_queue(std::move(rhs.m_queue)) {} KOKKOS_INLINE_FUNCTION BasicTaskScheduler(BasicTaskScheduler const& rhs) : m_track(rhs.m_track), m_queue(rhs.m_queue) {} KOKKOS_INLINE_FUNCTION BasicTaskScheduler& operator=(BasicTaskScheduler&& rhs) noexcept { m_track = rhs.m_track; // probably should be a move, but this is deprecated // code anyway m_queue = std::move(rhs.m_queue); return *this; } KOKKOS_INLINE_FUNCTION BasicTaskScheduler& operator=(BasicTaskScheduler const& rhs) { m_track = rhs.m_track; m_queue = rhs.m_queue; return *this; } explicit BasicTaskScheduler(memory_pool const& arg_memory_pool) noexcept : m_track(), m_queue(nullptr) { using record_type = Kokkos::Impl::SharedAllocationRecord; record_type* record = record_type::allocate( memory_space(), "Kokkos::TaskQueue", sizeof(queue_type)); m_queue = new (record->data()) queue_type(arg_memory_pool); record->m_destroy.m_queue = m_queue; m_track.assign_allocated_record_to_uninitialized(record); } BasicTaskScheduler(memory_space const& arg_memory_space, size_t const mempool_capacity, unsigned const mempool_min_block_size // = 1u << 6 , unsigned const mempool_max_block_size // = 1u << 10 , unsigned const mempool_superblock_size // = 1u << 12 ) : BasicTaskScheduler(memory_pool( arg_memory_space, mempool_capacity, mempool_min_block_size, mempool_max_block_size, mempool_superblock_size)) {} //---------------------------------------- KOKKOS_INLINE_FUNCTION queue_type& queue() const noexcept { KOKKOS_EXPECTS(m_queue != nullptr); return *m_queue; } KOKKOS_INLINE_FUNCTION memory_pool* memory() const noexcept { return m_queue ? &(m_queue->m_memory) : (memory_pool*)0; } //---------------------------------------- /**\brief Allocation size for a spawned task */ template KOKKOS_FUNCTION size_t spawn_allocation_size() const { return m_queue->template spawn_allocation_size(); } /**\brief Allocation size for a when_all aggregate */ KOKKOS_FUNCTION size_t when_all_allocation_size(int narg) const { return m_queue->when_all_allocation_size(narg); } //---------------------------------------- template KOKKOS_FUNCTION static Kokkos::BasicFuture spawn(Impl::TaskPolicyWithScheduler&& arg_policy, typename task_base::function_type arg_function, typename task_base::destroy_type arg_destroy, FunctorType&& arg_functor) { return std::move(arg_policy.scheduler()) .template _spawn_impl( _get_task_ptr(std::move(arg_policy.predecessor())), arg_policy.priority(), arg_function, arg_destroy, std::forward(arg_functor)); } template KOKKOS_FUNCTION future_type_for_functor> spawn( Impl::TaskPolicyWithPredecessor&& arg_policy, FunctorType&& arg_functor) { using task_type = runnable_task_type; typename task_type::function_type const ptr = task_type::apply; typename task_type::destroy_type const dtor = task_type::destroy; return _spawn_impl( _get_task_ptr(std::move(arg_policy).predecessor()), arg_policy.priority(), ptr, dtor, std::forward(arg_functor)); } template KOKKOS_FUNCTION static void respawn( FunctorType* arg_self, BasicFuture const& arg_dependence, TaskPriority const& arg_priority) { // Precondition: task is in Executing state using value_type = typename FunctorType::value_type; using task_type = Impl::Task; task_type* const task = static_cast(arg_self); task->m_priority = static_cast(arg_priority); task->add_dependence(arg_dependence.m_task); // Postcondition: task is in Executing-Respawn state } template KOKKOS_FUNCTION static void respawn(FunctorType* arg_self, BasicTaskScheduler const&, TaskPriority const& arg_priority) { // Precondition: task is in Executing state using value_type = typename FunctorType::value_type; using task_type = Impl::Task; task_type* const task = static_cast(arg_self); task->m_priority = static_cast(arg_priority); task->add_dependence(nullptr); // Postcondition: task is in Executing-Respawn state } //---------------------------------------- /**\brief Return a future that is complete * when all input futures are complete. */ template KOKKOS_FUNCTION BasicFuture when_all( BasicFuture const arg[], int narg) { future_type f; if (narg) { queue_type* q = m_queue; // BasicTaskScheduler const* scheduler_ptr = nullptr; for (int i = 0; i < narg; ++i) { task_base* const t = arg[i].m_task; if (nullptr != t) { // Increment reference count to track subsequent assignment. // This likely has to be SeqCst desul::atomic_inc(&(t->m_ref_count), desul::MemoryOrderSeqCst(), desul::MemoryScopeDevice()); if (q != static_cast(t->m_queue)) { Kokkos::abort( "Kokkos when_all Futures must be in the same scheduler"); } } } if (q != nullptr) { // this should probably handle the queue == 0 case, // but this is deprecated code anyway size_t const alloc_size = q->when_all_allocation_size(narg); f.m_task = reinterpret_cast(q->allocate(alloc_size)); // f.m_scheduler = *scheduler_ptr; if (f.m_task) { // Reference count starts at two: // +1 to match decrement when task completes // +1 for the future new (f.m_task) task_base(); f.m_task->m_queue = q; f.m_task->m_ref_count = 2; f.m_task->m_alloc_size = static_cast(alloc_size); f.m_task->m_dep_count = narg; f.m_task->m_task_type = task_base::Aggregate; // Assign dependences, reference counts were already incremented task_base* volatile* const dep = f.m_task->aggregate_dependences(); for (int i = 0; i < narg; ++i) { dep[i] = arg[i].m_task; } Kokkos::memory_fence(); q->schedule_aggregate(f.m_task); // this when_all may be processed at any moment } } } return f; } template KOKKOS_FUNCTION BasicFuture when_all(int narg, F const func) { using input_type = decltype(func(0)); static_assert(is_future::value, "Functor must return a Kokkos::Future"); future_type f; if (0 == narg) return f; size_t const alloc_size = m_queue->when_all_allocation_size(narg); f.m_task = reinterpret_cast(m_queue->allocate(alloc_size)); if (f.m_task) { // Reference count starts at two: // +1 to match decrement when task completes // +1 for the future new (f.m_task) task_base(); // f.m_scheduler = *this; // f.m_task->m_scheduler = &f.m_scheduler; f.m_task->m_queue = m_queue; f.m_task->m_ref_count = 2; f.m_task->m_alloc_size = static_cast(alloc_size); f.m_task->m_dep_count = narg; f.m_task->m_task_type = task_base::Aggregate; // f.m_task->m_apply = nullptr; // f.m_task->m_destroy = nullptr; // Assign dependences, reference counts were already incremented task_base* volatile* const dep = f.m_task->aggregate_dependences(); for (int i = 0; i < narg; ++i) { const input_type arg_f = func(i); if (nullptr != arg_f.m_task) { // Not scheduled, so task scheduler is not yet set // if ( m_queue != static_cast< BasicTaskScheduler const * >( // arg_f.m_task->m_scheduler )->m_queue ) { // Kokkos::abort("Kokkos when_all Futures must be in the same // scheduler" ); //} // Increment reference count to track subsequent assignment. // This increment likely has to be SeqCst desul::atomic_inc(&(arg_f.m_task->m_ref_count), desul::MemoryOrderSeqCst(), desul::MemoryScopeDevice()); dep[i] = arg_f.m_task; } } Kokkos::memory_fence(); m_queue->schedule_aggregate(f.m_task); // this when_all may be processed at any moment } return f; } //---------------------------------------- KOKKOS_INLINE_FUNCTION int allocation_capacity() const noexcept { return m_queue->m_memory.capacity(); } KOKKOS_INLINE_FUNCTION int allocated_task_count() const noexcept { return m_queue->m_count_alloc; } KOKKOS_INLINE_FUNCTION int allocated_task_count_max() const noexcept { return m_queue->m_max_alloc; } KOKKOS_INLINE_FUNCTION long allocated_task_count_accum() const noexcept { return m_queue->m_accum_alloc; } //---------------------------------------- template friend void wait(Kokkos::BasicTaskScheduler const&); }; } // namespace Kokkos //---------------------------------------------------------------------------- //---------------------------------------------------------------------------- namespace Kokkos { //---------------------------------------------------------------------------- // Construct a TaskTeam execution policy template Impl::TaskPolicyWithPredecessor> KOKKOS_INLINE_FUNCTION TaskTeam(Kokkos::BasicFuture arg_future, TaskPriority arg_priority = TaskPriority::Regular) { return {std::move(arg_future), arg_priority}; } template Impl::TaskPolicyWithScheduler KOKKOS_INLINE_FUNCTION TaskTeam( Scheduler arg_scheduler, std::enable_if_t::value, TaskPriority> arg_priority = TaskPriority::Regular) { return {std::move(arg_scheduler), arg_priority}; } template Impl::TaskPolicyWithScheduler KOKKOS_INLINE_FUNCTION TaskTeam(Scheduler arg_scheduler, PredecessorFuture arg_future, std::enable_if_t::value && Kokkos::is_future::value, TaskPriority> arg_priority = TaskPriority::Regular) { static_assert(std::is_same::value, "Can't create a task policy from a scheduler and a future from " "a different scheduler"); return {std::move(arg_scheduler), std::move(arg_future), arg_priority}; } // Construct a TaskSingle execution policy template Impl::TaskPolicyWithPredecessor> KOKKOS_INLINE_FUNCTION TaskSingle(Kokkos::BasicFuture arg_future, TaskPriority arg_priority = TaskPriority::Regular) { return {std::move(arg_future), arg_priority}; } template Impl::TaskPolicyWithScheduler KOKKOS_INLINE_FUNCTION TaskSingle( Scheduler arg_scheduler, std::enable_if_t::value, TaskPriority> arg_priority = TaskPriority::Regular) { return {std::move(arg_scheduler), arg_priority}; } template Impl::TaskPolicyWithScheduler KOKKOS_INLINE_FUNCTION TaskSingle(Scheduler arg_scheduler, PredecessorFuture arg_future, std::enable_if_t::value && Kokkos::is_future::value, TaskPriority> arg_priority = TaskPriority::Regular) { static_assert(std::is_same::value, "Can't create a task policy from a scheduler and a future from " "a different scheduler"); return {std::move(arg_scheduler), std::move(arg_future), arg_priority}; } //---------------------------------------------------------------------------- /**\brief A host control thread spawns a task with options * * 1) Team or Serial * 2) With scheduler or dependence * 3) High, Normal, or Low priority */ template typename Scheduler::template future_type_for_functor> host_spawn(Impl::TaskPolicyWithScheduler arg_policy, FunctorType&& arg_functor) { using scheduler_type = Scheduler; using task_type = typename scheduler_type::template runnable_task_type; static_assert(TaskEnum == Impl::TaskType::TaskTeam || TaskEnum == Impl::TaskType::TaskSingle, "Kokkos host_spawn requires TaskTeam or TaskSingle"); // May be spawning a Cuda task, must use the specialization // to query on-device function pointer. typename task_type::function_type ptr; typename task_type::destroy_type dtor; Kokkos::Impl::TaskQueueSpecialization< scheduler_type>::template get_function_pointer(ptr, dtor); return scheduler_type::spawn(std::move(arg_policy), ptr, dtor, std::forward(arg_functor)); } /**\brief A task spawns a task with options * * 1) Team or Serial * 2) With scheduler or dependence * 3) High, Normal, or Low priority */ template typename Scheduler::template future_type_for_functor> KOKKOS_INLINE_FUNCTION task_spawn(Impl::TaskPolicyWithScheduler arg_policy, FunctorType&& arg_functor) { using scheduler_type = Scheduler; using task_type = typename scheduler_type::template runnable_task_type; static_assert(TaskEnum == Impl::TaskType::TaskTeam || TaskEnum == Impl::TaskType::TaskSingle, "Kokkos task_spawn requires TaskTeam or TaskSingle"); typename task_type::function_type const ptr = task_type::apply; typename task_type::destroy_type const dtor = task_type::destroy; return scheduler_type::spawn(std::move(arg_policy), ptr, dtor, std::forward(arg_functor)); } /**\brief A task respawns itself with options * * 1) With scheduler or dependence * 2) High, Normal, or Low priority */ template void KOKKOS_INLINE_FUNCTION respawn(FunctorType* arg_self, T const& arg, TaskPriority const& arg_priority = TaskPriority::Regular) { static_assert(Kokkos::is_future::value || Kokkos::is_scheduler::value, "Kokkos respawn argument must be Future or TaskScheduler"); T::scheduler_type::respawn(arg_self, arg, arg_priority); } //---------------------------------------------------------------------------- // template // KOKKOS_INLINE_FUNCTION // BasicFuture // when_all(BasicFuture const arg[], int narg) //{ // return BasicFuture::scheduler_type::when_all(arg, narg); //} //---------------------------------------------------------------------------- // Wait for all runnable tasks to complete template inline void wait(BasicTaskScheduler const& scheduler) { using scheduler_type = BasicTaskScheduler; scheduler_type::specialization::execute(scheduler); // scheduler.m_queue->execute(); } } // namespace Kokkos //---------------------------------------------------------------------------- //---------------------------------------------------------------------------- //////////////////////////////////////////////////////////////////////////////// // END OLD CODE //////////////////////////////////////////////////////////////////////////////// #endif /* #if defined( KOKKOS_ENABLE_TASKDAG ) */ #endif /* #ifndef KOKKOS_TASKSCHEDULER_HPP */