概览
上一章的文件系统管线为并行产出与消费数据的应用奠定了舞台。现在我们关注 Zig 如何启动操作系统线程、在多核间协调工作,并通过原子操作保持共享状态一致(参见28与Thread.zig)。
Zig 0.15.2 的线程原语将轻量的 spawn API 与显式的内存序结合起来,因此由你决定存储何时可见以及竞争何时应阻塞。现在理解这些工具,将让后续的并行词频统计项目不再神秘(参见atomic.zig与30)。
学习目标
- Spawn and join worker threads responsibly, selecting stack sizes and allocators only when necessary.
- Choose memory orderings for atomic loads, stores, and compare-and-swap loops when protecting shared state.
- Detect single-threaded builds at compile time and fall back to synchronous execution paths.
用编排工作
Zig 通过std.Thread建模内核线程,提供查询 CPU 数量、配置栈大小与确定性 join 句柄的助手。不同于异步 I/O,这些是真正的内核线程——每次 spawn 都消耗操作系统资源,因此批处理工作单元至关重要。
线程池模式
在深入手动线程 spawn 之前,理解 Zig 编译器自身用于并行工作的线程池模式很有价值。下图展示了std.Thread.Pool如何在工作线程间分配任务:
线程池维护固定数量的工作线程,从队列拉取任务,避免反复 spawn 与 join 的开销。Zig 编译器广泛采用此模式:std.Thread.Pool将 AST 生成、语义分析与代码生成任务派发给工作线程。每个工作线程拥有线程本地状态(Zcu.PerThread)以最小化同步——仅在合并到InternPool.shards等共享数据结构时,最终结果需要互斥保护。该架构体现了并发设计的关键原则:工作单元应相互独立;共享状态应分片或用互斥保护;线程本地缓存可减少竞争。若负载涉及大量小任务,优先采用std.Thread.Pool而非手动 spawn;若需要少量长时间运行且职责明确的工作线程,手动spawn/join更为适宜。
用 spawn/join 分块数据
下例将一个整数数组划分给动态数量的工作线程,使用原子 fetch-add 在无锁情况下累计偶数总和。其会适配宿主 CPU 数量,但绝不会 spawn 超过待处理元素数量的线程。
// 此示例演示在Zig中使用线程和原子操作进行并行计算。
// 它通过在多个线程之间分配工作来计算数组中偶数的和。
const std = @import("std");
// 传递给每个工作线程用于并行处理的参数
const WorkerArgs = struct {
slice: []const u64, // 该工作线程应处理的数字子集
sum: *std.atomic.Value(u64), // 用于线程安全累加的共享原子计数器
};
// 工作函数,从其分配的切片中累加偶数
// 每个线程在其自己的数据分区上独立运行此函数
fn accumulate(args: WorkerArgs) void {
// 使用局部变量以最小化原子操作(性能优化)
var local_total: u64 = 0;
for (args.slice) |value| {
if (value % 2 == 0) {
local_total += value;
}
}
// 使用顺序一致性排序原子性地将局部结果添加到共享总和
// 这确保所有线程都能看到共享状态的一致视图
_ = args.sum.fetchAdd(local_total, .seq_cst);
}
pub fn main() !void {
// 设置带自动泄漏检测的内存分配器
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// 分配64个数字的数组用于演示
var numbers = try allocator.alloc(u64, 64);
defer allocator.free(numbers);
// 用模式:index * 7 + 3 的值初始化数组
for (numbers, 0..) |*slot, index| {
slot.* = @as(u64, @intCast(index * 7 + 3));
}
// 初始化所有线程将安全更新的共享原子计数器
var shared_sum = std.atomic.Value(u64).init(0);
// 根据可用CPU核心数确定最优工作线程数
const cpu_count = std.Thread.getCpuCount() catch 1;
const desired = if (cpu_count == 0) 1 else cpu_count;
// 不要创建比要处理的数字更多的线程
const worker_limit = @min(numbers.len, desired);
// 分配并行工作线程的线程句柄
var threads = try allocator.alloc(std.Thread, worker_limit);
defer allocator.free(threads);
// 计算块大小,向上舍入以确保覆盖所有元素
const chunk = (numbers.len + worker_limit - 1) / worker_limit;
// 生成工作线程,将数组分配为大致相等的块
var start: usize = 0;
var spawned: usize = 0;
while (start < numbers.len and spawned < worker_limit) : (spawned += 1) {
const remaining = numbers.len - start;
// 让最后一个线程处理所有剩余元素以处理不均匀的除法
const take = if (worker_limit - spawned == 1) remaining else @min(chunk, remaining);
const end = start + take;
// 使用其分配的切片和共享累加器生成线程
threads[spawned] = try std.Thread.spawn(.{}, accumulate, .{WorkerArgs{
.slice = numbers[start..end],
.sum = &shared_sum,
}});
start = end;
}
// 跟踪实际生成了多少线程(可能少于worker_limit)
const used_threads = spawned;
// 等待所有工作线程完成其工作
for (threads[0..used_threads]) |thread| {
thread.join();
}
// 从原子共享总和读取最终累加结果
const even_sum = shared_sum.load(.seq_cst);
// 执行顺序计算以验证并行计算的正确性
var sequential: u64 = 0;
for (numbers) |value| {
if (value % 2 == 0) {
sequential += value;
}
}
// 设置缓冲stdout写入器以实现高效输出
var stdout_buffer: [256]u8 = undefined;
var stdout_state = std.fs.File.stdout().writer(&stdout_buffer);
const out = &stdout_state.interface;
// 显示结果:线程数和并行与顺序总和
try out.print("spawned {d} worker(s)\n", .{used_threads});
try out.print("even sum (threads): {d}\n", .{even_sum});
try out.print("even sum (sequential check): {d}\n", .{sequential});
try out.flush();
}
$ zig run 01_parallel_even_sum.zigspawned 8 worker(s)
even sum (threads): 7264
even sum (sequential check): 7264std.atomic.Value wraps plain integers and routes every access through @atomicLoad, @atomicStore, or @atomicRmw, shielding you from accidentally mixing atomic and non-atomic access to the same memory location.
spawn 配置与调度提示
std.Thread.SpawnConfig允许在默认不合用时覆盖栈大小或提供自定义分配器(例如深度递归或预分配 arena)。捕获Thread.getCpuCount()的错误以提供安全回退;当等待其它线程推进时,别忘了使用Thread.yield()或Thread.sleep()进行协作式调度。
原子状态机
Zig 直接暴露 LLVM 的原子内建:你选择.acquire、.release或.seq_cst等序,编译器发射相应的栅栏。设计小型状态机(如一次性初始化器)且需被多线程一致观察时,这种清晰性极具价值。
用原子内建实现一次保护
该程序围绕@cmpxchgStrong构建一个无锁的“仅调用一次”助手。线程仅在其它线程执行初始化期间自旋,随后通过 acquire 加载读取已发布的值。
// 此示例演示使用原子操作的线程安全一次性初始化。
// 多个线程尝试初始化共享资源,但只有一个成功
// 精确执行一次昂贵的初始化。
const std = @import("std");
// 使用原子操作表示初始化状态
const State = enum(u8) { idle, busy, ready };
// 跟踪初始化生命周期的全局状态
var once_state: State = .idle;
// 将被一次性初始化的共享配置值
var config_value: i32 = 0;
// 验证初始化只发生一次的计数器
var init_calls: u32 = 0;
// 模拟只应运行一次的昂贵初始化操作。
// 使用原子操作安全地增加调用计数器并设置配置值。
fn expensiveInit() void {
// 通过睡眠模拟昂贵的工作
std.Thread.sleep(2 * std.time.ns_per_ms);
// 原子性地增加初始化调用计数器
_ = @atomicRmw(u32, &init_calls, .Add, 1, .seq_cst);
// 使用发布语义原子性地存储初始化值
@atomicStore(i32, &config_value, 9157, .release);
}
// 确保expensiveInit()在多个线程中只被调用一次。
// 使用带有比较和交换的状态机来协调线程访问。
fn callOnce() void {
while (true) {
// 使用获取语义检查当前状态以查看初始化结果
switch (@atomicLoad(State, &once_state, .acquire)) {
// 初始化完成,立即返回
.ready => return,
// 另一个线程正在初始化,让出并重试
.busy => {
std.Thread.yield() catch {};
continue;
},
// 尚未初始化,尝试声明初始化责任
.idle => {
// 尝试从空闲原子性地转换到忙碌
// 如果成功(返回null),此线程获胜并将初始化
// 如果失败(返回实际值),另一个线程获胜,因此重试
if (@cmpxchgStrong(State, &once_state, .idle, .busy, .acq_rel, .acquire)) |_| {
continue;
}
// 此线程成功声明了初始化
break;
},
}
}
// 执行一次性初始化
expensiveInit();
// 使用发布语义将初始化标记为完成
@atomicStore(State, &once_state, .ready, .release);
}
// 传递给每个工作线程的参数
const WorkerArgs = struct {
results: []i32,
index: usize,
};
// 工作线程函数,调用一次性初始化并读取结果。
fn worker(args: WorkerArgs) void {
// 确保初始化发生(如果另一个线程正在初始化则阻塞直到完成)
callOnce();
// 使用获取语义读取初始化值
const value = @atomicLoad(i32, &config_value, .acquire);
// 将观察到的值存储在线程的结果槽中
args.results[args.index] = value;
}
pub fn main() !void {
// 重置全局状态用于演示
once_state = .idle;
config_value = 0;
init_calls = 0;
// 设置内存分配
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const worker_count: usize = 4;
// 分配数组以收集每个线程的结果
const results = try allocator.alloc(i32, worker_count);
defer allocator.free(results);
// 初始化所有结果槽为-1以检测是否有任何线程失败
for (results) |*slot| slot.* = -1;
// 分配数组以保存线程句柄
const threads = try allocator.alloc(std.Thread, worker_count);
defer allocator.free(threads);
// 生成所有工作线程
for (threads, 0..) |*thread, index| {
thread.* = try std.Thread.spawn(.{}, worker, .{WorkerArgs{
.results = results,
.index = index,
}});
}
// 等待所有线程完成
for (threads) |thread| {
thread.join();
}
// 所有线程完成后读取最终值
const final_value = @atomicLoad(i32, &config_value, .acquire);
const called = @atomicLoad(u32, &init_calls, .seq_cst);
// 设置缓冲输出
var stdout_buffer: [256]u8 = undefined;
var stdout_state = std.fs.File.stdout().writer(&stdout_buffer);
const out = &stdout_state.interface;
// 打印每个线程观察到的值(应该都是9157)
for (results, 0..) |value, index| {
try out.print("thread {d} observed {d}\n", .{ index, value });
}
// 验证初始化只被调用一次
try out.print("init calls: {d}\n", .{called});
// 显示最终配置值
try out.print("config value: {d}\n", .{final_value});
try out.flush();
}
$ zig run 02_atomic_once.zigthread 0 observed 9157
thread 1 observed 9157
thread 2 observed 9157
thread 3 observed 9157
init calls: 1
config value: 9157@cmpxchgStrong returns null on success, so looping while it yields a value is a concise way to retry the CAS without allocating a mutex. Pair the final @atomicStore with .release to publish the results before any waiter performs its .acquire load.
单线程构建与回退
传递-Dsingle-threaded=true会强制编译器拒绝任何 spawn 操作系统线程的尝试。可能在两种配置下运行的代码应在编译期基于builtin.single_threaded分支,并替换为内联执行路径。参见builtin.zig。
理解单线程标志
single_threaded标志是编译器特性配置的一部分,会影响代码生成与优化:
当single_threaded为真时,编译器假定内存无并发访问,从而启用若干优化:原子操作可降低为普通加载与存储(消除栅栏指令)、线程局部存储变为普通全局、同步原语可被完全省略。该标志通过-Dsingle-threaded=true在构建时设置,并经由Compilation.Config流入代码生成。重要的是,这不仅是 API 限制——它从根本上改变了生成代码。以单线程模式编译的原子指令比多线程构建中的原子保障更弱,因此必须确保两种模式下的代码路径保持一致,以避免在切换该标志时出现微妙缺陷。
在编译期门控线程使用
如下保护器会重置一个原子状态机,然后根据构建模式选择 spawn 一个工作线程或内联执行任务。由于该分支发生在编译期,单线程配置下不会实例化Thread.spawn,从而完全避免编译错误。
const std = @import("std");
const builtin = @import("builtin");
// 表示任务执行可能状态的枚举
// 使用显式 u8 后备以确保跨平台一致的大小
const TaskState = enum(u8) { idle, threaded_done, inline_done };
// 全局原子状态跟踪任务是内联运行还是在单独线程中运行
// 原子操作确保线程安全访问,即使单线程构建不会生成线程
var task_state = std.atomic.Value(TaskState).init(.idle);
// 模拟在单独线程中运行的任务
// 包括一个小延迟来演示异步执行
fn threadedTask() void {
std.Thread.sleep(1 * std.time.ns_per_ms);
// 发布排序确保获取此值的所有线程都能看到之前的写入
task_state.store(.threaded_done, .release);
}
// 模拟在主线程内联运行的任务
// 用作编译时禁用线程时的后备方案
fn inlineTask() void {
// 发布排序保持与线程路径的一致性
task_state.store(.inline_done, .release);
}
pub fn main() !void {
// 为高效输出设置缓冲标准输出写入器
var stdout_buffer: [256]u8 = undefined;
var stdout_state = std.fs.File.stdout().writer(&stdout_buffer);
const out = &stdout_state.interface;
// 使用顺序一致性重置状态为空闲
// seq_cst 为初始化提供最强的排序保证
task_state.store(.idle, .seq_cst);
// 检查编译时标志以确定执行策略
// 当使用 -fsingle-threaded 编译时,builtin.single_threaded 为 true
if (builtin.single_threaded) {
try out.print("single-threaded build; running task inline\n", .{});
// 直接执行任务而不生成线程
inlineTask();
} else {
try out.print("multi-threaded build; spawning worker\n", .{});
// 生成单独线程以并发执行任务
var worker = try std.Thread.spawn(.{}, threadedTask, .{});
// 阻塞直到工作线程完成
worker.join();
}
// 获取排序确保我们观察到发布存储之前的所有写入
const final_state = task_state.load(.acquire);
// 将枚举状态转换为人类可读的字符串用于输出
const label = switch (final_state) {
.idle => "idle",
.threaded_done => "threaded_done",
.inline_done => "inline_done",
};
// 显示最终执行状态并刷新缓冲区以确保输出可见
try out.print("task state: {s}\n", .{label});
try out.flush();
}
$ zig run 03_single_thread_guard.zigmulti-threaded build; spawning worker
task state: threaded_doneWhen you build with -Dsingle-threaded=true, the inline branch is the only one compiled, so keep the logic symmetrical and make sure any shared state is still set via the same atomic helpers to avoid diverging semantics.
注意与警示
- 线程必须恰好 join 或 detach 一次;泄漏句柄会导致资源耗尽。
Thread.join会消费该句柄,因此将其存放在稍后可迭代的切片中。 - 原子操作作用于原始内存——切勿在同一地址混用原子与非原子访问,即便你“知道”不会发生竞争。用
std.atomic.Value包裹共享标量以使意图清晰。 - 比较交换循环可能自旋;当等待可能持续多个时钟周期时,考虑使用
Thread.yield()或Thread.ResetEvent等事件原语。
用 ThreadSanitizer 调试并发代码
Zig 通过 ThreadSanitizer 提供内置竞争检测,它是查找数据竞争、死锁及其它并发缺陷的强大工具:
| Sanitizer | Config Field | Purpose | Requirements |
|---|---|---|---|
| Thread Sanitizer | any_sanitize_thread | Data race detection | LLVM backend |
| UBSan | any_sanitize_c | C undefined behavior | LLVM backend, C code |
| Fuzzing | any_fuzz | Fuzzing instrumentation | libfuzzer integration |
构建程序时使用-Dsanitize-thread启用 ThreadSanitizer。TSan 会对所有内存访问与同步操作插桩,跟踪先发生关系以检测竞争。一旦发现竞争,TSan 会打印详细报告,展示冲突访问及其栈追踪。插桩会带来显著运行时开销(慢 2–5 倍、内存 5–10 倍),因此仅在开发与测试阶段使用,不要用于生产。TSan 对验证原子代码尤为有用:即使逻辑看似正确,它也能捕获细微的排序问题或缺失的同步。对于本章示例,尝试使用-Dsanitize-thread运行以验证无竞争——并行求和与 atomic once 模式应当能干净通过,证明同步正确。
练习
- 扩展并行求和以接收谓词回调,从而将“偶数”替换为任意你喜欢的分类;衡量
.acquire与.monotonic加载对竞争的影响。 - 重构
callOnce演示以分层错误:使初始化器返回!void并将失败存入一个原子槽,以便调用者一致地重新抛出同一错误。 - 在一次性保护代码周围引入
std.Thread.WaitGroup,使其无需手动存储句柄即可等待任意数量的工作线程。
注意事项、替代方案与边界情况
- 在无 pthreads 或 Win32 线程的平台上 Zig 会报编译错误;针对不支持
--threading的 WASI 目标时,应规划回退到事件循环或异步。 - 原子操作适用于普通整数与枚举;对于复合状态,考虑使用互斥锁或设计原子数组以避免撕裂更新。
- 单线程构建仍可使用原子,但这些指令会编译为普通加载/存储。保持代码路径一致,避免无意依赖多线程构建中更强的排序保证。
平台特定的线程约束
并非所有平台都支持线程,且部分平台对线程局部存储(TLS)有特殊要求:
某些目标默认单线程模式,因为它们缺乏操作系统线程支持:WebAssembly(未开启--threading标志)与 Haiku OS 即属此类。在这些平台上,除非在构建配置中明确启用线程支持,否则尝试 spawn 线程会导致编译错误。相关问题是线程局部存储(TLS):OpenBSD 与老版本 Android 不提供原生 TLS,因此 Zig 使用仿真 TLS——这是一种更慢但可移植的软件实现。编写跨平台并发代码时,请检查target.defaultSingleThreaded()与target.useEmulatedTls()以了解平台约束。对于 WASM,可通过启用atomics与bulk-memory特性加上--import-memory --shared-memory链接标志来开启线程,但并非所有 WASM 运行时都支持此特性。请将代码设计为优雅降级:使用builtin.single_threaded提供同步回退,并避免假定 TLS 在所有平台上都是零成本。
总结
std.Thread提供轻量的 spawn/join 语义,但调度与清理仍由你负责。@atomicLoad、@atomicStore与@cmpxchgStrong等原子内建在你将序与不变量匹配时,使小型无锁状态机可行。- 使用
builtin.single_threaded可使共享组件在单线程构建与多核部署间保持工作,而无需分叉代码库。