GO 高效找两海量文件相同记录
做后端开发的同学肯定遇到过这种头疼的场景:手里有两个各超10亿行的文本文件 a.txt 和 b.txt,要找出里面相同的记录,
但服务器内存限制最多只能用 100 MB——直接把文件加载到内存肯定爆掉,常规方法根本行不通。
今天就记录分享一下高效解决方案,亲测能抗住海量数据压力。希望对大家有用。
一、核心思路:分片拆小问题
拒绝“全量加载”,用哈希分片拆小问题
海量数据+小内存的核心矛盾是“单文件大小远超内存”,解决思路就是“分而治之”——通过哈希分片把两个大文件拆成多个小文件,让每个小文件都能塞进 100MB 内存,再逐个处理小文件对,最后汇总结果。
核心逻辑:对两个文件中的每一条记录计算哈希值(比如 CRC32),再对哈希值取模(比如取 100),得到的结果作为“分片编号”。
关于 hash 哈希值的内容可以参考文章:Go 通过哈希判断文件是否被修改
这样一来,相同的记录一定会落到同一个编号的分片文件中(因为哈希值相同,取模后结果也相同)。
之后只需成对处理编号相同的小文件,就能找出所有相同记录。
举个例子:a.txt 的记录A哈希取模后得 5,就存到 a_5.txt;b.txt 的记录A哈希取模也得 5,存到 b_5.txt。
最后只需要对比 a_5.txt 和 b_5.txt,就能找到这条相同记录。
二、Go 实战案例
整个流程分三步:
-
分片(把a.txt、b.txt拆成多个小文件);
-
匹配(成对处理小文件,找出相同记录);
-
汇总(收集所有匹配结果)。
实现代码步骤如下:
(一)哈希分片函数实现
选择 CRC32 哈希是因为它计算速度快、碰撞率低,适合海量数据场景;
取模100是因为10亿行数据拆成100份,每份约1000万行,按每行10字节算也就100MB左右,刚好适配内存限制(可根据实际行大小调整取模数量)。
package main
import (
"bufio"
"crypto/crc32"
"fmt"
"os"
"path/filepath"
)
// 分片配置:可根据实际行大小调整分片数量
const (
splitCount = 100 // 分片总数
tempDir = "./temp_splits" // 临时分片存储目录
)
// HashSplit 对指定文件进行哈希分片
func HashSplit(filePath string) error {
// 1. 创建临时目录存储分片,不存在则创建
if err := os.MkdirAll(tempDir, 0755); err != nil {
return fmt.Errorf("创建临时目录失败:%w", err)
}
// 2. 打开源文件,用bufio按行读取(避免一次性加载)
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("打开文件 %s 失败:%w", filePath, err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
// 设置缓冲区大小(64KB,可根据实际调整,平衡速度和内存)
buf := make([]byte, 64*1024)
scanner.Buffer(buf, 1024*1024) // 最大单行1MB
// 3. 遍历每一行,计算哈希并写入对应分片
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue // 跳过空行,避免无效数据
}
// 计算CRC32哈希值,取模得到分片编号(0-99)
hashVal := crc32.ChecksumIEEE([]byte(line))
splitIdx := hashVal % splitCount
// 生成分片文件名,比如a.txt的第5个分片是a_5.txt
fileName := filepath.Base(filePath)
splitFileName := fmt.Sprintf("%s_%d.txt", fileName[:len(fileName)-4], splitIdx)
splitFilePath := filepath.Join(tempDir, splitFileName)
// 追加写入分片文件(用os.O_APPEND|os.O_CREATE,避免覆盖)
splitFile, err := os.OpenFile(splitFilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
return fmt.Errorf("打开分片 %s 失败:%w", splitFilePath, err)
}
// 写入行数据,加换行符保持原格式
_, err = splitFile.WriteString(line + "\n")
splitFile.Close() // 及时关闭,避免文件句柄泄露
if err != nil {
return fmt.Errorf("写入分片 %s 失败:%w", splitFilePath, err)
}
}
// 检查读取过程是否有错误(比如文件损坏)
if err := scanner.Err(); err != nil {
return fmt.Errorf("读取源文件失败:%w", err)
}
return nil
}(二)分片文件成对匹配
分片完成后,每个编号对应一对小文件(比如 a_0.txt 和 b_0.txt)。
此时每个小文件大小约 100MB 内,可直接用哈希表(map)存储一个文件的记录,再遍历另一个文件匹配相同记录。
import (
"bufio"
"os"
"path/filepath"
)
// MatchSplitPair 匹配一对分片文件中的相同记录,结果写入输出文件
func MatchSplitPair(aSplitPath, bSplitPath, outputPath string) error {
// 1. 用map存储a分片的所有记录,key为记录内容,value为是否存在(节省空间)
recordMap := make(map[string]bool, 100000) // 预分配容量,减少扩容开销
// 读取a分片文件并写入map
aFile, err := os.Open(aSplitPath)
if err != nil {
return fmt.Errorf("打开a分片 %s 失败:%w", aSplitPath, err)
}
aScanner := bufio.NewScanner(aFile)
aScanner.Buffer(make([]byte, 64*1024), 1*1024*1024)
for aScanner.Scan() {
line := aScanner.Text()
if line == "" {
continue
}
recordMap[line] = true
}
aFile.Close()
if err := aScanner.Err(); err != nil {
return fmt.Errorf("读取a分片 %s 失败:%w", aSplitPath, err)
}
// 2. 遍历b分片文件,检查每条记录是否在map中,存在则写入结果
bFile, err := os.Open(bSplitPath)
if err != nil {
return fmt.Errorf("打开b分片 %s 失败:%w", bSplitPath, err)
}
// 输出文件采用追加模式,避免多分片结果覆盖
outputFile, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
if err != nil {
return fmt.Errorf("打开输出文件 %s 失败:%w", outputPath, err)
}
defer outputFile.Close()
bScanner := bufio.NewScanner(bFile)
bScanner.Buffer(make([]byte, 64*1024), 1*1024*1024)
for bScanner.Scan() {
line := bScanner.Text()
if line == "" {
continue
}
// 匹配到相同记录,写入输出文件
if recordMap[line] {
_, err = outputFile.WriteString(line + "\n")
if err != nil {
return fmt.Errorf("写入结果失败:%w", err)
}
// 可选:删除map中的记录,避免重复匹配(如果文件有重复行)
delete(recordMap, line)
}
}
bFile.Close()
if err := bScanner.Err(); err != nil {
return fmt.Errorf("读取b分片 %s 失败:%w", bSplitPath, err)
}
return nil
}(三)主函数整合流程
主函数负责串联分片、匹配、清理临时文件等全流程,同时加入进度提示,方便海量数据处理时查看状态。
import (
"fmt"
"os"
"path/filepath"
"runtime"
)
func main() {
// 配置文件路径(可根据实际场景修改)
aFilePath := "./a.txt"
bFilePath := "./b.txt"
outputFilePath := "./same_records.txt"
// 临时分片目录,可后续清理
splitDir := tempDir
// 1. 第一步:对两个大文件分别分片
fmt.Println("开始对a.txt分片...")
if err := HashSplit(aFilePath); err != nil {
fmt.Printf("a.txt分片失败:%v\n", err)
return
}
fmt.Println("a.txt分片完成!开始对b.txt分片...")
if err := HashSplit(bFilePath); err != nil {
fmt.Printf("b.txt分片失败:%v\n", err)
return
}
fmt.Println("b.txt分片完成!开始匹配相同记录...")
// 2. 第二步:遍历所有分片对,逐个匹配
for i := 0; i < splitCount; i++ {
// 构造一对分片的路径
aSplitName := fmt.Sprintf("a_%d.txt", i)
aSplitPath := filepath.Join(splitDir, aSplitName)
bSplitName := fmt.Sprintf("b_%d.txt", i)
bSplitPath := filepath.Join(splitDir, bSplitName)
// 检查分片文件是否存在(可能某一方没有对应记录)
if _, err := os.Stat(aSplitPath); os.IsNotExist(err) {
fmt.Printf("a分片 %s 不存在,跳过\n", aSplitName)
continue
}
if _, err := os.Stat(bSplitPath); os.IsNotExist(err) {
fmt.Printf("b分片 %s 不存在,跳过\n", bSplitName)
continue
}
// 匹配当前分片对
fmt.Printf("正在匹配第%d/%d对分片...\n", i+1, splitCount)
if err := MatchSplitPair(aSplitPath, bSplitPath, outputFilePath); err != nil {
fmt.Printf("匹配分片对 %d 失败:%v\n", i, err)
return
}
}
// 3. 第三步:可选清理临时分片目录(海量数据分片占用空间大,建议清理)
fmt.Println("匹配完成!开始清理临时分片...")
if err := os.RemoveAll(splitDir); err != nil {
fmt.Printf("临时分片清理失败:%v\n", err)
return
}
fmt.Printf("所有操作完成!相同记录已保存到 %s\n", outputFilePath)
// 输出内存使用情况,验证是否符合100MB限制
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("峰值内存使用:%.2f MB\n", float64(m.Sys)/1024/1024)
}常见问题
Q1. 分片后部分小文件还是超过100MB?
原因:哈希分布不均,导致某些分片集中了大量记录。 解决方案:两种思路
① 增加分片总数(比如从100调到200),减小单分片最大可能大小;
② 采用“双重哈希分片”,第一次哈希取模后,若分片大小超标,对该分片再用另一种哈希算法二次分片
Q2. 哈希碰撞导致相同记录分到不同分片?
原因:不同记录的哈希值取模后可能相同,但更危险的是相同记录因哈希计算错误分到不同分片(比如换行符处理不一致)。 解决方案:
① 计算哈希时包含完整行内容(包括换行符?需统一处理,建议读取时保留换行符或统一去除);
② 采用“哈希+取模+校验”的方式,比如用CRC32+MD5双重哈希,取模用CRC32,匹配时校验MD5,降低碰撞概率。
Q3. 单条记录过大(比如1KB)导致map存储超限?
原因:100万条1KB的记录占1GB内存,远超100MB限制。 解决方案:用“布隆过滤器”替代 map,布隆过滤器能以极低的内存占用判断元素是否存在(有极小误判率)。
GO 可使用第三方库 github.com/willf/bloom,示例:
import "github.com/willf/bloom"
// 用布隆过滤器替代map,适配大记录场景
func initBloomFilter(filePath string) (*bloom.BloomFilter, error) {
// 预估元素数量100万,误判率0.001
filter := bloom.NewWithEstimates(1000000, 0.001)
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
filter.Add([]byte(line))
}
file.Close()
return filter, scanner.Err()
}
// 匹配时用布隆过滤器判断
if filter.Test([]byte(line)) {
// 布隆过滤器有极小误判率,需二次校验(可查原文件或存哈希值)
_, err = outputFile.WriteString(line + "\n")
}Q4. 大文件读取速度太慢?
原因:单线程读取效率低,未利用多核CPU。 解决方案:
① 用 GO 的 goroutine 并发分片(注意控制并发数,避免文件句柄过多);
② 增大读取缓冲区(代码中已设置 64KB 缓冲区,可根据磁盘性能调整到 128KB);
③ 采用“内存映射文件”(syscall.Mmap),减少系统调用开销。
总结
100MB 内存处理超 10 亿行文件的核心是:“哈希分片+分而治之”,
GO 凭借高效的并发能力和底层控制能力,能很好地支撑这套方案。
实际落地时需注意三点:
① 哈希算法选择(优先选计算快的CRC32);
② 分片大小控制(预留内存冗余);
③ 异常处理(空行、文件损坏、权限问题)。
如果数据量达到 10 亿级别以上,还可结合分布式存储(比如 HDFS)和并发调度,进一步提升处理效率。
大家有其他海量数据处理的小技巧,欢迎在评论区交流!
版权声明
未经授权,禁止转载本文章。
如需转载请保留原文链接并注明出处。即视为默认获得授权。
未保留原文链接未注明出处或删除链接将视为侵权,必追究法律责任!
本文原文链接: https://fiveyoboy.com/articles/go-find-same-records-in-two-huge-files/
备用原文链接: https://blog.fiveyoboy.com/articles/go-find-same-records-in-two-huge-files/