task/core.go

265 lines
7.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package task ...
//
// Description : task ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2023-08-04 11:13
package task
import (
"container/list"
"fmt"
"git.zhangdeman.cn/zhangdeman/easymap"
"git.zhangdeman.cn/zhangdeman/wrapper"
"time"
)
// NewTimeWheel 实例化时间轮
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 16:50 2023/8/11
func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel {
tw := &TimeWheel{
Interval: interval,
Slots: make([]*list.List, slotCount),
Ticker: time.NewTicker(interval),
CurrentPosition: 0,
SlotCount: slotCount,
AddTaskChannel: make(chan *Task, 1000),
RemoveTaskChannel: make(chan *Task, 1000),
StopChannel: make(chan bool, 1000),
TaskRecords: easymap.NewNormal(true),
Job: nil,
IsRunning: false,
}
tw.initSlots()
return tw
}
// TimeWheel 核心时间轮的数据结构
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:14 2023/8/4
type TimeWheel struct {
Interval time.Duration // 时间轮精度
Slots []*list.List // 时间轮盘每个位置存储的任务列表
Ticker *time.Ticker // 定时器
CurrentPosition int // 时间轮盘当前位置
SlotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间
AddTaskChannel chan *Task
RemoveTaskChannel chan *Task
StopChannel chan bool
TaskRecords easymap.EasyMap // Map结构来存储Task对象key是Task.keyvalue是Task在双向链表中的存储对象
// 需要执行的任务如果时间轮盘上的Task执行同一个Job可以直接实例化到TimeWheel结构体中。
// 此处的优先级低于Task中的Job参数
Job ITask
IsRunning bool // 是否运行中
}
// initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:37 2023/8/4
func (tw *TimeWheel) initSlots() {
for i := 0; i < tw.SlotCount; i++ {
tw.Slots[i] = list.New()
}
}
// Start 启动时间轮盘的内部函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:38 2023/8/4
func (tw *TimeWheel) Start() {
tw.IsRunning = true
for {
select {
case <-tw.Ticker.C:
tw.checkAndRunTask()
case task := <-tw.AddTaskChannel:
// 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数
// 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽,
// 会造成任务过度集中
tw.AddTask(task, false)
case task := <-tw.RemoveTaskChannel:
tw.RemoveTask(task)
case <-tw.StopChannel:
tw.Ticker.Stop()
tw.IsRunning = false
return
}
}
}
// checkAndRunTask 检查该轮盘点位上的Task看哪个需要执行
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:39 2023/8/4
func (tw *TimeWheel) checkAndRunTask() {
// 获取该轮盘位置的双向链表
currentList := tw.Slots[tw.CurrentPosition]
if currentList != nil {
for item := currentList.Front(); item != nil; {
task := item.Value.(*Task)
// 如果圈数>0表示还没到执行时间更新圈数
if task.Circle > 0 {
task.Circle--
item = item.Next()
continue
}
// 执行任务时Task.job是第一优先级然后是TimeWheel.job
if task.Job != nil {
go func() {
defer func() {
if r := recover(); nil != r {
}
}()
_, _ = task.Job.Execute(nil, nil)
}()
} else if tw.Job != nil {
go func() {
defer func() {
if r := recover(); nil != r {
}
}()
_, _ = tw.Job.Execute(nil, nil)
}()
} else {
fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key))
}
// 执行完成以后,将该任务从时间轮盘删除
next := item.Next()
tw.TaskRecords.Del(task.Key)
currentList.Remove(item)
item = next
// 重新添加任务到时间轮盘用Task.interval来获取下一次执行的轮盘位置
// 如果times==0,说明已经完成执行周期,不需要再添加任务回时间轮盘
if task.Times != 0 {
if task.Times < 0 {
tw.AddTask(task, true)
} else {
task.Times--
tw.AddTask(task, true)
}
}
}
}
// 轮盘前进一步
if tw.CurrentPosition == tw.SlotCount-1 {
tw.CurrentPosition = 0
} else {
tw.CurrentPosition++
}
}
// AddTask 添加任务的内部函数, 生成Task在时间轮盘位置和圈数的方式true表示利用Task.interval来生成false表示利用Task.createTime生成
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:43 2023/8/4
func (tw *TimeWheel) AddTask(task *Task, byInterval bool) {
if nil != task {
// 生成 key
task.Key = wrapper.StringFromRandom(128, "").Md5().Value
}
var pos, circle int
if byInterval {
pos, circle = tw.getPosAndCircleByInterval(task.Interval)
} else {
pos, circle = tw.getPosAndCircleByCreatedTime(task.CreatedTime, task.Interval, task.Key)
}
task.Circle = circle
task.Position = pos
element := tw.Slots[pos].PushBack(task)
tw.TaskRecords.Set(task.Key, element)
}
// RemoveTask 删除任务的内部函数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:44 2023/8/4
func (tw *TimeWheel) RemoveTask(task *Task) {
// 从map结构中删除
taskInfo, _ := tw.TaskRecords.Get(task.Key)
tw.TaskRecords.Del(task.Key)
// 通过TimeWheel.slots获取任务的
currentList := tw.Slots[task.Position]
currentList.Remove(taskInfo.(*list.Element))
}
// getPosAndCircleByInterval 该函数通过任务的周期来计算下次执行的位置和圈数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:46 2023/8/4
func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) {
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.SlotCount
pos := (tw.CurrentPosition + delaySeconds/intervalSeconds) % tw.SlotCount
// 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一
if pos == tw.CurrentPosition && circle != 0 {
circle--
}
return pos, circle
}
// getPosAndCircleByCreatedTime 该函数用任务的创建时间来计算下次执行的位置和圈数
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 13:47 2023/8/4
func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key interface{}) (int, int) {
passedTime := time.Since(createdTime)
passedSeconds := int(passedTime.Seconds())
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.SlotCount
pos := (tw.CurrentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.SlotCount
// 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一
if pos == tw.CurrentPosition && circle != 0 {
circle--
}
return pos, circle
}
// Task 任务数据结构
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 11:44 2023/8/4
type Task struct {
Key string // 用来标识task对象是唯一的
Interval time.Duration // 任务周期
CreatedTime time.Time // 任务的创建时间
Position int // 任务在轮盘的位置
Circle int // 任务需要在轮盘走多少圈才能执行
Job ITask // 任务需要执行的Job优先级高于TimeWheel中的Job
Times int // 任务需要执行的次数,如果需要一直执行,设置成-1
MaxExecuteTime int64 // 最大执行时长, 超时自动取消. 如不限制时长, 设置成0
}