概览
借助上一章的并发原语,我们将构建一个小而实用的工具:并行词频统计器。它读取文件,按空白分割为连续片段,启动工作线程进行分词与计数,然后将线程本地映射合并为最终的频率表。参见29、Thread.zig与atomic.zig。
为何选择该项目?它锻炼常见系统模式——工作拆分、避免虚假共享、字符串键的所有权与内存序纪律——同时避免陷入样板代码。最终产出是一个健壮骨架,可适配日志处理、类 grep 索引或轻量分析。
学习目标
- 在尊重词元边界的前提下对输入进行分片。
- 安全使用
std.Thread.spawn并在单线程构建中回退为内联执行。 - 维护每线程的
std.StringHashMap并在合并时避免悬挂指针。 - 通过排序键值对向量呈现确定性的“Top N”。
项目布局与构建
该示例保持为一个本地构建的小型包。0.15.2 的构建 API 构造显式模块并传给addExecutable——注意使用root_module字段而非旧的root_source_file。
const std = @import("std");
// / Build script for the parallel wordcount project.
// / 并行词频统计项目的构建脚本。
// / Configures and compiles the executable with standard build options.
// / 使用标准构建选项配置和编译可执行文件。
pub fn build(b: *std.Build) void {
// Parse target triple from command line (--target flag)
// 从命令行解析目标三元组(--target 标志)
const target = b.standardTargetOptions(.{});
// Parse optimization level from command line (-Doptimize flag)
// 从命令行解析优化级别(-Doptimize 标志)
const optimize = b.standardOptimizeOption(.{});
// Create a module representing our application's entry point.
// 创建表示应用程序入口点的模块。
// In Zig 0.15.2, modules are explicitly created before being passed to executables.
// 在 Zig 0.15.2 中,模块在传递给可执行文件之前被显式创建。
const root = b.createModule(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
});
// Define the executable artifact, linking it to the root module.
// 定义可执行文件产物,将其链接到根模块。
const exe = b.addExecutable(.{
.name = "parallel-wc",
.root_module = root,
});
// Register the executable to be installed in zig-out/bin
// 注册可执行文件以安装到zig-out/bin
b.installArtifact(exe);
// Create a run command that executes the compiled binary
// 创建执行已编译二进制文件的运行命令
const run_cmd = b.addRunArtifact(exe);
// Forward any arguments passed after '--' to the executable
// 将在 '--' 之后传递的任何参数转发给可执行文件
if (b.args) |args| run_cmd.addArgs(args);
// Define a 'run' step that users can invoke with 'zig build run'
// 定义用户可以通过 'zig build run' 调用的 'run' 步骤
const run_step = b.step("run", "Run parallel wordcount");
run_step.dependOn(&run_cmd.step);
}
See Build.zig.
实现
程序将整个文件读入内存(受合理上限限制),在空白处分片,然后启动 N 个工作线程(N 为 CPU 数量,除非为单线程模式)。每个工作线程执行分词与 ASCII 小写化、去除标点,并插入到其各自的、由 arena 支撑的映射中以避免对每个词元的单次释放。合并时,我们将键复制到最终映射的分配器中,以确保在反初始化 arena 时不会使键失效。
const std = @import("std");
const builtin = @import("builtin");
// / 映射类型别名:单词 → 频率计数
const Map = std.StringHashMap(u64);
// / 通过将ASCII字母转换为小写来规范化原始token,并
// / 从两端剥离非字母数字字符。
// / 返回指向所提供缓冲区的切片;调用者拥有该缓冲区。
fn normalizeWord(allocator: std.mem.Allocator, raw: []const u8) ![]const u8 {
// 分配一个足够大的缓冲区以容纳整个输入
var buf = try allocator.alloc(u8, raw.len);
var n: usize = 0;
// 将大写ASCII转换为小写 (A-Z → a-z)
for (raw) |c| {
var ch = c;
if (ch >= 'A' and ch <= 'Z') ch = ch + 32;
buf[n] = ch;
n += 1;
}
// 剥离前导非字母数字字符
var start: usize = 0;
while (start < n and (buf[start] < '0' or (buf[start] > '9' and buf[start] < 'a') or buf[start] > 'z')) : (start += 1) {}
// 剥离尾随非字母数字字符
var end: usize = n;
while (end > start and (buf[end - 1] < '0' or (buf[end - 1] > '9' and buf[end - 1] < 'a') or buf[end - 1] > 'z')) : (end -= 1) {}
// 如果剥离后没有剩余,则返回空切片
if (end <= start) return buf[0..0];
return buf[start..end];
}
// / 在空白处对文本进行token化,并使用提供的映射填充
/// 规范化的单词频率。键是从提供的分配器中
// / 分配的规范化副本。
fn tokenizeAndCount(allocator: std.mem.Allocator, text: []const u8, map: *Map) !void {
// 在任意空白字符处分割
var it = std.mem.tokenizeAny(u8, text, " \t\r\n");
while (it.next()) |raw| {
const word = try normalizeWord(allocator, raw);
if (word.len == 0) continue; // skip empty tokens
// 插入或更新单词计数
const gop = try map.getOrPut(word);
if (!gop.found_existing) {
gop.value_ptr.* = 1;
} else {
gop.value_ptr.* += 1;
}
}
}
// / 传递给每个工作线程的参数
const WorkerArgs = struct {
slice: []const u8, // 要处理的文本段
counts: *Map, // 线程本地频率映射
arena: *std.heap.ArenaAllocator, // 用于临时分配的 arena
};
// / 每个线程执行的工作函数;token化并计数单词
// / 在其分配的文本段中,不共享状态。
fn countWorker(args: WorkerArgs) void {
// 每个工作线程只写入自己的映射实例;合并稍后发生
tokenizeAndCount(args.arena.allocator(), args.slice, args.counts) catch |err| {
std.debug.print("worker error: {s}\n", .{@errorName(err)});
};
}
// / 将整个文件读取到新分配的缓冲区中,上限为 64 MiB。
fn readAllAlloc(path: []const u8, allocator: std.mem.Allocator) ![]u8 {
var file = try std.fs.cwd().openFile(path, .{});
defer file.close();
return try file.readToEndAlloc(allocator, 64 * 1024 * 1024);
}
/// 将文本分区为大致相等的片段,确保分片边界
// / 落在空白处以避免分割单词。返回切片的拥有切片。
fn shard(text: []const u8, shards: usize, allocator: std.mem.Allocator) ![]const []const u8 {
// 如果只请求一个分片或文本为空,则返回单个片段
if (shards <= 1 or text.len == 0) {
var single = try allocator.alloc([]const u8, 1);
single[0] = text;
return single;
}
const approx = text.len / shards; // 每个分片的近似字节数
var parts = std.array_list.Managed([]const u8).init(allocator);
defer parts.deinit();
var i: usize = 0;
while (i < text.len) {
var end = @min(text.len, i + approx);
// 将分片边界向前推进到下一个空白字符
while (end < text.len and text[end] != ' ' and text[end] != '\n' and text[end] != '\t' and text[end] != '\r') : (end += 1) {}
// 如果没有找到空白,则回退到近似边界
if (end == i) end = @min(text.len, i + approx);
try parts.append(text[i..end]);
i = end;
}
return try parts.toOwnedSlice();
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// 设置缓冲标准输出以实现高效打印
var stdout_buf: [1024]u8 = undefined;
var stdout_state = std.fs.File.stdout().writer(&stdout_buf);
const out = &stdout_state.interface;
// 解析命令行参数
var args_it = try std.process.argsWithAllocator(allocator);
defer args_it.deinit();
_ = args_it.next(); // 跳过程序名
const path = args_it.next() orelse {
try out.print("usage: parallel-wc <file>\n", .{});
try out.flush();
return;
};
// 将整个文件读入内存
const text = try readAllAlloc(path, allocator);
defer allocator.free(text);
// 确定分片计数:除非是单线程构建,否则使用 CPU 计数
const cpu = std.Thread.getCpuCount() catch 1;
const shard_count = if (builtin.single_threaded) 1 else if (cpu < 1) 1 else cpu;
// 在空白边界处将文本分区为分片
const parts = try shard(text, shard_count, allocator);
defer allocator.free(parts);
// 分配每个分片的 arena 和 map
var arenas = try allocator.alloc(std.heap.ArenaAllocator, parts.len);
defer allocator.free(arenas);
var maps = try allocator.alloc(Map, parts.len);
defer allocator.free(maps);
// 如果是多线程,则分配线程句柄
var threads = if (builtin.single_threaded) &[_]std.Thread{} else try allocator.alloc(std.Thread, parts.len);
defer if (!builtin.single_threaded) allocator.free(threads);
// 启动工作线程(如果是单线程,则内联执行)
for (parts, 0..) |seg, i| {
arenas[i] = std.heap.ArenaAllocator.init(allocator);
maps[i] = Map.init(allocator);
try maps[i].ensureTotalCapacity(1024); // 预设大小以减少重新哈希
if (builtin.single_threaded) {
// 内联执行 worker
countWorker(.{ .slice = seg, .counts = &maps[i], .arena = &arenas[i] });
} else {
// 为此分片启动一个线程
threads[i] = try std.Thread.spawn(.{}, countWorker, .{WorkerArgs{ .slice = seg, .counts = &maps[i], .arena = &arenas[i] }});
}
}
// 等待所有线程完成
if (!builtin.single_threaded) {
for (threads) |t| t.join();
}
// 将每个线程的 map 合并到一个全局 map 中
var total = Map.init(allocator);
defer total.deinit();
try total.ensureTotalCapacity(4096); // 预设合并数据的大小
for (maps, 0..) |*m, i| {
var it = m.iterator();
while (it.next()) |e| {
const key_bytes = e.key_ptr.*;
// 将键复制到 total 的分配器中以获取所有权,
// 因为 arena 很快就会被释放
const dup = try allocator.dupe(u8, key_bytes);
const gop = try total.getOrPut(dup);
if (!gop.found_existing) {
gop.value_ptr.* = e.value_ptr.*;
} else {
// 键已存在;释放重复项并累加计数
allocator.free(dup);
gop.value_ptr.* += e.value_ptr.*;
}
}
// 释放每个线程的 arena 和 map
arenas[i].deinit();
m.deinit();
}
// 构建可排序的 (单词, 计数) 条目列表
const Entry = struct { k: []const u8, v: u64 };
var entries = std.array_list.Managed(Entry).init(allocator);
defer entries.deinit();
var it = total.iterator();
while (it.next()) |e| {
try entries.append(.{ .k = e.key_ptr.*, .v = e.value_ptr.* });
}
// Sort by count descending, then alphabetically
std.sort.pdq(Entry, entries.items, {}, struct {
fn lessThan(_: void, a: Entry, b: Entry) bool {
if (a.v == b.v) return std.mem.lessThan(u8, a.k, b.k);
return a.v > b.v; // 按计数降序
}
}.lessThan);
// Print top 10 most frequent words
const to_show = @min(entries.items.len, 10);
try out.print("top {d} words in {d} shards:\n", .{ to_show, parts.len });
for (entries.items[0..to_show]) |e| {
try out.print("{s} {d}\n", .{ e.k, e.v });
}
// 现在我们已经处理完 map,释放重复的键
var free_it = total.iterator();
while (free_it.next()) |e| allocator.free(e.key_ptr.*);
try out.flush();
}
See hash_map.zig and tokenize.zig.
$ zig build --build-file chapters-data/code/30__project-parallel-wordcount/build.zig run -- chapters-data/code/30__project-parallel-wordcount/data/lines.txttop 10 words in 8 shards:
and 2
i 2
little 2
me 2
the 2
a 1
about 1
ago—never 1
call 1
how 1StringHashMap以引用方式存储字符串切片;不会复制字节。合并指向短生命周期 arena 的映射时,请将键字节复制到目标分配器,完成后再释放。示例在退出前迭代并释放各键。
注意与警示
- 分片会将片段末尾推进到下一个空白,以避免在词中部拆分词元。这意味着片段可能不均匀;对 I/O 受限工具而言无妨。
- 示例以粗略的 ASCII 小写化与去除标点来把焦点保持在线程上。若需要 Unicode 分段,请集成
std.unicode与更忠实的规范化。 - 在单线程构建(
-Dsingle-threaded=true)中,我们内联执行工作线程并完全跳过 spawn,遵循第 29 章的模式。
练习
- 添加
-n <N>以打印 Top N 单词,并使用std.process.argsWithAllocator解析参数。 - 将合并阶段切换为并行归约:每个 CPU 进行成对合并,直至仅剩一个映射;衡量可扩展性。
- 以按
文件大小/分片数定额的 bump 分配器替换 arena,并权衡碎片化与峰值占用。
总结
该项目将磁盘字节到排序频率表的路径浓缩为实用且快速的过程,同时尊重 Zig 的所有权与线程模型。它整合了分片、每线程映射与安全合并——一个可用于更大流水线的最小模板。