Compare commits

...

10 Commits

Author SHA1 Message Date
0c770b805d 添加注释 2025-05-29 18:56:49 +08:00
029c268af6 update unit test 2025-05-29 18:49:07 +08:00
efeeb51e8b update go mod 2025-05-29 18:33:03 +08:00
55be6d989e update task 2024-10-08 11:24:23 +08:00
4bf786336e fix 2024-10-08 11:14:50 +08:00
bf27b47427 update go mod 2024-10-08 11:14:03 +08:00
dc9ade07b6 Merge pull request '第一版时间轮任务调度实现' (#1) from feature/timewheel into master
Reviewed-on: #1
2023-09-04 20:13:17 +08:00
c068799b32 更新结构定义 2023-09-04 20:09:30 +08:00
fe15933479 fix 2023-08-24 11:20:46 +08:00
87f8651bed fix 2023-08-24 11:19:48 +08:00
7 changed files with 254 additions and 71 deletions

View File

@ -22,5 +22,5 @@ type ITask interface {
// Callback 任务执行成功的回调 // Callback 任务执行成功的回调
Callback(result *Result) error Callback(result *Result) error
// Execute 执行任务 // Execute 执行任务
Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error) Execute(ctx context.Context, cfg *Config) *Result
} }

68
core.go
View File

@ -30,16 +30,16 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel {
interval = time.Second * 60 interval = time.Second * 60
} }
tw := &TimeWheel{ tw := &TimeWheel{
Interval: interval, interval: interval,
Slots: make([]*list.List, slotCount), slots: make([]*list.List, slotCount),
ticker: time.NewTicker(interval), ticker: time.NewTicker(interval),
slotCount: slotCount, slotCount: slotCount,
addTaskChannel: make(chan *Task, 1000), addTaskChannel: make(chan *Task, 1000),
removeTaskChannel: make(chan *Task, 1000), removeTaskChannel: make(chan *Task, 1000),
stopChannel: make(chan bool, 1000), stopChannel: make(chan bool, 1000),
TaskRecords: easymap.NewNormal(true), taskRecords: easymap.NewNormal(),
Job: nil, job: nil,
IsRunning: false, isRunning: false,
} }
tw.initSlots() tw.initSlots()
return tw return tw
@ -51,19 +51,19 @@ func NewTimeWheel(slotCount int, interval time.Duration) *TimeWheel {
// //
// Date : 11:14 2023/8/4 // Date : 11:14 2023/8/4
type TimeWheel struct { type TimeWheel struct {
Interval time.Duration // 时间轮精度 interval time.Duration // 时间轮精度
Slots []*list.List // 时间轮盘每个位置存储的任务列表 slots []*list.List // 时间轮盘每个位置存储的任务列表
ticker *time.Ticker // 定时器 ticker *time.Ticker // 定时器
currentPosition int // 时间轮盘当前位置 currentPosition int // 时间轮盘当前位置
slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间 slotCount int // 时间轮盘的齿数,Interval*SlotCount就是时间轮盘转一圈走过的时间
addTaskChannel chan *Task addTaskChannel chan *Task
removeTaskChannel chan *Task removeTaskChannel chan *Task
stopChannel chan bool stopChannel chan bool
TaskRecords easymap.EasyMap // Map结构来存储Task对象key是Task.keyvalue是Task在双向链表中的存储对象 taskRecords easymap.EasyMap // Map结构来存储Task对象key是Task.keyvalue是Task在双向链表中的存储对象
// 需要执行的任务如果时间轮盘上的Task执行同一个Job可以直接实例化到TimeWheel结构体中。 // 需要执行的任务如果时间轮盘上的Task执行同一个Job可以直接实例化到TimeWheel结构体中。
// 此处的优先级低于Task中的Job参数 // 此处的优先级低于Task中的Job参数
Job ITask job ITask
IsRunning bool // 是否运行中 isRunning bool // 是否运行中
} }
// initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除 // initSlots 初始化时间轮盘,每个轮盘上的卡槽用一个双向队列表示,便于插入和删除
@ -73,7 +73,7 @@ type TimeWheel struct {
// Date : 13:37 2023/8/4 // Date : 13:37 2023/8/4
func (tw *TimeWheel) initSlots() { func (tw *TimeWheel) initSlots() {
for i := 0; i < tw.slotCount; i++ { for i := 0; i < tw.slotCount; i++ {
tw.Slots[i] = list.New() tw.slots[i] = list.New()
} }
} }
@ -83,7 +83,7 @@ func (tw *TimeWheel) initSlots() {
// //
// Date : 13:38 2023/8/4 // Date : 13:38 2023/8/4
func (tw *TimeWheel) Start() { func (tw *TimeWheel) Start() {
tw.IsRunning = true tw.isRunning = true
for { for {
select { select {
case <-tw.ticker.C: case <-tw.ticker.C:
@ -98,7 +98,8 @@ func (tw *TimeWheel) Start() {
tw.RemoveTask(task) tw.RemoveTask(task)
case <-tw.stopChannel: case <-tw.stopChannel:
tw.ticker.Stop() tw.ticker.Stop()
tw.IsRunning = false tw.isRunning = false
// TODO : 重新初始化时间轮实例
return return
} }
} }
@ -112,7 +113,7 @@ func (tw *TimeWheel) Start() {
func (tw *TimeWheel) checkAndRunTask() { func (tw *TimeWheel) checkAndRunTask() {
// 获取该轮盘位置的双向链表 // 获取该轮盘位置的双向链表
currentList := tw.Slots[tw.currentPosition] currentList := tw.slots[tw.currentPosition]
if currentList != nil { if currentList != nil {
for item := currentList.Front(); item != nil; { for item := currentList.Front(); item != nil; {
@ -132,16 +133,16 @@ func (tw *TimeWheel) checkAndRunTask() {
} }
}() }()
_, _ = task.Job.Execute(nil, nil) _ = task.Job.Execute(nil, nil)
}() }()
} else if tw.Job != nil { } else if tw.job != nil {
go func() { go func() {
defer func() { defer func() {
if r := recover(); nil != r { if r := recover(); nil != r {
} }
}() }()
_, _ = tw.Job.Execute(nil, nil) _ = tw.job.Execute(nil, nil)
}() }()
} else { } else {
fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key)) fmt.Println(fmt.Sprintf("The task %v don't have job to run", task.Key))
@ -149,7 +150,7 @@ func (tw *TimeWheel) checkAndRunTask() {
// 执行完成以后,将该任务从时间轮盘删除 // 执行完成以后,将该任务从时间轮盘删除
next := item.Next() next := item.Next()
tw.TaskRecords.Del(task.Key) tw.taskRecords.Del(task.Key)
currentList.Remove(item) currentList.Remove(item)
item = next item = next
@ -181,9 +182,12 @@ func (tw *TimeWheel) checkAndRunTask() {
// //
// Date : 13:43 2023/8/4 // Date : 13:43 2023/8/4
func (tw *TimeWheel) AddTask(task *Task, byInterval bool) { func (tw *TimeWheel) AddTask(task *Task, byInterval bool) {
if nil == task {
return
}
// 生成 run_id // 生成 run_id
task.RunID = wrapper.StringFromRandom(128, "").Md5().Value task.RunID = wrapper.StringFromRandom(128, "").Md5().Value
if nil != task && nil != task.Job { if nil != task.Job {
task.Key = task.Job.GetFlag() + "_" + task.RunID task.Key = task.Job.GetFlag() + "_" + task.RunID
} }
var pos, circle int var pos, circle int
@ -196,8 +200,8 @@ func (tw *TimeWheel) AddTask(task *Task, byInterval bool) {
task.Circle = circle task.Circle = circle
task.Position = pos task.Position = pos
element := tw.Slots[pos].PushBack(task) element := tw.slots[pos].PushBack(task)
tw.TaskRecords.Set(task.Key, element) tw.taskRecords.Set(task.Key, element)
} }
// RemoveTask 删除任务的内部函数 // RemoveTask 删除任务的内部函数
@ -207,11 +211,11 @@ func (tw *TimeWheel) AddTask(task *Task, byInterval bool) {
// Date : 13:44 2023/8/4 // Date : 13:44 2023/8/4
func (tw *TimeWheel) RemoveTask(task *Task) { func (tw *TimeWheel) RemoveTask(task *Task) {
// 从map结构中删除 // 从map结构中删除
taskInfo, _ := tw.TaskRecords.Get(task.Key) taskInfo, _ := tw.taskRecords.Get(task.Key)
tw.TaskRecords.Del(task.Key) tw.taskRecords.Del(task.Key)
// 通过TimeWheel.slots获取任务的 // 通过TimeWheel.slots获取任务的
currentList := tw.Slots[task.Position] currentList := tw.slots[task.Position]
currentList.Remove(taskInfo.(*list.Element)) currentList.Remove(taskInfo.(*list.Element))
} }
@ -222,7 +226,7 @@ func (tw *TimeWheel) RemoveTask(task *Task) {
// Date : 13:46 2023/8/4 // Date : 13:46 2023/8/4
func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) { func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) {
delaySeconds := int(d.Seconds()) delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.Interval.Seconds()) intervalSeconds := int(tw.interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.slotCount circle := delaySeconds / intervalSeconds / tw.slotCount
pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount pos := (tw.currentPosition + delaySeconds/intervalSeconds) % tw.slotCount
@ -238,15 +242,15 @@ func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) {
// Author : go_developer@163.com<白茶清欢> // Author : go_developer@163.com<白茶清欢>
// //
// Date : 13:47 2023/8/4 // Date : 13:47 2023/8/4
func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key interface{}) (int, int) { func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key any) (int, int) {
passedTime := time.Since(createdTime) passedTime := time.Since(createdTime) // 计算从创建时间到现在经过了多少时间
passedSeconds := int(passedTime.Seconds()) passedSeconds := int(passedTime.Seconds()) // 从开始到现在经历的秒数
delaySeconds := int(d.Seconds()) delaySeconds := int(d.Seconds()) // 多久之后执行
intervalSeconds := int(tw.Interval.Seconds()) intervalSeconds := int(tw.interval.Seconds()) // 时间轮盘的间隔秒数
circle := delaySeconds / intervalSeconds / tw.slotCount circle := delaySeconds / intervalSeconds / tw.slotCount // 计算需要走多少圈
pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount pos := (tw.currentPosition + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotCount // 计算下次执行的位置
// 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一 // 特殊case当计算的位置和当前位置重叠时因为当前位置已经走过了所以circle需要减一
if pos == tw.currentPosition && circle != 0 { if pos == tw.currentPosition && circle != 0 {

View File

@ -23,7 +23,7 @@ var tw *TimeWheel
// //
// Date : 17:00 2023/8/11 // Date : 17:00 2023/8/11
func TestNewTimeWheel(t *testing.T) { func TestNewTimeWheel(t *testing.T) {
tw = NewTimeWheel(10, 10*time.Second) tw = NewTimeWheel(10, time.Second)
tw.AddTask(&Task{ tw.AddTask(&Task{
Key: wrapper.StringFromRandom(32, "").Value(), Key: wrapper.StringFromRandom(32, "").Value(),
Interval: 5 * time.Second, Interval: 5 * time.Second,
@ -36,8 +36,7 @@ func TestNewTimeWheel(t *testing.T) {
}, false) }, false)
go tw.Start() go tw.Start()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
for tw.IsRunning { for tw.isRunning {
} }
} }
@ -61,7 +60,7 @@ func (t testTask) Callback(result *Result) error {
return nil return nil
} }
func (t testTask) Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error) { func (t testTask) Execute(ctx context.Context, cfg *Config) *Result {
fmt.Println(wrapper.OwnTimeFromNow().ToString()) fmt.Println(wrapper.OwnTimeFromNow().ToString())
return nil, nil return nil
} }

View File

@ -18,9 +18,13 @@ type Config struct {
// ForbiddenCallback 禁用执行结果回调 // ForbiddenCallback 禁用执行结果回调
ForbiddenCallback bool ForbiddenCallback bool
// Param 任务执行参数 // Param 任务执行参数
Param map[string]interface{} Param map[string]any
// TaskFlag 任务标识 // TaskFlag 任务标识
TaskFlag string TaskFlag string
// TaskName 任务名称
TaskName string
// Async 是否异步
Async bool
} }
// Result 执行结果 // Result 执行结果
@ -29,12 +33,12 @@ type Config struct {
// //
// Date : 14:43 2022/6/23 // Date : 14:43 2022/6/23
type Result struct { type Result struct {
StartTime int64 // 开始时间, 纳秒 StartTime int64 // 开始时间, 纳秒
FinishTime int64 // 结束时间, 纳秒 FinishTime int64 // 结束时间, 纳秒
Used int64 // 耗时, 纳秒 Used int64 // 耗时, 纳秒
TaskRunID string // 任务运行ID TaskRunID string // 任务运行ID
TaskDescription string // 任务描述 TaskDescription string // 任务描述
TaskConfig *Config // 任务配置 TaskConfig *Config // 任务配置
Data map[string]interface{} // 任务结果数据 Data map[string]any // 任务结果数据
Err error // 异常信息, err == nil , 代表执行成功 Err error // 异常信息, err == nil , 代表执行成功
} }

24
go.mod
View File

@ -1,20 +1,28 @@
module git.zhangdeman.cn/zhangdeman/task module git.zhangdeman.cn/zhangdeman/task
go 1.20 go 1.23.0
require git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 toolchain go1.24.3
require ( require (
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c // indirect git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20241101082529-28a6c68e38a4
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10 // indirect git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20250321102712-1cbfbe959740
git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b // indirect )
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect require (
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20250425024726-cc17224cb995 // indirect
git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 // indirect
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20250504055908-8d68e6106ea9 // indirect
git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e // indirect
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 // indirect
github.com/go-ini/ini v1.67.0 // indirect github.com/go-ini/ini v1.67.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mozillazg/go-pinyin v0.20.0 // indirect github.com/mozillazg/go-pinyin v0.20.0 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/sbabiv/xml2map v1.2.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

56
go.sum
View File

@ -1,29 +1,53 @@
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c h1:Dan3iSVU6XTKt8r3/qixfPHPpfLZjkYlPmaJios7wtE= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20250425024726-cc17224cb995 h1:LmPRAf0AsxRVFPibdpZR89ajlsz8hof2IvMMyTqiEq4=
git.zhangdeman.cn/zhangdeman/consts v0.0.0-20230811030300-6f850372c88c/go.mod h1:IXXaZkb7vGzGnGM5RRWrASAuwrVSNxuoe0DmeXx5g6k= git.zhangdeman.cn/zhangdeman/consts v0.0.0-20250425024726-cc17224cb995/go.mod h1:5p8CEKGBxi7qPtTXDI3HDmqKAfIm5i/aBWdrbkbdNjc=
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10 h1:+Lg4vXFEiWVKjhUJdXuoP0AgjGT49oqJ3301STnZErk= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20241101082529-28a6c68e38a4 h1:s6d4b6yY+NaK1AzoBD1pxqsuygEHQz0Oie86c45geDw=
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230307094841-e437ba87af10/go.mod h1:+Lc0zYF8sylRi75A7NGmObrLxugwAZa8WVpWh2eh5X0= git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20241101082529-28a6c68e38a4/go.mod h1:V4Dfg1v/JVIZGEKCm6/aehs8hK+Xow1dkL1yiQymXlQ=
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590 h1:zN5JKHvzCH5q+X6W6fZw1hZ18FTlmZdv+k5T2j+6/GQ= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0 h1:gUDlQMuJ4xNfP2Abl1Msmpa3fASLWYkNlqDFF/6GN0Y=
git.zhangdeman.cn/zhangdeman/easymap v0.0.0-20230811085725-1a7414bb7590/go.mod h1:l9S40lsDnTd/VAZjh1kmfYvz0B9z+7oT86pMQ/KurWo= git.zhangdeman.cn/zhangdeman/op_type v0.0.0-20240122104027-4928421213c0/go.mod h1:VHb9qmhaPDAQDcS6vUiDCamYjZ4R5lD1XtVsh55KsMI=
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10 h1:orhcMAKrcOajsBJCgssnb9O8YcLsPJvWuXF511gs5dc= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20250504055908-8d68e6106ea9 h1:/GLQaFoLb+ciHOtAS2BIyPNnf4O5ME3AC5PUaJY9kfs=
git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20230811032817-e6ad534a9a10/go.mod h1:CzX5/WwGDTnKmewarnjkK5XcSRbgszTQTdTL3OUc/s4= git.zhangdeman.cn/zhangdeman/serialize v0.0.0-20250504055908-8d68e6106ea9/go.mod h1:ABJ655C5QenQNOzf7LjCe4sSB52CXvaWLX2Zg4uwDJY=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b h1:vnmxYrNdX6f5sEVjjkM1fIR+i32kHJ4g9DJqug9KKek= git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e h1:Q973S6CcWr1ICZhFI1STFOJ+KUImCl2BaIXm6YppBqI=
git.zhangdeman.cn/zhangdeman/util v0.0.0-20230811070456-d6a489d5860b/go.mod h1:Yum5+tgP+Wf1GWUAyQz1Qh8Ab9m5+90GYkYdzqVs0lA= git.zhangdeman.cn/zhangdeman/util v0.0.0-20240618042405-6ee2c904644e/go.mod h1:VpPjBlwz8U+OxZuxzHQBv1aEEZ3pStH6bZvT21ADEbI=
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1 h1:k2iu9KgRxeroytB+N+/XapAxt1di7o2pNTISjFlYDJ8= git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20250321102712-1cbfbe959740 h1:zPUoylfJTbc0EcxW+NEzOTBmoeFZ2I/rLFBnEzxb4Wk=
git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20230811071513-cfc46e8d82e1/go.mod h1:kvjAbtGTo14gKCS0X4rxnb2sPkskHOUy2NXcx34t6Mw= git.zhangdeman.cn/zhangdeman/wrapper v0.0.0-20250321102712-1cbfbe959740/go.mod h1:1ct92dbVc49pmXusA/iGfcQUJzcYmJ+cjAhgc3sDv1I=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ=
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ= github.com/mozillazg/go-pinyin v0.20.0 h1:BtR3DsxpApHfKReaPO1fCqF4pThRwH9uwvXzm+GnMFQ=
github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc= github.com/mozillazg/go-pinyin v0.20.0/go.mod h1:iR4EnMMRXkfpFVV5FMi4FNB6wGq9NV6uDWbUuPhP4Yc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sbabiv/xml2map v1.2.1 h1:1lT7t0hhUvXZCkdxqtq4n8/ZCnwLWGq4rDuDv5XOoFE=
github.com/sbabiv/xml2map v1.2.1/go.mod h1:2TPoAfcaM7+Sd4iriPvzyntb2mx7GY+kkQpB/GQa/eo=
github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY=
github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec=
github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY=
github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

144
task.go Normal file
View File

@ -0,0 +1,144 @@
// Package task ...
//
// Description : task ...
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 2022-06-23 15:13
package task
import (
"context"
"fmt"
"git.zhangdeman.cn/zhangdeman/wrapper"
"runtime"
"sync"
"time"
)
var (
// Dispatch 调度实例
Dispatch *dispatch
)
func init() {
Dispatch = &dispatch{
lock: &sync.RWMutex{},
taskTable: make(map[string]ITask),
}
}
// dispatch 任务调度
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:14 2022/6/23
type dispatch struct {
// 锁
lock *sync.RWMutex
// 任务表
taskTable map[string]ITask
}
// Register 注册任务
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:16 2022/6/23
func (d *dispatch) Register(taskInstanceList ...ITask) error {
d.lock.Lock()
defer d.lock.Unlock()
for _, taskInstance := range taskInstanceList {
if nil == taskInstance {
continue
}
if _, exist := d.taskTable[taskInstance.GetFlag()]; exist {
return fmt.Errorf("%s 任务重复注册! ", taskInstance.GetFlag())
}
d.taskTable[taskInstance.GetFlag()] = taskInstance
}
return nil
}
// Remove 移除任务
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:16 2022/6/23
func (d *dispatch) Remove(taskNameList ...string) error {
d.lock.Lock()
defer d.lock.Unlock()
for _, taskName := range taskNameList {
delete(d.taskTable, taskName)
}
return nil
}
// Run 执行任务
//
// Author : go_developer@163.com<白茶清欢>
//
// Date : 15:18 2022/6/23
func (d *dispatch) Run(ctx context.Context, cfg *Config) *Result {
var (
taskInstance ITask
exist bool
cancelFunc context.CancelFunc
)
if nil == ctx {
if cfg.Timeout > 0 {
ctx, cancelFunc = context.WithCancel(context.Background())
} else {
ctx = context.Background()
}
} else {
ctx, cancelFunc = context.WithCancel(ctx)
}
if nil != cancelFunc {
// 带了超时时间
cancelFunc()
}
result := &Result{
StartTime: time.Now().UnixNano(),
FinishTime: 0,
Used: 0,
TaskRunID: wrapper.StringFromRandom(64, "").Md5().Value,
TaskDescription: "",
TaskConfig: cfg,
Data: nil,
Err: nil,
}
defer func() {
result.FinishTime = time.Now().UnixNano()
result.Used = result.FinishTime - result.StartTime
}()
d.lock.RLock()
if taskInstance, exist = d.taskTable[cfg.TaskName]; !exist {
result.Err = fmt.Errorf("%v 任务未注册", cfg.TaskName)
}
d.lock.RUnlock()
if cfg.Async {
// 异步运行
go func() {
if e := recover(); nil != e {
switch e.(type) {
case runtime.Error: // 运行时错误
result.Err = fmt.Errorf("出现运行时Panic : %v", e)
default: // 非运行时错误
result.Err = fmt.Errorf("出现其他场景Panic : %v", e)
}
}
result = taskInstance.Execute(ctx, cfg)
result.TaskDescription = taskInstance.Description()
_ = taskInstance.Callback(result)
}()
} else {
result = taskInstance.Execute(ctx, cfg)
result.TaskDescription = taskInstance.Description()
_ = taskInstance.Callback(result)
}
return result
}