diff --git a/merlin/Handoff.md b/merlin/Handoff.md index f892270..9f6d36d 100644 --- a/merlin/Handoff.md +++ b/merlin/Handoff.md @@ -2,10 +2,10 @@ - [x] debian - [ ] ubuntu < - [ ] ubuntu-ports < -- [ ] linuxmint-packages < -- [ ] debian-multimedia < -- [ ] debian-backports < -- [ ] debian-security < +- [x] linuxmint-packages +- [x] debian-multimedia +- [ ] debian-backports (part 1 broken) +- [x] debian-security - [x] ubuntu-releases - [x] xubuntu-releases - [ ] puppylinux (broken) diff --git a/merlin/README.md b/merlin/README.md index b1b3461..9b3f3b9 100644 --- a/merlin/README.md +++ b/merlin/README.md @@ -13,12 +13,13 @@ Then configure `merlin-config.ini` and run using `./merlin` ### Nice Features To Add - detect if an rsync process is stuck (watch the stdout/stderr of the rsync processes) -- get the config or the rsync command that a repo will sync with using a cli tool -- improve conversion from exit status enum to string +- detect if config file is changed and automatically reload (fsnotify) - sort `arthur status` by last time synced -- last sync runtime, time until next sync -- when sync fails, then make a copy of the rsync logs -- implementation of `trace` is ad hoc and can be improved +- respond to `arthur` without blocking main thread +- split off arthur with a more featureful arthur for debugging + - last sync runtime, time until next sync + - get expected rsync command or repo config + - details for last 10 syncs (avg time, success rate, data read/written) ### Completed - [x] add bwlimit option for each rsync process diff --git a/merlin/arthur/arthur.go b/merlin/arthur/arthur.go index 95272e2..1a60e19 100644 --- a/merlin/arthur/arthur.go +++ b/merlin/arthur/arthur.go @@ -68,12 +68,7 @@ func SendStatus(conn net.Conn) { repo := config.RepoMap[name] lastSync := repo.State.LastAttemptStartTime nextSync := lastSync + int64(repo.Frequency) - lastExit := "failed" - if repo.State.LastAttemptExit == config.SUCCESS { - lastExit = "completed" - } else if repo.State.LastAttemptExit == config.TERMINATED { - lastExit = "terminated" - } + lastExit := config.StatusToString(repo.State.LastAttemptExit) fmt.Fprintf(status, "%s\t%s\t%s\t%s\t%t\n", name, diff --git a/merlin/config/config.go b/merlin/config/config.go index d597f53..cb0899d 100644 --- a/merlin/config/config.go +++ b/merlin/config/config.go @@ -50,12 +50,26 @@ var frequencies = map[string]int{ // Last job attempt statuses const ( - NOT_RUN_YET = iota + TERMINATED = iota - 1 SUCCESS FAILURE - TERMINATED // was killed by a signal + NOT_RUN_YET ) +func StatusToString(status int) string { + switch status { + case TERMINATED: + return "terminated" + case SUCCESS: + return "success" + case FAILURE: + return "failure" + case NOT_RUN_YET: + return "not run yet" + } + return "" +} + type SyncResult struct { Name string Exit int diff --git a/merlin/sync/interface.go b/merlin/sync/interface.go index 7be313e..e2e6cbb 100644 --- a/merlin/sync/interface.go +++ b/merlin/sync/interface.go @@ -20,6 +20,7 @@ func SyncIfPossible(repo *config.Repo, force bool) bool { repo.State.LastAttemptStartTime = curTime repo.SaveState() repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name)) + go startRepoSync(repo) return true } @@ -40,16 +41,28 @@ func SyncCompleted(repo *config.Repo, exit int) { repoState.LastAttemptRunTime = syncTook repo.SaveState() - exitStr := "failed" - if exit == config.SUCCESS { - exitStr = "completed" - } else if exit == config.TERMINATED { - exitStr = "terminated" - } + exitStr := config.StatusToString(exit) repo.Logger.Info(fmt.Sprintf("Sync %s after running for %d seconds, will run again in %d seconds", exitStr, syncTook, nextSync)) - if exit == config.SUCCESS { - go zfsSync(repo) - go postSyncTraceUpdate(repo) - } + go postRepoSync(repo, exit) +} + +// begin and manage the steps of the sync and return the exit status +func startRepoSync(repo *config.Repo) { + status := config.FAILURE + done := false + + defer func() { + repo.DoneChan <- config.SyncResult{ + Name: repo.Name, + Exit: status, + } + }() + + status, done = preRepoSync(repo) + if done { + return + } + + status = runRepoSync(repo) } diff --git a/merlin/sync/post.go b/merlin/sync/post.go new file mode 100644 index 0000000..2926f9c --- /dev/null +++ b/merlin/sync/post.go @@ -0,0 +1,94 @@ +package sync + +import ( + "io" + "os" + "os/exec" + "path/filepath" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" +) + +func postRepoSync(repo *config.Repo, exit int) { + if repo.DryRun { + repo.Logger.Debug("post sync not run because in dry run mode") + return + } + + switch exit { + case config.SUCCESS: + go zfsSync(repo) + go postSyncTraceUpdate(repo) + return + + case config.FAILURE: + go backupRsyncFailLog(repo) + return + } +} + +func zfsSync(repo *config.Repo) { + // we are not using zfs snapshots at the moment + repo.Logger.Debug("Would run a zfssync if not disabled") + return + + out, err := exec.Command("/bin/sh", "/home/mirror/bin/zfssync", repo.Name).CombinedOutput() + if err != nil { + repo.Logger.Error(err) + } else { + f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + repo.Logger.Error(err.Error()) + } else { + f.Write(out) + } + } +} + +// updates a trace file after the repo has finished syncing +func postSyncTraceUpdate(repo *config.Repo) { + switch repo.SyncType { + case "csc-sync-debian": + cscPostDebian(repo) + } +} + +// update our trace file's modification date by writing the current time +func cscPostDebian(repo *config.Repo) { + targetDir := filepath.Join(buildDownloadDir(repo), "project/trace") + target := filepath.Join(targetDir, config.Conf.Hostname) + + os.MkdirAll(targetDir, 0755) + f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + + if err != nil { + repo.Logger.Error("Unable to open trace file: " + target) + return + } + if _, err = f.WriteString(time.Now().UTC().Format(time.RFC1123)); err != nil { + repo.Logger.Error("Unable to write to trace file: " + target) + return + } +} + +func backupRsyncFailLog(repo *config.Repo) { + src := repo.RsyncLogFile + dest := repo.RsyncLogFile + ".fail" + + fin, err := os.Open(src) + if err != nil { + repo.Logger.Error(err.Error()) + } + defer fin.Close() + + fout, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + repo.Logger.Error(err.Error()) + } + defer fout.Close() + + if _, err = io.Copy(fout, fin); err != nil { + repo.Logger.Error(err.Error()) + } +} diff --git a/merlin/sync/pre.go b/merlin/sync/pre.go new file mode 100644 index 0000000..6542f4c --- /dev/null +++ b/merlin/sync/pre.go @@ -0,0 +1,165 @@ +package sync + +import ( + "os" + "path/filepath" + + "git.csclub.uwaterloo.ca/public/merlin/config" +) + +// jobs to complete before repo sync command should be run +// when done is false: status can be ignored and sync command should continue +// when done is true: sync should exit with status +func preRepoSync(repo *config.Repo) (status int, done bool) { + status = config.SUCCESS + done = false + + // clear the rsync log file + if repo.RsyncLogFile != "" { + err := os.Truncate(repo.RsyncLogFile, 0) + if err != nil { + status = config.FAILURE + repo.Logger.Error("Error while trying to clear logfile: " + repo.RepoLogFile) + } + } + + if repo.TraceHost != "" { + if status, done = checkIfRepoSyncNeeded(repo); done { + + // run alternative jobs if a full sync will not be done + if exit := noRepoSyncNeeded(repo); exit != config.SUCCESS { + status = exit + } + return + } + } + + return +} + +// check if the repo should be synced by comparing their trace file with our own +// returns the exit status of the command used to retrieve the repo's trace file +// returns true for done if and only if no errors occur and it is confirmed that +// the trace files are the same or that a termination signal was given +// (csc-sync-debian will always sync so done in its case will always be false) +func checkIfRepoSyncNeeded(repo *config.Repo) (status int, done bool) { + status = config.FAILURE + done = false + + if repo.DryRun { + repo.Logger.Debug("dry running so assuming that trace file was changed") + return config.SUCCESS, false + } + + repo.Logger.Debug("Retrieving the repo's trace file") + + temp, err := os.CreateTemp("/tmp", "merlin-"+repo.Name+"-trace-*") + if err != nil { + repo.Logger.Error(err.Error()) + return + } + temp.Close() + defer os.Remove(temp.Name()) + + // get the trace command to copy the last update file + var cmd []string + switch repo.SyncType { + case "csc-sync-archlinux": + cmd = cscTraceArchLinux(repo, temp.Name()) + case "csc-sync-debian": + cmd = cscTraceDebian(repo, temp.Name()) + default: + repo.Logger.Error("Trace files are not implemented for sync type '" + repo.SyncType + "'") + return + } + + status = spawnProcessAndWait(repo, cmd) + if status != config.SUCCESS { + if status == config.TERMINATED { + done = true + } + return + } + + repo.Logger.Debug("Comparing the repo's trace file with our own") + + // diff returns true if files are the same (if files are the same then sync is done) + switch repo.SyncType { + case "csc-sync-archlinux": + done = cscTraceArchLinuxDiff(repo, temp.Name()) + case "csc-sync-debian": + done = cscTraceDebianDiff(repo, temp.Name()) + } + + if done { + repo.Logger.Info("trace file for " + repo.RsyncHost + " unchanged") + + // debian syncs even when trace file is unchanged + if repo.SyncType == "csc-sync-debian" { + done = false + } + return + } + + repo.Logger.Debug("trace file changes found; will attempt to sync") + return +} + +// jobs to do when a full sync will not be run +func noRepoSyncNeeded(repo *config.Repo) (status int) { + status = config.SUCCESS + + var args []string + switch repo.SyncType { + case "csc-sync-archlinux": + args = cscTraceArchLinuxUpdate(repo) + } + + status = spawnProcessAndWait(repo, args) + return +} + +func cscTraceArchLinux(repo *config.Repo, newTime string) []string { + args := []string{ + "curl", + "--interface", config.Conf.IPv4Address, + "-s", repo.TraceHost, + "-o", newTime, + } + + return args +} + +func cscTraceDebian(repo *config.Repo, newTime string) []string { + args := []string{ + "nice", "rsync", "-tv", "--quiet", + "-4", "--address=" + config.Conf.IPv4Address, + repo.RsyncHost + "::" + filepath.Join(repo.RsyncDir, "project/trace", repo.TraceHost), + newTime, + } + + return args +} + +func cscTraceArchLinuxDiff(repo *config.Repo, newTime string) bool { + return diffFileContent(repo, newTime, filepath.Join(buildDownloadDir(repo), "lastupdate")) +} + +func cscTraceDebianDiff(repo *config.Repo, newTime string) bool { + return diffFileTime(repo, newTime, filepath.Join(buildDownloadDir(repo), "project/trace", repo.TraceHost)) +} + +func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) { + rsyncDir := repo.RsyncDir + localDir := repo.LocalDir + + repo.RsyncDir = repo.RsyncDir + "/lastsync" + repo.LocalDir = repo.LocalDir + "/lastsync" + + args = cscSyncArchLinux(repo) + + repo.RsyncDir = rsyncDir + repo.LocalDir = localDir + + return args +} diff --git a/merlin/sync/sync.go b/merlin/sync/sync.go index df36195..7c97198 100644 --- a/merlin/sync/sync.go +++ b/merlin/sync/sync.go @@ -8,32 +8,69 @@ import ( "git.csclub.uwaterloo.ca/public/merlin/config" ) -func buildRsyncHost(repo *config.Repo) string { - rsyncHost := repo.RsyncHost - if repo.RsyncUser != "" { - rsyncHost = repo.RsyncUser + "@" + rsyncHost +func runRepoSync(repo *config.Repo) (status int) { + status = config.FAILURE + + cmds := getSyncCommand(repo) + if len(cmds) == 0 { + repo.Logger.Error() + return } - return "rsync://" + rsyncHost + "/" + repo.RsyncDir + + // run every step of the sync command as long as the previous step was successful + for i, args := range cmds { + repo.Logger.Debug(fmt.Sprintf("Running step %d of sync", i)) + + status = spawnProcessAndWait(repo, args) + if status != config.SUCCESS { + // spawnProcessAndWait will have already logged error + return + } + } + return } -func buildRsyncDaemonHost(repo *config.Repo) string { - rsyncHost := repo.RsyncHost - if repo.RsyncUser != "" { - rsyncHost = repo.RsyncUser + "@" + rsyncHost +// executes a particular sync job depending on repo.SyncType. +func getSyncCommand(repo *config.Repo) (cmds [][]string) { + // check that the download directory exists + if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) { + repo.Logger.Error(err.Error()) + return } - return rsyncHost + "::" + repo.RsyncDir -} -func buildRsyncSSHHost(repo *config.Repo) string { - rsyncHost := repo.RsyncHost - if repo.RsyncUser != "" { - rsyncHost = repo.RsyncUser + "@" + rsyncHost + switch repo.SyncType { + case "csc-sync-apache": + return append(cmds, cscSyncApache(repo)) + case "csc-sync-archlinux": + return append(cmds, cscSyncArchLinux(repo)) + case "csc-sync-badperms": + return append(cmds, cscSyncBadPerms(repo)) + case "csc-sync-cdimage": + return append(cmds, cscSyncCDImage(repo)) + case "csc-sync-ceph": + return append(cmds, cscSyncCephStep1(repo), cscSyncCephStep2(repo)) + case "csc-sync-chmod": + return append(cmds, cscSyncChmod(repo)) + case "csc-sync-debian": + return append(cmds, cscSyncDebianStep1(repo), cscSyncDebianStep2(repo)) + case "csc-sync-debian-cd": + return append(cmds, cscSyncDebianCD(repo)) + case "csc-sync-gentoo": + return append(cmds, cscSyncGentoo(repo)) + case "csc-sync-s3": + return append(cmds, cscSyncS3(repo)) + case "csc-sync-ssh": + return append(cmds, cscSyncSSH(repo)) + case "csc-sync-standard": + return append(cmds, cscSyncStandard(repo)) + case "csc-sync-standard-ipv6": + return append(cmds, cscSyncStandardIPV6(repo)) + case "csc-sync-wget": + return append(cmds, cscSyncWget(repo)) + default: + repo.Logger.Error("Unrecognized sync type: " + repo.SyncType) + return } - return rsyncHost + ":" + repo.RsyncDir -} - -func buildDownloadDir(repo *config.Repo) string { - return filepath.Join(config.Conf.DownloadDir, repo.LocalDir) } const ( @@ -94,6 +131,34 @@ func addConditionalFlags(repo *config.Repo, flags int) []string { return args } +func buildRsyncHost(repo *config.Repo) string { + rsyncHost := repo.RsyncHost + if repo.RsyncUser != "" { + rsyncHost = repo.RsyncUser + "@" + rsyncHost + } + return "rsync://" + rsyncHost + "/" + repo.RsyncDir +} + +func buildRsyncDaemonHost(repo *config.Repo) string { + rsyncHost := repo.RsyncHost + if repo.RsyncUser != "" { + rsyncHost = repo.RsyncUser + "@" + rsyncHost + } + return rsyncHost + "::" + repo.RsyncDir +} + +func buildRsyncSSHHost(repo *config.Repo) string { + rsyncHost := repo.RsyncHost + if repo.RsyncUser != "" { + rsyncHost = repo.RsyncUser + "@" + rsyncHost + } + return rsyncHost + ":" + repo.RsyncDir +} + +func buildDownloadDir(repo *config.Repo) string { + return filepath.Join(config.Conf.DownloadDir, repo.LocalDir) +} + func cscSyncApache(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-az", @@ -149,7 +214,7 @@ func cscSyncCDImage(repo *config.Repo) []string { return args } -func cscSyncCeph(repo *config.Repo) []string { +func cscSyncCephStep1(repo *config.Repo) []string { args := []string{ "rsync", "--stats", "--progress", repo.RsyncHost + "::ceph", @@ -166,12 +231,16 @@ func cscSyncCeph(repo *config.Repo) []string { args = append(args, addConditionalFlags(repo, quiet|ipv4)...) args = append(args, buildDownloadDir(repo)) - args = append(args, []string{ + return args +} + +func cscSyncCephStep2(repo *config.Repo) []string { + args := []string{ "&&", "rsync", "--stats", "--progress", repo.RsyncHost + "::ceph", "--recursive", "--times", "--links", "--hard-links", "--delete-after", - }...) + } args = append(args, addConditionalFlags(repo, quiet|ipv4)...) args = append(args, buildDownloadDir(repo)) @@ -191,24 +260,18 @@ func cscSyncChmod(repo *config.Repo) []string { return args } -func cscSyncDebian(repo *config.Repo) []string { +func cscSyncDebianStep1(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-rlHtvp", } args = append(args, addConditionalFlags(repo, baseFlags|excludeTmp|ipv4)...) args = append(args, buildRsyncDaemonHost(repo)+"/pool/", buildDownloadDir(repo)+"/pool/") - ch := spawnProcess(repo, args) - if ch == nil { - // spawnSyncProcess will have already logged error - return nil - } - cmd := <-ch - if cmd.ProcessState.ExitCode() != 0 { - return nil - } + return args +} - args = []string{ +func cscSyncDebianStep2(repo *config.Repo) []string { + args := []string{ "nice", "rsync", "-rlHtvp", "--exclude", filepath.Join("project/trace", config.Conf.Hostname), } @@ -320,46 +383,3 @@ func cscSyncWget(repo *config.Repo) []string { return args } - -// executes a particular sync job depending on repo.SyncType. -func getSyncCommand(repo *config.Repo) (args []string) { - // check that the download directory exists - if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) { - repo.Logger.Error(err.Error()) - return - } - - switch repo.SyncType { - case "csc-sync-apache": - return cscSyncApache(repo) - case "csc-sync-archlinux": - return cscSyncArchLinux(repo) - case "csc-sync-badperms": - return cscSyncBadPerms(repo) - case "csc-sync-cdimage": - return cscSyncCDImage(repo) - case "csc-sync-ceph": - return cscSyncCeph(repo) - case "csc-sync-chmod": - return cscSyncChmod(repo) - case "csc-sync-debian": - return cscSyncDebian(repo) - case "csc-sync-debian-cd": - return cscSyncDebianCD(repo) - case "csc-sync-gentoo": - return cscSyncGentoo(repo) - case "csc-sync-s3": - return cscSyncS3(repo) - case "csc-sync-ssh": - return cscSyncSSH(repo) - case "csc-sync-standard": - return cscSyncStandard(repo) - case "csc-sync-standard-ipv6": - return cscSyncStandardIPV6(repo) - case "csc-sync-wget": - return cscSyncWget(repo) - default: - repo.Logger.Error("Unrecognized sync type: " + repo.SyncType) - return - } -} diff --git a/merlin/sync/sync_test.go b/merlin/sync/sync_test.go deleted file mode 100644 index ca46741..0000000 --- a/merlin/sync/sync_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package sync - -import ( - "path/filepath" - "reflect" - "testing" - - "git.csclub.uwaterloo.ca/public/merlin/config" - "git.csclub.uwaterloo.ca/public/merlin/logger" -) - -func dummyRepoConf(name string, syncType string, frequencyStr string, localDir string, rsyncHost string, rsyncDir string) *config.Repo { - doneChan := make(chan config.SyncResult) - stopChan := make(chan struct{}) - - repoLogFile := filepath.Join("test_files", name, name+".log") - logger := logger.NewLogger(name, repoLogFile, false) - - return &config.Repo{ - Name: name, - SyncType: syncType, - FrequencyStr: frequencyStr, - Frequency: 0, - MaxTime: config.DEFAULT_MAX_TIME, - LocalDir: localDir, - RsyncHost: rsyncHost, - RsyncDir: rsyncDir, - RsyncUser: "", - PasswordFile: "", - StateFile: "", - RepoLogFile: repoLogFile, - Logger: logger, - RsyncLogFile: "/tmp/log/" + name + "-rsync.log", - ZfssyncLogFile: "", - DoneChan: doneChan, - StopChan: stopChan, - State: &config.RepoState{ - IsRunning: false, - LastAttemptStartTime: 0, - LastAttemptRunTime: 0, - LastAttemptExit: config.NOT_RUN_YET, - }, - } -} - -func TestGetSyncCommand(t *testing.T) { - - config.Conf.DownloadDir = "test_files" - config.Conf.IPv4Address = "0.0.0.0" - config.Conf.DownloadDir = "/tmp/mirror" - - testData := []struct { - repoConf *config.Repo - expected []string - }{ - { - repoConf: dummyRepoConf("ubuntu-releases", "csc-sync-standard", "bi-hourly", "ubuntu-releases", "rsync.releases.ubuntu.com", "releases"), - expected: []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=0.0.0.0", - "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=/tmp/log/ubuntu-releases-rsync.log", - "rsync://rsync.releases.ubuntu.com/releases", "/tmp/mirror/ubuntu-releases", - }, - }, - { - repoConf: dummyRepoConf("raspberrypi", "csc-sync-standard-ipv6", "bi-hourly", "raspberrypi", "apt-repo.raspberrypi.org", "archive"), - expected: []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=0.0.0.0", - "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=/tmp/log/raspberrypi-rsync.log", - "apt-repo.raspberrypi.org::archive", "/tmp/mirror/raspberrypi", - }, - }, - } - - for _, test := range testData { - - syncCommand := getSyncCommand(test.repoConf) - - // check for correct command output - if !reflect.DeepEqual(syncCommand, test.expected) { - t.Errorf("Invalid command string for %s repo\nRECIEVED:\n%+v\nEXPECTED:\n%+v\n", test.repoConf.Name, syncCommand, test.expected) - } - - // check if download dir was created - - } -} diff --git a/merlin/sync/trace.go b/merlin/sync/trace.go deleted file mode 100644 index cdafb49..0000000 --- a/merlin/sync/trace.go +++ /dev/null @@ -1,203 +0,0 @@ -package sync - -import ( - "os" - "path/filepath" - "strings" - "time" - - "git.csclub.uwaterloo.ca/public/merlin/config" -) - -func cscTraceArchLinux(repo *config.Repo, newTime string) []string { - args := []string{ - "curl", - "--interface", config.Conf.IPv4Address, - "-s", repo.TraceHost, - "-o", newTime, - } - - return args -} - -// return false iff both files exist, are non-empty, and have the same contents -func cscTraceArchLinuxDiff(repo *config.Repo, newTime string) bool { - readFile := func(file string) string { - f, err := os.ReadFile(file) - if err != nil { - repo.Logger.Debug(err.Error()) - return "" - } - return strings.TrimSpace(string(f)) - } - - file1 := readFile(newTime) - file2 := readFile(filepath.Join(buildDownloadDir(repo), "lastupdate")) - - if file1 == "" || file2 == "" { - return true - } - return file1 != file2 -} - -// update the lastsync file -func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) { - rsyncDir := repo.RsyncDir - localDir := repo.LocalDir - - repo.RsyncDir = repo.RsyncDir + "/lastsync" - repo.LocalDir = repo.LocalDir + "/lastsync" - - args = cscSyncArchLinux(repo) - - repo.RsyncDir = rsyncDir - repo.LocalDir = localDir - - return args -} - -func cscTraceDebian(repo *config.Repo, newTime string) []string { - args := []string{ - "nice", "rsync", "-tv", "-4", - "--address=" + config.Conf.IPv4Address, - "--quiet", - repo.RsyncHost + "::" + filepath.Join(repo.RsyncDir, "project/trace", repo.TraceHost), - newTime, - } - - return args -} - -// return false iff both files exist and were modified at the same time -func cscTraceDebianDiff(repo *config.Repo, newTime string) bool { - statFile := func(file string) int64 { - f, err := os.Stat(file) - if err != nil { - repo.Logger.Debug("Error while trying to stat file: " + file) - return 0 - } - - return f.ModTime().Unix() - } - - file1 := statFile(newTime) - file2 := statFile(filepath.Join(buildDownloadDir(repo), "project/trace", repo.TraceHost)) - - if file1 == 0 || file2 == 0 { - return true - } - - return file1 != file2 -} - -// update our trace file's modification date by writing the current time -func cscTraceDebianUpdate(repo *config.Repo) { - target := filepath.Join(buildDownloadDir(repo), "project/trace", config.Conf.Hostname) - - os.MkdirAll(filepath.Join(buildDownloadDir(repo), "project/trace"), 0755) - - f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - repo.Logger.Error("Unable to open trace file: " + target) - return - } - if _, err = f.WriteString(time.Now().UTC().Format(time.RFC1123)); err != nil { - repo.Logger.Error("Unable to write to trace file: " + target) - return - } -} - -// some repos want us to check a last modification time file before performing a full sync -// returns a status to update the original sync's status along with if the sync should continue -func checkIfSyncNeeded(repo *config.Repo) (status int, continueSync bool) { - // default return values are the default status and to continue the sync - status = config.FAILURE - continueSync = true - if repo.DryRun { - return - } - runCommand := func(args []string) int { - ch := spawnProcess(repo, args) - if ch == nil { - // spawnSyncProcess will have already logged error - return 1 - } - cmd := <-ch - return cmd.ProcessState.ExitCode() - } - - // create a temp file - temp, err := os.CreateTemp("/tmp", "merlin-"+repo.Name+"-trace-*") - if err != nil { - repo.Logger.Error(err.Error()) - return - } - temp.Close() - defer os.Remove(temp.Name()) - - // get the trace command to copy a last updated file - var args []string - switch repo.SyncType { - case "csc-sync-archlinux": - args = cscTraceArchLinux(repo, temp.Name()) - case "csc-sync-debian": - args = cscTraceDebian(repo, temp.Name()) - default: - repo.Logger.Error("Trace files are not implemented for sync type '" + repo.SyncType + "'") - return - } - - exit := runCommand(args) - if exit != 0 { - // if the process was terminated then don't continue to sync - if exit == -1 { - return config.TERMINATED, false - } - return - } - - filesDiffer := true - switch repo.SyncType { - case "csc-sync-archlinux": - filesDiffer = cscTraceArchLinuxDiff(repo, temp.Name()) - case "csc-sync-debian": - filesDiffer = cscTraceDebianDiff(repo, temp.Name()) - } - // debian still syncs even if the trace file is the same - if !filesDiffer && repo.SyncType == "csc-sync-debian" { - repo.Logger.Error("trace file for " + repo.RsyncHost + " unchanged") - return - } - if filesDiffer { - // continue sync if files differ - return - } - - repo.Logger.Debug("No changes found; full sync will not be made") - - // archlinux wants to sync a "lastsync" file if lastupdate prevents sync - switch repo.SyncType { - case "csc-sync-archlinux": - args = cscTraceArchLinuxUpdate(repo) - default: - return config.SUCCESS, false - } - - exit = runCommand(args) - if exit != 0 { - // if the process was terminated then don't continue to sync - if exit == -1 { - return config.TERMINATED, false - } - return - } - - return config.SUCCESS, false -} - -// csc-sync-debian wants to update a trace file after the repo is done syncing -func postSyncTraceUpdate(repo *config.Repo) { - if repo.SyncType == "csc-sync-debian" { - cscTraceDebianUpdate(repo) - } -} diff --git a/merlin/sync/process.go b/merlin/sync/utils.go similarity index 65% rename from merlin/sync/process.go rename to merlin/sync/utils.go index 86678d1..11334c2 100644 --- a/merlin/sync/process.go +++ b/merlin/sync/utils.go @@ -16,11 +16,17 @@ import ( // It returns a channel through which a Cmd will be sent once it has finished, // or nil if it was unable to start a process. func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { + if len(args) == 0 { + repo.Logger.Error("command given is of zero length") + return + } + repo.Logger.Debug(fmt.Sprintf("Running the command: %v", args)) if repo.DryRun { repo.Logger.Debug("Dry running for 50 seconds") args = []string{"sleep", "50"} } + cmd := exec.Command(args[0], args[1:]...) repo.Logger.Debug("Starting process") @@ -78,44 +84,13 @@ func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { return } -func startRepoSync(repo *config.Repo) { - status := config.FAILURE - defer func() { - repo.DoneChan <- config.SyncResult{ - Name: repo.Name, - Exit: status, - } - }() - - if repo.TraceHost != "" { - repo.Logger.Debug("Checking for changes") - - continueSync := true - status, continueSync = checkIfSyncNeeded(repo) - if !continueSync || status == config.TERMINATED { - return - } - - repo.Logger.Debug("Changes found; will attempt to sync") - } - - // clear the rsync log file before starting the sync - if repo.RsyncLogFile != "" { - err := os.Truncate(repo.RsyncLogFile, 0) - if err != nil { - repo.Logger.Error(err.Error()) - } - } - - args := getSyncCommand(repo) - if len(args) == 0 { - repo.Logger.Error("zero length command given for sync") - return - } +// spawns a process and waits for it complete before parsing the exit code and returning it +func spawnProcessAndWait(repo *config.Repo, args []string) (status int) { + status = config.FAILURE ch := spawnProcess(repo, args) if ch == nil { - // spawnSyncProcess will have already logged error + // spawnProcess will have already logged error return } cmd := <-ch @@ -127,22 +102,46 @@ func startRepoSync(repo *config.Repo) { status = config.TERMINATED // default is already FAILURE } -} - -func zfsSync(repo *config.Repo) { - // we are not using zfs snapshots at the moment - repo.Logger.Debug("Would run a zfssync if not disabled") return - - out, err := exec.Command("/bin/sh", "/home/mirror/bin/zfssync", repo.Name).CombinedOutput() - if err != nil { - repo.Logger.Error(err) - } else { - f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - repo.Logger.Error(err.Error()) - } else { - f.Write(out) - } - } +} + +// returns true iff file contents are the same (for diff file contents, errors, empty return false) +func diffFileContent(repo *config.Repo, file1, file2 string) bool { + readFile := func(file string) string { + f, err := os.ReadFile(file) + if err != nil { + repo.Logger.Debug("Error while trying to read file: " + file) + return "" + } + return string(f) + } + + content1 := readFile(file1) + content2 := readFile(file2) + + if content1 == "" || content2 == "" { + return false + } + return content1 == content2 +} + +// returns true iff file times are the same (for diff file times and errors return false) +func diffFileTime(repo *config.Repo, file1, file2 string) bool { + statFile := func(file string) int64 { + f, err := os.Stat(file) + if err != nil { + repo.Logger.Debug("Error while trying to stat file: " + file) + return 0 + } + + return f.ModTime().Unix() + } + + time1 := statFile(file1) + time2 := statFile(file2) + + if time1 == 0 || time2 == 0 { + return false + } + return file1 == file2 }