Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:change thread sheduling method in ThreadPool class #2648

Open
wants to merge 11 commits into
base: unstable
Choose a base branch
from

Conversation

QlQlqiqi
Copy link

@QlQlqiqi QlQlqiqi commented May 12, 2024

The logic is based on function WriteThread::AwaitState in rocksdb. link

Before:

  1. All workers and main thread which pushs task in queue both are waiting the same lock. It can cause very intense competition.
  2. When a worker has finished one task, it will try to get lock again for a new task through function await. It can make the worker sleep with high probability due to intense competition. And it can cost much time to sleep and wake up.

After:

  1. This is a standard producer-consumer model. So we can use lock-free list to deal with this problem about intense competition.
  2. When a worker wake up, it will try to get tasks. And when it find there is no tasks, it will try to loop for a while to wait for new tasks. Because with high throughput the time for waiting new tasks is very short, so this loop will NOT cause serious block. In order to reduce the block time, the loop has 3 level.
    2.1. 1-level. Using spin-loop to wait.
    2.2. 2-level. Using long-time-loop to wait. The worker maybe yield the cpu when some condition is reached. And using a data to store probability of entering 2-level loop.
    2.3. 3-level. Using function await to wait for new tasks.

params

  1. the count of 1-level loop:
    default: 200. Too much number maybe cause high cpu load. Too few number maybe cause vain opration.
  2. queue_slow_size_:
    default: std::min(worker_num, 100). When the number of tasks in queue exceeds it, the main thread which call function Schedule call std::this_thread::yield().
  3. max_queue_size_:
    default: max_queue_size. When the number of tasks in queue exceeds it, the main thread which call function Schedule call std::this_thread::yield() till the number of tasks in queue is less than threshold.
  4. max_yield_usec_:
    default: 100. The max time of loop in 2-level loop.
  5. slow_yield_usec_:
    default: 3. If the time the function std::this_thread::yield() spends exceeds the threshold, the data sorce may be updated.
  6. kMaxSlowYieldsWhileSpinning:
    default: 3. If the times of reaching condition above(5), the data sorce will be updated.
  7. sampling_base:
    default: 256. It represent the provability of enter 2-level loop is not lower than 1/sampling_base.

@QlQlqiqi QlQlqiqi changed the title change thread sheduling method in ThreadPool class feat:change thread sheduling method in ThreadPool class May 12, 2024
@github-actions github-actions bot added the ✏️ Feature New feature or request label May 12, 2024
@QlQlqiqi QlQlqiqi requested a review from AlexStocks May 12, 2024 06:43
// 1. loop for short time
for (uint32_t tries = 0; tries < 200; ++tries) {
if (newest_node_.load(std::memory_order_acquire) != nullptr) {
last = newest_node_.exchange(nullptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里先到的线程直接摘了整个链表,据为己有,在去线性消费,可能会导致延迟波动大,建议尽量将任务均匀分给线程池里的worker。毕竟Pika读写链路上都是自己的线程,和rocksdb的线程模型差异比较大(Rocksdb这块都是application线程在对每个writer并发),这一块可能得多一些考量。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我想了一下,一次只取一定数量的 task 大概有两种办法:
1、一个 worker 对应一个无锁链表,然后新的 task 就随机或者遍历地往这些链表中加;
2、依旧使用一个无锁链表,但是无锁链表的容量较低,比如为 10 个这样的,这样保证一个 worker 一次最多取 10 个。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

第二个方法直接测试就行,第一个方法见我新的分支:https://github.com/QlQlqiqi/pika/tree/change-thread-shedule-with-mutil-list

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我这里测的结果是这两个方法速度不相上下,当然如果调参合适的话应该会有较大的差距

@AlexStocks
Copy link
Collaborator

@CodiumAI-Agent /review

@CodiumAI-Agent
Copy link

PR Review 🔍

⏱️ Estimated effort to review [1-5]

4, because the PR involves significant changes to the threading and scheduling logic in a thread pool implementation, which is a complex area requiring careful review to ensure thread safety, performance, and correctness. The changes include the introduction of lock-free structures and a detailed scheduling mechanism which needs thorough testing and validation.

🧪 Relevant tests

No

⚡ Possible issues

Possible Bug: The use of std::this_thread::yield() in the Schedule method may not be sufficient to handle high contention gracefully. This could lead to performance degradation under heavy load.

Thread Safety Concern: The lock-free implementation using std::atomic and manual memory management (e.g., new and delete for Node objects) must be carefully reviewed to ensure there are no race conditions or memory leaks.

🔒 Security concerns

No

Code feedback:
relevant filesrc/net/src/thread_pool.cc
suggestion      

Consider using a condition variable instead of busy-waiting with std::this_thread::yield() in the Schedule method. This change could improve CPU usage and responsiveness under high load. [important]

relevant linestd::this_thread::yield();

relevant filesrc/net/src/thread_pool.cc
suggestion      

Implement proper memory management for Node objects to prevent memory leaks. Consider using smart pointers (e.g., std::unique_ptr) instead of raw pointers for automatic memory management. [important]

relevant lineauto node = new Node(func, arg);

relevant filesrc/net/src/thread_pool.cc
suggestion      

To prevent potential race conditions with the lock-free list, ensure that all operations on std::atomic pointers in LinkOne and related functions are correctly ordered using memory order tags. [important]

relevant lineif (newest_node->compare_exchange_weak(nodes, node)) {

relevant filesrc/net/src/thread_pool.cc
suggestion      

Add error handling for the scenario where new Node fails to allocate memory, which could lead to a null pointer dereference. [medium]

relevant lineauto node = new Node(func, arg);

@AlexStocks
Copy link
Collaborator

@CodiumAI-Agent /improve

@CodiumAI-Agent
Copy link

PR Code Suggestions ✨

CategorySuggestion                                                                                                                                    Score
Possible bug
Add exception handling for memory allocation to prevent leaks

Ensure proper exception handling for dynamic memory allocation when creating new Node
objects to prevent memory leaks and ensure program stability.

src/net/src/thread_pool.cc [112-113]

-auto node = new Node(func, arg);
-LinkOne(node, &newest_node_);
+Node* node = nullptr;
+try {
+  node = new Node(func, arg);
+  LinkOne(node, &newest_node_);
+} catch (const std::bad_alloc& e) {
+  // Handle memory allocation failure, e.g., log error, clean up, and throw if necessary
+  delete node;
+  throw;
+}
 
Suggestion importance[1-10]: 9

Why: Adding exception handling for dynamic memory allocation is crucial to prevent memory leaks and ensure program stability. This is a significant improvement addressing a potential bug.

9
Best practice
Use smart pointers for automatic memory management

Replace the manual memory management with smart pointers to avoid manual deletion and
improve code safety and readability.

src/net/src/thread_pool.cc [112-113]

-auto node = new Node(func, arg);
-LinkOne(node, &newest_node_);
+auto node = std::make_unique<Node>(func, arg);
+LinkOne(node.get(), &newest_node_);
 
Suggestion importance[1-10]: 8

Why: Using smart pointers enhances code safety and readability by automating memory management, reducing the risk of memory leaks and manual deletion errors. This is a best practice for modern C++.

8
Enhancement
Use a thread-safe queue to simplify task scheduling

Consider using a thread-safe queue or priority queue that encapsulates the synchronization
logic internally, rather than manually managing the synchronization and node linking in
ThreadPool::Schedule.

src/net/src/thread_pool.cc [112-113]

-auto node = new Node(func, arg);
-LinkOne(node, &newest_node_);
+task_queue.push(std::make_unique<Node>(func, arg));
 
Suggestion importance[1-10]: 8

Why: Encapsulating synchronization logic within a thread-safe queue can simplify the code and reduce the potential for synchronization errors. This enhancement improves code maintainability and readability.

8
Performance
Replace busy waiting with a sleep mechanism to improve CPU efficiency

Consider using a more efficient locking mechanism or lock-free data structures for the
ThreadPool::Schedule method to avoid potential performance bottlenecks due to the use of
std::this_thread::yield() in a busy loop. This can lead to thread starvation and
inefficient CPU usage.

src/net/src/thread_pool.cc [103-109]

 while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) {
-  std::this_thread::yield();
+  std::this_thread::sleep_for(std::chrono::milliseconds(1));
 }
 if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) {
-  std::this_thread::yield();
+  std::this_thread::sleep_for(std::chrono::milliseconds(1));
 }
 
Suggestion importance[1-10]: 7

Why: The suggestion to replace busy waiting with a sleep mechanism can improve CPU efficiency and reduce thread starvation. However, it may introduce latency in task scheduling, so the impact on performance should be carefully evaluated.

7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✏️ Feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants