当前位置:网站首页>Go concurrent programming - work pool
Go concurrent programming - work pool
2022-07-22 18:09:00 【go|Python】
List of articles
go Concurrent programming - Work pool
What is a work pool
One of the important applications of buffered channel is to implement work pool .
A work pool is a group of threads waiting for task allocation . Once the assigned task has been completed , These threads can continue to wait for the assignment of tasks , and python Process pool in , Thread pool .
We will use buffered channels to implement the work pool . The task of our work pool is to calculate the sum of each bit of the input number . for example , If input 234, The results will be 9( namely 2 + 3 + 4). The input to the work pool is a list of pseudo-random numbers .
The core functions of our work pool are as follows :
- Create a Go Xie Chengchi , Listen for an input buffer channel waiting for job allocation .
- Add the job to the input buffer channel .
- After job completion , Then write the result to an output type buffer channel .
- Read and print the result from the output type buffer channel .
Use of work pool
The first step is to define the task structure and the result structure .
// 1 Define a task structure and a result structure
type Job struct {
Id int
RandNum int
}
type Result struct {
job Job
total int
}
all Job
Structural variables will have id
and RandNum
Two fields ,RandNum
Used to calculate the sum of each number .
and Result
The structure has a job
Field , Indicates the corresponding job , One more total
Field , Represents the result of the calculation ( Sum of each number ).
The second step is to create buffer channels for receiving jobs and writing results respectively .
//2 Define two buffered channels , A storage task , A storage of calculation results
var jobsChan = make(chan Job, 10)
var resultChan = make(chan Result, 10)
Work coordination process (Worker Goroutine
) Will monitor the buffer channel jobsChan
Homework updated in . Once the work is completed , The result is written to the buffer channel resultChan
.
worker
Tasks are real work tasks , Loop to get the task from the task channel , Then calculate the sum of each bit of the integer , Finally, put the calculation results into the result channel . In order to simulate the calculation process, it took some time , We added 1 Second sleep time .
func worker(wg *sync.WaitGroup) {
// Take value from task channel and calculate , Plug into the result channel
for job := range jobsChan {
// from job Take random numbers out of the structure , Every bit adds up
var total = 0 // The sum of the
var randNum = job.RandNum // Random numbers
for randNum != 0 {
total += randNum % 10 // The sum of the + Random numbers take the remainder for each
randNum /= 10 // Random number divided by 10
}
// Simulate the delay , It is convenient for later viewing after opening multiple work pools , Whether the efficiency has been improved
time.Sleep(1 * time.Second)
// Plug the result into the result channel
resultChan <- Result{
job, total}
}
// If jobsChan It's over , Shut down the , The task can be finished
wg.Done()
}
The above function creates a worker (Worker), Read jobsChan
Channel data , Based on the current jobsChan
Calculation , And created a Result
Structural variable , Then write the result to results
Buffered channels .worker
Function receives a WaitGroup
Type of wg
As a parameter , When all jobsChan
When it's done , Called Done()
Method .
createWorkPool
The function creates a Go The working pool of the collaborative process .
func createWorkPool(num int) {
// Define a wg, Control all work pools to close after completing all tasks
var wgPool sync.WaitGroup
for i := 0; i < num; i++ {
wgPool.Add(1)
// Real execution , hold wgPool The pointer passes in
go worker(&wgPool)
}
// Wait for all work pools to complete
wgPool.Wait()
// All working pools are completed , indicate resultChan The channel is used up , You can close it
close(resultChan)
}
The parameter of the above function is the number of work processes to be created . Creating Go Before collaborative process , It calls for wg.Add(1)
Method , therefore WaitGroup
Counter increment . Next , We create a working process , And to worker
Function transfer wg
The address of . After creating the required work collaboration , Function call wg.Wait()
, Wait for all Go The execution of the cooperation process is completed . After all the coordination processes are completed , The function will close resultChan
channel . Because all the cooperation processes have been implemented , So there is no need to resultChan
The channel writes data .
Now we have a working pool , Let's continue to write a function , Assign homework to workers , Random generation job, Write to jobsChan
In the channel
func genRandNum(num int) {
for i := 0; i < num; i++ {
// The random number to be generated , Plug it into the buffer channel of the task
jobsChan <- Job{
i, rand.Intn(999)}
}
// When it's all stuffed in , You can turn off the channel
close(jobsChan)
}
above genRandNum
The function receives the number of jobs to be created as an input parameter , The maximum value generated is 998 Pseudorandom number of , And use this random number to create Job
Structural variable . This function takes for Cycle counter i
As id, Finally, write the created structure variable to jobsChan
channel . When writing all job
when , It's off jobsChan
channel .
The next step is to create a read results
Functions of channel and printout .
func printResult() {
for result := range resultChan {
fmt.Printf(" Mission id by :%d, The random number of the task is :%d, The result is :%d\n", result.job.Id, result.job.RandNum, result.total)
}
}
result
Function read results
channel , And print out job
Of id
、 Input random number 、 The sum of each number of the random number .
Now everything is ready . We continue to complete the last step , stay main()
Call all the above functions in the function .
func main() {
start := time.Now()
// Start the process , Write tasks into the task channel , Write 100 A random number
go genRandNum(100)
// Start the process , Print calculation results
go printResult()
// Create a work pool , Be careful : The work pool must be downloaded below the above two tasks
// If you put it on it , Internal wgPool.Wait(), Master Xie Cheng has been here , Neither the task channel nor the result channel will write data , Cause a deadlock
createWorkPool(10) // Create size as 10 Working pool of
end := time.Now()
fmt.Println(" The total time is :", end.Sub(start))
}
We started with main
Function saves the start time of the program start, And calculated on the last line end
and start
The difference between the , Show the total time the program runs . Because we want to change the number of processes , Look at the running time of the program .
We put Work pool
Set to 10, Next, I call genRandNum
, Generate 100 individual job, towards jobsChan
Channel addition job .
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Work pool
// There are a number of random numbers ---》 The sum of each digit ---》10 personal /100 Do it alone
// First step : Define task structure and result structure
type Job struct {
jobId int // Mission id
randNum int // The random number of this task
}
type Result struct {
job Job // Put the task in
total int // Sum of each random number
}
// The second step : Define two channels ( There's a cushion ), Separate storage Mission result
var jobChan = make(chan Job, 10)
var resultChan = make(chan Result, 10)
// The third step : Write a task , Randomly generate a batch of numbers ---》 Put it into the task channel
// n Indicates how many
func genRandNum(n int) {
for i := 0; i < n; i++ {
// Generate random number , Random generation is less than 999 Of int Type digital
//rand.Intn(9999)
jobChan <- Job{
jobId: i, randNum: rand.Intn(9999)} // Generative Job Put the structure object into the task channel
}
// for The loop ends , explain , The task is all put in , It can be turned off Task channel
close(jobChan)
}
// Step four : Write a real task worker, function
func worker(wg *sync.WaitGroup) {
// worker It should be implemented in the collaboration process
for job := range jobChan {
// Cyclic task channel , Take out the task to execute
// Calculate the sum of each job.randNum
num := job.randNum // 67 8
total := 0
for num != 0 {
total += num % 10
num /= 10
} // Calculation total
// Analog time delay Doing this job requires 1s Time
time.Sleep(1*time.Second)
// The result is put in In the result channel
resultChan <- Result{
job: job, total: total}
}
wg.Done()
}
// Step five : Create a work pool
func createWorkingPool(maxPool int) {
var wg sync.WaitGroup
for i := 0; i < maxPool; i++ {
wg.Add(1)
go worker(&wg) // How big is the pool , How many people work , perform worker
}
wg.Wait() // Wait for the completion of all work coordination
// The work is done ---> As a result, the storage channel can be closed
close(resultChan)
}
// Step six : Print out All data in the result channel
func printResult() {
for result := range resultChan {
// Take data from the result channel and print ---》 Once the result channel is closed --》 Indicates that the task has been completed ---》for The loop ends
fmt.Printf(" Mission id by :%d, The random number of tasks is :%d, The result of random number is :%d\n", result.job.jobId, result.job.randNum, result.total)
}
}
// Step seven :main Function call
func main() {
start:=time.Now()
// 1 Generate 100 random number ---》 Put it in the task queue
go genRandNum(100)
// 2 Print the result in another collaboration
go printResult()
// 3 Create a work pool to perform tasks
createWorkingPool(5)
end:=time.Now()
fmt.Println(end.Sub(start)) // Count the running time of the program 10 Do it yourself 10.032089437s 1 Do it alone want 100s 100 personal 1s Finish more
}
The program will print a total of 100 That's ok , Corresponding 100 Item job , Then the total time consumed by one line of program will be printed finally . because Go The running sequence of the coordination process is not necessarily , Similarly, the total time will vary depending on the hardware .
If you put main
Function Work pool
Add to 100. We doubled the number of workers . Due to the increase of work coordination ( To be exact, it is twice ), Therefore, the total time spent by the program will be reduced .
Now we can understand , As the number of work collaborations increases , The total time to complete the homework will be reduced . in general , Work use is a typical example of the producer consumer model .
边栏推荐
- Bigder:35/100 development colleagues said that I tested it myself. But something went wrong after the launch.
- Hblock盘活企业级存储市场
- Bigder:36/100 报表测试中常见的业务名词
- Computer Graphics From Scratch - Chapter 4
- 【Excle】生成guid和datetime导入测试数据到数据库
- How Allegro imports pictures such as high-definition logo, QR code, anti-static logo and Chinese characters
- ORACLE语句调整
- Oracle statement adjustment
- 分布式(一)分布式系统,BASE,CAP是何方神圣?
- 并发模型值Actor和CSP
猜你喜欢
Allegro如何导入高清Logo、二维码、防静电标识等图片以及汉字
Methods of downloading literature from IEEE
路由协议是什么
[cloud native] docker deployment database persistence
Bigder:37/100 一个误操作
Computer Graphics From Scratch - Chapter 4
How to solve the gloomy life under the middle-aged crisis of it
《微信小程序-进阶篇》Lin-ui组件库的安装与引入
在线XML转CSV工具
【OpenCV入门实战】利用电脑前置摄像头进行人脸检测
随机推荐
zmq无锁队列的浅解
Conference OA project
Excel import export controller
How does boss direct hire write an excellent resume?
MySQL和MariaDB区别
RK3399平台开发系列讲解(input子系统)4.52、input子系统的实现原理
逻辑回归(公式推导+numpy实现)
Linear regression (formula derivation +numpy Implementation)
智汇华云 | 集群日志动态采集方案
Rk3399 platform development series explanation (alsa subsystem) 4.37, alsa driven framework
Bigder:37/100 一个误操作
Teach you to write makefile files from 0
JS to determine whether the linked image exists
Bigder:35/100 development colleagues said that I tested it myself. But something went wrong after the launch.
vscode 安装 tools失败
[database] addition, deletion, modification and query of MySQL table (basic)
Logistic regression (formula derivation +numpy Implementation)
Pytorch实现Word2Vec
OceanBase数据库搭建测试
计算存款利息