package sync import ( "fmt" "os" "os/exec" "syscall" "time" "git.csclub.uwaterloo.ca/public/merlin/config" ) // SpawnProcess spawns a child process for the given repo. The process will // be stopped early if the repo receives a stop signal, or if the process // runs for longer than the repo's MaxTime. // It returns a channel through which a Cmd will be sent once it has finished, // or nil if it was unable to start a process. func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { repo.Logger.Debug(fmt.Sprintf("Running the command: %v", args)) if repo.DryRun { repo.Logger.Debug("Dry running for 50 seconds") args = []string{"sleep", "50"} } cmd := exec.Command(args[0], args[1:]...) repo.Logger.Debug("Starting process") if err := cmd.Start(); err != nil { repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err).Error()) return } cmdChan := make(chan *exec.Cmd) ch = cmdChan cmdDoneChan := make(chan struct{}) killProcess := func() { err := cmd.Process.Signal(syscall.SIGTERM) if err != nil { repo.Logger.Error("Could not send signal to process:", err) return } select { case <-time.After(30 * time.Second): repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL") cmd.Process.Signal(syscall.SIGKILL) case <-cmdDoneChan: repo.Logger.Debug("Process has been stopped.") } } go func() { cmd.Wait() close(cmdDoneChan) }() go func() { defer func() { cmdChan <- cmd }() select { case <-cmdDoneChan: if !cmd.ProcessState.Success() { repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) out, _ := cmd.CombinedOutput() repo.Logger.Debug(string(out)) } case <-repo.StopChan: repo.Logger.Debug("Received signal to stop, killing process...") killProcess() case <-time.After(time.Duration(repo.MaxTime) * time.Second): repo.Logger.Warning("Process has exceeded its max time; killing now") killProcess() } }() return } func startRepoSync(repo *config.Repo) { status := config.FAILURE defer func() { repo.DoneChan <- config.SyncResult{ Name: repo.Name, Exit: status, } }() if repo.TraceUrl != "" { repo.Logger.Debug("Checking for changes") continueSync := true status, continueSync = checkIfSyncNeeded(repo) if !continueSync { return } repo.Logger.Debug("Changes found; will attempt to sync") } args := getSyncCommand(repo) if len(args) == 0 { repo.Logger.Error("zero length command given for sync") return } // clear the rsync log file before starting the sync if repo.RsyncLogFile != "" { err := os.Truncate(repo.RsyncLogFile, 0) if err != nil { repo.Logger.Error(err.Error()) } } ch := spawnProcess(repo, args) if ch == nil { // spawnSyncProcess will have already logged error return } cmd := <-ch switch cmd.ProcessState.ExitCode() { case 0: status = config.SUCCESS case -1: status = config.TERMINATED // default is already FAILURE } } func zfsSync(repo *config.Repo) { // we are not using zfs snapshots at the moment repo.Logger.Debug("Would run a zfssync if not disabled") return out, err := exec.Command("/bin/sh", "/home/mirror/bin/zfssync", repo.Name).CombinedOutput() if err != nil { repo.Logger.Error(err) } else { f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { repo.Logger.Error(err.Error()) } else { f.Write(out) } } }