第一版时间轮任务调度实现 #1

Merged
zhangdeman merged 15 commits from feature/timewheel into master 2023-09-04 20:13:17 +08:00
3 changed files with 21 additions and 24 deletions
Showing only changes of commit a2ba1440d4 - Show all commits

View File

@ -15,8 +15,8 @@ import "context"
//
// Date : 14:22 2022/6/23
type ITask interface {
// Name 任务名称标识, 全局唯一
Name() string
// GetFlag 任务名称标识, 全局唯一
GetFlag() string
// Description 任务描述
Description() string
// GetRunID 获取任务ID
@ -24,5 +24,5 @@ type ITask interface {
// Callback 任务执行成功的回调
Callback(result *Result) error
// Execute 执行任务
Execute(ctx context.Context, cfg *Config) *Result
Execute(ctx context.Context, cfg *Config) (map[string]interface{}, error)
}

View File

@ -19,6 +19,8 @@ type Config struct {
ForbiddenCallback bool
// Param 任务执行参数
Param map[string]interface{}
// TaskFlag 任务标识
TaskFlag string
}
// Result 执行结果

37
task.go
View File

@ -51,10 +51,10 @@ func (d *dispatch) Register(taskInstanceList ...ITask) error {
if nil == taskInstance {
continue
}
if _, exist := d.taskTable[taskInstance.Name()]; exist {
return fmt.Errorf("%s 任务重复注册! ", taskInstance.Name())
if _, exist := d.taskTable[taskInstance.GetFlag()]; exist {
return fmt.Errorf("%s 任务重复注册! ", taskInstance.GetFlag())
}
d.taskTable[taskInstance.Name()] = taskInstance
d.taskTable[taskInstance.GetFlag()] = taskInstance
}
return nil
}
@ -109,36 +109,31 @@ func (d *dispatch) Run(ctx context.Context, cfg *Config) *Result {
TaskConfig: cfg,
Data: nil,
Err: nil,
Async: cfg.Async,
}
defer func() {
result.FinishTime = time.Now().UnixNano()
result.Used = result.FinishTime - result.StartTime
}()
d.lock.RLock()
if taskInstance, exist = d.taskTable[cfg.TaskName]; !exist {
result.Err = fmt.Errorf("%v 任务未注册", cfg.TaskName)
if taskInstance, exist = d.taskTable[cfg.TaskFlag]; !exist {
result.Err = fmt.Errorf("%v 任务未注册", cfg.TaskFlag)
return result
}
d.lock.RUnlock()
result.TaskRunID = taskInstance.GetRunID()
result.TaskDescription = taskInstance.Description()
if cfg.Async {
// 异步运行
go func() {
if e := recover(); nil != e {
switch e.(type) {
case runtime.Error: // 运行时错误
result.Err = fmt.Errorf("出现运行时Panic : %v", e)
default: // 非运行时错误
result.Err = fmt.Errorf("出现其他场景Panic : %v", e)
}
// 异步运行
go func() {
if e := recover(); nil != e {
switch e.(type) {
case runtime.Error: // 运行时错误
result.Err = fmt.Errorf("出现运行时Panic : %v", e)
default: // 非运行时错误
result.Err = fmt.Errorf("出现其他场景Panic : %v", e)
}
result.Data, result.Err = taskInstance.Execute(ctx, cfg)
_ = taskInstance.Callback(result)
}()
} else {
}
result.Data, result.Err = taskInstance.Execute(ctx, cfg)
_ = taskInstance.Callback(result)
}
}()
return result
}