handling of SIGINT and SIGTERM and some other stuff

This commit is contained in:
Andrew Wang 2021-11-05 00:17:55 -04:00
parent 0ca3ee77f5
commit 6e79a6408e
6 changed files with 108 additions and 53 deletions

View File

@ -3,14 +3,12 @@ arthur (which sends commands to merlin).
### In Progress
- [ ] implement all sync types (csc-sync-debian, csc-sync-apache, etc.)
- [ ] use separate log file for each child process (currently sharing stdout/stderr with parent) (does this mean different from repo?)
- different log dir for each repo?
- [ ] handle termination signals in merlin (SIGINT, SIGTERM); close stopChan for this
### TODO
- [ ] use separate log file for each child process (currently sharing stdout/stderr with parent)
- [ ] listen on Unix socket in merlin
- [ ] implement arthur.go (commands: sync and status)
- [ ] implement zfssync in merlin (just invoke the existing Python script)
### TODO
- [ ] allow dynamic reloading in merlin (\*)
- [ ] detect if an rsync process is stuck (\*\*)
- [ ] place each rsync process in a separate cgroup (\*\*\*)
@ -29,5 +27,6 @@ stdout/stderr of the rsync process.
- [x] save state (last attempted time, last attempted status) for each repo, and restore state on startup (e.g. use JSON/INI file for each repo)
- [x] calculate difference between the scheduled time of a job and the time at which it actually ran; log this
- [x] add all repos to merlin-config.ini (\*)
- [x] handle termination signals in merlin (SIGINT, SIGTERM); close stopChan for this
\* there are some parts that I don't understand (trace_host, csc-sync-ceph, csc-sync-saltstack, etc)

View File

@ -83,7 +83,7 @@ type Repo struct {
// when it has finished a job
DoneChan chan<- Result `ini:"-"`
// the repo should stop syncing if StopChan is closed
StopChan <-chan bool `ini:"-"`
StopChan chan bool `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
// a reference to the global config
@ -131,27 +131,6 @@ type RepoState struct {
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
}
// RunIfScheduled starts a sync job for this repo if more than repo.Frequency
// seconds have elapsed since its last job.
// It returns true iff a job is started.
func (repo *Repo) RunIfScheduled() bool {
// don't run if a job is already running
if repo.State.IsRunning {
return false
}
// this should be set in the caller's thread so that the if will work
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptTime = curTime
repo.SaveState()
go repo.StartSyncJob()
return true
}
return false
}
// save the save the current state of a repo to a file
func (repo *Repo) SaveState() {
state_cfg := ini.Empty()
@ -167,14 +146,34 @@ func (repo *Repo) SaveState() {
}
}
// RunIfScheduled starts a sync job for this repo if more than repo.Frequency
// seconds have elapsed since its last job.
// It returns true iff a job is started.
func (repo *Repo) RunIfPossible() bool {
// don't run if a job is already running
if repo.State.IsRunning {
return false
}
// this should be set in the caller's thread so that the "if" will work
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptTime = curTime
repo.SaveState()
go repo.StartSyncJob()
return true
}
return false
}
// update the repo state with the last attempt time and exit now that the job is done
// TODO: rename and reorginize
// TODO: method before and after sync job
func (repo *Repo) JobDone(exit int) {
func (repo *Repo) SyncExit(exit int) {
repoState := repo.State
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptTime = time.Now().Unix() - repoState.LastAttemptTime
// repo.Logger.Debug(fmt.Sprintf("Process exited after %d seconds", repoState.LastAttemptTime))
repo.SaveState()
}
@ -212,9 +211,11 @@ func GetConfig() Config {
panic("Missing IPv6 address from config")
}
// add each repo configuration to cfg
doneChan := make(chan Result)
// buffered to prevent possible race condition
doneChan := make(chan Result, cfg.MaxJobs)
cfg.DoneChan = doneChan
// add each repo configuration to cfg
for _, section := range data.Sections() {
if section.Name() == "DEFAULT" {
continue
@ -238,27 +239,29 @@ func GetConfig() Config {
}
repo.Logger = NewLogger(repo.Name)
repo.DoneChan = doneChan
repo.StopChan = make(chan bool, 1)
repo.StopChan = make(chan bool)
repo.cfg = &cfg
// create the default repo state configuration from a file
repoStateFile := cfg.StateDir + "/" + repo.Name
// create the default repo state
repo.State = RepoState{
IsRunning: false,
LastAttemptExit: NOT_RUN_YET,
LastAttemptTime: 0,
LastAttemptRunTime: 0,
}
// when repoStatusPath does not exist, create it and write the default state
// overwise overwrite repo.state
// create the state file if it does not exist otherwise sync the state
repoStateFile := cfg.StateDir + "/" + repo.Name
if _, err := os.Stat(repoStateFile); err != nil {
repo.SaveState()
} else if err := ini.MapTo(&repo.State, repoStateFile); err != nil {
panic(err)
}
// must be initially not running, otherwise will never run
// repo state must be initially not running, otherwise will never run
repo.State.IsRunning = false
// append a reference to the new repo in the slice of repos
cfg.Repos = append(cfg.Repos, &repo)
}
if len(cfg.Repos) == 0 {

View File

@ -11,9 +11,14 @@ type Logger struct {
}
const (
DEBUG = 1 << iota
// verbose
// DEBUG = 1 << iota
DEBUG = iota
// normal operation
INFO
// bad
WARNING
// really bad (crash)
ERROR
)
@ -34,6 +39,7 @@ func NewLogger(name string) *Logger {
func (logger *Logger) Log(level int, v ...interface{}) {
levelStr := levels[level]
// TODO: add date + time
args := []interface{}{levelStr + ":", logger.name + ":"}
args = append(args, v...)
logger.Println(args...)

View File

@ -14,15 +14,32 @@ import (
// It returns a channel through which a Cmd will be sent once it has finished,
// or nil if it was unable to start a process.
func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
// TODO change stdout and stderr to something else
// startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...)
// TODO: change stdout and stderr to something else
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
startTime := time.Now().Unix()
// I have no idea how to do this
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// repo.Logger.Warning(err)
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// repo.Logger.Warning(err)
// }
// multi := io.MultiReader(stdout, stderr)
// in := bufio.NewScanner(multi)
// repo.Logger.Debug(in.Text())
// repo.Logger.Warning(in.Err())
// startTime := time.Now().Unix()
repo.Logger.Debug("Starting process")
err := cmd.Start()
if err != nil {
err = fmt.Errorf("Could not start process %s: %w", args[0], err)
if err := cmd.Start(); err != nil {
err = fmt.Errorf("could not start process %s: %w", args[0], err)
repo.Logger.Error(err)
return
}
@ -38,7 +55,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
}
select {
case <-time.After(30 * time.Second):
repo.Logger.Warning("Process still hasn't stopped; sending SIGKILL now")
repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL")
cmd.Process.Signal(syscall.SIGKILL)
case <-procDoneChan:
repo.Logger.Debug("Process has stopped.")
@ -49,6 +66,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
cmd.Wait()
procDoneChan <- true
}()
go func() {
defer func() {
cmdChan <- cmd
@ -58,13 +76,14 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
repo.Logger.Info("Received signal to stop, killing process...")
killProcess()
case <-procDoneChan:
// the following could be moved to SyncExit in common.go
if cmd.ProcessState.Success() {
repo.Logger.Debug("Process ended successfully")
} else {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
}
timeTook := time.Now().Unix() - startTime
repo.Logger.Debug(fmt.Sprintf("Process took %d seconds", timeTook))
// timeTook := time.Now().Unix() - startTime
// repo.Logger.Debug(fmt.Sprintf("Process took %d seconds", timeTook))
case <-time.After(time.Duration(repo.MaxTime) * time.Second):
repo.Logger.Warning("Process has exceeded its max time; killing now")
killProcess()

View File

@ -20,8 +20,6 @@ func (repo *Repo) buildRsyncHost() string {
// CSCSyncStandard performs a standard rsync job.
func (repo *Repo) CSCSyncStandard() {
status := FAILURE
// https://medium.com/@manandharsabbir/go-lang-defer-statement-arguments-evaluated-at-defer-execution-b2c4a1687c6c
// will defer actaully wait till end to function to set the vars?
defer func() {
repo.DoneChan <- Result{
Name: repo.Name,
@ -51,6 +49,7 @@ func (repo *Repo) CSCSyncStandard() {
ch := SpawnProcess(repo, args)
if ch == nil {
// Log that something failed?
return
}
cmd := <-ch

View File

@ -2,6 +2,9 @@ package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"git.csclub.uwaterloo.ca/public/merlin/common"
@ -9,6 +12,9 @@ import (
)
func main() {
stopSig := make(chan os.Signal, 1)
signal.Notify(stopSig, syscall.SIGINT, syscall.SIGTERM)
logger := common.NewLogger("main")
cfg := common.GetConfig()
logger.Debug("Read config:")
@ -33,7 +39,7 @@ func main() {
for numJobsRunning < cfg.MaxJobs {
repo := repos[repoIdx]
// attempt to run repo and increment when a job is started
if repo.RunIfScheduled() {
if repo.RunIfPossible() {
numJobsRunning++
}
repoIdx = (repoIdx + 1) % len(repos)
@ -44,15 +50,38 @@ func main() {
}
}
// TODO: Logging of job starts
runAsManyAsPossible()
runLoop:
for {
select {
case result := <-doneChan:
// move this into a method in common.go
repoMap[result.Name].JobDone(result.Exit)
case <-stopSig:
// close StopChan for every repo
for i := 0; i < len(repos); i++ {
close(repos[i].StopChan)
}
break runLoop
case done := <-doneChan:
// a job has exited
repoMap[done.Name].SyncExit(done.Exit)
numJobsRunning--
case <-time.After(1 * time.Minute):
}
runAsManyAsPossible()
}
// TODO: Logging of job exits
// wait for every running job to stop running
for {
select {
case done := <-doneChan:
repoMap[done.Name].SyncExit(done.Exit)
numJobsRunning--
case <-time.After(1 * time.Second):
if numJobsRunning == 0 {
return
}
}
}
}