当前位置:网站首页>A practical example of the go pipeline pattern -- calculating the MD5 value of a series of files
A practical example of the go pipeline pattern -- calculating the MD5 value of a series of files
2022-07-22 19:10:00 【alenliu0621】
Digesting a tree
Linux Upper md5sum
command , Some files can be calculated md5 value :
[email protected]:~/gogo/tour# md5sum *.go
d2b2c3719370ea0aa7261325926111fd bounded.go
157126313040135745466593b6f65508 interface1.go
62f7464386fceaade2c83184f05e09b7 parallel.go
eea9d6c2ed85f03e099686b7b9ae5d68 serial.go
Today we are going to look at examples of practical application pipelines and md5sum
Command similar , But it takes a directory path as a command line parameter , In alphabetical order of file pathnames , Print out all ordinary files in this directory md5 value . Here is an example serial.go Program output :
[email protected]:~/gogo/tour# go run serial.go .
d2b2c3719370ea0aa7261325926111fd bounded.go
157126313040135745466593b6f65508 interface1.go
62f7464386fceaade2c83184f05e09b7 parallel.go
eea9d6c2ed85f03e099686b7b9ae5d68 serial.go
Our program main Function calls an auxiliary function MD5All, This function returns a path from file pathname to md5 Mapping of values , Then sort the path names and print the results :
func main() {
// Calculate the md5 value , Sort the results by path name
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
// Get all file pathnames
var paths []string
for path := range m {
paths = append(paths, path)
}
// Sort
sort.Strings(paths)
// Print out the results
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
MD5All Function is the focus of our discussion . stay serial.go The implementation in does not use concurrency , Just in the process of traversal , Read each file , Calculate its md5 value .
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
// call filepath.Walk Traverse all the files in the directory , The second parameter is the file processing function
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
// An error occurred during traversal , Returns an error
if err != nil {
return err
}
// Ignore files that are not ordinary files
if !info.Mode().IsRegular() {
return nil
}
// Read file contents
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
// Calculation md5 value
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
Parallel digestion
stay parallel.go In the program , We will MD5All A pipe divided into two stages : First stage ,sumFiles, When traversing the files in the directory , In a new goroutine Calculate the md5 value , Send the result to a value of type result Of channel :
type result struct {
path string // File pathname
sum [md5.Size]byte // md5 value
err error // Wrong value
}
sumFiles Return to two channel: One is to collect results , The other is to deal with filepath.Walk Error returned . The file processing function starts a new goroutine To handle every ordinary file , Then check that the done channel. If done channel Shut down the , Then stop the traversal immediately :
func sumFiles(done <-chan struct{
}, root string) (<-chan result, <-chan error) {
// Create two channel
c := make(chan result)
errc := make(chan error, 1)
// Start a goroutine
go func() {
var wg sync.WaitGroup
// Traverse all the files in the directory
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
// Start a new goroutine To calculate the md5 value
go func() {
// Read file contents
data, err := ioutil.ReadFile(path)
// Use select sentence , stay done channel closed , Return immediately
select {
// Calculate the md5 Value sends the result to channel
case c <- result{
path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// If done channel Shut down the , Then stop traversing , Return an error
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Come here filepath.Walk Function returned , be-all wg.Add Call to complete ,
// Start a goroutine Wait for all operations to send results to complete , And shut down channel.
go func() {
wg.Wait()
close(c)
}()
// Send the error value returned during traversal to channel
errc <- err
}()
// Return to the created channel
return c, errc
}
stay MD5All Medium The second stage returns the result from channel In the receiving md5 value . When errors are found , Return immediately , Use defer
Statement closing done channel:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// establish done channel
done := make(chan struct{
})
// Turn off when function returns done channel, Will make all of the traversal process goroutine sign out
defer close(done)
// call sumFiles Calculate the md5 value
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
// Return results from channel In the receiving md5 value
for r := range c {
// If an error occurs, return immediately
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Receive error value , If an error occurs, an error is returned
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Bounded parallelism
stay parallel.go In program MD5All The implementation starts a new file for each file goroutine. In a folder containing a large number of large files , The allocated memory size may exceed the available memory of the machine .
We can limit memory allocation by limiting the number of files read in parallel . stay bounded.go in , We created a fixed number of... For reading files goroutine. Our pipeline now has three stages : Traverse the directory file , Read file calculation md5 value , Summarize the results .
First stage ,walkFiles, Send the pathnames of all ordinary files in a directory to paths channel:
func walkFiles(done <-chan struct{
}, root string) (<-chan string, <-chan error) {
// establish paths channel
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// stay Walk Close on return paths channel.
defer close(paths)
// Traverse the directory
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
// Send the file name to paths channel
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
// Return to the created channel
return paths, errc
}
The second stage starts a fixed number of digester goroutine, from paths channel Receive pathname , Read file calculation md5 value , Send the results to a collection of results channel:
func digester(done <-chan struct{
}, paths <-chan string, c chan<- result) {
// from paths channel Receive each pathname
for path := range paths {
// Read the file
data, err := ioutil.ReadFile(path)
select {
// Calculation md5 value , Send the results to c channel.
case c <- result{
path, md5.Sum(data), err}:
case <-done:
return
}
}
}
stay MD5All in , Wait for all. digester goroutine after , Turn off the output channel :
done := make(chan struct{
})
defer close(done)
// First stage
paths, errc := walkFiles(done, root)
// Create a channel
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
// Second stage , Start a fixed number of goroutine Calculation md5 value
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
// Start a new goroutine Wait for all operations to complete , Then close it channel
go func() {
wg.Wait()
close(c)
}()
We can do it in digester Create and return its own output channel, But we need more goroutine To fan in these results .
The last stage is from c channel Receive all results , And from errc channel Receive error values to check for errors :
m := make(map[string][md5.Size]byte)
// The last stage , from c channel Receive all results
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk Function return failed
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
边栏推荐
- 代码—
- 数据存储分区--范围分区,哈希分区,列表分区,性能调优必不可缺少的部分
- Problems encountered in using openfeign to realize remote call in Webflux
- PTA 6-11 find the median of self-determined type element sequence (25 points)
- Date operation in shell script
- 融云漫话:通信中台
- wget下载目录内的所有文件
- Transformer, another city! The top of many low-level tasks was occupied, and Peking University Huawei and others jointly proposed the pre training model IPT
- 协同办公市场暴增背后:融云通信能力是需求重点
- Shell脚本调试技术
猜你喜欢
Swagger-UI介绍及常用注解说明
PTA 6-11 find the median of self-determined type element sequence (25 points)
程序员面试金典面试题 01.01. 判定字符是否唯一
Programmer interview golden code interview question 01.03. URL
[yolov5 practice 4] traffic sign recognition system based on yolov5 - model test and evaluation
程序员面试金典面试题 01.03. URL化
写作单词积累
Interrogation aléatoire de n données dans diverses bases de données
Thymeleaf中一个页面怎么嵌套另一个页面,关于页面嵌套,标签告诉你应该知道的
numpy 求矩阵非零元素的均值
随机推荐
Randomly query n pieces of data in various databases
数据存储分区--范围分区,哈希分区,列表分区,性能调优必不可缺少的部分
MySQL execution process and sequence
Leetcode: 184. the highest paid employee in the Department
用LaTeX写论文时如何加资助信息
深度学习资料整理
Shell脚本调试技术
paper - A Physics-based Noise Formation Model for Extreme Low-light Raw Denoising
Flink learning notes (V) datastream API
Loss function in logistic regression
Mysql5.7 decompression configuration steps
Flink learning notes (IV) Flink runtime architecture
卧式单面多轴钻孔组合机床动力滑台液压系统的设计
C语言 static和extern知识点
协同办公市场暴增背后:融云通信能力是需求重点
numpy.reshape完成图像切割
idea快速上手指南
场景实践 | 如何使用融云超级群构建游戏社区
[QT source code reuse] simulate the pop-up mode of qcompleter
Ways of data optimization