diff --git a/README.md b/README.md index 06b942e..e0f98d3 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ # task -任务调度接口 \ No newline at end of file +## 作用 + +延迟任务调度 + +## 算法原理 + +**时间轮算法** diff --git a/abstract.go b/abstract.go index af97c24..3772291 100644 --- a/abstract.go +++ b/abstract.go @@ -15,14 +15,12 @@ import "context" // // Date : 14:22 2022/6/23 type ITask interface { - // Name 任务名称标识, 全局唯一 - Name() string + // GetFlag 任务名称标识, 全局唯一 + GetFlag() string // Description 任务描述 Description() string - // GetRunID 获取任务ID - GetRunID() string // Callback 任务执行成功的回调 Callback(result *Result) error // Execute 执行任务 - Execute(ctx context.Context, cfg *Config) *Result + Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error) } diff --git a/core.go b/core.go new file mode 100644 index 0000000..f2e4d13 --- /dev/null +++ b/core.go @@ -0,0 +1,275 @@ +// Package task ... +// +// Description : task ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2023-08-04 11:13 +package task + +import ( + "container/list" + "fmt" + "git.zhangdeman.cn/zhangdeman/easymap" + "git.zhangdeman.cn/zhangdeman/wrapper" + "time" +) + +// NewTimeWheel 实例化时间轮 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:50 2023/8/11 +func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { + if slotCount <= 0 { + // 默认100个槽位 + slotCount = 100 + } + if interval.Seconds() == 0 { + // 默认60s扫描一次 + interval = time.Second * 60 + } + tw := &TimeWheel{ + interval: interval, + slots: make([]*list.List, slotCount), + ticker: time.NewTicker(interval), + slotCount: slotCount, + addTaskChannel: make(chan *Task, 1000), + removeTaskChannel: make(chan *Task, 1000), + stopChannel: make(chan bool, 1000), + taskRecords: easymap.NewNormal(true), + job: nil, + isRunning: false, + } + tw.initSlots() + return tw +} + +// TimeWheel 核心时间轮的数据结构 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:14 2023/8/4 +type TimeWheel struct { + interval time.Duration // 时间轮精度 + slots []*list.List // 时间轮盘每个位置存储的任务列表 + ticker *time.Ticker // 定时器 + currentPosition int // 时间轮盘当前位置 + slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 + addTaskChannel chan *Task + removeTaskChannel chan *Task + stopChannel chan bool + taskRecords easymap.EasyMap // Map结构来存储Task对象,key是Task.key,value是Task在双向链表中的存储对象 + // 需要执行的任务,如果时间轮盘上的Task执行同一个Job,可以直接实例化到TimeWheel结构体中。 + // 此处的优先级低于Task中的Job参数 + job ITask + isRunning bool // 是否运行中 +} + +// initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:37 2023/8/4 +func (tw *TimeWheel) initSlots() { + for i := 0; i < tw.slotCount; i++ { + tw.slots[i] = list.New() + } +} + +// Start 启动时间轮盘的内部函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:38 2023/8/4 +func (tw *TimeWheel) Start() { + tw.isRunning = true + for { + select { + case <-tw.ticker.C: + // 指定时间间隔之后, 调度一次任务 + tw.checkAndRunTask() + case task := <-tw.addTaskChannel: + // 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数 + // 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽, + // 会造成任务过度集中 + tw.AddTask(task, false) + case task := <-tw.removeTaskChannel: + tw.RemoveTask(task) + case <-tw.stopChannel: + tw.ticker.Stop() + tw.isRunning = false + // TODO : 重新初始化时间轮实例 + return + } + } +} + +// checkAndRunTask 检查该轮盘点位上的Task,看哪个需要执行 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:39 2023/8/4 +func (tw *TimeWheel) checkAndRunTask() { + + // 获取该轮盘位置的双向链表 + currentList := tw.slots[tw.currentPosition] + + if currentList != nil { + for item := currentList.Front(); item != nil; { + task := item.Value.(*Task) + // 如果圈数>0,表示还没到执行时间,更新圈数 + if task.Circle > 0 { + task.Circle-- + item = item.Next() + continue + } + + // 执行任务时,Task.job是第一优先级,然后是TimeWheel.job + if task.Job != nil { + go func() { + defer func() { + if r := recover(); nil != r { + + } + }() + _, _ = task.Job.Execute(nil, nil) + }() + } else if tw.job != nil { + go func() { + defer func() { + if r := recover(); nil != r { + + } + }() + _, _ = tw.job.Execute(nil, nil) + }() + } else { + fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) + } + + // 执行完成以后,将该任务从时间轮盘删除 + next := item.Next() + tw.taskRecords.Del(task.Key) + currentList.Remove(item) + + item = next + + // 重新添加任务到时间轮盘,用Task.interval来获取下一次执行的轮盘位置 + // 如果times==0,说明已经完成执行周期,不需要再添加任务回时间轮盘 + if task.Times != 0 { + if task.Times < 0 { + tw.AddTask(task, true) + } else { + task.Times-- + tw.AddTask(task, true) + } + } + } + } + + // 轮盘前进一步 + if tw.currentPosition == tw.slotCount-1 { + tw.currentPosition = 0 + } else { + tw.currentPosition++ + } +} + +// AddTask 添加任务的内部函数, 生成Task在时间轮盘位置和圈数的方式,true表示利用Task.interval来生成,false表示利用Task.createTime生成 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:43 2023/8/4 +func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { + // 生成 run_id + task.RunID = wrapper.StringFromRandom(128, "").Md5().Value + if nil != task && nil != task.Job { + task.Key = task.Job.GetFlag() + "_" + task.RunID + } + var pos, circle int + if byInterval { + pos, circle = tw.getPosAndCircleByInterval(task.Interval) + } else { + pos, circle = tw.getPosAndCircleByCreatedTime(task.CreatedTime, task.Interval, task.Key) + } + + task.Circle = circle + task.Position = pos + + element := tw.slots[pos].PushBack(task) + tw.taskRecords.Set(task.Key, element) +} + +// RemoveTask 删除任务的内部函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:44 2023/8/4 +func (tw *TimeWheel) RemoveTask(task *Task) { + // 从map结构中删除 + taskInfo, _ := tw.taskRecords.Get(task.Key) + tw.taskRecords.Del(task.Key) + + // 通过TimeWheel.slots获取任务的 + currentList := tw.slots[task.Position] + currentList.Remove(taskInfo.(*list.Element)) +} + +// getPosAndCircleByInterval 该函数通过任务的周期来计算下次执行的位置和圈数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:46 2023/8/4 +func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { + delaySeconds := int(d.Seconds()) + intervalSeconds := int(tw.interval.Seconds()) + circle := delaySeconds / intervalSeconds / tw.slotCount + pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount + + // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 + if pos == tw.currentPosition && circle != 0 { + circle-- + } + return pos, circle +} + +// getPosAndCircleByCreatedTime 该函数用任务的创建时间来计算下次执行的位置和圈数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:47 2023/8/4 +func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key interface{}) (int, int) { + + passedTime := time.Since(createdTime) + passedSeconds := int(passedTime.Seconds()) + delaySeconds := int(d.Seconds()) + intervalSeconds := int(tw.interval.Seconds()) + + circle := delaySeconds / intervalSeconds / tw.slotCount + pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount + + // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 + if pos == tw.currentPosition && circle != 0 { + circle-- + } + + return pos, circle +} + +// Task 任务数据结构 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:44 2023/8/4 +type Task struct { + RunID string // 每一次运行的run id + Key string // 用来标识task对象,是唯一的 + Interval time.Duration // 任务周期 + CreatedTime time.Time // 任务的创建时间 + Position int // 任务在轮盘的位置 + Circle int // 任务需要在轮盘走多少圈才能执行 + Job ITask // 任务需要执行的Job,优先级高于TimeWheel中的Job + Times int // 任务需要执行的次数,如果需要一直执行,设置成-1 + MaxExecuteTime int64 // 最大执行时长, 超时自动取消. 如不限制时长, 设置成0 +} diff --git a/core_test.go b/core_test.go new file mode 100644 index 0000000..9dbe5f0 --- /dev/null +++ b/core_test.go @@ -0,0 +1,67 @@ +// Package task ... +// +// Description : task ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2023-08-11 17:00 +package task + +import ( + "context" + "fmt" + "git.zhangdeman.cn/zhangdeman/wrapper" + "testing" + "time" +) + +var tw *TimeWheel + +// TestNewTimeWheel ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:00 2023/8/11 +func TestNewTimeWheel(t *testing.T) { + tw = NewTimeWheel(10, 10*time.Second) + tw.AddTask(&Task{ + Key: wrapper.StringFromRandom(32, "").Value(), + Interval: 5 * time.Second, + CreatedTime: time.Now(), + Position: 0, + Circle: 100, + Job: &testTask{}, + Times: 10, + MaxExecuteTime: 0, + }, false) + go tw.Start() + time.Sleep(3 * time.Second) + for tw.IsRunning { + + } +} + +type testTask struct { +} + +func (t testTask) GetFlag() string { + return "unit_test" +} + +func (t testTask) Description() string { + return "单元测试任务" +} + +func (t testTask) GetRunID() string { + return wrapper.StringFromRandom(32, "").Value() +} + +func (t testTask) Callback(result *Result) error { + fmt.Println(result) + return nil +} + +func (t testTask) Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error) { + fmt.Println(wrapper.OwnTimeFromNow().ToString()) + return nil, nil +} diff --git a/define.go b/define.go index bd1667a..8130850 100644 --- a/define.go +++ b/define.go @@ -19,6 +19,8 @@ type Config struct { ForbiddenCallback bool // Param 任务执行参数 Param map[string]interface{} + // TaskFlag 任务标识 + TaskFlag string // TaskName 任务名称 TaskName string // Async 是否异步 diff --git a/error.go b/error.go new file mode 100644 index 0000000..b79e0c8 --- /dev/null +++ b/error.go @@ -0,0 +1,17 @@ +// Package task ... +// +// Description : 内置的错误信息 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2023-08-04 11:50 +package task + +import "errors" + +var ( + // ErrDuplicateTaskKey 任务key重复 + ErrDuplicateTaskKey = errors.New("duplicate task key") + // ErrTaskKeyNotFound 任务key不存在 + ErrTaskKeyNotFound = errors.New("task key doesn't existed in task list, please check your input") +) diff --git a/go.mod b/go.mod index fcf1def..4c63e67 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,20 @@ module git.zhangdeman.cn/zhangdeman/task go 1.20 + +require git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 + +require ( + git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c // indirect + git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10 // indirect + git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b // indirect + git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1 // indirect + github.com/BurntSushi/toml v1.3.2 // indirect + github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect + github.com/go-ini/ini v1.67.0 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mozillazg/go-pinyin v0.20.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b1301bf --- /dev/null +++ b/go.sum @@ -0,0 +1,29 @@ +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c h1:Dan3iSVU6XTKt8r3/qixfPHPpfLZjkYlPmaJios7wtE= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 h1:+Lg4vXFEiWVKjhUJdXuoP0AgjGT49oqJ3301STnZErk= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10/go.mod h1:+Lc0zYF8sylRi75A7NGmObrLxugwAZa8WVpWh2eh5X0= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 h1:zN5JKHvzCH5q+X6W6fZw1hZ18FTlmZdv+k5T2j+6/GQ= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590/go.mod h1:l9S40lsDnTd/VAZjh1kmfYvz0B9z+7oT86pMQ/KurWo= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10 h1:orhcMAKrcOajsBJCgssnb9O8YcLsPJvWuXF511gs5dc= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10/go.mod h1:CzX5/WwGDTnKmewarnjkK5XcSRbgszTQTdTL3OUc/s4= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b h1:vnmxYrNdX6f5sEVjjkM1fIR+i32kHJ4g9DJqug9KKek= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b/go.mod h1:Yum5+tgP+Wf1GWUAyQz1Qh8Ab9m5+90GYkYdzqVs0lA= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1 h1:k2iu9KgRxeroytB+N+/XapAxt1di7o2pNTISjFlYDJ8= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1/go.mod h1:kvjAbtGTo14gKCS0X4rxnb2sPkskHOUy2NXcx34t6Mw= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= +github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=