1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| package main
import ( "fmt" "sync" "time" )
type Task struct { F func() error Id int }
func NewTask(argc func() error, id int) *Task { t := &Task{ F: argc, Id: id, } return t }
func (t *Task) Execute() { fmt.Println("task ", t.Id, " is running ") t.F() time.Sleep(2 * time.Second) }
type Taskpool struct { Tasknum int Taskchannel chan *Task }
func NewTaskPool(tasknum int, taskmaxlen int) *Taskpool { tp := &Taskpool{ Tasknum: tasknum, Taskchannel: make(chan *Task, taskmaxlen), } return tp } func (tp *Taskpool) AllocateTask() { defer close(tp.Taskchannel) for i := 0; i < tp.Tasknum; i++ { t := NewTask(func() error { fmt.Println("task ", i, " is allocate") return nil }, i) tp.Taskchannel <- t } }
func (tp *Taskpool) ReceiveTask(jobchannel chan *Task) { defer close(jobchannel) for task := range tp.Taskchannel { fmt.Println("job channel get this task channel", task.Id) jobchannel <- task } }
type Workerpool struct { Workernum int Workchannel chan *Task }
func (wp *Workerpool) ReceiveWork(jobchannel chan *Task) { defer close(wp.Workchannel) for work := range jobchannel { fmt.Println("work channel get this work from job channel", work.Id) wp.Workchannel <- work } } func NewWorkerPool(workernum int, workmaxlen int) *Workerpool { w := &Workerpool{ Workernum: workernum, Workchannel: make(chan *Task, workmaxlen), } return w } func (w *Workerpool) CreateWork() { var wg sync.WaitGroup fmt.Println("work num is:", w.Workernum) for i := 0; i < w.Workernum; i++ { wg.Add(1) go w.GetWork(&wg, i) } wg.Wait() } func (w *Workerpool) GetWork(wg *sync.WaitGroup, id int) { for job := range w.Workchannel { job.Execute() fmt.Println("worker ", id, " works with task ", job.Id) } wg.Done() } func main() { startTime := time.Now() channelLen := 10 workerNum := 20 taskNum := 100 jobChannel := make(chan *Task, channelLen) worker := NewWorkerPool(workerNum, channelLen) task := NewTaskPool(taskNum, channelLen) go task.AllocateTask() go task.ReceiveTask(jobChannel) go worker.ReceiveWork(jobChannel) worker.CreateWork() endTime := time.Now() diff := endTime.Sub(startTime) fmt.Println("total time is :", diff.Seconds(), " seconds") }
|