// Package task ... // // Description : task ... // // Author : go_developer@163.com<白茶清欢> // // Date : 2023-08-04 11:13 package task import ( "container/list" "fmt" "git.zhangdeman.cn/zhangdeman/easymap" "git.zhangdeman.cn/zhangdeman/wrapper" "time" ) // NewTimeWheel 实例化时间轮 // // Author : go_developer@163.com<白茶清欢> // // Date : 16:50 2023/8/11 func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { if slotCount <= 0 { // 默认100个槽位 slotCount = 100 } if interval.Seconds() == 0 { // 默认60s扫描一次 interval = time.Second * 60 } tw := &TimeWheel{ interval: interval, slots: make([]*list.List, slotCount), ticker: time.NewTicker(interval), slotCount: slotCount, addTaskChannel: make(chan *Task, 1000), removeTaskChannel: make(chan *Task, 1000), stopChannel: make(chan bool, 1000), taskRecords: easymap.NewNormal(), job: nil, isRunning: false, } tw.initSlots() return tw } // TimeWheel 核心时间轮的数据结构 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:14 2023/8/4 type TimeWheel struct { interval time.Duration // 时间轮精度 slots []*list.List // 时间轮盘每个位置存储的任务列表 ticker *time.Ticker // 定时器 currentPosition int // 时间轮盘当前位置 slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 addTaskChannel chan *Task removeTaskChannel chan *Task stopChannel chan bool taskRecords easymap.EasyMap // Map结构来存储Task对象,key是Task.key,value是Task在双向链表中的存储对象 // 需要执行的任务,如果时间轮盘上的Task执行同一个Job,可以直接实例化到TimeWheel结构体中。 // 此处的优先级低于Task中的Job参数 job ITask isRunning bool // 是否运行中 } // initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除 // // Author : go_developer@163.com<白茶清欢> // // Date : 13:37 2023/8/4 func (tw *TimeWheel) initSlots() { for i := 0; i < tw.slotCount; i++ { tw.slots[i] = list.New() } } // Start 启动时间轮盘的内部函数 // // Author : go_developer@163.com<白茶清欢> // // Date : 13:38 2023/8/4 func (tw *TimeWheel) Start() { tw.isRunning = true for { select { case <-tw.ticker.C: // 指定时间间隔之后, 调度一次任务 tw.checkAndRunTask() case task := <-tw.addTaskChannel: // 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数 // 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽, // 会造成任务过度集中 tw.AddTask(task, false) case task := <-tw.removeTaskChannel: tw.RemoveTask(task) case <-tw.stopChannel: tw.ticker.Stop() tw.isRunning = false // TODO : 重新初始化时间轮实例 return } } } // checkAndRunTask 检查该轮盘点位上的Task,看哪个需要执行 // // Author : go_developer@163.com<白茶清欢> // // Date : 13:39 2023/8/4 func (tw *TimeWheel) checkAndRunTask() { // 获取该轮盘位置的双向链表 currentList := tw.slots[tw.currentPosition] if currentList != nil { for item := currentList.Front(); item != nil; { task := item.Value.(*Task) // 如果圈数>0,表示还没到执行时间,更新圈数 if task.Circle > 0 { task.Circle-- item = item.Next() continue } // 执行任务时,Task.job是第一优先级,然后是TimeWheel.job if task.Job != nil { go func() { defer func() { if r := recover(); nil != r { } }() _ = task.Job.Execute(nil, nil) }() } else if tw.job != nil { go func() { defer func() { if r := recover(); nil != r { } }() _ = tw.job.Execute(nil, nil) }() } else { fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) } // 执行完成以后,将该任务从时间轮盘删除 next := item.Next() tw.taskRecords.Del(task.Key) currentList.Remove(item) item = next // 重新添加任务到时间轮盘,用Task.interval来获取下一次执行的轮盘位置 // 如果times==0,说明已经完成执行周期,不需要再添加任务回时间轮盘 if task.Times != 0 { if task.Times < 0 { tw.AddTask(task, true) } else { task.Times-- tw.AddTask(task, true) } } } } // 轮盘前进一步 if tw.currentPosition == tw.slotCount-1 { tw.currentPosition = 0 } else { tw.currentPosition++ } } // AddTask 添加任务的内部函数, 生成Task在时间轮盘位置和圈数的方式,true表示利用Task.interval来生成,false表示利用Task.createTime生成 // // Author : go_developer@163.com<白茶清欢> // // Date : 13:43 2023/8/4 func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { if nil == task { return } // 生成 run_id task.RunID = wrapper.StringFromRandom(128, "").Md5().Value if nil != task.Job { task.Key = task.Job.GetFlag() + "_" + task.RunID } var pos, circle int if byInterval { pos, circle = tw.getPosAndCircleByInterval(task.Interval) } else { pos, circle = tw.getPosAndCircleByCreatedTime(task.CreatedTime, task.Interval, task.Key) } task.Circle = circle task.Position = pos element := tw.slots[pos].PushBack(task) tw.taskRecords.Set(task.Key, element) } // RemoveTask 删除任务的内部函数 // // Author : go_developer@163.com<白茶清欢> // // Date : 13:44 2023/8/4 func (tw *TimeWheel) RemoveTask(task *Task) { // 从map结构中删除 taskInfo, _ := tw.taskRecords.Get(task.Key) tw.taskRecords.Del(task.Key) // 通过TimeWheel.slots获取任务的 currentList := tw.slots[task.Position] currentList.Remove(taskInfo.(*list.Element)) } // getPosAndCircleByInterval 该函数通过任务的周期来计算下次执行的位置和圈数 // // Author : go_developer@163.com<白茶清欢> // // Date : 13:46 2023/8/4 func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { delaySeconds := int(d.Seconds()) intervalSeconds := int(tw.interval.Seconds()) circle := delaySeconds / intervalSeconds / tw.slotCount pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 if pos == tw.currentPosition && circle != 0 { circle-- } return pos, circle } // getPosAndCircleByCreatedTime 该函数用任务的创建时间来计算下次执行的位置和圈数 // // Author : go_developer@163.com<白茶清欢> // // Date : 13:47 2023/8/4 func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key any) (int, int) { passedTime := time.Since(createdTime) passedSeconds := int(passedTime.Seconds()) delaySeconds := int(d.Seconds()) intervalSeconds := int(tw.interval.Seconds()) circle := delaySeconds / intervalSeconds / tw.slotCount pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 if pos == tw.currentPosition && circle != 0 { circle-- } return pos, circle } // Task 任务数据结构 // // Author : go_developer@163.com<白茶清欢> // // Date : 11:44 2023/8/4 type Task struct { RunID string // 每一次运行的run id Key string // 用来标识task对象,是唯一的 Interval time.Duration // 任务周期 CreatedTime time.Time // 任务的创建时间 Position int // 任务在轮盘的位置 Circle int // 任务需要在轮盘走多少圈才能执行 Job ITask // 任务需要执行的Job,优先级高于TimeWheel中的Job Times int // 任务需要执行的次数,如果需要一直执行,设置成-1 MaxExecuteTime int64 // 最大执行时长, 超时自动取消. 如不限制时长, 设置成0 }