Skip to content

Commit

Permalink
Add reference implementation for parallel_phase feature (#1570)
Browse files Browse the repository at this point in the history
Implement parallel_phase API proposed in RFC, add new entry points to the library, move the RFC to the experimental stage. 

Signed-off-by: pavelkumbrasev <[email protected]>
Signed-off-by: Isaev, Ilya <[email protected]>
Co-authored-by: pavelkumbrasev <[email protected]>
Co-authored-by: Aleksei Fedotov <[email protected]>
Co-authored-by: Alexey Kukanov <[email protected]>
  • Loading branch information
4 people authored Jan 24, 2025
1 parent 0f287a7 commit a9fcf49
Show file tree
Hide file tree
Showing 20 changed files with 727 additions and 45 deletions.
6 changes: 5 additions & 1 deletion include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2024 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -534,4 +534,8 @@
#define __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1
#endif

#if TBB_PREVIEW_PARALLEL_PHASE || __TBB_BUILD
#define __TBB_PREVIEW_PARALLEL_PHASE 1
#endif

#endif // __TBB_detail__config_H
173 changes: 152 additions & 21 deletions include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,11 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, s
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);

#if __TBB_PREVIEW_PARALLEL_PHASE
TBB_EXPORT void __TBB_EXPORTED_FUNC enter_parallel_phase(d1::task_arena_base*, std::uintptr_t);
TBB_EXPORT void __TBB_EXPORTED_FUNC exit_parallel_phase(d1::task_arena_base*, std::uintptr_t);
#endif
} // namespace r1

namespace d2 {
Expand Down Expand Up @@ -122,6 +127,14 @@ class task_arena_base {
normal = 2 * priority_stride,
high = 3 * priority_stride
};

#if __TBB_PREVIEW_PARALLEL_PHASE
enum class leave_policy : int {
automatic = 0,
fast = 1
};
#endif

#if __TBB_ARENA_BINDING
using constraints = tbb::detail::d1::constraints;
#endif /*__TBB_ARENA_BINDING*/
Expand Down Expand Up @@ -162,13 +175,36 @@ class task_arena_base {
return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
}

#if __TBB_PREVIEW_PARALLEL_PHASE
leave_policy get_leave_policy() const {
return (my_version_and_traits & fast_leave_policy_flag) ? leave_policy::fast : leave_policy::automatic;
}

int leave_policy_trait(leave_policy lp) const {
return lp == leave_policy::fast ? fast_leave_policy_flag : 0;
}

void set_leave_policy(leave_policy lp) {
my_version_and_traits |= leave_policy_trait(lp);
}
#endif

enum {
default_flags = 0
, core_type_support_flag = 1
default_flags = 0,
core_type_support_flag = 1,
fast_leave_policy_flag = 1 << 1
};

task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
: my_version_and_traits(default_flags | core_type_support_flag)
task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag
#if __TBB_PREVIEW_PARALLEL_PHASE
| leave_policy_trait(lp)
#endif
)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
, my_max_concurrency(max_concurrency)
Expand All @@ -180,8 +216,16 @@ class task_arena_base {
{}

#if __TBB_ARENA_BINDING
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
: my_version_and_traits(default_flags | core_type_support_flag)
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag
#if __TBB_PREVIEW_PARALLEL_PHASE
| leave_policy_trait(lp)
#endif
)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
, my_max_concurrency(constraints_.max_concurrency)
Expand Down Expand Up @@ -259,31 +303,58 @@ class task_arena : public task_arena_base {
* Value of 1 is default and reflects behavior of implicit arenas.
**/
task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, lp
#endif
)
{}

#if __TBB_ARENA_BINDING
//! Creates task arena pinned to certain NUMA node
task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(constraints_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
: task_arena_base(constraints_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, lp
#endif
)
{}

//! Copies settings from another task_arena
task_arena(const task_arena &s) // copy settings but not the reference or instance
task_arena(const task_arena &a) // copy settings but not the reference or instance
: task_arena_base(
constraints{}
.set_numa_id(s.my_numa_id)
.set_max_concurrency(s.my_max_concurrency)
.set_core_type(s.my_core_type)
.set_max_threads_per_core(s.my_max_threads_per_core)
, s.my_num_reserved_slots, s.my_priority)
.set_numa_id(a.my_numa_id)
.set_max_concurrency(a.my_max_concurrency)
.set_core_type(a.my_core_type)
.set_max_threads_per_core(a.my_max_threads_per_core)
, a.my_num_reserved_slots, a.my_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, a.get_leave_policy()
#endif
)

{}
#else
//! Copies settings from another task_arena
task_arena(const task_arena& a) // copy settings but not the reference or instance
: task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
: task_arena_base(a.my_max_concurrency,
a.my_num_reserved_slots,
a.my_priority,
#if __TBB_PREVIEW_PARALLEL_PHASE
a.get_leave_policy()
#endif
)
{}
#endif /*__TBB_ARENA_BINDING*/

Expand All @@ -292,7 +363,11 @@ class task_arena : public task_arena_base {

//! Creates an instance of task_arena attached to the current arena of the thread
explicit task_arena( attach )
: task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
: task_arena_base(automatic, 1, priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy::automatic
#endif
) // use default settings if attach fails
{
if (r1::attach(*this)) {
mark_initialized();
Expand All @@ -311,21 +386,32 @@ class task_arena : public task_arena_base {

//! Overrides concurrency level and forces initialization of internal representation
void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
my_max_concurrency = max_concurrency_;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_PHASE
set_leave_policy(lp);
#endif
r1::initialize(*this);
mark_initialized();
}
}

#if __TBB_ARENA_BINDING
void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
Expand All @@ -335,6 +421,9 @@ class task_arena : public task_arena_base {
my_max_threads_per_core = constraints_.max_threads_per_core;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_PHASE
set_leave_policy(lp);
#endif
r1::initialize(*this);
mark_initialized();
}
Expand Down Expand Up @@ -404,6 +493,32 @@ class task_arena : public task_arena_base {
return execute_impl<decltype(f())>(f);
}

#if __TBB_PREVIEW_PARALLEL_PHASE
void start_parallel_phase() {
initialize();
r1::enter_parallel_phase(this, /*reserved*/0);
}
void end_parallel_phase(bool with_fast_leave = false) {
__TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr);
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::exit_parallel_phase(this, static_cast<std::uintptr_t>(with_fast_leave));
}

class scoped_parallel_phase {
task_arena& arena;
bool one_time_fast_leave;
public:
scoped_parallel_phase(task_arena& ta, bool with_fast_leave = false)
: arena(ta), one_time_fast_leave(with_fast_leave)
{
arena.start_parallel_phase();
}
~scoped_parallel_phase() {
arena.end_parallel_phase(one_time_fast_leave);
}
};
#endif

#if __TBB_EXTRA_DEBUG
//! Returns my_num_reserved_slots
int debug_reserved_slots() const {
Expand Down Expand Up @@ -472,6 +587,17 @@ inline void enqueue(F&& f) {
enqueue_impl(std::forward<F>(f), nullptr);
}

#if __TBB_PREVIEW_PARALLEL_PHASE
inline void start_parallel_phase() {
r1::enter_parallel_phase(nullptr, /*reserved*/0);
}

inline void end_parallel_phase(bool with_fast_leave) {
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::exit_parallel_phase(nullptr, static_cast<std::uintptr_t>(with_fast_leave));
}
#endif

using r1::submit;

} // namespace d1
Expand All @@ -491,6 +617,11 @@ using detail::d1::max_concurrency;
using detail::d1::isolate;

using detail::d1::enqueue;

#if __TBB_PREVIEW_PARALLEL_PHASE
using detail::d1::start_parallel_phase;
using detail::d1::end_parallel_phase;
#endif
} // namespace this_task_arena

} // inline namespace v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ void scoped_parallel_phase_example() {
// Computation
}
}
```

## Considerations
Expand All @@ -256,6 +255,32 @@ it might introduce performance problems if:
Heavier involvement of less performant core types might result in artificial work
imbalance in the arena.

## Technical Details

To implement the proposed feature, the following changes were made:
* Added a new entity `thread_leave_manager` to the `r1::arena` which is responsible for
for managing the state of workers' arena leaving behaviour.
* Introduced two new entry points to the library.
* `r1::enter_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
the start of parallel phase with the library.
* `r1::exit_parallel_phase(d1::task_arena_base*, std::uintptr_t)` - used to communicate
the end of parallel phase with the library.

### Thread Leave Manager

`thread_leave_manager` class implements the state machine described in proposal.
Specifically, it controls when worker threads are allowed to be retained in the arena.
`thread_leave_manager` is initialized with a state that determines the default
behavior for workers leaving the arena.

To support `start/end_parallel_phase` API, it provides functionality to override the default
state with a "Parallel Phase" state. It also keeps track of the number of active parallel phases.

The following sequence diagram illustrates the interaction between the user and
the `thread_leave_manager` during the execution of parallel phases. It shows how the
`thread_leave_manager` manages the state transitions when using `start/end_parallel_phase`.

<img src="parallel_phase_sequence_diagram.png" width=1000>

## Open Questions in Design

Expand All @@ -272,3 +297,10 @@ Some open questions that remain:
* Do we see any value if arena potentially can transition from one to another state?
* What if different types of workloads are mixed in one application?
* What if there concurrent calls to this API?

## Conditions to become fully supported

Following conditions need to be met for the feature to move from experimental to fully supported:
* Open questions regarding API should be resolved.
* The feature should demonstrate performance improvements in scenarios mentioned.
* oneTBB specification needs to be updated to reflect the new feature.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit a9fcf49

Please sign in to comment.