第一版时间轮任务调度实现 #1

Merged
zhangdeman merged 15 commits from feature/timewheel into master 2023-09-04 20:13:17 +08:00
Showing only changes of commit a96cf04261 - Show all commits

56
core.go
View File

@ -32,12 +32,11 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel {
tw := &TimeWheel{ tw := &TimeWheel{
Interval: interval, Interval: interval,
Slots: make([]*list.List, slotCount), Slots: make([]*list.List, slotCount),
Ticker: time.NewTicker(interval), ticker: time.NewTicker(interval),
CurrentPosition: 0, slotCount: slotCount,
SlotCount: slotCount, addTaskChannel: make(chan *Task, 1000),
AddTaskChannel: make(chan *Task, 1000), removeTaskChannel: make(chan *Task, 1000),
RemoveTaskChannel: make(chan *Task, 1000), stopChannel: make(chan bool, 1000),
StopChannel: make(chan bool, 1000),
TaskRecords: easymap.NewNormal(true), TaskRecords: easymap.NewNormal(true),
Job: nil, Job: nil,
IsRunning: false, IsRunning: false,
@ -54,12 +53,12 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel {
type TimeWheel struct { type TimeWheel struct {
Interval time.Duration // 时间轮精度 Interval time.Duration // 时间轮精度
Slots []*list.List // 时间轮盘每个位置存储的任务列表 Slots []*list.List // 时间轮盘每个位置存储的任务列表
Ticker *time.Ticker // 定时器 ticker *time.Ticker // 定时器
CurrentPosition int // 时间轮盘当前位置 currentPosition int // 时间轮盘当前位置
SlotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间
AddTaskChannel chan *Task addTaskChannel chan *Task
RemoveTaskChannel chan *Task removeTaskChannel chan *Task
StopChannel chan bool stopChannel chan bool
TaskRecords easymap.EasyMap // Map结构来存储Task对象key是Task.keyvalue是Task在双向链表中的存储对象 TaskRecords easymap.EasyMap // Map结构来存储Task对象key是Task.keyvalue是Task在双向链表中的存储对象
// 需要执行的任务如果时间轮盘上的Task执行同一个Job可以直接实例化到TimeWheel结构体中。 // 需要执行的任务如果时间轮盘上的Task执行同一个Job可以直接实例化到TimeWheel结构体中。
// 此处的优先级低于Task中的Job参数 // 此处的优先级低于Task中的Job参数
@ -73,7 +72,7 @@ type TimeWheel struct {
// //
// Date : 13:37 2023/8/4 // Date : 13:37 2023/8/4
func (tw *TimeWheel) initSlots() { func (tw *TimeWheel) initSlots() {
for i := 0; i < tw.SlotCount; i++ { for i := 0; i < tw.slotCount; i++ {
tw.Slots[i] = list.New() tw.Slots[i] = list.New()
} }
} }
@ -87,17 +86,18 @@ func (tw *TimeWheel) Start() {
tw.IsRunning = true tw.IsRunning = true
for { for {
select { select {
case <-tw.Ticker.C: case <-tw.ticker.C:
// 指定时间间隔之后, 调度一次任务
tw.checkAndRunTask() tw.checkAndRunTask()
case task := <-tw.AddTaskChannel: case task := <-tw.addTaskChannel:
// 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数 // 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数
// 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽, // 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽,
// 会造成任务过度集中 // 会造成任务过度集中
tw.AddTask(task, false) tw.AddTask(task, false)
case task := <-tw.RemoveTaskChannel: case task := <-tw.removeTaskChannel:
tw.RemoveTask(task) tw.RemoveTask(task)
case <-tw.StopChannel: case <-tw.stopChannel:
tw.Ticker.Stop() tw.ticker.Stop()
tw.IsRunning = false tw.IsRunning = false
return return
} }
@ -112,7 +112,7 @@ func (tw *TimeWheel) Start() {
func (tw *TimeWheel) checkAndRunTask() { func (tw *TimeWheel) checkAndRunTask() {
// 获取该轮盘位置的双向链表 // 获取该轮盘位置的双向链表
currentList := tw.Slots[tw.CurrentPosition] currentList := tw.Slots[tw.currentPosition]
if currentList != nil { if currentList != nil {
for item := currentList.Front(); item != nil; { for item := currentList.Front(); item != nil; {
@ -168,10 +168,10 @@ func (tw *TimeWheel) checkAndRunTask() {
} }
// 轮盘前进一步 // 轮盘前进一步
if tw.CurrentPosition == tw.SlotCount-1 { if tw.currentPosition == tw.slotCount-1 {
tw.CurrentPosition = 0 tw.currentPosition = 0
} else { } else {
tw.CurrentPosition++ tw.currentPosition++
} }
} }
@ -222,11 +222,11 @@ func (tw *TimeWheel) RemoveTask(task *Task) {
func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) {
delaySeconds := int(d.Seconds()) delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds()) intervalSeconds := int(tw.Interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.SlotCount circle := delaySeconds / intervalSeconds / tw.slotCount
pos := (tw.CurrentPosition + delaySeconds/intervalSeconds) % tw.SlotCount pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount
// 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一 // 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一
if pos == tw.CurrentPosition && circle != 0 { if pos == tw.currentPosition && circle != 0 {
circle-- circle--
} }
return pos, circle return pos, circle
@ -244,11 +244,11 @@ func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.
delaySeconds := int(d.Seconds()) delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds()) intervalSeconds := int(tw.Interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.SlotCount circle := delaySeconds / intervalSeconds / tw.slotCount
pos := (tw.CurrentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.SlotCount pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount
// 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一 // 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一
if pos == tw.CurrentPosition && circle != 0 { if pos == tw.currentPosition && circle != 0 {
circle-- circle--
} }