This commit is contained in:
白茶清欢 2023-08-04 13:58:56 +08:00
parent fb0a9d875c
commit 5bd0e4335f

169
core.go
View File

@ -9,6 +9,7 @@ package task
import ( import (
"container/list" "container/list"
"fmt"
"git.zhangdeman.cn/zhangdeman/easymap" "git.zhangdeman.cn/zhangdeman/easymap"
"time" "time"
) )
@ -34,6 +35,174 @@ type TimeWheel struct {
IsRunning bool // 是否运行中 IsRunning bool // 是否运行中
} }
// initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:37 2023/8/4
func (tw *TimeWheel) initSlots() {
for i := 0; i < tw.SlotCount; i++ {
tw.Slots[i] = list.New()
}
}
// Start 启动时间轮盘的内部函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:38 2023/8/4
func (tw *TimeWheel) Start() {
for {
select {
case <-tw.Ticker.C:
tw.checkAndRunTask()
case task := <-tw.AddTaskChannel:
// 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数
// 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽,
// 会造成任务过度集中
tw.AddTask(task, false)
case task := <-tw.RemoveTaskChannel:
tw.RemoveTask(task)
case <-tw.StopChannel:
tw.Ticker.Stop()
return
}
}
}
// checkAndRunTask 检查该轮盘点位上的Task看哪个需要执行
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:39 2023/8/4
func (tw *TimeWheel) checkAndRunTask() {
// 获取该轮盘位置的双向链表
currentList := tw.Slots[tw.CurrentPosition]
if currentList != nil {
for item := currentList.Front(); item != nil; {
task := item.Value.(*Task)
// 如果圈数>0表示还没到执行时间更新圈数
if task.Circle > 0 {
task.Circle--
item = item.Next()
continue
}
// 执行任务时Task.job是第一优先级然后是TimeWheel.job
if task.Job != nil {
go task.Job.Execute(nil, nil)
} else if tw.Job != nil {
go tw.Job.Execute(nil, nil)
} else {
fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key))
}
// 执行完成以后,将该任务从时间轮盘删除
next := item.Next()
tw.TaskRecords.Del(task.Key)
currentList.Remove(item)
item = next
// 重新添加任务到时间轮盘用Task.interval来获取下一次执行的轮盘位置
// 如果times==0,说明已经完成执行周期,不需要再添加任务回时间轮盘
if task.Times != 0 {
if task.Times < 0 {
tw.AddTask(task, true)
} else {
task.Times--
tw.AddTask(task, true)
}
}
}
}
// 轮盘前进一步
if tw.CurrentPosition == tw.SlotCount-1 {
tw.CurrentPosition = 0
} else {
tw.CurrentPosition++
}
}
// AddTask 添加任务的内部函数, 生成Task在时间轮盘位置和圈数的方式true表示利用Task.interval来生成false表示利用Task.createTime生成
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:43 2023/8/4
func (tw *TimeWheel) AddTask(task *Task, byInterval bool) {
var pos, circle int
if byInterval {
pos, circle = tw.getPosAndCircleByInterval(task.Interval)
} else {
pos, circle = tw.getPosAndCircleByCreatedTime(task.CreatedTime, task.Interval, task.Key)
}
task.Circle = circle
task.Position = pos
element := tw.Slots[pos].PushBack(task)
tw.TaskRecords.Set(task.Key, element)
}
// RemoveTask 删除任务的内部函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:44 2023/8/4
func (tw *TimeWheel) RemoveTask(task *Task) {
// 从map结构中删除
taskInfo, _ := tw.TaskRecords.Get(task.Key)
tw.TaskRecords.Del(task.Key)
// 通过TimeWheel.slots获取任务的
currentList := tw.Slots[task.Position]
currentList.Remove(taskInfo.(*list.Element))
}
// getPosAndCircleByInterval 该函数通过任务的周期来计算下次执行的位置和圈数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:46 2023/8/4
func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) {
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.SlotCount
pos := (tw.CurrentPosition + delaySeconds/intervalSeconds) % tw.SlotCount
// 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一
if pos == tw.CurrentPosition && circle != 0 {
circle--
}
return pos, circle
}
// getPosAndCircleByCreatedTime 该函数用任务的创建时间来计算下次执行的位置和圈数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:47 2023/8/4
func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key interface{}) (int, int) {
passedTime := time.Since(createdTime)
passedSeconds := int(passedTime.Seconds())
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.SlotCount
pos := (tw.CurrentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.SlotCount
// 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一
if pos == tw.CurrentPosition && circle != 0 {
circle--
}
return pos, circle
}
// Task 任务数据结构 // Task 任务数据结构
// //
// Author : go_developer@163.com<白茶清欢> // Author : go_developer@163.com<白茶清欢>