当前位置:网站首页>Go 管道模式的实际例子——计算一系列文件的 md5 值
Go 管道模式的实际例子——计算一系列文件的 md5 值
2022-07-22 09:07:00 【alenliu0621】
Digesting a tree
Linux 上的 md5sum
命令,可以计算一些文件的 md5 值:
[email protected]:~/gogo/tour# md5sum *.go
d2b2c3719370ea0aa7261325926111fd bounded.go
157126313040135745466593b6f65508 interface1.go
62f7464386fceaade2c83184f05e09b7 parallel.go
eea9d6c2ed85f03e099686b7b9ae5d68 serial.go
今天我们要看的实际应用管道的例子和 md5sum
命令类似,但是是以一个目录路径作为一个命令行参数,按照文件路径名的字母顺序,打印出这个目录下所有普通文件的 md5 值。以下是例子 serial.go 程序的输出:
[email protected]:~/gogo/tour# go run serial.go .
d2b2c3719370ea0aa7261325926111fd bounded.go
157126313040135745466593b6f65508 interface1.go
62f7464386fceaade2c83184f05e09b7 parallel.go
eea9d6c2ed85f03e099686b7b9ae5d68 serial.go
我们程序的 main 函数调用了一个辅助函数 MD5All,这个函数返回一个从文件路径名到 md5 值的映射,然后将路径名排序并打印出结果:
func main() {
// 计算特定目录下所有文件的 md5 值,按照路径名排序打印出结果
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
// 获取所有的文件路径名
var paths []string
for path := range m {
paths = append(paths, path)
}
// 排序
sort.Strings(paths)
// 打印出结果
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5All 函数是我们讨论的焦点。在 serial.go 中的实现没有使用并发,仅仅是遍历过程中,读取每个文件,计算它的 md5 值。
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
// 调用 filepath.Walk 遍历目录下的所有文件,第二个参数是文件处理函数
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
// 遍历过程中出现错误,返回错误
if err != nil {
return err
}
// 忽略不是普通文件的文件
if !info.Mode().IsRegular() {
return nil
}
// 读取文件内容
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
// 计算 md5 值
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
Parallel digestion
在 parallel.go 程序中,我们将 MD5All 分割成两阶段的管道:第一个阶段,sumFiles,遍历目录下的文件时,在一个新的 goroutine 中计算这个文件的 md5 值,将结果发送到一个值类型为 result 的 channel :
type result struct {
path string // 文件路径名
sum [md5.Size]byte // md5 值
err error // 错误值
}
sumFiles 返回两个 channel:一个是为了收集结果,另一个是为了处理 filepath.Walk 返回的错误。文件处理函数启动一个新的 goroutine 来处理每个普通文件,然后检查 done channel。如果 done channel 关闭了,那么立即终止遍历:
func sumFiles(done <-chan struct{
}, root string) (<-chan result, <-chan error) {
// 创建两个 channel
c := make(chan result)
errc := make(chan error, 1)
// 启动一个 goroutine
go func() {
var wg sync.WaitGroup
// 遍历目录下所有的文件
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
// 启动一个新的 goroutine 来计算这个文件的 md5 值
go func() {
// 读取文件内容
data, err := ioutil.ReadFile(path)
// 使用 select 语句,在 done channel 关闭时,立即返回
select {
// 计算文件的 md5 值将结果发送到 channel
case c <- result{
path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// 如果 done channel 关闭了,则停止遍历,返回一个错误
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// 到这里 filepath.Walk 函数返回了,所有的 wg.Add 的调用完成,
// 启动一个 goroutine 等待所有发送结果的操作完成,并关闭 channel。
go func() {
wg.Wait()
close(c)
}()
// 将遍历过程中返回的错误值发往 channel
errc <- err
}()
// 返回创建的 channel
return c, errc
}
在 MD5All 中的第二个阶段从返回结果的 channel 中接收 md5 值。在发现错误时,立即返回,使用 defer
语句关闭 done channel:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// 创建 done channel
done := make(chan struct{
})
// 函数返回时关闭 done channel,将会使遍历过程中的所有 goroutine 退出
defer close(done)
// 调用 sumFiles 计算所有文件的 md5 值
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
// 从返回结果的 channel 中接收 md5 值
for r := range c {
// 出现错误立即返回
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 接收错误值,如果有错误发生返回错误
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Bounded parallelism
在 parallel.go 程序中的 MD5All 实现为每个文件启动一个新的 goroutine。在一个包含大量大文件的文件夹中,分配的内存大小可能会超过机器可使用的。
我们可以限制并行地读取文件的数量来限制内存分配。在 bounded.go 中,我们为读取文件创建了固定数量的 goroutine。我们的管道现在有三个阶段:遍历目录文件,读取文件计算 md5 值,汇总结果。
第一个阶段,walkFiles,将一个目录下所有普通文件的路径名发往 paths channel:
func walkFiles(done <-chan struct{
}, root string) (<-chan string, <-chan error) {
// 创建 paths channel
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// 在 Walk 返回时关闭 paths channel。
defer close(paths)
// 遍历目录
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
// 将文件名发往 paths channel
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
// 返回创建的 channel
return paths, errc
}
第二个阶段启动了固定数量的 digester goroutine,从 paths channel 接收路径名,读取文件计算 md5 值,将结果发送到一个收集结果的 channel:
func digester(done <-chan struct{
}, paths <-chan string, c chan<- result) {
// 从 paths channel 接收每个路径名
for path := range paths {
// 读取文件
data, err := ioutil.ReadFile(path)
select {
// 计算 md5 值,将结果发往 c channel。
case c <- result{
path, md5.Sum(data), err}:
case <-done:
return
}
}
}
在 MD5All 中,等所有的 digester goroutine 退出后,关闭输出结果的 channel :
done := make(chan struct{
})
defer close(done)
// 第一个阶段
paths, errc := walkFiles(done, root)
// 创建收集结果的 channel
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
// 第二个阶段,启动固定数量的 goroutine 计算 md5 值
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
// 启动一个新的 goroutine 等待所有操作完成,然后关闭 channel
go func() {
wg.Wait()
close(c)
}()
我们可以在 digester 创建并返回一个它自己的输出 channel,但是我们需要更多的 goroutine 来扇入这些结果。
最后一个阶段从 c channel 接收所有的结果,并从 errc channel 接收错误值来检查错误:
m := make(map[string][md5.Size]byte)
// 最后一个阶段,从 c channel 接收所有的结果
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 检查是否 Walk 函数返回失败
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
边栏推荐
- 力扣解法汇总1260-二维网格迁移
- DOM简介及查询
- 堡垒机,DMZ区
- ES6 assignment deconstruction
- Codeforces Round #806 (Div. 4)(7/7)
- 力扣解法汇总1051-高度检查器
- Fabric. JS centered element
- charm zaza functional test (by quqi99)
- 【Audio】基于STM32 I2S移植WM8978 Audio Codec驱动
- MySQL密码正确但是启动报错Unable to create initial connections of pool.Access denied for user ‘root‘@‘localhost
猜你喜欢
Understanding of continue in C language (fishing_1)
【SDIO】SD2.0协议分析总结(三)-- SD卡相关命令介绍
【Audio】I2S传输PCM音频数据分析总结(二)
Fabric.js 控制元素层级
第十二讲 MySQL之高可用组件MHA
Laravel 解决【1045】Access denied for user ‘homestead‘@‘localhost‘ (usin g password: YES)
creating vlan over openstack (by quqi99)
Android互联网大厂面试经验
在ubuntu中使用pypyodbc无法连接sql server
MySQL修改密码不成功(无效)的解决办法
随机推荐
【10点公开课】:云视频会议系统私有化实践
2022-07-21: given a string STR and a positive number k, you can divide STR into multiple substrings at will, in order to find that in a certain division scheme, there are as many palindrome substrings
Try kolla-ansible (by quqi99)
别让恐婚,扼杀你幸福!
力扣解法汇总558- 四叉树交集
MySQL密碼正確但是啟動報錯Unable to create initial connections of pool.Access denied for user ‘root‘@‘localhost
3.Transbot修改显示分辨率
包装类和字符串的方法
[10:00 public class]: cloud video conference system privatization practice
Js高级-对象的理解
垃圾回收
Rocky基础练习题-shell脚本-1
Android互联网大厂面试经验
Is it really necessary to define VO, Bo, Po, do, dto?
小程序实现列表和详情页
charm zaza functional test (by quqi99)
Top 10 active noise reduction headphones and top 10 active noise reduction headphones brands
Le mot de passe MySQL est correct, mais une erreur de démarrage n'a pas été signalée pour créer des connexions initiales de pool. Accès refusé pour l'utilisateur 'root' @ 'localhost
力扣解法汇总515-在每个树行中找最大值
sshfs + autofs + sshpass (by quqi99)