diff --git a/merlin/README.md b/merlin/README.md index 2637a70..3ababa3 100644 --- a/merlin/README.md +++ b/merlin/README.md @@ -3,14 +3,12 @@ arthur (which sends commands to merlin). ### In Progress - [ ] implement all sync types (csc-sync-debian, csc-sync-apache, etc.) -- [ ] use separate log file for each child process (currently sharing stdout/stderr with parent) (does this mean different from repo?) - - different log dir for each repo? -- [ ] handle termination signals in merlin (SIGINT, SIGTERM); close stopChan for this - -### TODO +- [ ] use separate log file for each child process (currently sharing stdout/stderr with parent) - [ ] listen on Unix socket in merlin - [ ] implement arthur.go (commands: sync and status) - [ ] implement zfssync in merlin (just invoke the existing Python script) + +### TODO - [ ] allow dynamic reloading in merlin (\*) - [ ] detect if an rsync process is stuck (\*\*) - [ ] place each rsync process in a separate cgroup (\*\*\*) @@ -29,5 +27,6 @@ stdout/stderr of the rsync process. - [x] save state (last attempted time, last attempted status) for each repo, and restore state on startup (e.g. use JSON/INI file for each repo) - [x] calculate difference between the scheduled time of a job and the time at which it actually ran; log this - [x] add all repos to merlin-config.ini (\*) +- [x] handle termination signals in merlin (SIGINT, SIGTERM); close stopChan for this \* there are some parts that I don't understand (trace_host, csc-sync-ceph, csc-sync-saltstack, etc) \ No newline at end of file diff --git a/merlin/common/common.go b/merlin/common/common.go index 0394126..a2bd1a6 100644 --- a/merlin/common/common.go +++ b/merlin/common/common.go @@ -83,7 +83,7 @@ type Repo struct { // when it has finished a job DoneChan chan<- Result `ini:"-"` // the repo should stop syncing if StopChan is closed - StopChan <-chan bool `ini:"-"` + StopChan chan bool `ini:"-"` // a struct that stores the repo's status State RepoState `ini:"-"` // a reference to the global config @@ -131,27 +131,6 @@ type RepoState struct { LastAttemptRunTime int64 `ini:"last_attempt_runtime"` } -// RunIfScheduled starts a sync job for this repo if more than repo.Frequency -// seconds have elapsed since its last job. -// It returns true iff a job is started. -func (repo *Repo) RunIfScheduled() bool { - // don't run if a job is already running - if repo.State.IsRunning { - return false - } - - // this should be set in the caller's thread so that the if will work - curTime := time.Now().Unix() - if curTime-repo.State.LastAttemptTime > int64(repo.Frequency) { - repo.State.IsRunning = true - repo.State.LastAttemptTime = curTime - repo.SaveState() - go repo.StartSyncJob() - return true - } - return false -} - // save the save the current state of a repo to a file func (repo *Repo) SaveState() { state_cfg := ini.Empty() @@ -167,14 +146,34 @@ func (repo *Repo) SaveState() { } } +// RunIfScheduled starts a sync job for this repo if more than repo.Frequency +// seconds have elapsed since its last job. +// It returns true iff a job is started. +func (repo *Repo) RunIfPossible() bool { + // don't run if a job is already running + if repo.State.IsRunning { + return false + } + + // this should be set in the caller's thread so that the "if" will work + curTime := time.Now().Unix() + if curTime-repo.State.LastAttemptTime > int64(repo.Frequency) { + repo.State.IsRunning = true + repo.State.LastAttemptTime = curTime + repo.SaveState() + go repo.StartSyncJob() + return true + } + return false +} + // update the repo state with the last attempt time and exit now that the job is done -// TODO: rename and reorginize -// TODO: method before and after sync job -func (repo *Repo) JobDone(exit int) { +func (repo *Repo) SyncExit(exit int) { repoState := repo.State repoState.IsRunning = false repoState.LastAttemptExit = exit repoState.LastAttemptTime = time.Now().Unix() - repoState.LastAttemptTime + // repo.Logger.Debug(fmt.Sprintf("Process exited after %d seconds", repoState.LastAttemptTime)) repo.SaveState() } @@ -212,9 +211,11 @@ func GetConfig() Config { panic("Missing IPv6 address from config") } - // add each repo configuration to cfg - doneChan := make(chan Result) + // buffered to prevent possible race condition + doneChan := make(chan Result, cfg.MaxJobs) cfg.DoneChan = doneChan + + // add each repo configuration to cfg for _, section := range data.Sections() { if section.Name() == "DEFAULT" { continue @@ -238,27 +239,29 @@ func GetConfig() Config { } repo.Logger = NewLogger(repo.Name) repo.DoneChan = doneChan - repo.StopChan = make(chan bool, 1) + repo.StopChan = make(chan bool) repo.cfg = &cfg - // create the default repo state configuration from a file - repoStateFile := cfg.StateDir + "/" + repo.Name + // create the default repo state repo.State = RepoState{ IsRunning: false, LastAttemptExit: NOT_RUN_YET, LastAttemptTime: 0, LastAttemptRunTime: 0, } - // when repoStatusPath does not exist, create it and write the default state - // overwise overwrite repo.state + + // create the state file if it does not exist otherwise sync the state + repoStateFile := cfg.StateDir + "/" + repo.Name if _, err := os.Stat(repoStateFile); err != nil { repo.SaveState() } else if err := ini.MapTo(&repo.State, repoStateFile); err != nil { panic(err) } - // must be initially not running, otherwise will never run + + // repo state must be initially not running, otherwise will never run repo.State.IsRunning = false + // append a reference to the new repo in the slice of repos cfg.Repos = append(cfg.Repos, &repo) } if len(cfg.Repos) == 0 { diff --git a/merlin/common/logger.go b/merlin/common/logger.go index cbb1045..ad7e902 100644 --- a/merlin/common/logger.go +++ b/merlin/common/logger.go @@ -11,9 +11,14 @@ type Logger struct { } const ( - DEBUG = 1 << iota + // verbose + // DEBUG = 1 << iota + DEBUG = iota + // normal operation INFO + // bad WARNING + // really bad (crash) ERROR ) @@ -34,6 +39,7 @@ func NewLogger(name string) *Logger { func (logger *Logger) Log(level int, v ...interface{}) { levelStr := levels[level] + // TODO: add date + time args := []interface{}{levelStr + ":", logger.name + ":"} args = append(args, v...) logger.Println(args...) diff --git a/merlin/common/process_manager.go b/merlin/common/process_manager.go index 0236c68..b762a1b 100644 --- a/merlin/common/process_manager.go +++ b/merlin/common/process_manager.go @@ -14,15 +14,32 @@ import ( // It returns a channel through which a Cmd will be sent once it has finished, // or nil if it was unable to start a process. func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { - // TODO change stdout and stderr to something else + // startTime and time took will be handled in common.go by SyncExit cmd := exec.Command(args[0], args[1:]...) + + // TODO: change stdout and stderr to something else cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - startTime := time.Now().Unix() + + // I have no idea how to do this + + // stdout, err := cmd.StdoutPipe() + // if err != nil { + // repo.Logger.Warning(err) + // } + // stderr, err := cmd.StderrPipe() + // if err != nil { + // repo.Logger.Warning(err) + // } + // multi := io.MultiReader(stdout, stderr) + // in := bufio.NewScanner(multi) + // repo.Logger.Debug(in.Text()) + // repo.Logger.Warning(in.Err()) + + // startTime := time.Now().Unix() repo.Logger.Debug("Starting process") - err := cmd.Start() - if err != nil { - err = fmt.Errorf("Could not start process %s: %w", args[0], err) + if err := cmd.Start(); err != nil { + err = fmt.Errorf("could not start process %s: %w", args[0], err) repo.Logger.Error(err) return } @@ -38,7 +55,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { } select { case <-time.After(30 * time.Second): - repo.Logger.Warning("Process still hasn't stopped; sending SIGKILL now") + repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL") cmd.Process.Signal(syscall.SIGKILL) case <-procDoneChan: repo.Logger.Debug("Process has stopped.") @@ -49,6 +66,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { cmd.Wait() procDoneChan <- true }() + go func() { defer func() { cmdChan <- cmd @@ -58,13 +76,14 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { repo.Logger.Info("Received signal to stop, killing process...") killProcess() case <-procDoneChan: + // the following could be moved to SyncExit in common.go if cmd.ProcessState.Success() { repo.Logger.Debug("Process ended successfully") } else { repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) } - timeTook := time.Now().Unix() - startTime - repo.Logger.Debug(fmt.Sprintf("Process took %d seconds", timeTook)) + // timeTook := time.Now().Unix() - startTime + // repo.Logger.Debug(fmt.Sprintf("Process took %d seconds", timeTook)) case <-time.After(time.Duration(repo.MaxTime) * time.Second): repo.Logger.Warning("Process has exceeded its max time; killing now") killProcess() diff --git a/merlin/common/sync.go b/merlin/common/sync.go index 0356320..5822342 100644 --- a/merlin/common/sync.go +++ b/merlin/common/sync.go @@ -20,8 +20,6 @@ func (repo *Repo) buildRsyncHost() string { // CSCSyncStandard performs a standard rsync job. func (repo *Repo) CSCSyncStandard() { status := FAILURE - // https://medium.com/@manandharsabbir/go-lang-defer-statement-arguments-evaluated-at-defer-execution-b2c4a1687c6c - // will defer actaully wait till end to function to set the vars? defer func() { repo.DoneChan <- Result{ Name: repo.Name, @@ -51,6 +49,7 @@ func (repo *Repo) CSCSyncStandard() { ch := SpawnProcess(repo, args) if ch == nil { + // Log that something failed? return } cmd := <-ch diff --git a/merlin/merlin.go b/merlin/merlin.go index 202a90a..dec5170 100644 --- a/merlin/merlin.go +++ b/merlin/merlin.go @@ -2,6 +2,9 @@ package main import ( "fmt" + "os" + "os/signal" + "syscall" "time" "git.csclub.uwaterloo.ca/public/merlin/common" @@ -9,6 +12,9 @@ import ( ) func main() { + stopSig := make(chan os.Signal, 1) + signal.Notify(stopSig, syscall.SIGINT, syscall.SIGTERM) + logger := common.NewLogger("main") cfg := common.GetConfig() logger.Debug("Read config:") @@ -33,7 +39,7 @@ func main() { for numJobsRunning < cfg.MaxJobs { repo := repos[repoIdx] // attempt to run repo and increment when a job is started - if repo.RunIfScheduled() { + if repo.RunIfPossible() { numJobsRunning++ } repoIdx = (repoIdx + 1) % len(repos) @@ -44,15 +50,38 @@ func main() { } } + // TODO: Logging of job starts runAsManyAsPossible() +runLoop: for { select { - case result := <-doneChan: - // move this into a method in common.go - repoMap[result.Name].JobDone(result.Exit) + case <-stopSig: + // close StopChan for every repo + for i := 0; i < len(repos); i++ { + close(repos[i].StopChan) + } + break runLoop + case done := <-doneChan: + // a job has exited + repoMap[done.Name].SyncExit(done.Exit) numJobsRunning-- case <-time.After(1 * time.Minute): } runAsManyAsPossible() } + + // TODO: Logging of job exits + + // wait for every running job to stop running + for { + select { + case done := <-doneChan: + repoMap[done.Name].SyncExit(done.Exit) + numJobsRunning-- + case <-time.After(1 * time.Second): + if numJobsRunning == 0 { + return + } + } + } }