第一版时间轮任务调度实现 #1

Merged
zhangdeman merged 15 commits from feature/timewheel into master 2023-09-04 20:13:17 +08:00
Showing only changes of commit c068799b32 - Show all commits

49
core.go
View File

@ -30,16 +30,16 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel {
interval = time.Second * 60 interval = time.Second * 60
} }
tw := &TimeWheel{ tw := &TimeWheel{
Interval: interval, interval: interval,
Slots: make([]*list.List, slotCount), slots: make([]*list.List, slotCount),
ticker: time.NewTicker(interval), ticker: time.NewTicker(interval),
slotCount: slotCount, slotCount: slotCount,
addTaskChannel: make(chan *Task, 1000), addTaskChannel: make(chan *Task, 1000),
removeTaskChannel: make(chan *Task, 1000), removeTaskChannel: make(chan *Task, 1000),
stopChannel: make(chan bool, 1000), stopChannel: make(chan bool, 1000),
TaskRecords: easymap.NewNormal(true), taskRecords: easymap.NewNormal(true),
Job: nil, job: nil,
IsRunning: false, isRunning: false,
} }
tw.initSlots() tw.initSlots()
return tw return tw
@ -51,19 +51,19 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel {
// //
// Date : 11:14 2023/8/4 // Date : 11:14 2023/8/4
type TimeWheel struct { type TimeWheel struct {
Interval time.Duration // 时间轮精度 interval time.Duration // 时间轮精度
Slots []*list.List // 时间轮盘每个位置存储的任务列表 slots []*list.List // 时间轮盘每个位置存储的任务列表
ticker *time.Ticker // 定时器 ticker *time.Ticker // 定时器
currentPosition int // 时间轮盘当前位置 currentPosition int // 时间轮盘当前位置
slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间
addTaskChannel chan *Task addTaskChannel chan *Task
removeTaskChannel chan *Task removeTaskChannel chan *Task
stopChannel chan bool stopChannel chan bool
TaskRecords easymap.EasyMap // Map结构来存储Task对象key是Task.keyvalue是Task在双向链表中的存储对象 taskRecords easymap.EasyMap // Map结构来存储Task对象key是Task.keyvalue是Task在双向链表中的存储对象
// 需要执行的任务如果时间轮盘上的Task执行同一个Job可以直接实例化到TimeWheel结构体中。 // 需要执行的任务如果时间轮盘上的Task执行同一个Job可以直接实例化到TimeWheel结构体中。
// 此处的优先级低于Task中的Job参数 // 此处的优先级低于Task中的Job参数
Job ITask job ITask
IsRunning bool // 是否运行中 isRunning bool // 是否运行中
} }
// initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除 // initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除
@ -73,7 +73,7 @@ type TimeWheel struct {
// Date : 13:37 2023/8/4 // Date : 13:37 2023/8/4
func (tw *TimeWheel) initSlots() { func (tw *TimeWheel) initSlots() {
for i := 0; i < tw.slotCount; i++ { for i := 0; i < tw.slotCount; i++ {
tw.Slots[i] = list.New() tw.slots[i] = list.New()
} }
} }
@ -83,7 +83,7 @@ func (tw *TimeWheel) initSlots() {
// //
// Date : 13:38 2023/8/4 // Date : 13:38 2023/8/4
func (tw *TimeWheel) Start() { func (tw *TimeWheel) Start() {
tw.IsRunning = true tw.isRunning = true
for { for {
select { select {
case <-tw.ticker.C: case <-tw.ticker.C:
@ -98,7 +98,8 @@ func (tw *TimeWheel) Start() {
tw.RemoveTask(task) tw.RemoveTask(task)
case <-tw.stopChannel: case <-tw.stopChannel:
tw.ticker.Stop() tw.ticker.Stop()
tw.IsRunning = false tw.isRunning = false
// TODO : 重新初始化时间轮实例
return return
} }
} }
@ -112,7 +113,7 @@ func (tw *TimeWheel) Start() {
func (tw *TimeWheel) checkAndRunTask() { func (tw *TimeWheel) checkAndRunTask() {
// 获取该轮盘位置的双向链表 // 获取该轮盘位置的双向链表
currentList := tw.Slots[tw.currentPosition] currentList := tw.slots[tw.currentPosition]
if currentList != nil { if currentList != nil {
for item := currentList.Front(); item != nil; { for item := currentList.Front(); item != nil; {
@ -134,14 +135,14 @@ func (tw *TimeWheel) checkAndRunTask() {
}() }()
_, _ = task.Job.Execute(nil, nil) _, _ = task.Job.Execute(nil, nil)
}() }()
} else if tw.Job != nil { } else if tw.job != nil {
go func() { go func() {
defer func() { defer func() {
if r := recover(); nil != r { if r := recover(); nil != r {
} }
}() }()
_, _ = tw.Job.Execute(nil, nil) _, _ = tw.job.Execute(nil, nil)
}() }()
} else { } else {
fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key))
@ -149,7 +150,7 @@ func (tw *TimeWheel) checkAndRunTask() {
// 执行完成以后,将该任务从时间轮盘删除 // 执行完成以后,将该任务从时间轮盘删除
next := item.Next() next := item.Next()
tw.TaskRecords.Del(task.Key) tw.taskRecords.Del(task.Key)
currentList.Remove(item) currentList.Remove(item)
item = next item = next
@ -196,8 +197,8 @@ func (tw *TimeWheel) AddTask(task *Task, byInterval bool) {
task.Circle = circle task.Circle = circle
task.Position = pos task.Position = pos
element := tw.Slots[pos].PushBack(task) element := tw.slots[pos].PushBack(task)
tw.TaskRecords.Set(task.Key, element) tw.taskRecords.Set(task.Key, element)
} }
// RemoveTask 删除任务的内部函数 // RemoveTask 删除任务的内部函数
@ -207,11 +208,11 @@ func (tw *TimeWheel) AddTask(task *Task, byInterval bool) {
// Date : 13:44 2023/8/4 // Date : 13:44 2023/8/4
func (tw *TimeWheel) RemoveTask(task *Task) { func (tw *TimeWheel) RemoveTask(task *Task) {
// 从map结构中删除 // 从map结构中删除
taskInfo, _ := tw.TaskRecords.Get(task.Key) taskInfo, _ := tw.taskRecords.Get(task.Key)
tw.TaskRecords.Del(task.Key) tw.taskRecords.Del(task.Key)
// 通过TimeWheel.slots获取任务的 // 通过TimeWheel.slots获取任务的
currentList := tw.Slots[task.Position] currentList := tw.slots[task.Position]
currentList.Remove(taskInfo.(*list.Element)) currentList.Remove(taskInfo.(*list.Element))
} }
@ -222,7 +223,7 @@ func (tw *TimeWheel) RemoveTask(task *Task) {
// Date : 13:46 2023/8/4 // Date : 13:46 2023/8/4
func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) {
delaySeconds := int(d.Seconds()) delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds()) intervalSeconds := int(tw.interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.slotCount circle := delaySeconds / intervalSeconds / tw.slotCount
pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount
@ -243,7 +244,7 @@ func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.
passedTime := time.Since(createdTime) passedTime := time.Since(createdTime)
passedSeconds := int(passedTime.Seconds()) passedSeconds := int(passedTime.Seconds())
delaySeconds := int(d.Seconds()) delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds()) intervalSeconds := int(tw.interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.slotCount circle := delaySeconds / intervalSeconds / tw.slotCount
pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount