graceful相关逻辑处理

This commit is contained in:
白茶清欢 2025-05-29 15:09:45 +08:00
parent c3d9bfd3bc
commit 92833db6f3

View File

@ -25,13 +25,13 @@ import (
) )
const ( const (
PRE_SIGNAL = iota PreSignal = iota
POST_SIGNAL PostSignal
STATE_INIT StateInit
STATE_RUNNING StateRunning
STATE_SHUTTING_DOWN StateShuttingDown
STATE_TERMINATE StateTerminate
) )
var ( var (
@ -111,7 +111,7 @@ func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
sigChan: make(chan os.Signal), sigChan: make(chan os.Signal),
isChild: isChild, isChild: isChild,
SignalHooks: map[int]map[os.Signal][]func(){ SignalHooks: map[int]map[os.Signal][]func(){
PRE_SIGNAL: map[os.Signal][]func(){ PreSignal: map[os.Signal][]func(){
syscall.SIGHUP: []func(){}, syscall.SIGHUP: []func(){},
syscall.SIGUSR1: []func(){}, syscall.SIGUSR1: []func(){},
syscall.SIGUSR2: []func(){}, syscall.SIGUSR2: []func(){},
@ -119,7 +119,7 @@ func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
syscall.SIGTERM: []func(){}, syscall.SIGTERM: []func(){},
syscall.SIGTSTP: []func(){}, syscall.SIGTSTP: []func(){},
}, },
POST_SIGNAL: map[os.Signal][]func(){ PostSignal: map[os.Signal][]func(){
syscall.SIGHUP: []func(){}, syscall.SIGHUP: []func(){},
syscall.SIGUSR1: []func(){}, syscall.SIGUSR1: []func(){},
syscall.SIGUSR2: []func(){}, syscall.SIGUSR2: []func(){},
@ -128,7 +128,7 @@ func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
syscall.SIGTSTP: []func(){}, syscall.SIGTSTP: []func(){},
}, },
}, },
state: STATE_INIT, state: StateInit,
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
} }
@ -196,11 +196,11 @@ down the server.
*/ */
func (srv *endlessServer) Serve() (err error) { func (srv *endlessServer) Serve() (err error) {
defer log.Println(syscall.Getpid(), "Serve() returning...") defer log.Println(syscall.Getpid(), "Serve() returning...")
srv.setState(STATE_RUNNING) srv.setState(StateRunning)
err = srv.Server.Serve(srv.EndlessListener) err = srv.Server.Serve(srv.EndlessListener)
log.Println(syscall.Getpid(), "Waiting for connections to finish...") log.Println(syscall.Getpid(), "Waiting for connections to finish...")
srv.wg.Wait() srv.wg.Wait()
srv.setState(STATE_TERMINATE) srv.setState(StateTerminate)
return return
} }
@ -318,10 +318,7 @@ func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error)
return return
} }
/* // handleSignals 监听系统传递的信号量, 并根据不同的信号量进行不同的逻辑处理
handleSignals listens for os Signals and calls any hooked in function that the
user had registered with the signal.
*/
func (srv *endlessServer) handleSignals() { func (srv *endlessServer) handleSignals() {
var sig os.Signal var sig os.Signal
@ -330,10 +327,13 @@ func (srv *endlessServer) handleSignals() {
hookableSignals..., hookableSignals...,
) )
// 获取当前进程ID
pid := syscall.Getpid() pid := syscall.Getpid()
for { for {
// 无缓冲chan, 阻塞等待信号量
sig = <-srv.sigChan sig = <-srv.sigChan
srv.signalHooks(PRE_SIGNAL, sig) // 触发信号量处理之前的逻辑
srv.signalHooks(PreSignal, sig)
switch sig { switch sig {
case syscall.SIGHUP: case syscall.SIGHUP:
log.Println(pid, "Received SIGHUP. forking.") log.Println(pid, "Received SIGHUP. forking.")
@ -357,7 +357,8 @@ func (srv *endlessServer) handleSignals() {
default: default:
log.Printf("Received %v: nothing i care about...\n", sig) log.Printf("Received %v: nothing i care about...\n", sig)
} }
srv.signalHooks(POST_SIGNAL, sig) // 触发信号量处理之后的逻辑
srv.signalHooks(PostSignal, sig)
} }
} }
@ -371,21 +372,24 @@ func (srv *endlessServer) signalHooks(ppFlag int, sig os.Signal) {
return return
} }
/* // shutdown 关闭服务(平滑)
shutdown closes the listener so that no new connections are accepted. it also //
starts a goroutine that will hammer (stop all running requests) the server // 1. 关闭监听器, 不再接受新的连接
after DefaultHammerTime. //
*/ // 2. 等待所有连接处理完毕后, 退出服务
//
// 3. 如果设置了 DefaultHammerTime, 则在该时间后强制退出服务
func (srv *endlessServer) shutdown() { func (srv *endlessServer) shutdown() {
if srv.getState() != STATE_RUNNING { if srv.getState() != StateRunning {
return return
} }
srv.setState(STATE_SHUTTING_DOWN) // 将服务状态设置为 StateShuttingDown(关闭中)
srv.setState(StateShuttingDown)
if DefaultHammerTime >= 0 { if DefaultHammerTime >= 0 {
go srv.hammerTime(DefaultHammerTime) go srv.hammerTime(DefaultHammerTime)
} }
// disable keep-alives on existing connections // 已存在的连接禁用keep-alives
srv.SetKeepAlivesEnabled(false) srv.SetKeepAlivesEnabled(false)
err := srv.EndlessListener.Close() err := srv.EndlessListener.Close()
if err != nil { if err != nil {
@ -413,13 +417,13 @@ func (srv *endlessServer) hammerTime(d time.Duration) {
log.Println("WaitGroup at 0", r) log.Println("WaitGroup at 0", r)
} }
}() }()
if srv.getState() != STATE_SHUTTING_DOWN { if srv.getState() != StateShuttingDown {
return return
} }
time.Sleep(d) time.Sleep(d)
log.Println("[STOP - Hammer Time] Forcefully shutting down parent") log.Println("[STOP - Hammer Time] Forcefully shutting down parent")
for { for {
if srv.getState() == STATE_TERMINATE { if srv.getState() == StateTerminate {
break break
} }
srv.wg.Done() srv.wg.Done()
@ -555,13 +559,10 @@ func (w endlessConn) Close() error {
return err return err
} }
/* // RegisterSignalHook 注册各个信号前置/后置处理逻辑
RegisterSignalHook registers a function to be run PRE_SIGNAL or POST_SIGNAL for // PreSignal 和 PostSignal 分别表示信号前置和后置处理逻辑
a given signal. PRE or POST in this case means before or after the signal
related code endless itself runs
*/
func (srv *endlessServer) RegisterSignalHook(prePost int, sig os.Signal, f func()) (err error) { func (srv *endlessServer) RegisterSignalHook(prePost int, sig os.Signal, f func()) (err error) {
if prePost != PRE_SIGNAL && prePost != POST_SIGNAL { if prePost != PreSignal && prePost != PostSignal {
err = fmt.Errorf("Cannot use %v for prePost arg. Must be endless.PRE_SIGNAL or endless.POST_SIGNAL.", sig) err = fmt.Errorf("Cannot use %v for prePost arg. Must be endless.PRE_SIGNAL or endless.POST_SIGNAL.", sig)
return return
} }