diff --git a/.gitignore b/.gitignore index 18e45e8..47e6a4e 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,4 @@ tp2/ fbcode/ fbcode buckifier/*.pyc +cmake-build-debug \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..42ac1e4 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,17 @@ +cmake_minimum_required(VERSION 3.20) +project(threadPool) + +set(CMAKE_CXX_STANDARD 11) +include_directories(include) + +add_library(thread_pool STATIC + src/env.cc + src/env_posix.cc + src/threadpool_imp.cc) +add_definitions(-DOS_LINUX) + +add_executable(thread_pool_test example/env_thread_test.cc) +target_link_libraries(thread_pool_test thread_pool -lpthread) + +add_executable(env_thread_test example/threadpool_test.cc) +target_link_libraries(env_thread_test thread_pool -lpthread) \ No newline at end of file diff --git a/Makefile b/Makefile deleted file mode 100644 index af2a904..0000000 --- a/Makefile +++ /dev/null @@ -1,33 +0,0 @@ -# step 1: -# prepare the compiler -# prepare targe file name -# prepare dependecy libarary -CC = g++ -CFLAGS := -Wall -O2 -fPIC -std=c++11 -LDFLAGS = -lpthread - -# header file's path -INCLUDE_PATH = -I ./include -SRC_PATH = ./src/ - -# .o file with the same name of .ccc file -OBJS = threadpool_imp.o env_posix.o env.o -SRCS = $(SRC_PATH)threadpool_imp.cc $(SRC_PATH)env_posix.cc $(SRC_PATH)env.cc -TARGET = libthreadpool.so -STATIC_TARGET = libthreadpool.a - -# step 2: -# produce the .o files -$(OBJS):$(SRCS) - $(CC) $(CFLAGS) $(INCLUDE_PATH) -c $^ - -# 3. produce the .so file -# 'ar crv' produce static lib -# 'cc - shared' produce shared lib -all : $(OBJS) - ar crv $(STATIC_TARGET) $(OBJS) - $(CC) -shared $(CFLAGS) $(INCLUDE_PATH) -o $(TARGET) $(OBJS) $(LDFLAGS) - -# 4. clean the files except source file -clean: - rm -f $(OBJS) $(LIB) $(TARGET) $(STATIC_TARGET) diff --git a/example/Makefile b/example/Makefile deleted file mode 100644 index 4adc91a..0000000 --- a/example/Makefile +++ /dev/null @@ -1,28 +0,0 @@ -# step 1: -# prepare the compiler -# prepare targe file name -# prepare dependecy libarary -CC = g++ -CFLAGS := -Wall -O2 -fPIC -std=c++11 -LDFLAGS = -lpthread - -# ifneq ($(USE_RTTI), 1) -# CFLAGS += -fno-rtti -# endif - -# header file's path -INCLUDE_PATH = -I ../include -LIBPATH = ../libthreadpool.so -STATIC_LIBPATH = ../libthreadpool.a - -all : threadpool_test env_thread_test - -threadpool_test: threadpool_test.cc - $(CC) $(CFLAGS) $(INCLUDE_PATH) $(LIBPATH) $@.cc -o$@ $(LDFLAGS) - -env_thread_test: env_thread_test.cc - $(CC) $(CFLAGS) $(INCLUDE_PATH) $(LIBPATH) $@.cc -o$@ $(LDFLAGS) - -# 4. clean the files except source file -clean: - rm -f ./threadpool_test ./env_thread_test diff --git a/example/env_thread_test.cc b/example/env_thread_test.cc index 72c86e3..c0462e9 100644 --- a/example/env_thread_test.cc +++ b/example/env_thread_test.cc @@ -1,7 +1,3 @@ -// -// Created by zhanghuigui on 2021/3/17. -// - #include #include #include @@ -9,86 +5,82 @@ #include "env.h" #include "threadpool.h" -#define THREAD_COUNT 10 +#define THREAD_COUNT 3 using std::cout; using std::endl; -struct Cxt{ - int thread_id; - std::atomic* last_id; +struct Cxt { + int thread_id; + std::atomic *last_id; - Cxt(std::atomic* p, int i) : - thread_id(i),last_id(p) {} + Cxt(std::atomic *p, int i) : + thread_id(i), last_id(p) {} }; static void Thread1(void *ptr) { - Cxt *cxt = reinterpret_cast(ptr); - int count = THREAD_COUNT; - while(count --) { - printf("------- *%d* running ------- \n", cxt->thread_id); - sleep(2); - } + Cxt *cxt = reinterpret_cast(ptr); + int count = THREAD_COUNT; + while (count--) { + printf("------- *%d* running ------- \n", cxt->thread_id); + sleep(2); + } } static void Thread2(void *ptr) { - Cxt *cxt = reinterpret_cast(ptr); - int count = THREAD_COUNT; - while(count --) { - printf("------- *%d* running ------- \n", cxt->thread_id); - sleep(2); - } + Cxt *cxt = reinterpret_cast(ptr); + int count = THREAD_COUNT; + while (count--) { + printf("------- *%d* running ------- \n", cxt->thread_id); + sleep(2); + } } static void finish1(void *ptr) { - Cxt *cxt = reinterpret_cast(ptr); - printf("Finish excute %d\n", cxt->thread_id); - delete cxt; + Cxt *cxt = reinterpret_cast(ptr); + printf("Finish excute %d\n", cxt->thread_id); + delete cxt; } void PrintEnvInfo(Env *env) { - if (env == nullptr) { - return; - } + if (env == nullptr) { + return; + } - int low_thread_nums; - int high_thread_nums; - uint64_t time; + int low_thread_nums; + int high_thread_nums; + uint64_t time; - time = env->NowMicros(); - low_thread_nums = env->GetThreadPoolQueueLen(Env::Priority::LOW); - high_thread_nums = env->GetThreadPoolQueueLen(Env::Priority::HIGH); + time = env->NowMicros(); + low_thread_nums = env->GetThreadPoolQueueLen(Env::Priority::LOW); + high_thread_nums = env->GetThreadPoolQueueLen(Env::Priority::HIGH); - cout << "time : " << env->TimeToString(time) << endl - << "low thread nums: " << low_thread_nums << endl - << "high thread nums: " << high_thread_nums << endl - << "thread id: " << env->GetThreadID() << endl - << endl; + cout << "time : " << env->TimeToString(time) << endl + << "low thread nums: " << low_thread_nums << endl + << "high thread nums: " << high_thread_nums << endl + << "thread id: " << env->GetThreadID() << endl + << endl; } int main(int argc, char *argv[]) { - Env *env = Env::Default(); - std::atomic last_id(0); - - env->SetBackgroundThreads(3, Env::Priority::LOW); - env->SetBackgroundThreads(7, Env::Priority::HIGH); - - for (int i = 0, j = 0;i < 10; j++,i ++) { - Cxt cxt_i(&last_id, i); - Cxt cxt_j(&last_id, j); - if (i % 2 == 0 ) { - env->Schedule(&Thread1, &cxt_i, Env::Priority::LOW, &cxt_i, &finish1); - } else { - env->Schedule(&Thread2, &cxt_j, Env::Priority::HIGH, &cxt_j, &finish1); - } - - PrintEnvInfo(env); - } - - Cxt cxt_us(&last_id, 1); - env->UnSchedule(&cxt_us, Env::Priority::LOW); - - return 0; + Env *env = Env::Default(); + std::atomic last_id(0); + + env->SetBackgroundThreads(3, Env::Priority::LOW); // 设置低优先级有3个后台线程 + env->SetBackgroundThreads(7, Env::Priority::HIGH);// 设置高优先级有7个后台线程 + + for (int i = 0, j = 0; i < 10; j++, i++) { + Cxt cxt_i(&last_id, i); + Cxt cxt_j(&last_id, j); + if (i % 2 == 0) env->Schedule(&Thread1, &cxt_i, Env::Priority::LOW, &cxt_i, &finish1); + else env->Schedule(&Thread2, &cxt_j, Env::Priority::HIGH, &cxt_j, &finish1); +// PrintEnvInfo(env); + } + + Cxt cxt_us(&last_id, 1); + env->UnSchedule(&cxt_us, Env::Priority::LOW); + delete env; + return 0; } diff --git a/include/env.h b/include/env.h index 468f744..2e839d2 100644 --- a/include/env.h +++ b/include/env.h @@ -4,7 +4,7 @@ #pragma once -#include +#include #include #include #include @@ -12,164 +12,190 @@ #include #include -class Env { - public: +// rocksdb维护了一个Env类,这个类在同一个进程中的若干个rocksdb实例中是能够共享的。所以Rocksdb将这个类作为线程池的入口, +// 从而让Flush/Compaction这样的线程调度过程中,多个db可以只使用同一个线程池 - Env() = default; +// Env默认实例是PosixEnv,为了保证多个db实例之间共享一个环境变量,PosixEnv仅维护一个单例 - virtual ~Env(); +class Env { +public: + + Env() = default; + + virtual ~Env(); + + static const char *Type() { return "Environment"; } + + // Return a default environment suitable for the current operating + // system. Sophisticated users may wish to provide their own Env + // implementation instead of relying on this default environment. + // + // The result of Default() belongs to rocksdb and must never be deleted. + // + // 返回适合当前操作系统的默认环境。老练的用户可能希望提供他们自己的 Env 实现,而不是依赖于这个默认环境。 + // + // Default() 的结果属于rocksdb,绝对不能删除。 + static Env *Default(); + + // Priority for scheduling job in thread pool + enum Priority { + BOTTOM, LOW, HIGH, USER, TOTAL + }; + + // Priority for requesting bytes in rate limiter scheduler + enum IOPriority { + IO_LOW = 0, IO_HIGH = 1, IO_TOTAL = 2 + }; + + // Arrange to run "(*function)(arg)" once in a background thread, in + // the thread pool specified by pri. By default, jobs go to the 'LOW' + // priority thread pool. + + // "function" may run in an unspecified thread. Multiple functions + // added to the same Env may run concurrently in different threads. + // I.e., the caller may not assume that background work items are + // serialized. + // When the UnSchedule function is called, the unschedFunction + // registered at the time of Schedule is invoked with arg as a parameter. + // + // “函数”可以在未指定的线程中运行。添加到同一个 Env 的多个函数可能会在不同的线程中并发运行。 + // 即,调用者可能不会假设后台工作项是序列化的。 + // 调用UnSchedule函数时,以arg为参数调用Schedule时注册的unschedFunction。 + virtual void Schedule(void (*function)(void *arg), void *arg, + Priority pri = LOW, void *tag = nullptr, + void (*unschedFunction)(void *arg) = nullptr) = 0; + + // Arrange to remove jobs for given arg from the queue_ if they are not + // already scheduled. Caller is expected to have exclusive lock on arg. + // + // 如果尚未安排给定 arg 的作业,则安排从 queue_ 中删除作业。调用者应该对 arg 有独占锁。 + virtual int UnSchedule(void * /*arg*/, Priority /*pri*/) { return 0; } + + // Start a new thread, invoking "function(arg)" within the new thread. + // When "function(arg)" returns, the thread will be destroyed. + virtual void StartThread(void (*function)(void *arg), void *arg) = 0; + + // Wait for all threads started by StartThread to terminate. + virtual void WaitForJoin() {} + + // Get thread pool queue length for specific thread pool. + virtual unsigned int GetThreadPoolQueueLen(Priority /*pri*/ = LOW) const { + return 0; + } + + // The number of background worker threads of a specific thread pool + // for this environment. 'LOW' is the default pool. + // default number: 1 + // + // 此环境的特定线程池的后台工作线程数。 'LOW' 是默认池。默认数量:1 + virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; + + virtual int GetBackgroundThreads(Priority pri = LOW) = 0; + + // Enlarge number of background worker threads of a specific thread pool + // for this environment if it is smaller than specified. 'LOW' is the default + // pool. + virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) = 0; + + // Lower IO priority for threads from the specified pool. + virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {} + + // Lower CPU priority for threads from the specified pool. + virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {} + + // Converts seconds-since-Jan-01-1970 to a printable string + virtual std::string TimeToString(uint64_t time) = 0; + + virtual uint64_t NowMicros() = 0; + + virtual uint64_t NowNanos() = 0; + + virtual uint64_t NowCPUNanos() = 0; + + // Returns the ID of the current thread. + virtual uint64_t GetThreadID() const = 0; - static const char* Type() { return "Environment"; } +// This seems to clash with a macro on Windows, so #undef it here +#undef GetFreeSpace - // Return a default environment suitable for the current operating - // system. Sophisticated users may wish to provide their own Env - // implementation instead of relying on this default environment. - // - // The result of Default() belongs to rocksdb and must never be deleted. - static Env* Default(); +private: + // No copying allowed + Env(const Env &); - // Priority for scheduling job in thread pool - enum Priority { BOTTOM, LOW, HIGH, USER, TOTAL }; + void operator=(const Env &); +}; - // Priority for requesting bytes in rate limiter scheduler - enum IOPriority { IO_LOW = 0, IO_HIGH = 1, IO_TOTAL = 2 }; +// An implementation of Env that forwards all calls to another Env. +// May be useful to clients who wish to override just part of the +// functionality of another Env. +class EnvWrapper : public Env { +public: + // Initialize an EnvWrapper that delegates all calls to *t + explicit EnvWrapper(Env *t) : target_(t) {} - // Arrange to run "(*function)(arg)" once in a background thread, in - // the thread pool specified by pri. By default, jobs go to the 'LOW' - // priority thread pool. + ~EnvWrapper(); - // "function" may run in an unspecified thread. Multiple functions - // added to the same Env may run concurrently in different threads. - // I.e., the caller may not assume that background work items are - // serialized. - // When the UnSchedule function is called, the unschedFunction - // registered at the time of Schedule is invoked with arg as a parameter. - virtual void Schedule(void (*function)(void* arg), void* arg, - Priority pri = LOW, void* tag = nullptr, - void (*unschedFunction)(void* arg) = nullptr) = 0; + // Return the target to which this Env forwards all calls + Env *target() const { return target_; } - // Arrange to remove jobs for given arg from the queue_ if they are not - // already scheduled. Caller is expected to have exclusive lock on arg. - virtual int UnSchedule(void* /*arg*/, Priority /*pri*/) { return 0; } + void Schedule(void (*f)(void *arg), void *a, Priority pri, + void *tag = nullptr, void (*u)(void *arg) = nullptr) override { + return target_->Schedule(f, a, pri, tag, u); + } - // Start a new thread, invoking "function(arg)" within the new thread. - // When "function(arg)" returns, the thread will be destroyed. - virtual void StartThread(void (*function)(void* arg), void* arg) = 0; + int UnSchedule(void *tag, Priority pri) override { + return target_->UnSchedule(tag, pri); + } - // Wait for all threads started by StartThread to terminate. - virtual void WaitForJoin() {} + void StartThread(void (*f)(void *), void *a) override { + return target_->StartThread(f, a); + } - // Get thread pool queue length for specific thread pool. - virtual unsigned int GetThreadPoolQueueLen(Priority /*pri*/ = LOW) const { - return 0; - } + void WaitForJoin() override { return target_->WaitForJoin(); } - // The number of background worker threads of a specific thread pool - // for this environment. 'LOW' is the default pool. - // default number: 1 - virtual void SetBackgroundThreads(int number, Priority pri = LOW) = 0; - virtual int GetBackgroundThreads(Priority pri = LOW) = 0; + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { + return target_->GetThreadPoolQueueLen(pri); + } - // Enlarge number of background worker threads of a specific thread pool - // for this environment if it is smaller than specified. 'LOW' is the default - // pool. - virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) = 0; + void SetBackgroundThreads(int num, Priority pri) override { + return target_->SetBackgroundThreads(num, pri); + } - // Lower IO priority for threads from the specified pool. - virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {} + int GetBackgroundThreads(Priority pri) override { + return target_->GetBackgroundThreads(pri); + } - // Lower CPU priority for threads from the specified pool. - virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {} + void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { + return target_->IncBackgroundThreadsIfNeeded(num, pri); + } - // Converts seconds-since-Jan-01-1970 to a printable string - virtual std::string TimeToString(uint64_t time) = 0; + void LowerThreadPoolIOPriority(Priority pool = LOW) override { + target_->LowerThreadPoolIOPriority(pool); + } - virtual uint64_t NowMicros() = 0; + void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + target_->LowerThreadPoolCPUPriority(pool); + } - virtual uint64_t NowNanos() = 0; + uint64_t NowMicros() override { + return target_->NowMicros(); + } - virtual uint64_t NowCPUNanos() = 0; + uint64_t NowNanos() override { + return target_->NowNanos(); + } - // Returns the ID of the current thread. - virtual uint64_t GetThreadID() const = 0; + uint64_t NowCPUNanos() override { + return target_->NowCPUNanos(); + } -// This seems to clash with a macro on Windows, so #undef it here -#undef GetFreeSpace + std::string TimeToString(uint64_t time) override { + return target_->TimeToString(time); + } - private: - // No copying allowed - Env(const Env&); - void operator=(const Env&); -}; + uint64_t GetThreadID() const override { return target_->GetThreadID(); } -// An implementation of Env that forwards all calls to another Env. -// May be useful to clients who wish to override just part of the -// functionality of another Env. -class EnvWrapper : public Env { - public: - // Initialize an EnvWrapper that delegates all calls to *t - explicit EnvWrapper(Env* t) : target_(t) {} - ~EnvWrapper(); - - // Return the target to which this Env forwards all calls - Env* target() const { return target_; } - - void Schedule(void (*f)(void* arg), void* a, Priority pri, - void* tag = nullptr, void (*u)(void* arg) = nullptr) override { - return target_->Schedule(f, a, pri, tag, u); - } - - int UnSchedule(void* tag, Priority pri) override { - return target_->UnSchedule(tag, pri); - } - - void StartThread(void (*f)(void*), void* a) override { - return target_->StartThread(f, a); - } - - void WaitForJoin() override { return target_->WaitForJoin(); } - unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { - return target_->GetThreadPoolQueueLen(pri); - } - - void SetBackgroundThreads(int num, Priority pri) override { - return target_->SetBackgroundThreads(num, pri); - } - int GetBackgroundThreads(Priority pri) override { - return target_->GetBackgroundThreads(pri); - } - - void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { - return target_->IncBackgroundThreadsIfNeeded(num, pri); - } - - void LowerThreadPoolIOPriority(Priority pool = LOW) override { - target_->LowerThreadPoolIOPriority(pool); - } - - void LowerThreadPoolCPUPriority(Priority pool = LOW) override { - target_->LowerThreadPoolCPUPriority(pool); - } - - uint64_t NowMicros() override { - return target_->NowMicros(); - } - - uint64_t NowNanos() override { - return target_->NowNanos(); - } - - uint64_t NowCPUNanos() override { - return target_->NowCPUNanos(); - } - - std::string TimeToString(uint64_t time) override { - return target_->TimeToString(time); - } - - uint64_t GetThreadID() const override { return target_->GetThreadID(); } - - private: - Env* target_; +private: + Env *target_; }; diff --git a/include/threadpool_imp.h b/include/threadpool_imp.h index 0442138..b616150 100644 --- a/include/threadpool_imp.h +++ b/include/threadpool_imp.h @@ -15,7 +15,8 @@ #include -class ThreadPoolImpl : public ThreadPool { +class ThreadPoolImpl : public ThreadPool { // 线程类,这是对线程池具体实现的封装, + // 提供了操作线程池的数据结构和接口,实际线程池的实现在threadpool_impl.cc中 public: ThreadPoolImpl(); ~ThreadPoolImpl(); @@ -89,7 +90,7 @@ class ThreadPoolImpl : public ThreadPool { static void PthreadCall(const char* label, int result); - struct Impl; + struct Impl; // 线程类中具体的数据结构,涉及线程池的具体实现 private: @@ -105,6 +106,14 @@ class ThreadPoolImpl : public ThreadPool { // and override the environment. This would require refactoring ThreadPool usage. // // We can also combine these two approaches + // + // 当前的公共虚拟接口不提供可用的功能,因此不能在内部用于外观不同的实现。 + // + // 我们提出了一个 pimpl 习惯用法,以便轻松替换线程池 impl,无需接触头文件,但提供不同的 .cc 潜在的 CMake 选项驱动。 + // + // 另一种选择是引入 Env::MakeThreadPool() 虚拟接口并覆盖环境。这将需要重构 ThreadPool 的使用。 + // + // 我们也可以结合这两种方法 std::unique_ptr impl_; }; diff --git a/src/env_posix.cc b/src/env_posix.cc index ca7b053..db44890 100644 --- a/src/env_posix.cc +++ b/src/env_posix.cc @@ -7,19 +7,26 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors #if defined(OS_LINUX) + #include + #endif + #include #include #include #include #include #include + #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID) + #include #include #include + #endif + #include #include #include @@ -31,6 +38,7 @@ #else #include #endif + #include #include @@ -38,210 +46,216 @@ #include "env.h" class ThreadPoolEnv : public Env { - public: - ThreadPoolEnv(); +public: + ThreadPoolEnv(); - ~ThreadPoolEnv() override { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); + ~ThreadPoolEnv() override { + for (const auto tid: threads_to_join_) { + pthread_join(tid, nullptr); + } + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].JoinAllThreads(); + } } - for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { - thread_pools_[pool_id].JoinAllThreads(); - } - } - void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW, - void* tag = nullptr, - void (*unschedFunction)(void* arg) = nullptr) override; + void Schedule(void (*function)(void *arg1), void *arg, Priority pri = LOW, + void *tag = nullptr, + void (*unschedFunction)(void *arg) = nullptr) override; - int UnSchedule(void* arg, Priority pri) override; + int UnSchedule(void *arg, Priority pri) override; - void StartThread(void (*function)(void* arg), void* arg) override; + void StartThread(void (*function)(void *arg), void *arg) override; - void WaitForJoin() override; + void WaitForJoin() override; - unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; - static uint64_t gettid(pthread_t tid) { - uint64_t thread_id = 0; - memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); - return thread_id; - } + static uint64_t gettid(pthread_t tid) { + uint64_t thread_id = 0; + memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); + return thread_id; + } - static uint64_t gettid() { - pthread_t tid = pthread_self(); - return gettid(tid); - } + static uint64_t gettid() { + pthread_t tid = pthread_self(); + return gettid(tid); + } - uint64_t GetThreadID() const override { return gettid(pthread_self()); } + uint64_t GetThreadID() const override { return gettid(pthread_self()); } - uint64_t NowMicros() override { - struct timeval tv; - gettimeofday(&tv, nullptr); - return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; - } + uint64_t NowMicros() override { + struct timeval tv; + gettimeofday(&tv, nullptr); + return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + } - uint64_t NowNanos() override { + uint64_t NowNanos() override { #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; #elif defined(__MACH__) - clock_serv_t cclock; - mach_timespec_t ts; - host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); - clock_get_time(cclock, &ts); - mach_port_deallocate(mach_task_self(), cclock); - return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; + clock_serv_t cclock; + mach_timespec_t ts; + host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); + clock_get_time(cclock, &ts); + mach_port_deallocate(mach_task_self(), cclock); + return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; #else - return std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); #endif - } + } - uint64_t NowCPUNanos() override { + uint64_t NowCPUNanos() override { #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \ defined(__MACH__) - struct timespec ts; - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts); - return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; + struct timespec ts; + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts); + return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; #endif - return 0; - } + return 0; + } - // Allow increasing the number of worker threads. - void SetBackgroundThreads(int num, Priority pri) override { - assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); - thread_pools_[pri].SetBackgroundThreads(num); - } + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].SetBackgroundThreads(num); + } - int GetBackgroundThreads(Priority pri) override { - assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); - return thread_pools_[pri].GetBackgroundThreads(); - } + int GetBackgroundThreads(Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetBackgroundThreads(); + } - // Allow increasing the number of worker threads. - void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { - assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); - thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); - } + // Allow increasing the number of worker threads. + void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); + } - void LowerThreadPoolIOPriority(Priority pool = LOW) override { - assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); + void LowerThreadPoolIOPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); #ifdef OS_LINUX - thread_pools_[pool].LowerIOPriority(); + thread_pools_[pool].LowerIOPriority(); #else - (void)pool; + (void)pool; #endif - } + } - void LowerThreadPoolCPUPriority(Priority pool = LOW) override { - assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); + void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); #ifdef OS_LINUX - thread_pools_[pool].LowerCPUPriority(); + thread_pools_[pool].LowerCPUPriority(); #else - (void)pool; + (void)pool; #endif - } - - std::string TimeToString(uint64_t secondsSince1970) override { - const time_t seconds = (time_t)secondsSince1970; - struct tm t; - int maxsize = 64; - std::string dummy; - dummy.reserve(maxsize); - dummy.resize(maxsize); - char* p = &dummy[0]; - localtime_r(&seconds, &t); - snprintf(p, maxsize, - "%04d/%02d/%02d-%02d:%02d:%02d ", - t.tm_year + 1900, - t.tm_mon + 1, - t.tm_mday, - t.tm_hour, - t.tm_min, - t.tm_sec); - return dummy; - } - - private: - std::vector thread_pools_; - pthread_mutex_t mu_; - std::vector threads_to_join_; + } + + std::string TimeToString(uint64_t secondsSince1970) override { + const time_t seconds = (time_t) secondsSince1970; + struct tm t; + int maxsize = 64; + std::string dummy; + dummy.reserve(maxsize); + dummy.resize(maxsize); + char *p = &dummy[0]; + localtime_r(&seconds, &t); + snprintf(p, maxsize, + "%04d/%02d/%02d-%02d:%02d:%02d ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec); + return dummy; + } + +private: + std::vector thread_pools_; + pthread_mutex_t mu_{}; + std::vector threads_to_join_; }; ThreadPoolEnv::ThreadPoolEnv() - : thread_pools_(Priority::TOTAL) { - ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); - for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { - thread_pools_[pool_id].SetThreadPriority( - static_cast(pool_id)); - // This allows later initializing the thread-local-env of each thread. - thread_pools_[pool_id].SetHostEnv(this); - } + : thread_pools_(Priority::TOTAL) // 默认创建4个线程池 +{ + ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));// 初始化互斥锁 + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { // 遍历每一个线程池 + thread_pools_[pool_id].SetThreadPriority(static_cast(pool_id)); // 为每一个线程池设置优先级 + // 这允许稍后初始化每个线程的 thread-local-env。 + thread_pools_[pool_id].SetHostEnv(this); // 为每个线程池设置自己的运行环境 + } } -void ThreadPoolEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, - void* tag, void (*unschedFunction)(void* arg)) { - assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); - thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); +void ThreadPoolEnv::Schedule(void (*function)(void *arg1), void *arg, Priority pri, + void *tag, void (*unschedFunction)(void *arg)) { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } -int ThreadPoolEnv::UnSchedule(void* arg, Priority pri) { - return thread_pools_[pri].UnSchedule(arg); +int ThreadPoolEnv::UnSchedule(void *arg, Priority pri) { + return thread_pools_[pri].UnSchedule(arg); } unsigned int ThreadPoolEnv::GetThreadPoolQueueLen(Priority pri) const { - assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); - return thread_pools_[pri].GetQueueLen(); + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); } struct StartThreadState { - void (*user_function)(void*); - void* arg; + void (*user_function)(void *); + + void *arg; }; -static void* StartThreadWrapper(void* arg) { - StartThreadState* state = reinterpret_cast(arg); - state->user_function(state->arg); - delete state; - return nullptr; +static void *StartThreadWrapper(void *arg) { + StartThreadState *state = reinterpret_cast(arg); + state->user_function(state->arg); + delete state; + return nullptr; } -void ThreadPoolEnv::StartThread(void (*function)(void* arg), void* arg) { - pthread_t t; - StartThreadState* state = new StartThreadState; - state->user_function = function; - state->arg = arg; - ThreadPoolImpl::PthreadCall( - "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); - ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); - threads_to_join_.push_back(t); - ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +void ThreadPoolEnv::StartThread(void (*function)(void *arg), void *arg) { + pthread_t t; + StartThreadState *state = new StartThreadState; + state->user_function = function; + state->arg = arg; + ThreadPoolImpl::PthreadCall( + "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); + threads_to_join_.push_back(t); + ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } void ThreadPoolEnv::WaitForJoin() { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); - } - threads_to_join_.clear(); + for (const auto tid: threads_to_join_) { + pthread_join(tid, nullptr); + } + threads_to_join_.clear(); } // // Default Posix Env // -Env* Env::Default() { - // The following function call initializes the singletons of ThreadLocalPtr - // right before the static default_env. This guarantees default_env will - // always being destructed before the ThreadLocalPtr singletons get - // destructed as C++ guarantees that the destructions of static variables - // is in the reverse order of their constructions. - // - // Since static members are destructed in the reverse order - // of their construction, having this call here guarantees that - // the destructor of static ThreadPoolEnv will go first, then the - // the singletons of ThreadLocalPtr. - static ThreadPoolEnv default_env; - return &default_env; +Env *Env::Default() { + // The following function call initializes the singletons of ThreadLocalPtr + // right before the static default_env. This guarantees default_env will + // always being destructed before the ThreadLocalPtr singletons get + // destructed as C++ guarantees that the destructions of static variables + // is in the reverse order of their constructions. + // + // 以下函数调用在静态 default_env 之前初始化 ThreadLocalPtr 的单例。 + // 这保证了 default_env 总是在 ThreadLocalPtr 单例被破坏之前被破坏,因为 C++ 保证静态变量的破坏与其构造的顺序相反。 + // + // Since static members are destructed in the reverse order + // of their construction, having this call here guarantees that + // the destructor of static ThreadPoolEnv will go first, then the + // the singletons of ThreadLocalPtr. + // 由于静态成员的析构顺序与它们的构造顺序相反,因此在此处调用可确保静态 ThreadPoolEnv 的析构函数首先执行,然后是 ThreadLocalPtr 的单例。 + + static ThreadPoolEnv default_env; // 自动调用构造函数,完成线程池的构造,线程池是一个静态类 + return &default_env; } diff --git a/src/threadpool_imp.cc b/src/threadpool_imp.cc index 0d93e97..06bde72 100644 --- a/src/threadpool_imp.cc +++ b/src/threadpool_imp.cc @@ -7,20 +7,28 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +// Rocksdb 实现的线程池支持的特性: +// + + #include "threadpool_imp.h" #ifndef OS_WIN -# include + +#include + #endif #ifdef OS_LINUX -# include -# include + +#include +#include + #endif -#include -#include -#include +#include +#include +#include #include #include #include @@ -30,114 +38,115 @@ #include #include -void ThreadPoolImpl::PthreadCall(const char* label, int result) { - if (result != 0) { - fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - abort(); - } +void ThreadPoolImpl::PthreadCall(const char *label, int result) { + if (result != 0) { + fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); + abort(); + } } struct ThreadPoolImpl::Impl { - Impl(); - ~Impl(); + Impl(); - void JoinThreads(bool wait_for_jobs_to_complete); + ~Impl(); - void SetBackgroundThreadsInternal(int num, bool allow_reduce); - int GetBackgroundThreads(); + void JoinThreads(bool wait_for_jobs_to_complete); - unsigned int GetQueueLen() const { - return queue_len_.load(std::memory_order_relaxed); - } + void SetBackgroundThreadsInternal(int num, bool allow_reduce); - void LowerIOPriority(); + int GetBackgroundThreads(); - void LowerCPUPriority(); + unsigned int GetQueueLen() const { + return queue_len_.load(std::memory_order_relaxed); + } - void WakeUpAllThreads() { - bgsignal_.notify_all(); - } + void LowerIOPriority(); - void BGThread(size_t thread_id); + void LowerCPUPriority(); + + void WakeUpAllThreads() { + bgsignal_.notify_all(); + } - void StartBGThreads(); + void BGThread(size_t thread_id); - void Submit(std::function&& schedule, - std::function&& unschedule, void* tag); + void StartBGThreads(); - int UnSchedule(void* arg); + void Submit(std::function &&schedule, + std::function &&unschedule, void *tag); - void SetHostEnv(Env* env) { env_ = env; } + int UnSchedule(void *arg); - Env* GetHostEnv() const { return env_; } + void SetHostEnv(Env *env) { env_ = env; } - bool HasExcessiveThread() const { - return static_cast(bgthreads_.size()) > total_threads_limit_; - } + Env *GetHostEnv() const { return env_; } - // Return true iff the current thread is the excessive thread to terminate. - // Always terminate the running thread that is added last, even if there are - // more than one thread to terminate. - bool IsLastExcessiveThread(size_t thread_id) const { - return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; - } + bool HasExcessiveThread() const { + return static_cast(bgthreads_.size()) > total_threads_limit_; + } + + // Return true iff the current thread is the excessive thread to terminate. + // Always terminate the running thread that is added last, even if there are + // more than one thread to terminate. + bool IsLastExcessiveThread(size_t thread_id) const { + return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; + } - bool IsExcessiveThread(size_t thread_id) const { - return static_cast(thread_id) >= total_threads_limit_; - } + bool IsExcessiveThread(size_t thread_id) const { + return static_cast(thread_id) >= total_threads_limit_; + } - // Return the thread priority. - // This would allow its member-thread to know its priority. - Env::Priority GetThreadPriority() const { return priority_; } + // Return the thread priority. + // This would allow its member-thread to know its priority. + Env::Priority GetThreadPriority() const { return priority_; } - // Set the thread priority. - void SetThreadPriority(Env::Priority priority) { priority_ = priority; } + // Set the thread priority. + void SetThreadPriority(Env::Priority priority) { priority_ = priority; } private: - static void* BGThreadWrapper(void* arg); + static void *BGThreadWrapper(void *arg); - bool low_io_priority_; - bool low_cpu_priority_; - Env::Priority priority_; - Env* env_; + bool low_io_priority_; // I/O优先级 + bool low_cpu_priority_; // CPU优先级 + Env::Priority priority_; // 线程优先级 + Env *env_; // 获取当前线程的环境变量 - int total_threads_limit_; - std::atomic_uint queue_len_; // Queue length. Used for stats reporting - bool exit_all_threads_; - bool wait_for_jobs_to_complete_; + int total_threads_limit_; // 线程池线程总数 + std::atomic_uint queue_len_; // 当前线程池中执行线程的排队长度 + bool exit_all_threads_; // 清理线程池时会调度所有未执行的线程 + bool wait_for_jobs_to_complete_; // 等待线程池的线程执行完毕 - // Entry per Schedule()/Submit() call - struct BGItem { - void* tag = nullptr; - std::function function; - std::function unschedFunction; - }; + // Entry per Schedule()/Submit() call + struct BGItem { + void *tag = nullptr; + std::function function; // 执行函数 + std::function unschedFunction; // 不执行函数 + }; - using BGQueue = std::deque; - BGQueue queue_; + using BGQueue = std::deque; + BGQueue queue_; // 保留线程池中调度的线程的相关信息 - std::mutex mu_; - std::condition_variable bgsignal_; - std::vector bgthreads_; + std::mutex mu_; + std::condition_variable bgsignal_; // 条件变量,唤醒正在睡眠的线程 + std::vector bgthreads_; // 存储线程的动态数组,保存需要调度的线程 }; -inline -ThreadPoolImpl::Impl::Impl() - : - low_io_priority_(false), - low_cpu_priority_(false), - priority_(Env::LOW), - env_(nullptr), - total_threads_limit_(0), - queue_len_(), - exit_all_threads_(false), - wait_for_jobs_to_complete_(false), - queue_(), - mu_(), - bgsignal_(), - bgthreads_() { +inline ThreadPoolImpl::Impl::Impl() + : + low_io_priority_(false), + low_cpu_priority_(false), + priority_(Env::LOW), + env_(nullptr), + total_threads_limit_(0), + queue_len_(), + exit_all_threads_(false), + wait_for_jobs_to_complete_(false), + queue_(), + mu_(), + bgsignal_(), + bgthreads_() { } inline @@ -145,240 +154,240 @@ ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { - std::unique_lock lock(mu_); - assert(!exit_all_threads_); + std::unique_lock lock(mu_); + assert(!exit_all_threads_); - wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; - exit_all_threads_ = true; - // prevent threads from being recreated right after they're joined, in case - // the user is concurrently submitting jobs. - total_threads_limit_ = 0; + wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; + exit_all_threads_ = true; + // prevent threads from being recreated right after they're joined, in case + // the user is concurrently submitting jobs. + total_threads_limit_ = 0; - lock.unlock(); + lock.unlock(); - bgsignal_.notify_all(); + bgsignal_.notify_all(); - for (auto& th : bgthreads_) { - th.join(); - } + for (auto &th: bgthreads_) { + th.join(); + } - bgthreads_.clear(); + bgthreads_.clear(); - exit_all_threads_ = false; - wait_for_jobs_to_complete_ = false; + exit_all_threads_ = false; + wait_for_jobs_to_complete_ = false; } inline void ThreadPoolImpl::Impl::LowerIOPriority() { - std::lock_guard lock(mu_); - low_io_priority_ = true; + std::lock_guard lock(mu_); + low_io_priority_ = true; } inline void ThreadPoolImpl::Impl::LowerCPUPriority() { - std::lock_guard lock(mu_); - low_cpu_priority_ = true; + std::lock_guard lock(mu_); + low_cpu_priority_ = true; } void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { - bool low_io_priority = false; - bool low_cpu_priority = false; - - while (true) { - // Wait until there is an item that is ready to run - std::unique_lock lock(mu_); - // Stop waiting if the thread needs to do work or needs to terminate. - while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && - (queue_.empty() || IsExcessiveThread(thread_id))) { - bgsignal_.wait(lock); - } - - if (exit_all_threads_) { // mechanism to let BG threads exit safely + bool low_io_priority = false; + bool low_cpu_priority = false; + + while (true) { + // Wait until there is an item that is ready to run + std::unique_lock lock(mu_); + // Stop waiting if the thread needs to do work or needs to terminate. + while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && + (queue_.empty() || IsExcessiveThread(thread_id))) { + bgsignal_.wait(lock); + } - if (!wait_for_jobs_to_complete_ || - queue_.empty()) { - break; - } - } + if (exit_all_threads_) { // mechanism to let BG threads exit safely - if (IsLastExcessiveThread(thread_id)) { - // Current thread is the last generated one and is excessive. - // We always terminate excessive thread in the reverse order of - // generation time. - auto& terminating_thread = bgthreads_.back(); - terminating_thread.detach(); - bgthreads_.pop_back(); + if (!wait_for_jobs_to_complete_ || + queue_.empty()) { + break; + } + } - if (HasExcessiveThread()) { - // There is still at least more excessive thread to terminate. - WakeUpAllThreads(); - } - break; - } + if (IsLastExcessiveThread(thread_id)) { + // Current thread is the last generated one and is excessive. + // We always terminate excessive thread in the reverse order of + // generation time. + auto &terminating_thread = bgthreads_.back(); + terminating_thread.detach(); + bgthreads_.pop_back(); + + if (HasExcessiveThread()) { + // There is still at least more excessive thread to terminate. + WakeUpAllThreads(); + } + break; + } - auto func = std::move(queue_.front().function); - queue_.pop_front(); + auto func = std::move(queue_.front().function); + queue_.pop_front(); - queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); + queue_len_.store(static_cast(queue_.size()), + std::memory_order_relaxed); - bool decrease_io_priority = (low_io_priority != low_io_priority_); - bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_); - lock.unlock(); + bool decrease_io_priority = (low_io_priority != low_io_priority_); + bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_); + lock.unlock(); #ifdef OS_LINUX - if (decrease_cpu_priority) { - setpriority( - PRIO_PROCESS, - // Current thread. - 0, - // Lowest priority possible. - 19); - low_cpu_priority = true; - } + if (decrease_cpu_priority) { + setpriority( + PRIO_PROCESS, + // Current thread. + 0, + // Lowest priority possible. + 19); + low_cpu_priority = true; + } - if (decrease_io_priority) { + if (decrease_io_priority) { #define IOPRIO_CLASS_SHIFT (13) #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) - // Put schedule into IOPRIO_CLASS_IDLE class (lowest) - // These system calls only have an effect when used in conjunction - // with an I/O scheduler that supports I/O priorities. As at - // kernel 2.6.17 the only such scheduler is the Completely - // Fair Queuing (CFQ) I/O scheduler. - // To change scheduler: - // echo cfq > /sys/block//queue/schedule - // Tunables to consider: - // /sys/block//queue/slice_idle - // /sys/block//queue/slice_sync - syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS - 0, // current thread - IOPRIO_PRIO_VALUE(3, 0)); - low_io_priority = true; - } + // Put schedule into IOPRIO_CLASS_IDLE class (lowest) + // These system calls only have an effect when used in conjunction + // with an I/O scheduler that supports I/O priorities. As at + // kernel 2.6.17 the only such scheduler is the Completely + // Fair Queuing (CFQ) I/O scheduler. + // To change scheduler: + // echo cfq > /sys/block//queue/schedule + // Tunables to consider: + // /sys/block//queue/slice_idle + // /sys/block//queue/slice_sync + syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS + 0, // current thread + IOPRIO_PRIO_VALUE(3, 0)); + low_io_priority = true; + } #else - (void)decrease_io_priority; // avoid 'unused variable' error - (void)decrease_cpu_priority; + (void)decrease_io_priority; // avoid 'unused variable' error + (void)decrease_cpu_priority; #endif - func(); - } + func(); + } } // Helper struct for passing arguments when creating threads. struct BGThreadMetadata { - ThreadPoolImpl::Impl* thread_pool_; - size_t thread_id_; // Thread count in the thread. - BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id) - : thread_pool_(thread_pool), thread_id_(thread_id) {} + ThreadPoolImpl::Impl *thread_pool_; + size_t thread_id_; // Thread count in the thread. + BGThreadMetadata(ThreadPoolImpl::Impl *thread_pool, size_t thread_id) + : thread_pool_(thread_pool), thread_id_(thread_id) {} }; -void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { - BGThreadMetadata* meta = reinterpret_cast(arg); - size_t thread_id = meta->thread_id_; - ThreadPoolImpl::Impl* tp = meta->thread_pool_; +void *ThreadPoolImpl::Impl::BGThreadWrapper(void *arg) { + BGThreadMetadata *meta = reinterpret_cast(arg); + size_t thread_id = meta->thread_id_; + ThreadPoolImpl::Impl *tp = meta->thread_pool_; - delete meta; - tp->BGThread(thread_id); - return nullptr; + delete meta; + tp->BGThread(thread_id); + return nullptr; } void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, - bool allow_reduce) { - std::unique_lock lock(mu_); - if (exit_all_threads_) { - lock.unlock(); - return; - } - if (num > total_threads_limit_ || - (num < total_threads_limit_ && allow_reduce)) { - total_threads_limit_ = std::max(0, num); - WakeUpAllThreads(); - StartBGThreads(); - } + bool allow_reduce) { + std::unique_lock lock(mu_); + if (exit_all_threads_) { + lock.unlock(); + return; + } + if (num > total_threads_limit_ || + (num < total_threads_limit_ && allow_reduce)) { + total_threads_limit_ = std::max(0, num); + WakeUpAllThreads(); + StartBGThreads(); + } } int ThreadPoolImpl::Impl::GetBackgroundThreads() { - std::unique_lock lock(mu_); - return total_threads_limit_; + std::unique_lock lock(mu_); + return total_threads_limit_; } void ThreadPoolImpl::Impl::StartBGThreads() { - // Start background thread if necessary - while ((int)bgthreads_.size() < total_threads_limit_) { + // Start background thread if necessary + while ((int) bgthreads_.size() < total_threads_limit_) { - std::thread p_t(&BGThreadWrapper, - new BGThreadMetadata(this, bgthreads_.size())); + std::thread p_t(&BGThreadWrapper, + new BGThreadMetadata(this, bgthreads_.size())); - bgthreads_.push_back(std::move(p_t)); - } + bgthreads_.push_back(std::move(p_t)); + } } -void ThreadPoolImpl::Impl::Submit(std::function&& schedule, - std::function&& unschedule, void* tag) { +void ThreadPoolImpl::Impl::Submit(std::function &&schedule, + std::function &&unschedule, void *tag) { - std::lock_guard lock(mu_); + std::lock_guard lock(mu_); - if (exit_all_threads_) { - return; - } + if (exit_all_threads_) { + return; + } - StartBGThreads(); + StartBGThreads(); // 启动线程 - // Add to priority queue - queue_.push_back(BGItem()); + // Add to priority queue + queue_.push_back(BGItem()); - auto& item = queue_.back(); - item.tag = tag; - item.function = std::move(schedule); - item.unschedFunction = std::move(unschedule); + auto &item = queue_.back(); + item.tag = tag; + item.function = std::move(schedule); + item.unschedFunction = std::move(unschedule); - queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); + queue_len_.store(static_cast(queue_.size()), + std::memory_order_relaxed); - if (!HasExcessiveThread()) { - // Wake up at least one waiting thread. - bgsignal_.notify_one(); - } else { - // Need to wake up all threads to make sure the one woken - // up is not the one to terminate. - WakeUpAllThreads(); - } + if (!HasExcessiveThread()) { + // Wake up at least one waiting thread. + bgsignal_.notify_one(); + } else { + // Need to wake up all threads to make sure the one woken + // up is not the one to terminate. + WakeUpAllThreads(); + } } -int ThreadPoolImpl::Impl::UnSchedule(void* arg) { - int count = 0; - - std::vector> candidates; - { - std::lock_guard lock(mu_); - - // Remove from priority queue - BGQueue::iterator it = queue_.begin(); - while (it != queue_.end()) { - if (arg == (*it).tag) { - if (it->unschedFunction) { - candidates.push_back(std::move(it->unschedFunction)); +int ThreadPoolImpl::Impl::UnSchedule(void *arg) { + int count = 0; + + std::vector> candidates; + { + std::lock_guard lock(mu_); + + // Remove from priority queue + BGQueue::iterator it = queue_.begin(); + while (it != queue_.end()) { + if (arg == (*it).tag) { + if (it->unschedFunction) { + candidates.push_back(std::move(it->unschedFunction)); + } + it = queue_.erase(it); + count++; + } else { + ++it; + } } - it = queue_.erase(it); - count++; - } else { - ++it; - } + queue_len_.store(static_cast(queue_.size()), + std::memory_order_relaxed); } - queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); - } - // Run unschedule functions outside the mutex - for (auto& f : candidates) { - f(); - } + // Run unschedule functions outside the mutex + for (auto &f: candidates) { + f(); + } - return count; + return count; } ThreadPoolImpl::ThreadPoolImpl() : - impl_(new Impl()) { + impl_(new Impl()) { } @@ -386,79 +395,79 @@ ThreadPoolImpl::~ThreadPoolImpl() { } void ThreadPoolImpl::JoinAllThreads() { - impl_->JoinThreads(false); + impl_->JoinThreads(false); } void ThreadPoolImpl::SetBackgroundThreads(int num) { - impl_->SetBackgroundThreadsInternal(num, true); + impl_->SetBackgroundThreadsInternal(num, true); } int ThreadPoolImpl::GetBackgroundThreads() { - return impl_->GetBackgroundThreads(); + return impl_->GetBackgroundThreads(); } unsigned int ThreadPoolImpl::GetQueueLen() const { - return impl_->GetQueueLen(); + return impl_->GetQueueLen(); } void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() { - impl_->JoinThreads(true); + impl_->JoinThreads(true); } void ThreadPoolImpl::LowerIOPriority() { - impl_->LowerIOPriority(); + impl_->LowerIOPriority(); } void ThreadPoolImpl::LowerCPUPriority() { - impl_->LowerCPUPriority(); + impl_->LowerCPUPriority(); } void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { - impl_->SetBackgroundThreadsInternal(num, false); + impl_->SetBackgroundThreadsInternal(num, false); } -void ThreadPoolImpl::SubmitJob(const std::function& job) { - auto copy(job); - impl_->Submit(std::move(copy), std::function(), nullptr); +void ThreadPoolImpl::SubmitJob(const std::function &job) { + auto copy(job); + impl_->Submit(std::move(copy), std::function(), nullptr); } -void ThreadPoolImpl::SubmitJob(std::function&& job) { - impl_->Submit(std::move(job), std::function(), nullptr); +void ThreadPoolImpl::SubmitJob(std::function &&job) { + impl_->Submit(std::move(job), std::function(), nullptr); } -void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg, - void* tag, void(*unschedFunction)(void* arg)) { - if (unschedFunction == nullptr) { - impl_->Submit(std::bind(function, arg), std::function(), tag); - } else { - impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg), - tag); - } +void ThreadPoolImpl::Schedule(void(*function)(void *arg1), void *arg, + void *tag, void(*unschedFunction)(void *arg)) { + if (unschedFunction == nullptr) { + impl_->Submit(std::bind(function, arg), std::function(), tag); + } else { + impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg), + tag); + } } -int ThreadPoolImpl::UnSchedule(void* arg) { - return impl_->UnSchedule(arg); +int ThreadPoolImpl::UnSchedule(void *arg) { + return impl_->UnSchedule(arg); } -void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); } +void ThreadPoolImpl::SetHostEnv(Env *env) { impl_->SetHostEnv(env); } -Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } +Env *ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } // Return the thread priority. // This would allow its member-thread to know its priority. Env::Priority ThreadPoolImpl::GetThreadPriority() const { - return impl_->GetThreadPriority(); + return impl_->GetThreadPriority(); } // Set the thread priority. void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { - impl_->SetThreadPriority(priority); + impl_->SetThreadPriority(priority); } -ThreadPool* NewThreadPool(int num_threads) { - ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); - thread_pool->SetBackgroundThreads(num_threads); - return thread_pool; +ThreadPool *NewThreadPool(int num_threads) { + ThreadPoolImpl *thread_pool = new ThreadPoolImpl(); + thread_pool->SetBackgroundThreads(num_threads); + return thread_pool; }