第9章:线程池
# 第9章:线程池
在前面几章中,我们学习了如何创建和管理线程。尽管这种方法可行,但在某些情况下可能有些过头了。有时,我们需要在不同的线程上执行一些有边界的操作,但每次都创建新线程会带来额外开销。线程并非毫无成本:内核需要有管理线程信息的结构,线程拥有用户模式和内核模式的栈,而且线程的创建本身也需要耗费时间。如果线程的预期生命周期相对较短,那么这些额外开销就会变得很可观。
本章所讨论的线程池与托管进程中可用的 .NET 或 .NET Core 线程池没有关联。公共语言运行时(CLR)/CoreCLR有自己的线程池实现。
本章内容:
- 为何使用线程池?
- 线程池工作回调
- 线程池等待回调
- 线程池定时器回调
- 线程池I/O回调
- 线程池实例操作
- 回调环境
- 私有线程池
- 清理组
# 为何使用线程池?
Windows提供了线程池,这是一种机制,允许将操作发送给线程池中的某个线程去执行。与手动创建和管理线程相比,使用线程池具有以下优势:
- 客户端代码无需显式创建或终止线程,这些操作由线程池管理器负责处理。
- 已完成操作的工作线程不会被销毁,而是返回线程池以处理另一个请求。
- 线程池中的线程数量可以根据工作项的负载动态增加或减少。
Windows 2000是首个支持线程池的Windows版本,它为每个进程提供一个线程池。从Windows Vista开始,线程池API得到了显著增强,其中包括添加了私有线程池,这意味着一个进程中可以存在多个线程池。
我们仅介绍较新的API,因为除非你的目标是Windows Vista之前的版本,否则没有充分的理由使用旧API。 |
---|
线程池服务被一些Windows函数和第三方库在内部使用,所以即便你没有显式使用任何与线程池相关的API,也可能会发现存在线程池线程。如果你运行像记事本这样的简单应用程序,它原本并不需要多个线程,但当你在进程资源管理器(Process Explorer)中查看该进程时,可能会发现它有几个线程,其中一些线程的起始函数是ntdll!TppWorkerThread
(图9-1),这是线程池线程的起始函数。如果你让记事本进程运行一段时间,在其处于空闲状态一段时间后,你可能会发现线程池线程消失了(图9-2)。
“Tpp”是“ThreadPool Private”的缩写,即与线程池相关的一个私有(未导出)函数。负责管理线程池的内核对象名为
TpWorkerFactory
。
图9-1:记事本中的线程池线程
图9-2:记事本中没有线程池线程
你可以通过运行我的对象资源管理器工具(Object Explorer tool)来了解系统中的线程池数量,该工具可从https://github.com/zodiacon/AllTools或https://github.com/zodiacon/ObjectExplorer/Releases下载。打开工具后,会显示对象类型。按名称排序并查找TpWorkerFactory
(图9-3),留意系统中此类对象的数量。
图9-3:对象资源管理器中显示的TpWorkerFactory对象数量
右键单击TpWorkerFactory
对象并选择“所有对象”,你可以查看更多详细信息,从而了解进程中存在多少个线程池(图9-4)。
图 9-4:系统中的 TpWorkerFactory 对象
# 线程池工作回调
向线程池提交工作项最简单的API是TrySubmitThreadPoolCallback
:
typedef VOID (NTAPI *PTP_SIMPLE_CALLBACK)(
_Inout_ PTP_CALLBACK_INSTANCE Instance,
_Inout_opt_ PVOID Context);
BOOL TrySubmitThreadpoolCallback(
_In_ PTP_SIMPLE_CALLBACK pfns,
_Inout_opt_ PVOID pv,
_In_opt_ PTP_CALLBACK_ENVIRON pcbe);
2
3
4
5
6
7
8
TrySubmitThreadPoolCallback
会设置作为第一个参数提供的回调,由线程池进行调用。第二个参数允许指定一个上下文值,该值会原样传递给回调函数。最后一个与回调环境相关的可选参数可以设置为NULL
。我们将在本章后面探讨回调环境。
回调函数本身会被传入两个参数,其中第二个参数是传递给TrySubmitThreadPoolCallback
的上下文。第一个参数的类型为PTP_CALLBACK_INSTANCE
,它是一个不透明的指针,代表这个回调实例。我们也会在本章后面讨论这个参数。
该函数在成功时返回TRUE
,在大多数情况下都应该是成功的,除非Windows处于极端的内存压力之下。一旦提交,回调会尽快由线程池中的一个线程执行。提交的请求没有内置的取消方式,也没有直接的方法知晓回调何时完成执行。当然,我们可以添加自己的机制,比如在回调中发出事件对象的信号,并在另一个线程上等待它。
# 简单工作应用程序
简单工作(Simple Work)示例应用程序在执行时如图9-5所示,它允许使用TrySubmitThreadPoolCallback
将工作项提交到线程池(thread pool),同时观察每个回调在哪个线程下执行。与此同时,该应用程序会显示进程中的线程数量。
图9-5:简单工作应用程序
应用程序启动时,线程数量应该较少,通常为1或4。点击“提交工作项”按钮可提交单个工作项。如果线程数量大于1,那么线程数量很可能保持不变,因为至少有一个线程池线程已经处于活动状态,可以处理该请求(图9-6)。
多次点击同一个按钮会启动更多工作项,随着线程池“感知”到更高的负载,线程数量应该会增加。
图9-6:提交到线程池的单个工作项
现在多次点击“提交10个工作项”按钮,观察线程数量大幅增加(图9-7)。
图9-7:提交到线程池的多个工作项
如果不提交很多工作项,只需等待一段时间,线程数量就会开始减少。如果时间足够长,线程数量会减少到1,只剩下主用户界面(UI,User Interface)线程处于活动状态(图9-8)。这个应用程序展示了线程池的动态特性。
图9-8:闲置一段时间后的简单工作应用程序
“提交”按钮只是调用TrySubmitThreadPoolCallback
:
LRESULT CMainDlg::OnSubmitWorkItem(WORD, WORD wID, HWND, BOOL&) {
if (!::TrySubmitThreadpoolCallback(OnCallback, this, nullptr))
AtlMessageBox(*this, L"Failed to submit work item callback",
IDR_MAINFRAME, MB_ICONERROR);
return 0;
}
LRESULT CMainDlg::OnSubmit10WorkItems(WORD, WORD, HWND, BOOL&) {
for (int i = 0; i < 10; i++) {
if (!::TrySubmitThreadpoolCallback(OnCallback, this, nullptr)) {
AtlMessageBox(*this, L"Failed to submit work item callback",
IDR_MAINFRAME, MB_ICONERROR);
break;
}
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
上下文参数设置为this
,这样静态回调函数(OnCallback
)就可以访问对话框对象。该回调函数通过短暂休眠来模拟工作,并通过PostMessage
向对话框发送消息,指示它在哪个线程上被调用:
void CMainDlg::OnCallback(PTP_CALLBACK_INSTANCE instance, PVOID context) {
auto dlg = (CMainDlg*)context;
// post message indicating start
dlg->PostMessage(WM_APP + 1, ::GetCurrentThreadId());
// simulate work...
::Sleep(10 * (::GetTickCount() & 0xff));
// post message indicating end
dlg->PostMessage(WM_APP + 2, ::GetCurrentThreadId());
}
2
3
4
5
6
7
8
9
10
11
这些消息通过正常的Windows模板库(WTL,Windows Template Library )消息映射映射到由用户界面(主)线程调用的函数。这是因为任何发送到窗口的消息总是被放入窗口创建线程的消息队列中,并且只有该线程被允许从队列中检索消息并处理它们。下面是自定义WM_APP + 1
消息的处理程序:
LRESULT CMainDlg::OnCallbackStart(UINT, WPARAM wParam, LPARAM, BOOL&) {
CString text;
text.Format(L"Started on thread %d", wParam);
m_List.AddString(text);
return 0;
}
2
3
4
5
6
7
第二条消息除了文本本身之外,其他都相同。
获取当前进程中的线程数量出奇地棘手,因为没有文档记录(实际上也没有未文档记录)的应用程序编程接口(API,Application Programming Interface )可以直接获取该值。这里使用第3章讨论的工具帮助应用程序编程接口(Tool help API)来定位当前进程,进程中的线程数量作为PROCESSENTRY32
结构的一部分提供。这段代码是WM_TIMER
消息处理程序的一部分,每2秒调用一次以更新当前线程计数:
auto hSnapshot = ::CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
if (hSnapshot == INVALID_HANDLE_VALUE)
return 0;
PROCESSENTRY32 pe;
pe.dwSize = sizeof(pe);
::Process32First(hSnapshot, &pe);
// skip idle process
auto pid = ::GetCurrentProcessId();
ULONG threads = 0;
while (::Process32Next(hSnapshot, &pe)) {
if (pe.th32ProcessID == pid) {
threads = pe.cntThreads;
break;
}
}
::CloseHandle(hSnapshot);
CString text;
text.Format(L"Threads: %u\n", threads);
SetDlgItemText(IDC_THREADS, text);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 控制工作项
使用TrySubmitThreadPoolCallback
相当简单,但有时你需要更多的控制。例如,你可能想知道回调何时完成,或者如果满足某些条件,你可能想要取消工作项。对于这些情况,你可以显式地创建一个线程池工作项。实现这一点的应用程序编程接口是CreateThreadPoolWork
:
PTP_WORK CreateThreadpoolWork(
_In_ PTP_WORK_CALLBACK pfnwk,
_Inout_opt_ PVOID pv,
_In_opt_ PTP_CALLBACK_ENVIRON pcbe);
2
3
4
该函数看起来与TrySubmitThreadPoolCallback
类似,但有两个不同之处。第一个是返回值,它是一个不透明的PTP_WORK
指针,表示工作项,如果失败则为NULL
。第二个不同之处是回调函数的原型,如下所示:
typedef VOID (CALLBACK *PTP_WORK_CALLBACK)(
_Inout_ PTP_CALLBACK_INSTANCE Instance,
_Inout_opt_ PVOID Context,
_Inout_ PTP_WORK Work);
2
3
4
回调函数有第三个参数,它是最初从CreateThreadPoolWork
返回的工作项对象。一旦创建了工作项对象,就可以通过调用SubmitThreadPoolWork
提交(可能多次提交):
VOID SubmitThreadpoolWork(
_Inout_ PTP_WORK Work
);
2
3
注意,该函数返回void
,这意味着它不会失败。这是因为如果CreateThreadPoolWork
成功,SubmitThreadPoolWork
就不可能失败。另一方面,每次调用TrySubmitThreadPoolCallback
都有可能失败。
允许对同一个工作对象多次调用SubmitThreadPoolWork
。潜在的缺点是,所有这些提交都使用相同的回调函数和相同的上下文,因为这些只能在工作对象创建时提供。一旦提交,可以使用WaitForThreadPoolWorkCallbacks
对提交的回调进行一些控制:
void WaitForThreadpoolWorkCallbacks(
_Inout_ PTP_WORK pwk,
_In_ BOOL fCancelPendingCallbacks);
2
3
第一个参数是从CreateThreadPoolWork
返回的工作对象。如果fCancelPendingCallbacks
为FALSE
,调用线程将进入等待状态,直到通过该工作项提交的所有回调都完成。如果还没有提交任何回调,该函数会立即返回。
如果fCancelPendingCallbacks
为TRUE
,该函数将取消任何已提交但尚未开始执行的回调。该函数永远不会取消正在进行的回调——这没有意义,因为这样做的唯一方法是强制终止线程池线程,这是个坏主意。调用线程会等待所有当前正在执行的回调完成,然后才结束等待。
最后,线程池工作对象最终必须使用CloseThreadPoolWork
释放:
void CloseThreadpoolWork(_Inout_ PTP_WORK pwk);
修改简单工作应用程序,使用CreateThreadPoolWork
以及本节中描述的相关函数。(解决方案是名为SimpleWork2
的项目,与本章的其他示例放在一起。)
Windows实现库(WIL,Windows Implementation Library )为线程池工作对象提供了句柄:wil::unique_threadpool_work
、unique_threadpool_work_nocancel
和unique_threadpool_work_nowait
。“nowait
”变体在工作对象超出作用域时只是关闭该对象。前两个变体调用WaitForThreadPoolWorkCallbacks
等待所有挂起的回调完成,前者的取消参数设置为TRUE
,后者设置为FALSE
。
# MD5计算器应用程序
第7章中的MD5计算应用程序为每次需要的新计算创建一个新线程。这效率不高,这里可以使用线程池作为替代方案。需要替换的代码在CView::OnStartCalc
中,目前在该函数中为每次需要的计算创建一个线程:
// spawn a thread to do the actual calculation
auto data = new CalcThreadData;
data->View = this;
data->Index = (int)index;
auto hThread = ::CreateThread(nullptr, 0, [](auto param) {
auto data = (CalcThreadData*)param;
auto view = data->View;
auto index = data->Index;
delete data;
return view->DoCalc(index);
}, data, 0, nullptr);
if (!hThread) {
AtlMessageBox(nullptr, L"Failed to create worker thread!",
IDR_MAINFRAME, MB_ICONERROR);
return 0;
}
::CloseHandle(hThread);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
替换这段代码的最简单方法是使用TrySubmitThreadPoolCallback
,如下所示:
auto data = new CalcThreadData;
data->View = this;
data->Index = (int)index;
if (!::TrySubmitThreadpoolCallback([](auto instance, auto param) {
auto data = (CalcThreadData*)param;
auto view = data->View;
auto index = data->Index;
delete data;
view->DoCalc(index);
}, data, nullptr)) {
AtlMessageBox(nullptr, L"Failed to submit thread pool work!",
IDR_MAINFRAME, MB_ICONERROR);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
虽然CreateThread
和TrySubmitThreadPoolCallback
都有可能失败,但TrySubmitThreadPoolCallback
失败的可能性较小,因为它比创建一个新线程需要的资源更少。
另一种选择是使用CreateThreadPoolWork
创建一个完整的工作对象。然而,在这种情况下,它的好处较少,因为我们每次提交工作项时都需要不同的上下文,所以它的主要优点是能够等待并可能取消挂起的操作。不过,让我们借助WIL的包装器来实现这一点:
auto data = new CalcThreadData;
data->View = this;
data->Index = (int)index;
wil::unique_threadpool_work_nowait work(::CreateThreadpoolWork(
[](auto instance, auto param, auto work) {
auto data = (CalcThreadData*)param;
auto view = data->View;
auto index = data->Index;
delete data;
view->DoCalc(index);
}, data, nullptr));
if (!work) {
AtlMessageBox(nullptr, L"Failed to submit thread pool work!",
IDR_MAINFRAME, MB_ICONERROR);
return 0;
}
::SubmitThreadpoolWork(work.get());
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
我们选择unique_threadpool_work_nowait
变体,因为当工作对象超出作用域(在函数结束时发生)时,我们不想等待挂起的操作完成。在这个例子中,使用手动创建的工作项并没有实际好处,但在其他情况下,这种方法可能会有帮助。
# 线程池等待回调(Thread Pool Wait Callbacks)
在第8章“使用事件(Working with Events)”一节中,我们看了一个示例,其中几个进程需要等待一个公共事件(“关闭”),为此,每个进程都创建一个线程来执行等待操作。正如该节所述,这种做法效率低下——线程应该执行实际工作,而不是等待。更好的方法是让线程池等待该事件。乍一看,这似乎没什么区别:线程池线程不也只是在等待吗?如果是这样,它与应用程序创建的线程有什么不同呢?
区别在于,同一个线程池线程可以等待应用程序以及可能其他Windows API和库提交的多个对象。每个这样的线程都可以使用熟悉的WaitForMultipleObjects
函数同时等待多达64个对象。
如果需要等待超过64个对象,可以从线程池中启动另一个线程来完成此任务。
要创建一个线程池等待对象,可调用CreateThreadpoolWait
函数:
typedef VOID (NTAPI *PTP_WAIT_CALLBACK)(
_Inout_ PTP_CALLBACK_INSTANCE Instance,
_Inout_opt_ PVOID Context,
_Inout_ PTP_WAIT Wait,
_In_ TP_WAIT_RESULT WaitResult); // 只是一个DWORD类型
PTP_WAIT CreateThreadpoolWait(
_In_ PTP_WAIT_CALLBACK pfnwa,
_Inout_opt_ PVOID pv,
_In_opt_ PTP_CALLBACK_ENVIRON pcbe);
2
3
4
5
6
7
8
9
10
此时,这种模式应该很明显了,因为该函数和回调与CreateThreadpoolWork
非常相似。CreateThreadpoolWait
的参数基本相同,只是回调函数略有不同。它提供了一个额外的参数,用于指定回调被调用的原因;也就是说,当对一个对象的等待操作结束时会调用回调,因此WaitResult
指定了等待结束的原因。可能的值包括WAIT_OBJECT_0
,表示对象已发出信号;以及WAIT_TIMEOUT
,表示在对象未发出信号的情况下超时时间已到。
CreateThreadpoolWait
返回一个不透明的PTP_WAIT
指针,代表等待对象。现在,可以使用SetThreadpoolWait
提交实际的等待请求:
VOID SetThreadpoolWait(
_Inout_ PTP_WAIT pwa,
_In_opt_ HANDLE h,
_In_opt_ PFILETIME pftTimeout);
2
3
4
除了等待对象外,该函数还接受要等待的句柄,超时参数指定等待的时长。这个值的格式与第8章中关于可等待定时器(waitable timers)讨论的格式相同:负数表示以100纳秒为单位的相对时间,正数表示从1601年1月1日协调世界时(UTC)午夜开始以100纳秒为单位的绝对时间。有关如何指定此值的完整讨论,请参阅第8章。NULL
指针表示无限期等待 。
与线程池工作对象不同,对同一个等待对象(PTP_WAIT
)调用SetThreadpoolWait
会取消当前的等待(如果尚未执行),并用(可能是)新的句柄和超时时间替换等待操作。将句柄设置为NULL
会停止排队新的回调。
对于我们的关闭事件示例,可以通过以下代码实现等待:
void ConfigureWait() {
HANDLE hShutdown = ::CreateEvent(nullptr, TRUE, FALSE, L"ShutdownEvent");
auto wait = ::CreateThreadpoolWait(OnWaitSatisfied, nullptr, nullptr);
::SetThreadpoolWait(wait, hShutdown, nullptr);
// 继续正常运行...
}
void OnWaitSatisfied(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT) {
// 由于等待请求指定了无限时间,我们到这里意味着事件已发出信号
DoShutdown(); // 启动关闭操作
}
2
3
4
5
6
7
8
9
10
11
12
13
还有一个扩展的设置函数:
BOOL SetThreadpoolWaitEx(
_Inout_ PTP_WAIT pwa,
_In_opt_ HANDLE h,
_In_opt_ PFILETIME pftTimeout,
_Reserved_ PVOID Reserved);
2
3
4
5
该函数与SetThreadpoolWait
基本相同,只是返回值不同。它返回TRUE
表示等待操作处于活动状态且已被替换;如果返回FALSE
,则表示之前注册的句柄的回调已执行或即将执行。
与线程池工作项类似,可以使用WaitForThreadpoolWaitCallbacks
对等待操作进行一些控制:
VOID WaitForThreadpoolWaitCallbacks(
_Inout_ PTP_WAIT pwa,
_In_ BOOL fCancelPendingCallbacks);
2
3
WaitForThreadpoolWaitCallbacks
的语义和行为与WaitForThreadpoolWorkCallbacks
基本相同。
最后,需要使用CloseThreadpoolWait
释放等待项:
VOID CloseThreadpoolWait(_Inout_ PTP_WAIT pwa);
# 线程池定时器回调(Thread Pool Timer Callbacks)
在第8章中,我们介绍了可等待定时器内核对象(awaitable timer kernel object),它可以在到期时启动操作,还可以选择周期性地执行。然而,启动操作并不是很方便。这需要一些等待操作(现在我们知道可以通过线程池来完成),或者在调用SetWaitableTimer
的线程上作为异步过程调用(APC)运行回调。线程池提供了另一项服务,它可以在指定的时间段过去后(也可以选择周期性地)直接从线程池中调用回调。
相关函数的语义与我们已经看到的非常相似。以下是创建线程池定时器对象的函数:
typedef VOID (CALLBACK *PTP_TIMER_CALLBACK)(
_Inout_ PTP_CALLBACK_INSTANCE Instance,
_Inout_opt_ PVOID Context,
_Inout_ PTP_TIMER Timer);
PTP_TIMER CreateThreadpoolTimer(
_In_ PTP_TIMER_CALLBACK pfnti,
_Inout_opt_ PVOID pv,
_In_opt_ PTP_CALLBACK_ENVIRON pcbe);
2
3
4
5
6
7
8
9
此时,函数参数应该很容易理解了。CreateThreadpoolTimer
返回一个不透明的指针,代表定时器对象。要启动一个实际的定时器,可调用SetThreadpoolTimer
函数:
VOID SetThreadpoolTimer(
_Inout_ PTP_TIMER pti,
_In_opt_ PFILETIME pftDueTime,
_In_ DWORD msPeriod,
_In_opt_ DWORD msWindowLength);
2
3
4
5
pftDueTime
参数指定到期时间,其格式与SetWaitableTimer
和SetThreadpoolWait
中使用的格式相同,尽管它的类型是FILETIME
。如果此参数为NULL
,会使定时器对象停止排队新的到期请求(但已经排队的请求将在到期时被调用)。msPeriod
是请求的周期,以毫秒为单位。如果指定为零,则定时器为一次性定时器。最后一个参数的用法与SetWaitableTimerEx
的最后一个参数类似——可接受的容差(以毫秒为单位),以便进行定时器合并以节省电量。
对同一个定时器对象再次调用该函数会取消回调,并用新信息替换定时器。
与等待对象的情况类似,也有一个扩展函数:
BOOL SetThreadpoolWaitEx(
_Inout_ PTP_WAIT pwa,
_In_opt_ HANDLE h,
_In_opt_ PFILETIME pftTimeout,
_Reserved_ PVOID Reserved);
2
3
4
5
与非扩展函数相比,该函数的唯一区别是返回值。如果之前的定时器有效且现在已被替换,函数返回TRUE
;否则返回FALSE
。
要确定定时器对象上是否设置了定时器,可以调用IsThreadpoolTimerSet
函数:
BOOL WINAPI IsThreadpoolTimerSet(_Inout_ PTP_TIMER pti);
正如你可能预期的那样,可以使用WaitForThreadpoolTimerCallbacks
等待并可能取消定时器:
VOID WaitForThreadpoolTimerCallbacks(
_Inout_ PTP_TIMER pti,
_In_ BOOL fCancelPendingCallbacks);
2
3
最后,应该关闭定时器池对象:
VOID CloseThreadpoolTimer(_Inout_ PTP_TIMER pti);
# 简单定时器示例(The Simple Timer Sample)
第8章中的简单定时器示例可以重写为使用线程池定时器对象,而不是使用可等待定时器。以下是完整代码:
void CALLBACK OnTimer(PTP_CALLBACK_INSTANCE inst, PVOID context, PTP_TIMER timer) {
printf("TID: %u Ticks: %u\n", ::GetCurrentThreadId(), ::GetTickCount());
}
int main() {
auto timer = ::CreateThreadpoolTimer(OnTimer, nullptr, nullptr);
if (!timer) {
printf("Failed to create a thread pool timer (%u)", ::GetLastError());
return 1;
}
static_assert(sizeof(LONG64) == sizeof(FILETIME), "something weird!");
LONG64 interval;
interval = -10000 * 1000LL;
::SetThreadpoolTimer(timer, (FILETIME*)&interval, 1000, 0);
printf("Main thread ID: %u\n", ::GetCurrentThreadId());
::Sleep(10000);
::WaitForThreadpoolTimerCallbacks(timer, TRUE);
::CloseThreadpoolTimer(timer);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static_assert
关键字进行了合理性检查,以确保64位整数可以与FILETIME
结构同等对待。定时器设置为间隔一秒,周期为一秒。对Sleep
的调用只是使主线程等待,同时定时器回调会按指定进行调用。
# 线程池I/O回调(Thread Pool I/O Callbacks)
线程池处理的I/O回调用于为异步I/O操作提供服务。这将在第11章“文件和设备I/O(File and Device I/O)”中讨论。
# 线程池实例操作(Thread Pool Instance Operations)
我们还有两个参数尚未描述——回调环境和提供给回调的实例参数。在本节中,我们将介绍实例参数,在下一节中,我们将研究回调环境。
不透明的实例参数(类型为PTP_CALLBACK_INSTANCE
)与几个可以在回调中调用的函数一起使用。我们从CallbackMayRunLong
函数开始:
BOOL CallbackMayRunLong(_Inout_ PTP_CALLBACK_INSTANCE pci);
调用此函数会向线程池提示该回调可能会长时间运行,因此线程池应将此线程视为不属于线程池线程限制的一部分,并生成一个新线程来处理下一个请求,因为当前这个线程不太可能很快结束。如果线程池能够为下一个请求生成新线程,该函数返回TRUE
;如果此时无法生成新线程,则返回FALSE
。长时间运行标志仍然会应用于该实例。
接下来的一组函数请求线程池在回调真正结束且线程返回线程池之前执行特定操作:
VOID SetEventWhenCallbackReturns(
_Inout_ PTP_CALLBACK_INSTANCE pci,
_In_ HANDLE evt);
VOID ReleaseSemaphoreWhenCallbackReturns(
_Inout_ PTP_CALLBACK_INSTANCE pci,
_In_ HANDLE sem,
_In_ DWORD crel);
VOID ReleaseMutexWhenCallbackReturns(
_Inout_ PTP_CALLBACK_INSTANCE pci,
_In_ HANDLE mut);
VOID LeaveCriticalSectionWhenCallbackReturns(
_Inout_ PTP_CALLBACK_INSTANCE pci,
_Inout_ PCRITICAL_SECTION pcs);
VOID FreeLibraryWhenCallbackReturns(
_Inout_ PTP_CALLBACK_INSTANCE pci,
_In_ HMODULE mod);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
前四个函数相当直观,在回调返回之前分别调用以下函数:SetEvent
、ReleaseSemaphore
(计数为crel
参数的值)、ReleaseMutex
、LeaveCriticalSection
。这可能看起来没什么大不了的——客户端提供的回调函数难道不能直接调用相应的函数吗?可以,但这需要通过原始函数中的上下文参数传递相关对象/句柄,这可能不太方便,甚至会有问题,因为在创建相关线程池时,上下文只能设置一次。
列表中的最后一个函数FreeLibraryWhenCallbackReturns
,会在回调结束时调用FreeLibrary
卸载动态链接库(DLL)。它还有一个额外的好处,即能够卸载回调本身所在的DLL。如果回调自己调用FreeLibrary
,会导致致命错误,因为该函数会卸载自身代码,一旦FreeLibrary
返回,就会导致内存访问冲突。让线程池调用FreeLibrary
可以解决这个问题,因为调用者不属于该DLL。
最后一个对实例参数进行操作的函数是DisassociateCurrentThreadFromCallback
:
void DisassociateCurrentThreadFromCallback(_Inout_ PTP_CALLBACK_INSTANCE pci);
调用此函数会告诉线程池,该回调已完成其重要工作,因此其他可能正在等待WaitForThreadpoolWorkCallbacks
等函数以满足其等待条件的线程可以继续执行,即使该回调在技术上仍在执行。如果该回调是清理组(下一节将介绍)的一部分,此函数不会更改该关联。
# 回调环境
每个线程池对象创建函数都有一个类型为PTP_CALLBACK_ENVIRON
的最后参数,称为回调环境。其目的是在使用线程池函数时实现进一步的自定义。该结构体本身在<winnt.h>
头文件中定义,但你应将该结构体视为不透明的,只能通过本节所述的函数来操作它。
若要将回调环境初始化为干净状态,请调用InitializeThreadpoolEnvironment
:
VOID InitializeThreadpoolEnvironment(_Out_ PTP_CALLBACK_ENVIRON pcbe);
回调环境API函数通过调用ntdll.dll
中的其他函数内联实现。例如,InitializeThreadpoolEnvironment
调用TpInitializeCallbackEnviron
,后者也在同一头文件中实现。只有将回调环境传递给线程池函数后,其实际效果才会显现出来。
回到InitializeThreadpoolEnvironment
——目前,该函数会将除Version
字段以外的所有成员清零,Version
字段在Windows 7及更高版本中设置为3,在Vista中设置为1。最后,应使用DestroyThreadpoolEnvironment
销毁该环境:
VOID DestroyThreadpoolEnvironment(_Inout_ PTP_CALLBACK_ENVIRON);
目前,这个函数不执行任何操作,但在未来版本的Windows中可能会有所变化,所以,一旦不再需要该环境对象,调用这个函数是一个好习惯。
回调环境初始化后,可以调用一组函数来定制环境结构体的各个成员。表9-1总结了这些函数及其含义。
函数 | 描述(对与回调环境关联的回调有效) |
---|---|
SetThreadpoolCallbackPool | 设置回调要使用的线程池对象 |
SetThreadpoolCallbackPriority | 设置回调的优先级 |
SetThreadpoolCallbackRunsLong | 设置一个提示,表明回调是长时间运行的 |
SetThreadpoolCallbackLibrary | 表明回调是动态链接库(DLL)的一部分,用于同步动态链接库处理的某些部分 |
SetThreadpoolCallbackCleanupGroup | 将回调与清理组关联(本章后面会介绍) |
回调环境、线程池和各种线程池项之间的关系如图9-9所示。
图9-9:各种线程池实体之间的关系
有些函数需要更详细的讨论。SetThreadpoolCallbackPool
用于设置一个不同于进程默认线程池的线程池。下一节将展示如何创建线程池。
SetThreadpoolCallbackPriority
用于为回调设置相对于在同一线程池上运行的其他回调的优先级:
VOID SetThreadpoolCallbackPriority(
_Inout_ PTP_CALLBACK_ENVIRON pcbe,
_In_ TP_CALLBACK_PRIORITY Priority
);
2
3
4
Priority
参数可以是TP_CALLBACK_PRIORITY
枚举中的值之一:
typedef enum _TP_CALLBACK_PRIORITY {
TP_CALLBACK_PRIORITY_HIGH,
TP_CALLBACK_PRIORITY_NORMAL,
TP_CALLBACK_PRIORITY_LOW
} TP_CALLBACK_PRIORITY;
2
3
4
5
高优先级的回调保证会在低优先级的回调之前启动。这在同一线程池中提供了一定程度的灵活性。
此函数在Windows 7和Server 2008 R2中添加。
调用SetThreadpoolCallbackLibrary
可让线程池知道该回调是动态链接库(DLL)的一部分,因此只要存在针对该环境的回调,线程池就应使该动态链接库在进程中保持加载状态。如果其他线程试图获取加载程序锁(加载程序锁在“动态链接库”一章中讨论),该函数还有助于防止死锁。
# 专用线程池
默认情况下,进程有一个单一的线程池,该线程池无法被销毁。当回调没有自定义回调环境时,使用的就是这个线程池。通过回调环境,可以通过调用SetThreadpoolCallbackPool
将回调定向到不同的线程池:
VOID SetThreadpoolCallbackPool(
_Inout_ PTP_CALLBACK_ENVIRON pcbe,
_In_ PTP_POOL ptpp
);
2
3
4
线程池本身由不透明的PTP_POOL
指针表示。使用CreateThreadpool
函数可以创建一个专用线程池:
PTP_POOL CreateThreadpool(_Reserved_ PVOID reserved);
如前所述,唯一的参数是保留参数,应设置为NULL
。如果函数成功,它将返回一个不透明指针,供后续调用使用。在资源极度匮乏的情况下,函数可能会失败并返回NULL
。
有了线程池后,可以使用一些函数在一定程度上对其进行定制。最重要的函数与线程池中的最小和最大线程数有关:
VOID SetThreadpoolThreadMaximum(
_Inout_ PTP_POOL ptpp,
_In_ DWORD cthrdMost
);
BOOL SetThreadpoolThreadMinimum(
_Inout_ PTP_POOL ptpp,
_In_ DWORD cthrdMic
);
2
3
4
5
6
7
8
这些函数的功能很直观。默认的最大线程数是512,最小线程数是0。不过,这些数字不应依赖,所以最好调用上述函数来设置合适的值——毕竟这是创建专用线程池的主要原因之一。如果最小线程数大于零,那么会预先创建这么多线程,随时准备处理回调。
奇怪的是,没有文档记录的用于获取当前最小和最大线程数的反向函数。
这可以通过调用原生API函数
NtQueryInformationWorkerFactory
来实现(有关具体实现方法,请参阅我的对象资源管理器工具的源代码)。
专用线程池支持的另一项定制是线程池中线程使用的栈大小:
typedef struct _TP_POOL_STACK_INFORMATION {
SIZE_T StackReserve;
SIZE_T StackCommit;
} TP_POOL_STACK_INFORMATION, *PTP_POOL_STACK_INFORMATION;
BOOL SetThreadpoolStackInformation(
_Inout_ PTP_POOL ptpp,
_In_ PTP_POOL_STACK_INFORMATION ptpsi
);
2
3
4
5
6
7
8
9
默认大小来自第5章所述的PE(Portable Executable,可移植可执行文件)头。也就是说,默认情况下是提交4KB内存,保留1MB。这可能太多或太少,因此调用SetThreadpoolStackInformation
可以更好地利用线程池线程的内存。
奇怪的是,栈大小有一个查询函数:
BOOL QueryThreadpoolStackInformation(
_In_ PTP_POOL ptpp,
_Out_ PTP_POOL_STACK_INFORMATION ptpsi
);
2
3
4
还有更多方法可以定制线程池对象,但目前Windows API并未公开这些方法,所以这里不再赘述。
最后,需要通过调用CloseThreadpool
正确销毁线程池:
VOID CloseThreadpool(_Inout_ PTP_POOL ptpp);
更新“简单工作”应用程序,使其使用专用线程池,并更改最大和最小线程数以及栈大小。(解决方案在SimpleWork3
项目中)。
# 清理组
在一个频繁使用线程池的应用程序中,可能很难确定何时关闭各种线程池、工作项、等待项等。清理组会跟踪与其关联的所有回调,这样就可以一次性关闭它们,而应用程序无需手动跟踪所有回调。请注意,这仅适用于专用线程池,因为默认线程池无法被销毁。
清理组与上一节讨论的回调环境相关联。第一步是使用CreateThreadpoolCleanupGroup
创建一个新的清理组:
PTP_CLEANUP_GROUP CreateThreadpoolCleanupGroup();
不出所料,该函数返回一个不透明指针,表示清理组。为了使其生效,必须使用SetThreadpoolCallbackCleanupGroup
将清理组与回调环境相关联:
VOID SetThreadpoolCallbackCleanupGroup(
_Inout_ PTP_CALLBACK_ENVIRON pcbe,
_In_ PTP_CLEANUP_GROUP ptpcg,
_In_opt_ PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng
);
2
3
4
5
该函数接受一个已初始化的回调环境、要与该环境关联的清理组以及一个可选回调,其形式如下:
typedef VOID (CALLBACK *PTP_CLEANUP_GROUP_CANCEL_CALLBACK)(
_Inout_opt_ PVOID ObjectContext,
_Inout_opt_ PVOID CleanupContext
);
2
3
4
如果清理组被取消(稍后讨论),将调用这个可选回调。每次调用CreateThreadpoolWork
、CreateThreadpoolWait
等函数时,这些函数都会被关联的清理组跟踪。当你想要清理所有内容时,调用CloseThreadpoolCleanupGroupMembers
:
VOID CloseThreadpoolCleanupGroupMembers(
_Inout_ PTP_CLEANUP_GROUP ptpcg,
_In_ BOOL fCancelPendingCallbacks,
_Inout_opt_ PVOID pvCleanupContext
);
2
3
4
5
该函数会等待所有未完成的回调完成。这样你就无需为为该线程池创建的项目调用各种关闭函数。如果fCancelPendingCallbacks
为TRUE
,所有尚未开始的回调都会被取消,并且会为每个被取消的项目调用提供给SetThreadpoolCallbackCleanupGroup
的回调(如果不为NULL
)。回调函数的调用参数包括在CreateThreadPool*
(工作、等待、定时器、I/O)中设置的原始上下文参数,以及SetThreadpoolCallbackCleanupGroup
最后一个参数中提供的清理上下文。
等待和可选的取消操作完成后,可以使用CloseThreadpool
和CloseThreadpoolCleanupGroup
正常关闭线程池和清理组:
VOID CloseThreadpoolCleanupGroup(_Inout_ PTP_CLEANUP_GROUP ptpcg);
# 练习
- 使用线程池实现第5章中的曼德布洛特(Mandelbrot)练习。
- 使用线程池实现第8章中的练习。
# 总结
线程池是在高度多线程的进程中提高性能和可扩展性的强大机制。在下一章中,我们将汇总一些与线程相关的高级(以及一些不太高级)功能,这些功能在前面的章节中没有详细介绍。