From 5971cd569fc6196ca31272b91f55de6969d8143c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 4 Aug 2023 10:41:30 +0800 Subject: [PATCH 01/14] update README --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 06b942e..e0f98d3 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ # task -任务调度接口 \ No newline at end of file +## 作用 + +延迟任务调度 + +## 算法原理 + +**时间轮算法** From a2ba1440d4e5946099fe6d54b4a534f2b210dc29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 4 Aug 2023 10:45:08 +0800 Subject: [PATCH 02/14] update code --- abstract.go | 6 +++--- define.go | 2 ++ task.go | 37 ++++++++++++++++--------------------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/abstract.go b/abstract.go index af97c24..4191eba 100644 --- a/abstract.go +++ b/abstract.go @@ -15,8 +15,8 @@ import "context" // // Date : 14:22 2022/6/23 type ITask interface { - // Name 任务名称标识, 全局唯一 - Name() string + // GetFlag 任务名称标识, 全局唯一 + GetFlag() string // Description 任务描述 Description() string // GetRunID 获取任务ID @@ -24,5 +24,5 @@ type ITask interface { // Callback 任务执行成功的回调 Callback(result *Result) error // Execute 执行任务 - Execute(ctx context.Context, cfg *Config) *Result + Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error) } diff --git a/define.go b/define.go index 6e7b078..c140f2b 100644 --- a/define.go +++ b/define.go @@ -19,6 +19,8 @@ type Config struct { ForbiddenCallback bool // Param 任务执行参数 Param map[string]interface{} + // TaskFlag 任务标识 + TaskFlag string } // Result 执行结果 diff --git a/task.go b/task.go index b49e528..04d1c32 100644 --- a/task.go +++ b/task.go @@ -51,10 +51,10 @@ func (d *dispatch) Register(taskInstanceList ...ITask) error { if nil == taskInstance { continue } - if _, exist := d.taskTable[taskInstance.Name()]; exist { - return fmt.Errorf("%s 任务重复注册! ", taskInstance.Name()) + if _, exist := d.taskTable[taskInstance.GetFlag()]; exist { + return fmt.Errorf("%s 任务重复注册! ", taskInstance.GetFlag()) } - d.taskTable[taskInstance.Name()] = taskInstance + d.taskTable[taskInstance.GetFlag()] = taskInstance } return nil } @@ -109,36 +109,31 @@ func (d *dispatch) Run(ctx context.Context, cfg *Config) *Result { TaskConfig: cfg, Data: nil, Err: nil, - Async: cfg.Async, } defer func() { result.FinishTime = time.Now().UnixNano() result.Used = result.FinishTime - result.StartTime }() d.lock.RLock() - if taskInstance, exist = d.taskTable[cfg.TaskName]; !exist { - result.Err = fmt.Errorf("%v 任务未注册", cfg.TaskName) + if taskInstance, exist = d.taskTable[cfg.TaskFlag]; !exist { + result.Err = fmt.Errorf("%v 任务未注册", cfg.TaskFlag) + return result } d.lock.RUnlock() result.TaskRunID = taskInstance.GetRunID() result.TaskDescription = taskInstance.Description() - if cfg.Async { - // 异步运行 - go func() { - if e := recover(); nil != e { - switch e.(type) { - case runtime.Error: // 运行时错误 - result.Err = fmt.Errorf("出现运行时Panic : %v", e) - default: // 非运行时错误 - result.Err = fmt.Errorf("出现其他场景Panic : %v", e) - } + // 异步运行 + go func() { + if e := recover(); nil != e { + switch e.(type) { + case runtime.Error: // 运行时错误 + result.Err = fmt.Errorf("出现运行时Panic : %v", e) + default: // 非运行时错误 + result.Err = fmt.Errorf("出现其他场景Panic : %v", e) } - result.Data, result.Err = taskInstance.Execute(ctx, cfg) - _ = taskInstance.Callback(result) - }() - } else { + } result.Data, result.Err = taskInstance.Execute(ctx, cfg) _ = taskInstance.Callback(result) - } + }() return result } From 162550fd350decb52b06203ff801a5a7766b26af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 4 Aug 2023 11:49:44 +0800 Subject: [PATCH 03/14] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BB=93=E6=9E=84=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 20 ++++++++++++++++++++ go.sum | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+) create mode 100644 core.go create mode 100644 go.sum diff --git a/core.go b/core.go new file mode 100644 index 0000000..b20acf6 --- /dev/null +++ b/core.go @@ -0,0 +1,51 @@ +// Package task ... +// +// Description : task ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2023-08-04 11:13 +package task + +import ( + "container/list" + "git.zhangdeman.cn/zhangdeman/easymap" + "time" +) + +// TimeWheel 核心时间轮的数据结构 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:14 2023/8/4 +type TimeWheel struct { + Interval time.Duration // 时间轮精度 + Slots []*list.List // 时间轮盘每个位置存储的任务列表 + Ticker *time.Ticker // 定时器 + CurrentPosition int // 时间轮盘当前位置 + SlotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 + AddTaskChannel chan *Task + RemoveTaskChannel chan *Task + StopChannel chan bool + TaskRecords easymap.EasyMap // Map结构来存储Task对象,key是Task.key,value是Task在双向链表中的存储对象 + // 需要执行的任务,如果时间轮盘上的Task执行同一个Job,可以直接实例化到TimeWheel结构体中。 + // 此处的优先级低于Task中的Job参数 + Job ITask + IsRunning bool // 是否运行中 +} + +// Task 任务数据结构 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 11:44 2023/8/4 +type Task struct { + Key string // 用来标识task对象,是唯一的 + Interval time.Duration // 任务周期 + CreatedTime time.Time // 任务的创建时间 + Position int // 任务在轮盘的位置 + Circle int // 任务需要在轮盘走多少圈才能执行 + Job ITask // 任务需要执行的Job,优先级高于TimeWheel中的Job + Times int // 任务需要执行的次数,如果需要一直执行,设置成-1 + MaxExecuteTime int64 // 最大执行时长, 超时自动取消. 如不限制时长, 设置成0 +} diff --git a/go.mod b/go.mod index fcf1def..93fd1ce 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,23 @@ module git.zhangdeman.cn/zhangdeman/task go 1.20 + +require ( + git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 // indirect + git.zhangdeman.cn/zhangdeman/util v0.0.0-20230801092344-773ac512f305 // indirect + github.com/BurntSushi/toml v1.3.2 // indirect + github.com/Jeffail/gabs v1.4.0 // indirect + github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect + github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect + github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 // indirect + github.com/go-ini/ini v1.67.0 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mozillazg/go-pinyin v0.20.0 // indirect + github.com/mssola/user_agent v0.6.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tidwall/gjson v1.15.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..38cb114 --- /dev/null +++ b/go.sum @@ -0,0 +1,37 @@ +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 h1:+Lg4vXFEiWVKjhUJdXuoP0AgjGT49oqJ3301STnZErk= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10/go.mod h1:+Lc0zYF8sylRi75A7NGmObrLxugwAZa8WVpWh2eh5X0= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20230801092344-773ac512f305 h1:6Bs/cQP+eKABHB/01uHQI15PwKbo7n8HNx7nIFUGBp0= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20230801092344-773ac512f305/go.mod h1:trYFOShINaQBvinQrH4A0G2kfL22Y2lygEcAiGDt/sc= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo= +github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= +github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= +github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= +github.com/mssola/user_agent v0.6.0 h1:uwPR4rtWlCHRFyyP9u2KOV0u8iQXmS7Z7feTrstQwk4= +github.com/mssola/user_agent v0.6.0/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/tidwall/gjson v1.15.0 h1:5n/pM+v3r5ujuNl4YLZLsQ+UE5jlkLVm7jMzT5Mpolw= +github.com/tidwall/gjson v1.15.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From fb0a9d875c0c48a5da0be988dc98822cc82cadf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 4 Aug 2023 12:07:57 +0800 Subject: [PATCH 04/14] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=A4=E4=B8=AA?= =?UTF-8?q?=E5=86=85=E7=BD=AEerror=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- error.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 error.go diff --git a/error.go b/error.go new file mode 100644 index 0000000..b79e0c8 --- /dev/null +++ b/error.go @@ -0,0 +1,17 @@ +// Package task ... +// +// Description : 内置的错误信息 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2023-08-04 11:50 +package task + +import "errors" + +var ( + // ErrDuplicateTaskKey 任务key重复 + ErrDuplicateTaskKey = errors.New("duplicate task key") + // ErrTaskKeyNotFound 任务key不存在 + ErrTaskKeyNotFound = errors.New("task key doesn't existed in task list, please check your input") +) From 5bd0e4335f151eeaf9bbf7a56df3a08deca015c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 4 Aug 2023 13:58:56 +0800 Subject: [PATCH 05/14] save --- core.go | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) diff --git a/core.go b/core.go index b20acf6..143fb9b 100644 --- a/core.go +++ b/core.go @@ -9,6 +9,7 @@ package task import ( "container/list" + "fmt" "git.zhangdeman.cn/zhangdeman/easymap" "time" ) @@ -34,6 +35,174 @@ type TimeWheel struct { IsRunning bool // 是否运行中 } +// initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:37 2023/8/4 +func (tw *TimeWheel) initSlots() { + for i := 0; i < tw.SlotCount; i++ { + tw.Slots[i] = list.New() + } +} + +// Start 启动时间轮盘的内部函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:38 2023/8/4 +func (tw *TimeWheel) Start() { + for { + select { + case <-tw.Ticker.C: + tw.checkAndRunTask() + case task := <-tw.AddTaskChannel: + // 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数 + // 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽, + // 会造成任务过度集中 + tw.AddTask(task, false) + case task := <-tw.RemoveTaskChannel: + tw.RemoveTask(task) + case <-tw.StopChannel: + tw.Ticker.Stop() + return + } + } +} + +// checkAndRunTask 检查该轮盘点位上的Task,看哪个需要执行 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:39 2023/8/4 +func (tw *TimeWheel) checkAndRunTask() { + + // 获取该轮盘位置的双向链表 + currentList := tw.Slots[tw.CurrentPosition] + + if currentList != nil { + for item := currentList.Front(); item != nil; { + task := item.Value.(*Task) + // 如果圈数>0,表示还没到执行时间,更新圈数 + if task.Circle > 0 { + task.Circle-- + item = item.Next() + continue + } + + // 执行任务时,Task.job是第一优先级,然后是TimeWheel.job + if task.Job != nil { + go task.Job.Execute(nil, nil) + } else if tw.Job != nil { + go tw.Job.Execute(nil, nil) + } else { + fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) + } + + // 执行完成以后,将该任务从时间轮盘删除 + next := item.Next() + tw.TaskRecords.Del(task.Key) + currentList.Remove(item) + + item = next + + // 重新添加任务到时间轮盘,用Task.interval来获取下一次执行的轮盘位置 + // 如果times==0,说明已经完成执行周期,不需要再添加任务回时间轮盘 + if task.Times != 0 { + if task.Times < 0 { + tw.AddTask(task, true) + } else { + task.Times-- + tw.AddTask(task, true) + } + } + } + } + + // 轮盘前进一步 + if tw.CurrentPosition == tw.SlotCount-1 { + tw.CurrentPosition = 0 + } else { + tw.CurrentPosition++ + } +} + +// AddTask 添加任务的内部函数, 生成Task在时间轮盘位置和圈数的方式,true表示利用Task.interval来生成,false表示利用Task.createTime生成 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:43 2023/8/4 +func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { + var pos, circle int + if byInterval { + pos, circle = tw.getPosAndCircleByInterval(task.Interval) + } else { + pos, circle = tw.getPosAndCircleByCreatedTime(task.CreatedTime, task.Interval, task.Key) + } + + task.Circle = circle + task.Position = pos + + element := tw.Slots[pos].PushBack(task) + tw.TaskRecords.Set(task.Key, element) +} + +// RemoveTask 删除任务的内部函数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:44 2023/8/4 +func (tw *TimeWheel) RemoveTask(task *Task) { + // 从map结构中删除 + taskInfo, _ := tw.TaskRecords.Get(task.Key) + tw.TaskRecords.Del(task.Key) + + // 通过TimeWheel.slots获取任务的 + currentList := tw.Slots[task.Position] + currentList.Remove(taskInfo.(*list.Element)) +} + +// getPosAndCircleByInterval 该函数通过任务的周期来计算下次执行的位置和圈数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:46 2023/8/4 +func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { + delaySeconds := int(d.Seconds()) + intervalSeconds := int(tw.Interval.Seconds()) + circle := delaySeconds / intervalSeconds / tw.SlotCount + pos := (tw.CurrentPosition + delaySeconds/intervalSeconds) % tw.SlotCount + + // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 + if pos == tw.CurrentPosition && circle != 0 { + circle-- + } + return pos, circle +} + +// getPosAndCircleByCreatedTime 该函数用任务的创建时间来计算下次执行的位置和圈数 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 13:47 2023/8/4 +func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key interface{}) (int, int) { + + passedTime := time.Since(createdTime) + passedSeconds := int(passedTime.Seconds()) + delaySeconds := int(d.Seconds()) + intervalSeconds := int(tw.Interval.Seconds()) + + circle := delaySeconds / intervalSeconds / tw.SlotCount + pos := (tw.CurrentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.SlotCount + + // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 + if pos == tw.CurrentPosition && circle != 0 { + circle-- + } + + return pos, circle +} + // Task 任务数据结构 // // Author : go_developer@163.com<白茶清欢> From 18cd67a9ada11d07b917d7c8214351b9eb7d88ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 16:47:21 +0800 Subject: [PATCH 06/14] update --- core.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/core.go b/core.go index 143fb9b..049bdfd 100644 --- a/core.go +++ b/core.go @@ -92,9 +92,23 @@ func (tw *TimeWheel) checkAndRunTask() { // 执行任务时,Task.job是第一优先级,然后是TimeWheel.job if task.Job != nil { - go task.Job.Execute(nil, nil) + go func() { + defer func() { + if r := recover(); nil != r { + + } + }() + _, _ = task.Job.Execute(nil, nil) + }() } else if tw.Job != nil { - go tw.Job.Execute(nil, nil) + go func() { + defer func() { + if r := recover(); nil != r { + + } + }() + _, _ = tw.Job.Execute(nil, nil) + }() } else { fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) } From 8c6112ddf8e79ee7d6963e8c5badd251f20eccec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 16:53:19 +0800 Subject: [PATCH 07/14] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E8=BD=AE=E5=AE=9E=E4=BE=8B=E5=8C=96=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core.go | 23 +++++++++++++++++++++++ go.mod | 16 +++------------- go.sum | 29 ++--------------------------- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/core.go b/core.go index 049bdfd..a1f19c8 100644 --- a/core.go +++ b/core.go @@ -14,6 +14,29 @@ import ( "time" ) +// NewTimeWheel 实例化时间轮 +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 16:50 2023/8/11 +func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { + tw := &TimeWheel{ + Interval: 0, + Slots: make([]*list.List, 0), + Ticker: time.NewTicker(interval), + CurrentPosition: 0, + SlotCount: slotCount, + AddTaskChannel: make(chan *Task, 1000), + RemoveTaskChannel: make(chan *Task, 1000), + StopChannel: make(chan bool, 1000), + TaskRecords: easymap.NewNormal(true), + Job: nil, + IsRunning: false, + } + tw.initSlots() + return tw +} + // TimeWheel 核心时间轮的数据结构 // // Author : go_developer@163.com<白茶清欢> diff --git a/go.mod b/go.mod index 93fd1ce..31375d7 100644 --- a/go.mod +++ b/go.mod @@ -2,22 +2,12 @@ module git.zhangdeman.cn/zhangdeman/task go 1.20 +require git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 + require ( - git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 // indirect - git.zhangdeman.cn/zhangdeman/util v0.0.0-20230801092344-773ac512f305 // indirect - github.com/BurntSushi/toml v1.3.2 // indirect - github.com/Jeffail/gabs v1.4.0 // indirect - github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect - github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect - github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 // indirect - github.com/go-ini/ini v1.67.0 // indirect + git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mozillazg/go-pinyin v0.20.0 // indirect - github.com/mssola/user_agent v0.6.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/tidwall/gjson v1.15.0 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 38cb114..d29a822 100644 --- a/go.sum +++ b/go.sum @@ -1,37 +1,12 @@ git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 h1:+Lg4vXFEiWVKjhUJdXuoP0AgjGT49oqJ3301STnZErk= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10/go.mod h1:+Lc0zYF8sylRi75A7NGmObrLxugwAZa8WVpWh2eh5X0= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20230801092344-773ac512f305 h1:6Bs/cQP+eKABHB/01uHQI15PwKbo7n8HNx7nIFUGBp0= -git.zhangdeman.cn/zhangdeman/util v0.0.0-20230801092344-773ac512f305/go.mod h1:trYFOShINaQBvinQrH4A0G2kfL22Y2lygEcAiGDt/sc= -github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= -github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo= -github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= -github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= -github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU= -github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4= -github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= -github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b h1:vnmxYrNdX6f5sEVjjkM1fIR+i32kHJ4g9DJqug9KKek= +git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b/go.mod h1:Yum5+tgP+Wf1GWUAyQz1Qh8Ab9m5+90GYkYdzqVs0lA= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= -github.com/mssola/user_agent v0.6.0 h1:uwPR4rtWlCHRFyyP9u2KOV0u8iQXmS7Z7feTrstQwk4= -github.com/mssola/user_agent v0.6.0/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/tidwall/gjson v1.15.0 h1:5n/pM+v3r5ujuNl4YLZLsQ+UE5jlkLVm7jMzT5Mpolw= -github.com/tidwall/gjson v1.15.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= -github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= -github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= -github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From e84950da597fd3971a5adf5cf4c406f120335eec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 16:58:48 +0800 Subject: [PATCH 08/14] update go mod --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 31375d7..c8e05fe 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module git.zhangdeman.cn/zhangdeman/task go 1.20 -require git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 +require git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 require ( git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b // indirect diff --git a/go.sum b/go.sum index d29a822..0c2854e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 h1:+Lg4vXFEiWVKjhUJdXuoP0AgjGT49oqJ3301STnZErk= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10/go.mod h1:+Lc0zYF8sylRi75A7NGmObrLxugwAZa8WVpWh2eh5X0= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 h1:zN5JKHvzCH5q+X6W6fZw1hZ18FTlmZdv+k5T2j+6/GQ= +git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590/go.mod h1:l9S40lsDnTd/VAZjh1kmfYvz0B9z+7oT86pMQ/KurWo= git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b h1:vnmxYrNdX6f5sEVjjkM1fIR+i32kHJ4g9DJqug9KKek= git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b/go.mod h1:Yum5+tgP+Wf1GWUAyQz1Qh8Ab9m5+90GYkYdzqVs0lA= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= From bb088f1d29856a56c42f03513414bed1960ac7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 17:24:16 +0800 Subject: [PATCH 09/14] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0,=20=E5=A2=9E=E5=8A=A0=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core.go | 6 ++-- core_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 7 +++++ go.sum | 15 ++++++++++ 4 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 core_test.go diff --git a/core.go b/core.go index a1f19c8..a973c91 100644 --- a/core.go +++ b/core.go @@ -21,8 +21,8 @@ import ( // Date : 16:50 2023/8/11 func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { tw := &TimeWheel{ - Interval: 0, - Slots: make([]*list.List, 0), + Interval: interval, + Slots: make([]*list.List, slotCount), Ticker: time.NewTicker(interval), CurrentPosition: 0, SlotCount: slotCount, @@ -75,6 +75,7 @@ func (tw *TimeWheel) initSlots() { // // Date : 13:38 2023/8/4 func (tw *TimeWheel) Start() { + tw.IsRunning = true for { select { case <-tw.Ticker.C: @@ -88,6 +89,7 @@ func (tw *TimeWheel) Start() { tw.RemoveTask(task) case <-tw.StopChannel: tw.Ticker.Stop() + tw.IsRunning = false return } } diff --git a/core_test.go b/core_test.go new file mode 100644 index 0000000..a48bcf2 --- /dev/null +++ b/core_test.go @@ -0,0 +1,77 @@ +// Package task ... +// +// Description : task ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 2023-08-11 17:00 +package task + +import ( + "context" + "fmt" + "git.zhangdeman.cn/zhangdeman/wrapper" + "testing" + "time" +) + +var tw *TimeWheel + +// TestNewTimeWheel ... +// +// Author : go_developer@163.com<白茶清欢> +// +// Date : 17:00 2023/8/11 +func TestNewTimeWheel(t *testing.T) { + tw = NewTimeWheel(10, 10*time.Second) + tw.AddTask(&Task{ + Key: wrapper.StringFromRandom(32, "").Value(), + Interval: 5 * time.Second, + CreatedTime: time.Now(), + Position: 0, + Circle: 100, + Job: &testTask{}, + Times: 10, + MaxExecuteTime: 0, + }, false) + go tw.Start() + time.Sleep(3 * time.Second) + for tw.IsRunning { + + } +} + +type testTask struct { +} + +func (t testTask) GetFlag() string { + return "unit_test" +} + +func (t testTask) Description() string { + return "单元测试任务" +} + +func (t testTask) GetRunID() string { + return wrapper.StringFromRandom(32, "").Value() +} + +func (t testTask) Callback(result *Result) error { + fmt.Println(result) + return nil +} + +func (t testTask) Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error) { + fmt.Println(wrapper.OwnTimeFromNow().ToString()) + tw.AddTask(&Task{ + Key: wrapper.StringFromRandom(32, "").Value(), + Interval: 5 * time.Second, + CreatedTime: time.Now(), + Position: 0, + Circle: 100, + Job: &testTask{}, + Times: 10, + MaxExecuteTime: 100, + }, false) + return nil, nil +} diff --git a/go.mod b/go.mod index c8e05fe..4c63e67 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,16 @@ go 1.20 require git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 require ( + git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c // indirect + git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10 // indirect git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b // indirect + git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1 // indirect + github.com/BurntSushi/toml v1.3.2 // indirect + github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect + github.com/go-ini/ini v1.67.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mozillazg/go-pinyin v0.20.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0c2854e..b1301bf 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,21 @@ +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c h1:Dan3iSVU6XTKt8r3/qixfPHPpfLZjkYlPmaJios7wtE= +git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 h1:+Lg4vXFEiWVKjhUJdXuoP0AgjGT49oqJ3301STnZErk= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10/go.mod h1:+Lc0zYF8sylRi75A7NGmObrLxugwAZa8WVpWh2eh5X0= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 h1:zN5JKHvzCH5q+X6W6fZw1hZ18FTlmZdv+k5T2j+6/GQ= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590/go.mod h1:l9S40lsDnTd/VAZjh1kmfYvz0B9z+7oT86pMQ/KurWo= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10 h1:orhcMAKrcOajsBJCgssnb9O8YcLsPJvWuXF511gs5dc= +git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10/go.mod h1:CzX5/WwGDTnKmewarnjkK5XcSRbgszTQTdTL3OUc/s4= git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b h1:vnmxYrNdX6f5sEVjjkM1fIR+i32kHJ4g9DJqug9KKek= git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b/go.mod h1:Yum5+tgP+Wf1GWUAyQz1Qh8Ab9m5+90GYkYdzqVs0lA= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1 h1:k2iu9KgRxeroytB+N+/XapAxt1di7o2pNTISjFlYDJ8= +git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1/go.mod h1:kvjAbtGTo14gKCS0X4rxnb2sPkskHOUy2NXcx34t6Mw= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= @@ -12,3 +24,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From d6b415a83955dc04b75d2932a9bf11d20fa76873 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 18:14:05 +0800 Subject: [PATCH 10/14] =?UTF-8?q?=E6=AF=8F=E6=AC=A1=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=94=9F=E6=88=90=E4=B8=80=E4=B8=AA=E4=BB=BB=E5=8A=A1Key?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- abstract.go | 2 - core.go | 5 ++ core_test.go | 10 ---- task.go | 139 --------------------------------------------------- 4 files changed, 5 insertions(+), 151 deletions(-) delete mode 100644 task.go diff --git a/abstract.go b/abstract.go index 4191eba..3772291 100644 --- a/abstract.go +++ b/abstract.go @@ -19,8 +19,6 @@ type ITask interface { GetFlag() string // Description 任务描述 Description() string - // GetRunID 获取任务ID - GetRunID() string // Callback 任务执行成功的回调 Callback(result *Result) error // Execute 执行任务 diff --git a/core.go b/core.go index a973c91..a94998a 100644 --- a/core.go +++ b/core.go @@ -11,6 +11,7 @@ import ( "container/list" "fmt" "git.zhangdeman.cn/zhangdeman/easymap" + "git.zhangdeman.cn/zhangdeman/wrapper" "time" ) @@ -172,6 +173,10 @@ func (tw *TimeWheel) checkAndRunTask() { // // Date : 13:43 2023/8/4 func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { + if nil != task { + // 生成 key + task.Key = wrapper.StringFromRandom(128, "").Md5().Value + } var pos, circle int if byInterval { pos, circle = tw.getPosAndCircleByInterval(task.Interval) diff --git a/core_test.go b/core_test.go index a48bcf2..9dbe5f0 100644 --- a/core_test.go +++ b/core_test.go @@ -63,15 +63,5 @@ func (t testTask) Callback(result *Result) error { func (t testTask) Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error) { fmt.Println(wrapper.OwnTimeFromNow().ToString()) - tw.AddTask(&Task{ - Key: wrapper.StringFromRandom(32, "").Value(), - Interval: 5 * time.Second, - CreatedTime: time.Now(), - Position: 0, - Circle: 100, - Job: &testTask{}, - Times: 10, - MaxExecuteTime: 100, - }, false) return nil, nil } diff --git a/task.go b/task.go deleted file mode 100644 index 04d1c32..0000000 --- a/task.go +++ /dev/null @@ -1,139 +0,0 @@ -// Package task ... -// -// Description : task ... -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 2022-06-23 15:13 -package task - -import ( - "context" - "fmt" - "runtime" - "sync" - "time" -) - -var ( - // Dispatch 调度实例 - Dispatch *dispatch -) - -func init() { - Dispatch = &dispatch{ - lock: &sync.RWMutex{}, - taskTable: make(map[string]ITask), - } -} - -// dispatch 任务调度 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:14 2022/6/23 -type dispatch struct { - // 锁 - lock *sync.RWMutex - // 任务表 - taskTable map[string]ITask -} - -// Register 注册任务 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:16 2022/6/23 -func (d *dispatch) Register(taskInstanceList ...ITask) error { - d.lock.Lock() - defer d.lock.Unlock() - for _, taskInstance := range taskInstanceList { - if nil == taskInstance { - continue - } - if _, exist := d.taskTable[taskInstance.GetFlag()]; exist { - return fmt.Errorf("%s 任务重复注册! ", taskInstance.GetFlag()) - } - d.taskTable[taskInstance.GetFlag()] = taskInstance - } - return nil -} - -// Remove 移除任务 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:16 2022/6/23 -func (d *dispatch) Remove(taskNameList ...string) error { - d.lock.Lock() - defer d.lock.Unlock() - for _, taskName := range taskNameList { - delete(d.taskTable, taskName) - } - return nil -} - -// Run 执行任务 -// -// Author : go_developer@163.com<白茶清欢> -// -// Date : 15:18 2022/6/23 -func (d *dispatch) Run(ctx context.Context, cfg *Config) *Result { - var ( - taskInstance ITask - exist bool - cancelFunc context.CancelFunc - ) - - if nil == ctx { - if cfg.Timeout > 0 { - ctx, cancelFunc = context.WithCancel(context.Background()) - } else { - ctx = context.Background() - } - } else { - ctx, cancelFunc = context.WithCancel(ctx) - } - - if nil != cancelFunc { - // 带了超时时间 - cancelFunc() - } - - result := &Result{ - StartTime: time.Now().UnixNano(), - FinishTime: 0, - Used: 0, - TaskRunID: "", - TaskDescription: "", - TaskConfig: cfg, - Data: nil, - Err: nil, - } - defer func() { - result.FinishTime = time.Now().UnixNano() - result.Used = result.FinishTime - result.StartTime - }() - d.lock.RLock() - if taskInstance, exist = d.taskTable[cfg.TaskFlag]; !exist { - result.Err = fmt.Errorf("%v 任务未注册", cfg.TaskFlag) - return result - } - d.lock.RUnlock() - result.TaskRunID = taskInstance.GetRunID() - result.TaskDescription = taskInstance.Description() - // 异步运行 - go func() { - if e := recover(); nil != e { - switch e.(type) { - case runtime.Error: // 运行时错误 - result.Err = fmt.Errorf("出现运行时Panic : %v", e) - default: // 非运行时错误 - result.Err = fmt.Errorf("出现其他场景Panic : %v", e) - } - } - result.Data, result.Err = taskInstance.Execute(ctx, cfg) - _ = taskInstance.Callback(result) - }() - return result -} From 4d9cb375ec45eb03542b06ad85feac2dd6bba376 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 18:18:00 +0800 Subject: [PATCH 11/14] =?UTF-8?q?slotCount=E5=92=8Cinterval=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E9=BB=98=E8=AE=A4=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core.go b/core.go index a94998a..415525d 100644 --- a/core.go +++ b/core.go @@ -21,6 +21,14 @@ import ( // // Date : 16:50 2023/8/11 func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { + if slotCount <= 0 { + // 默认100个槽位 + slotCount = 100 + } + if interval.Seconds() == 0 { + // 默认60s扫描一次 + interval = time.Second * 60 + } tw := &TimeWheel{ Interval: interval, Slots: make([]*list.List, slotCount), From a96cf04261d72e77483f9eb8e03203fbfeaa0399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 18:23:52 +0800 Subject: [PATCH 12/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8F=98=E9=87=8F?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core.go | 56 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/core.go b/core.go index 415525d..97d1dc8 100644 --- a/core.go +++ b/core.go @@ -32,12 +32,11 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { tw := &TimeWheel{ Interval: interval, Slots: make([]*list.List, slotCount), - Ticker: time.NewTicker(interval), - CurrentPosition: 0, - SlotCount: slotCount, - AddTaskChannel: make(chan *Task, 1000), - RemoveTaskChannel: make(chan *Task, 1000), - StopChannel: make(chan bool, 1000), + ticker: time.NewTicker(interval), + slotCount: slotCount, + addTaskChannel: make(chan *Task, 1000), + removeTaskChannel: make(chan *Task, 1000), + stopChannel: make(chan bool, 1000), TaskRecords: easymap.NewNormal(true), Job: nil, IsRunning: false, @@ -54,12 +53,12 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { type TimeWheel struct { Interval time.Duration // 时间轮精度 Slots []*list.List // 时间轮盘每个位置存储的任务列表 - Ticker *time.Ticker // 定时器 - CurrentPosition int // 时间轮盘当前位置 - SlotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 - AddTaskChannel chan *Task - RemoveTaskChannel chan *Task - StopChannel chan bool + ticker *time.Ticker // 定时器 + currentPosition int // 时间轮盘当前位置 + slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 + addTaskChannel chan *Task + removeTaskChannel chan *Task + stopChannel chan bool TaskRecords easymap.EasyMap // Map结构来存储Task对象,key是Task.key,value是Task在双向链表中的存储对象 // 需要执行的任务,如果时间轮盘上的Task执行同一个Job,可以直接实例化到TimeWheel结构体中。 // 此处的优先级低于Task中的Job参数 @@ -73,7 +72,7 @@ type TimeWheel struct { // // Date : 13:37 2023/8/4 func (tw *TimeWheel) initSlots() { - for i := 0; i < tw.SlotCount; i++ { + for i := 0; i < tw.slotCount; i++ { tw.Slots[i] = list.New() } } @@ -87,17 +86,18 @@ func (tw *TimeWheel) Start() { tw.IsRunning = true for { select { - case <-tw.Ticker.C: + case <-tw.ticker.C: + // 指定时间间隔之后, 调度一次任务 tw.checkAndRunTask() - case task := <-tw.AddTaskChannel: + case task := <-tw.addTaskChannel: // 此处利用Task.createTime来定位任务在时间轮盘的位置和执行圈数 // 如果直接用任务的周期来定位位置,那么在服务重启的时候,任务周器相同的点会被定位到相同的卡槽, // 会造成任务过度集中 tw.AddTask(task, false) - case task := <-tw.RemoveTaskChannel: + case task := <-tw.removeTaskChannel: tw.RemoveTask(task) - case <-tw.StopChannel: - tw.Ticker.Stop() + case <-tw.stopChannel: + tw.ticker.Stop() tw.IsRunning = false return } @@ -112,7 +112,7 @@ func (tw *TimeWheel) Start() { func (tw *TimeWheel) checkAndRunTask() { // 获取该轮盘位置的双向链表 - currentList := tw.Slots[tw.CurrentPosition] + currentList := tw.Slots[tw.currentPosition] if currentList != nil { for item := currentList.Front(); item != nil; { @@ -168,10 +168,10 @@ func (tw *TimeWheel) checkAndRunTask() { } // 轮盘前进一步 - if tw.CurrentPosition == tw.SlotCount-1 { - tw.CurrentPosition = 0 + if tw.currentPosition == tw.slotCount-1 { + tw.currentPosition = 0 } else { - tw.CurrentPosition++ + tw.currentPosition++ } } @@ -222,11 +222,11 @@ func (tw *TimeWheel) RemoveTask(task *Task) { func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { delaySeconds := int(d.Seconds()) intervalSeconds := int(tw.Interval.Seconds()) - circle := delaySeconds / intervalSeconds / tw.SlotCount - pos := (tw.CurrentPosition + delaySeconds/intervalSeconds) % tw.SlotCount + circle := delaySeconds / intervalSeconds / tw.slotCount + pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 - if pos == tw.CurrentPosition && circle != 0 { + if pos == tw.currentPosition && circle != 0 { circle-- } return pos, circle @@ -244,11 +244,11 @@ func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time. delaySeconds := int(d.Seconds()) intervalSeconds := int(tw.Interval.Seconds()) - circle := delaySeconds / intervalSeconds / tw.SlotCount - pos := (tw.CurrentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.SlotCount + circle := delaySeconds / intervalSeconds / tw.slotCount + pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount // 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一 - if pos == tw.CurrentPosition && circle != 0 { + if pos == tw.currentPosition && circle != 0 { circle-- } From e332cc44c96624f31770af5e467c139f496c7224 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Fri, 11 Aug 2023 18:29:00 +0800 Subject: [PATCH 13/14] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=97=B6,=20=E5=A2=9E=E5=8A=A0=E5=88=86=E9=85=8Drun=20id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core.go b/core.go index 97d1dc8..9c33464 100644 --- a/core.go +++ b/core.go @@ -181,9 +181,10 @@ func (tw *TimeWheel) checkAndRunTask() { // // Date : 13:43 2023/8/4 func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { - if nil != task { - // 生成 key - task.Key = wrapper.StringFromRandom(128, "").Md5().Value + // 生成 run_id + task.RunID = wrapper.StringFromRandom(128, "").Md5().Value + if nil != task && nil != task.Job { + task.Key = task.Job.GetFlag() + "_" + task.RunID } var pos, circle int if byInterval { @@ -261,6 +262,7 @@ func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time. // // Date : 11:44 2023/8/4 type Task struct { + RunID string // 每一次运行的run id Key string // 用来标识task对象,是唯一的 Interval time.Duration // 任务周期 CreatedTime time.Time // 任务的创建时间 From c068799b3273b5a6a4edd448b165b6cad40bbf87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E8=8C=B6=E6=B8=85=E6=AC=A2?= Date: Mon, 4 Sep 2023 20:09:30 +0800 Subject: [PATCH 14/14] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core.go | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/core.go b/core.go index 9c33464..f2e4d13 100644 --- a/core.go +++ b/core.go @@ -30,16 +30,16 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { interval = time.Second * 60 } tw := &TimeWheel{ - Interval: interval, - Slots: make([]*list.List, slotCount), + interval: interval, + slots: make([]*list.List, slotCount), ticker: time.NewTicker(interval), slotCount: slotCount, addTaskChannel: make(chan *Task, 1000), removeTaskChannel: make(chan *Task, 1000), stopChannel: make(chan bool, 1000), - TaskRecords: easymap.NewNormal(true), - Job: nil, - IsRunning: false, + taskRecords: easymap.NewNormal(true), + job: nil, + isRunning: false, } tw.initSlots() return tw @@ -51,19 +51,19 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel { // // Date : 11:14 2023/8/4 type TimeWheel struct { - Interval time.Duration // 时间轮精度 - Slots []*list.List // 时间轮盘每个位置存储的任务列表 + interval time.Duration // 时间轮精度 + slots []*list.List // 时间轮盘每个位置存储的任务列表 ticker *time.Ticker // 定时器 currentPosition int // 时间轮盘当前位置 slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 addTaskChannel chan *Task removeTaskChannel chan *Task stopChannel chan bool - TaskRecords easymap.EasyMap // Map结构来存储Task对象,key是Task.key,value是Task在双向链表中的存储对象 + taskRecords easymap.EasyMap // Map结构来存储Task对象,key是Task.key,value是Task在双向链表中的存储对象 // 需要执行的任务,如果时间轮盘上的Task执行同一个Job,可以直接实例化到TimeWheel结构体中。 // 此处的优先级低于Task中的Job参数 - Job ITask - IsRunning bool // 是否运行中 + job ITask + isRunning bool // 是否运行中 } // initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除 @@ -73,7 +73,7 @@ type TimeWheel struct { // Date : 13:37 2023/8/4 func (tw *TimeWheel) initSlots() { for i := 0; i < tw.slotCount; i++ { - tw.Slots[i] = list.New() + tw.slots[i] = list.New() } } @@ -83,7 +83,7 @@ func (tw *TimeWheel) initSlots() { // // Date : 13:38 2023/8/4 func (tw *TimeWheel) Start() { - tw.IsRunning = true + tw.isRunning = true for { select { case <-tw.ticker.C: @@ -98,7 +98,8 @@ func (tw *TimeWheel) Start() { tw.RemoveTask(task) case <-tw.stopChannel: tw.ticker.Stop() - tw.IsRunning = false + tw.isRunning = false + // TODO : 重新初始化时间轮实例 return } } @@ -112,7 +113,7 @@ func (tw *TimeWheel) Start() { func (tw *TimeWheel) checkAndRunTask() { // 获取该轮盘位置的双向链表 - currentList := tw.Slots[tw.currentPosition] + currentList := tw.slots[tw.currentPosition] if currentList != nil { for item := currentList.Front(); item != nil; { @@ -134,14 +135,14 @@ func (tw *TimeWheel) checkAndRunTask() { }() _, _ = task.Job.Execute(nil, nil) }() - } else if tw.Job != nil { + } else if tw.job != nil { go func() { defer func() { if r := recover(); nil != r { } }() - _, _ = tw.Job.Execute(nil, nil) + _, _ = tw.job.Execute(nil, nil) }() } else { fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) @@ -149,7 +150,7 @@ func (tw *TimeWheel) checkAndRunTask() { // 执行完成以后,将该任务从时间轮盘删除 next := item.Next() - tw.TaskRecords.Del(task.Key) + tw.taskRecords.Del(task.Key) currentList.Remove(item) item = next @@ -196,8 +197,8 @@ func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { task.Circle = circle task.Position = pos - element := tw.Slots[pos].PushBack(task) - tw.TaskRecords.Set(task.Key, element) + element := tw.slots[pos].PushBack(task) + tw.taskRecords.Set(task.Key, element) } // RemoveTask 删除任务的内部函数 @@ -207,11 +208,11 @@ func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { // Date : 13:44 2023/8/4 func (tw *TimeWheel) RemoveTask(task *Task) { // 从map结构中删除 - taskInfo, _ := tw.TaskRecords.Get(task.Key) - tw.TaskRecords.Del(task.Key) + taskInfo, _ := tw.taskRecords.Get(task.Key) + tw.taskRecords.Del(task.Key) // 通过TimeWheel.slots获取任务的 - currentList := tw.Slots[task.Position] + currentList := tw.slots[task.Position] currentList.Remove(taskInfo.(*list.Element)) } @@ -222,7 +223,7 @@ func (tw *TimeWheel) RemoveTask(task *Task) { // Date : 13:46 2023/8/4 func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { delaySeconds := int(d.Seconds()) - intervalSeconds := int(tw.Interval.Seconds()) + intervalSeconds := int(tw.interval.Seconds()) circle := delaySeconds / intervalSeconds / tw.slotCount pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount @@ -243,7 +244,7 @@ func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time. passedTime := time.Since(createdTime) passedSeconds := int(passedTime.Seconds()) delaySeconds := int(d.Seconds()) - intervalSeconds := int(tw.Interval.Seconds()) + intervalSeconds := int(tw.interval.Seconds()) circle := delaySeconds / intervalSeconds / tw.slotCount pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount