diff --git a/core.go b/core.go index b20acf6..143fb9b 100644 --- a/core.go +++ b/core.go @@ -9,6 +9,7 @@ package task import ( "container/list" + "fmt" "git.zhangdeman.cn/zhangdeman/easymap" "time" ) @@ -34,6 +35,174 @@ type TimeWheel struct { IsRunning bool // 是否运行中 } +// initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:37 2023/8/4 +func (tw *TimeWheel) initSlots() { + for i := 0; i < tw.SlotCount; i++ { + tw.Slots[i] = list.New() + } +} + +// Start 启动时间轮盘的内部函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:38 2023/8/4 +func (tw *TimeWheel) Start() { + for { + select { + case <-tw.Ticker.C: + tw.checkAndRunTask() + case task := <-tw.AddTaskChannel: + // 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数 + // 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽, + // 会造成任务过度集中 + tw.AddTask(task, false) + case task := <-tw.RemoveTaskChannel: + tw.RemoveTask(task) + case <-tw.StopChannel: + tw.Ticker.Stop() + return + } + } +} + +// checkAndRunTask 检查该轮盘点位上的Task,看哪个需要执行 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:39 2023/8/4 +func (tw *TimeWheel) checkAndRunTask() { + + // 获取该轮盘位置的双向链表 + currentList := tw.Slots[tw.CurrentPosition] + + if currentList != nil { + for item := currentList.Front(); item != nil; { + task := item.Value.(*Task) + // 如果圈数>0,表示还没到执行时间,更新圈数 + if task.Circle > 0 { + task.Circle-- + item = item.Next() + continue + } + + // 执行任务时,Task.job是第一优先级,然后是TimeWheel.job + if task.Job != nil { + go task.Job.Execute(nil, nil) + } else if tw.Job != nil { + go tw.Job.Execute(nil, nil) + } else { + fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) + } + + // 执行完成以后,将该任务从时间轮盘删除 + next := item.Next() + tw.TaskRecords.Del(task.Key) + currentList.Remove(item) + + item = next + + // 重新添加任务到时间轮盘,用Task.interval来获取下一次执行的轮盘位置 + // 如果times==0,说明已经完成执行周期,不需要再添加任务回时间轮盘 + if task.Times != 0 { + if task.Times < 0 { + tw.AddTask(task, true) + } else { + task.Times-- + tw.AddTask(task, true) + } + } + } + } + + // 轮盘前进一步 + if tw.CurrentPosition == tw.SlotCount-1 { + tw.CurrentPosition = 0 + } else { + tw.CurrentPosition++ + } +} + +// AddTask 添加任务的内部函数, 生成Task在时间轮盘位置和圈数的方式,true表示利用Task.interval来生成,false表示利用Task.createTime生成 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:43 2023/8/4 +func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { + var pos, circle int + if byInterval { + pos, circle = tw.getPosAndCircleByInterval(task.Interval) + } else { + pos, circle = tw.getPosAndCircleByCreatedTime(task.CreatedTime, task.Interval, task.Key) + } + + task.Circle = circle + task.Position = pos + + element := tw.Slots[pos].PushBack(task) + tw.TaskRecords.Set(task.Key, element) +} + +// RemoveTask 删除任务的内部函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:44 2023/8/4 +func (tw *TimeWheel) RemoveTask(task *Task) { + // 从map结构中删除 + taskInfo, _ := tw.TaskRecords.Get(task.Key) + tw.TaskRecords.Del(task.Key) + + // 通过TimeWheel.slots获取任务的 + currentList := tw.Slots[task.Position] + currentList.Remove(taskInfo.(*list.Element)) +} + +// getPosAndCircleByInterval 该函数通过任务的周期来计算下次执行的位置和圈数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:46 2023/8/4 +func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { + delaySeconds := int(d.Seconds()) + intervalSeconds := int(tw.Interval.Seconds()) + circle := delaySeconds / intervalSeconds / tw.SlotCount + pos := (tw.CurrentPosition + delaySeconds/intervalSeconds) % tw.SlotCount + + // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 + if pos == tw.CurrentPosition && circle != 0 { + circle-- + } + return pos, circle +} + +// getPosAndCircleByCreatedTime 该函数用任务的创建时间来计算下次执行的位置和圈数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:47 2023/8/4 +func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key interface{}) (int, int) { + + passedTime := time.Since(createdTime) + passedSeconds := int(passedTime.Seconds()) + delaySeconds := int(d.Seconds()) + intervalSeconds := int(tw.Interval.Seconds()) + + circle := delaySeconds / intervalSeconds / tw.SlotCount + pos := (tw.CurrentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.SlotCount + + // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 + if pos == tw.CurrentPosition && circle != 0 { + circle-- + } + + return pos, circle +} + // Task 任务数据结构 // // Author : go_developer@163.com<白茶清欢>