refactor sync

This commit is contained in:
Andrew Wang 2022-07-28 02:09:36 +00:00
parent b3d4abd885
commit 8675e13d42
11 changed files with 457 additions and 447 deletions

View File

@ -2,10 +2,10 @@
- [x] debian
- [ ] ubuntu <
- [ ] ubuntu-ports <
- [ ] linuxmint-packages <
- [ ] debian-multimedia <
- [ ] debian-backports <
- [ ] debian-security <
- [x] linuxmint-packages
- [x] debian-multimedia
- [ ] debian-backports (part 1 broken)
- [x] debian-security
- [x] ubuntu-releases
- [x] xubuntu-releases
- [ ] puppylinux (broken)

View File

@ -13,12 +13,13 @@ Then configure `merlin-config.ini` and run using `./merlin`
### Nice Features To Add
- detect if an rsync process is stuck (watch the stdout/stderr of the rsync processes)
- get the config or the rsync command that a repo will sync with using a cli tool
- improve conversion from exit status enum to string
- detect if config file is changed and automatically reload (fsnotify)
- sort `arthur status` by last time synced
- respond to `arthur` without blocking main thread
- split off arthur with a more featureful arthur for debugging
- last sync runtime, time until next sync
- when sync fails, then make a copy of the rsync logs
- implementation of `trace` is ad hoc and can be improved
- get expected rsync command or repo config
- details for last 10 syncs (avg time, success rate, data read/written)
### Completed
- [x] add bwlimit option for each rsync process

View File

@ -68,12 +68,7 @@ func SendStatus(conn net.Conn) {
repo := config.RepoMap[name]
lastSync := repo.State.LastAttemptStartTime
nextSync := lastSync + int64(repo.Frequency)
lastExit := "failed"
if repo.State.LastAttemptExit == config.SUCCESS {
lastExit = "completed"
} else if repo.State.LastAttemptExit == config.TERMINATED {
lastExit = "terminated"
}
lastExit := config.StatusToString(repo.State.LastAttemptExit)
fmt.Fprintf(status, "%s\t%s\t%s\t%s\t%t\n",
name,

View File

@ -50,12 +50,26 @@ var frequencies = map[string]int{
// Last job attempt statuses
const (
NOT_RUN_YET = iota
TERMINATED = iota - 1
SUCCESS
FAILURE
TERMINATED // was killed by a signal
NOT_RUN_YET
)
func StatusToString(status int) string {
switch status {
case TERMINATED:
return "terminated"
case SUCCESS:
return "success"
case FAILURE:
return "failure"
case NOT_RUN_YET:
return "not run yet"
}
return ""
}
type SyncResult struct {
Name string
Exit int

View File

@ -20,6 +20,7 @@ func SyncIfPossible(repo *config.Repo, force bool) bool {
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
go startRepoSync(repo)
return true
}
@ -40,16 +41,28 @@ func SyncCompleted(repo *config.Repo, exit int) {
repoState.LastAttemptRunTime = syncTook
repo.SaveState()
exitStr := "failed"
if exit == config.SUCCESS {
exitStr = "completed"
} else if exit == config.TERMINATED {
exitStr = "terminated"
}
exitStr := config.StatusToString(exit)
repo.Logger.Info(fmt.Sprintf("Sync %s after running for %d seconds, will run again in %d seconds", exitStr, syncTook, nextSync))
if exit == config.SUCCESS {
go zfsSync(repo)
go postSyncTraceUpdate(repo)
go postRepoSync(repo, exit)
}
// begin and manage the steps of the sync and return the exit status
func startRepoSync(repo *config.Repo) {
status := config.FAILURE
done := false
defer func() {
repo.DoneChan <- config.SyncResult{
Name: repo.Name,
Exit: status,
}
}()
status, done = preRepoSync(repo)
if done {
return
}
status = runRepoSync(repo)
}

94
merlin/sync/post.go Normal file
View File

@ -0,0 +1,94 @@
package sync
import (
"io"
"os"
"os/exec"
"path/filepath"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
func postRepoSync(repo *config.Repo, exit int) {
if repo.DryRun {
repo.Logger.Debug("post sync not run because in dry run mode")
return
}
switch exit {
case config.SUCCESS:
go zfsSync(repo)
go postSyncTraceUpdate(repo)
return
case config.FAILURE:
go backupRsyncFailLog(repo)
return
}
}
func zfsSync(repo *config.Repo) {
// we are not using zfs snapshots at the moment
repo.Logger.Debug("Would run a zfssync if not disabled")
return
out, err := exec.Command("/bin/sh", "/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
if err != nil {
repo.Logger.Error(err)
} else {
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
}
}
}
// updates a trace file after the repo has finished syncing
func postSyncTraceUpdate(repo *config.Repo) {
switch repo.SyncType {
case "csc-sync-debian":
cscPostDebian(repo)
}
}
// update our trace file's modification date by writing the current time
func cscPostDebian(repo *config.Repo) {
targetDir := filepath.Join(buildDownloadDir(repo), "project/trace")
target := filepath.Join(targetDir, config.Conf.Hostname)
os.MkdirAll(targetDir, 0755)
f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
repo.Logger.Error("Unable to open trace file: " + target)
return
}
if _, err = f.WriteString(time.Now().UTC().Format(time.RFC1123)); err != nil {
repo.Logger.Error("Unable to write to trace file: " + target)
return
}
}
func backupRsyncFailLog(repo *config.Repo) {
src := repo.RsyncLogFile
dest := repo.RsyncLogFile + ".fail"
fin, err := os.Open(src)
if err != nil {
repo.Logger.Error(err.Error())
}
defer fin.Close()
fout, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
repo.Logger.Error(err.Error())
}
defer fout.Close()
if _, err = io.Copy(fout, fin); err != nil {
repo.Logger.Error(err.Error())
}
}

165
merlin/sync/pre.go Normal file
View File

@ -0,0 +1,165 @@
package sync
import (
"os"
"path/filepath"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
// jobs to complete before repo sync command should be run
// when done is false: status can be ignored and sync command should continue
// when done is true: sync should exit with status
func preRepoSync(repo *config.Repo) (status int, done bool) {
status = config.SUCCESS
done = false
// clear the rsync log file
if repo.RsyncLogFile != "" {
err := os.Truncate(repo.RsyncLogFile, 0)
if err != nil {
status = config.FAILURE
repo.Logger.Error("Error while trying to clear logfile: " + repo.RepoLogFile)
}
}
if repo.TraceHost != "" {
if status, done = checkIfRepoSyncNeeded(repo); done {
// run alternative jobs if a full sync will not be done
if exit := noRepoSyncNeeded(repo); exit != config.SUCCESS {
status = exit
}
return
}
}
return
}
// check if the repo should be synced by comparing their trace file with our own
// returns the exit status of the command used to retrieve the repo's trace file
// returns true for done if and only if no errors occur and it is confirmed that
// the trace files are the same or that a termination signal was given
// (csc-sync-debian will always sync so done in its case will always be false)
func checkIfRepoSyncNeeded(repo *config.Repo) (status int, done bool) {
status = config.FAILURE
done = false
if repo.DryRun {
repo.Logger.Debug("dry running so assuming that trace file was changed")
return config.SUCCESS, false
}
repo.Logger.Debug("Retrieving the repo's trace file")
temp, err := os.CreateTemp("/tmp", "merlin-"+repo.Name+"-trace-*")
if err != nil {
repo.Logger.Error(err.Error())
return
}
temp.Close()
defer os.Remove(temp.Name())
// get the trace command to copy the last update file
var cmd []string
switch repo.SyncType {
case "csc-sync-archlinux":
cmd = cscTraceArchLinux(repo, temp.Name())
case "csc-sync-debian":
cmd = cscTraceDebian(repo, temp.Name())
default:
repo.Logger.Error("Trace files are not implemented for sync type '" + repo.SyncType + "'")
return
}
status = spawnProcessAndWait(repo, cmd)
if status != config.SUCCESS {
if status == config.TERMINATED {
done = true
}
return
}
repo.Logger.Debug("Comparing the repo's trace file with our own")
// diff returns true if files are the same (if files are the same then sync is done)
switch repo.SyncType {
case "csc-sync-archlinux":
done = cscTraceArchLinuxDiff(repo, temp.Name())
case "csc-sync-debian":
done = cscTraceDebianDiff(repo, temp.Name())
}
if done {
repo.Logger.Info("trace file for " + repo.RsyncHost + " unchanged")
// debian syncs even when trace file is unchanged
if repo.SyncType == "csc-sync-debian" {
done = false
}
return
}
repo.Logger.Debug("trace file changes found; will attempt to sync")
return
}
// jobs to do when a full sync will not be run
func noRepoSyncNeeded(repo *config.Repo) (status int) {
status = config.SUCCESS
var args []string
switch repo.SyncType {
case "csc-sync-archlinux":
args = cscTraceArchLinuxUpdate(repo)
}
status = spawnProcessAndWait(repo, args)
return
}
func cscTraceArchLinux(repo *config.Repo, newTime string) []string {
args := []string{
"curl",
"--interface", config.Conf.IPv4Address,
"-s", repo.TraceHost,
"-o", newTime,
}
return args
}
func cscTraceDebian(repo *config.Repo, newTime string) []string {
args := []string{
"nice", "rsync", "-tv", "--quiet",
"-4", "--address=" + config.Conf.IPv4Address,
repo.RsyncHost + "::" + filepath.Join(repo.RsyncDir, "project/trace", repo.TraceHost),
newTime,
}
return args
}
func cscTraceArchLinuxDiff(repo *config.Repo, newTime string) bool {
return diffFileContent(repo, newTime, filepath.Join(buildDownloadDir(repo), "lastupdate"))
}
func cscTraceDebianDiff(repo *config.Repo, newTime string) bool {
return diffFileTime(repo, newTime, filepath.Join(buildDownloadDir(repo), "project/trace", repo.TraceHost))
}
func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) {
rsyncDir := repo.RsyncDir
localDir := repo.LocalDir
repo.RsyncDir = repo.RsyncDir + "/lastsync"
repo.LocalDir = repo.LocalDir + "/lastsync"
args = cscSyncArchLinux(repo)
repo.RsyncDir = rsyncDir
repo.LocalDir = localDir
return args
}

View File

@ -8,32 +8,69 @@ import (
"git.csclub.uwaterloo.ca/public/merlin/config"
)
func buildRsyncHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return "rsync://" + rsyncHost + "/" + repo.RsyncDir
func runRepoSync(repo *config.Repo) (status int) {
status = config.FAILURE
cmds := getSyncCommand(repo)
if len(cmds) == 0 {
repo.Logger.Error()
return
}
func buildRsyncDaemonHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
// run every step of the sync command as long as the previous step was successful
for i, args := range cmds {
repo.Logger.Debug(fmt.Sprintf("Running step %d of sync", i))
status = spawnProcessAndWait(repo, args)
if status != config.SUCCESS {
// spawnProcessAndWait will have already logged error
return
}
return rsyncHost + "::" + repo.RsyncDir
}
return
}
func buildRsyncSSHHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return rsyncHost + ":" + repo.RsyncDir
// executes a particular sync job depending on repo.SyncType.
func getSyncCommand(repo *config.Repo) (cmds [][]string) {
// check that the download directory exists
if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) {
repo.Logger.Error(err.Error())
return
}
func buildDownloadDir(repo *config.Repo) string {
return filepath.Join(config.Conf.DownloadDir, repo.LocalDir)
switch repo.SyncType {
case "csc-sync-apache":
return append(cmds, cscSyncApache(repo))
case "csc-sync-archlinux":
return append(cmds, cscSyncArchLinux(repo))
case "csc-sync-badperms":
return append(cmds, cscSyncBadPerms(repo))
case "csc-sync-cdimage":
return append(cmds, cscSyncCDImage(repo))
case "csc-sync-ceph":
return append(cmds, cscSyncCephStep1(repo), cscSyncCephStep2(repo))
case "csc-sync-chmod":
return append(cmds, cscSyncChmod(repo))
case "csc-sync-debian":
return append(cmds, cscSyncDebianStep1(repo), cscSyncDebianStep2(repo))
case "csc-sync-debian-cd":
return append(cmds, cscSyncDebianCD(repo))
case "csc-sync-gentoo":
return append(cmds, cscSyncGentoo(repo))
case "csc-sync-s3":
return append(cmds, cscSyncS3(repo))
case "csc-sync-ssh":
return append(cmds, cscSyncSSH(repo))
case "csc-sync-standard":
return append(cmds, cscSyncStandard(repo))
case "csc-sync-standard-ipv6":
return append(cmds, cscSyncStandardIPV6(repo))
case "csc-sync-wget":
return append(cmds, cscSyncWget(repo))
default:
repo.Logger.Error("Unrecognized sync type: " + repo.SyncType)
return
}
}
const (
@ -94,6 +131,34 @@ func addConditionalFlags(repo *config.Repo, flags int) []string {
return args
}
func buildRsyncHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return "rsync://" + rsyncHost + "/" + repo.RsyncDir
}
func buildRsyncDaemonHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return rsyncHost + "::" + repo.RsyncDir
}
func buildRsyncSSHHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return rsyncHost + ":" + repo.RsyncDir
}
func buildDownloadDir(repo *config.Repo) string {
return filepath.Join(config.Conf.DownloadDir, repo.LocalDir)
}
func cscSyncApache(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-az",
@ -149,7 +214,7 @@ func cscSyncCDImage(repo *config.Repo) []string {
return args
}
func cscSyncCeph(repo *config.Repo) []string {
func cscSyncCephStep1(repo *config.Repo) []string {
args := []string{
"rsync", "--stats", "--progress",
repo.RsyncHost + "::ceph",
@ -166,12 +231,16 @@ func cscSyncCeph(repo *config.Repo) []string {
args = append(args, addConditionalFlags(repo, quiet|ipv4)...)
args = append(args, buildDownloadDir(repo))
args = append(args, []string{
return args
}
func cscSyncCephStep2(repo *config.Repo) []string {
args := []string{
"&&", "rsync", "--stats", "--progress",
repo.RsyncHost + "::ceph",
"--recursive", "--times", "--links",
"--hard-links", "--delete-after",
}...)
}
args = append(args, addConditionalFlags(repo, quiet|ipv4)...)
args = append(args, buildDownloadDir(repo))
@ -191,24 +260,18 @@ func cscSyncChmod(repo *config.Repo) []string {
return args
}
func cscSyncDebian(repo *config.Repo) []string {
func cscSyncDebianStep1(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-rlHtvp",
}
args = append(args, addConditionalFlags(repo, baseFlags|excludeTmp|ipv4)...)
args = append(args, buildRsyncDaemonHost(repo)+"/pool/", buildDownloadDir(repo)+"/pool/")
ch := spawnProcess(repo, args)
if ch == nil {
// spawnSyncProcess will have already logged error
return nil
}
cmd := <-ch
if cmd.ProcessState.ExitCode() != 0 {
return nil
return args
}
args = []string{
func cscSyncDebianStep2(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-rlHtvp",
"--exclude", filepath.Join("project/trace", config.Conf.Hostname),
}
@ -320,46 +383,3 @@ func cscSyncWget(repo *config.Repo) []string {
return args
}
// executes a particular sync job depending on repo.SyncType.
func getSyncCommand(repo *config.Repo) (args []string) {
// check that the download directory exists
if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) {
repo.Logger.Error(err.Error())
return
}
switch repo.SyncType {
case "csc-sync-apache":
return cscSyncApache(repo)
case "csc-sync-archlinux":
return cscSyncArchLinux(repo)
case "csc-sync-badperms":
return cscSyncBadPerms(repo)
case "csc-sync-cdimage":
return cscSyncCDImage(repo)
case "csc-sync-ceph":
return cscSyncCeph(repo)
case "csc-sync-chmod":
return cscSyncChmod(repo)
case "csc-sync-debian":
return cscSyncDebian(repo)
case "csc-sync-debian-cd":
return cscSyncDebianCD(repo)
case "csc-sync-gentoo":
return cscSyncGentoo(repo)
case "csc-sync-s3":
return cscSyncS3(repo)
case "csc-sync-ssh":
return cscSyncSSH(repo)
case "csc-sync-standard":
return cscSyncStandard(repo)
case "csc-sync-standard-ipv6":
return cscSyncStandardIPV6(repo)
case "csc-sync-wget":
return cscSyncWget(repo)
default:
repo.Logger.Error("Unrecognized sync type: " + repo.SyncType)
return
}
}

View File

@ -1,88 +0,0 @@
package sync
import (
"path/filepath"
"reflect"
"testing"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
)
func dummyRepoConf(name string, syncType string, frequencyStr string, localDir string, rsyncHost string, rsyncDir string) *config.Repo {
doneChan := make(chan config.SyncResult)
stopChan := make(chan struct{})
repoLogFile := filepath.Join("test_files", name, name+".log")
logger := logger.NewLogger(name, repoLogFile, false)
return &config.Repo{
Name: name,
SyncType: syncType,
FrequencyStr: frequencyStr,
Frequency: 0,
MaxTime: config.DEFAULT_MAX_TIME,
LocalDir: localDir,
RsyncHost: rsyncHost,
RsyncDir: rsyncDir,
RsyncUser: "",
PasswordFile: "",
StateFile: "",
RepoLogFile: repoLogFile,
Logger: logger,
RsyncLogFile: "/tmp/log/" + name + "-rsync.log",
ZfssyncLogFile: "",
DoneChan: doneChan,
StopChan: stopChan,
State: &config.RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: config.NOT_RUN_YET,
},
}
}
func TestGetSyncCommand(t *testing.T) {
config.Conf.DownloadDir = "test_files"
config.Conf.IPv4Address = "0.0.0.0"
config.Conf.DownloadDir = "/tmp/mirror"
testData := []struct {
repoConf *config.Repo
expected []string
}{
{
repoConf: dummyRepoConf("ubuntu-releases", "csc-sync-standard", "bi-hourly", "ubuntu-releases", "rsync.releases.ubuntu.com", "releases"),
expected: []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=0.0.0.0",
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=/tmp/log/ubuntu-releases-rsync.log",
"rsync://rsync.releases.ubuntu.com/releases", "/tmp/mirror/ubuntu-releases",
},
},
{
repoConf: dummyRepoConf("raspberrypi", "csc-sync-standard-ipv6", "bi-hourly", "raspberrypi", "apt-repo.raspberrypi.org", "archive"),
expected: []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=0.0.0.0",
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=/tmp/log/raspberrypi-rsync.log",
"apt-repo.raspberrypi.org::archive", "/tmp/mirror/raspberrypi",
},
},
}
for _, test := range testData {
syncCommand := getSyncCommand(test.repoConf)
// check for correct command output
if !reflect.DeepEqual(syncCommand, test.expected) {
t.Errorf("Invalid command string for %s repo\nRECIEVED:\n%+v\nEXPECTED:\n%+v\n", test.repoConf.Name, syncCommand, test.expected)
}
// check if download dir was created
}
}

View File

@ -1,203 +0,0 @@
package sync
import (
"os"
"path/filepath"
"strings"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
func cscTraceArchLinux(repo *config.Repo, newTime string) []string {
args := []string{
"curl",
"--interface", config.Conf.IPv4Address,
"-s", repo.TraceHost,
"-o", newTime,
}
return args
}
// return false iff both files exist, are non-empty, and have the same contents
func cscTraceArchLinuxDiff(repo *config.Repo, newTime string) bool {
readFile := func(file string) string {
f, err := os.ReadFile(file)
if err != nil {
repo.Logger.Debug(err.Error())
return ""
}
return strings.TrimSpace(string(f))
}
file1 := readFile(newTime)
file2 := readFile(filepath.Join(buildDownloadDir(repo), "lastupdate"))
if file1 == "" || file2 == "" {
return true
}
return file1 != file2
}
// update the lastsync file
func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) {
rsyncDir := repo.RsyncDir
localDir := repo.LocalDir
repo.RsyncDir = repo.RsyncDir + "/lastsync"
repo.LocalDir = repo.LocalDir + "/lastsync"
args = cscSyncArchLinux(repo)
repo.RsyncDir = rsyncDir
repo.LocalDir = localDir
return args
}
func cscTraceDebian(repo *config.Repo, newTime string) []string {
args := []string{
"nice", "rsync", "-tv", "-4",
"--address=" + config.Conf.IPv4Address,
"--quiet",
repo.RsyncHost + "::" + filepath.Join(repo.RsyncDir, "project/trace", repo.TraceHost),
newTime,
}
return args
}
// return false iff both files exist and were modified at the same time
func cscTraceDebianDiff(repo *config.Repo, newTime string) bool {
statFile := func(file string) int64 {
f, err := os.Stat(file)
if err != nil {
repo.Logger.Debug("Error while trying to stat file: " + file)
return 0
}
return f.ModTime().Unix()
}
file1 := statFile(newTime)
file2 := statFile(filepath.Join(buildDownloadDir(repo), "project/trace", repo.TraceHost))
if file1 == 0 || file2 == 0 {
return true
}
return file1 != file2
}
// update our trace file's modification date by writing the current time
func cscTraceDebianUpdate(repo *config.Repo) {
target := filepath.Join(buildDownloadDir(repo), "project/trace", config.Conf.Hostname)
os.MkdirAll(filepath.Join(buildDownloadDir(repo), "project/trace"), 0755)
f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
repo.Logger.Error("Unable to open trace file: " + target)
return
}
if _, err = f.WriteString(time.Now().UTC().Format(time.RFC1123)); err != nil {
repo.Logger.Error("Unable to write to trace file: " + target)
return
}
}
// some repos want us to check a last modification time file before performing a full sync
// returns a status to update the original sync's status along with if the sync should continue
func checkIfSyncNeeded(repo *config.Repo) (status int, continueSync bool) {
// default return values are the default status and to continue the sync
status = config.FAILURE
continueSync = true
if repo.DryRun {
return
}
runCommand := func(args []string) int {
ch := spawnProcess(repo, args)
if ch == nil {
// spawnSyncProcess will have already logged error
return 1
}
cmd := <-ch
return cmd.ProcessState.ExitCode()
}
// create a temp file
temp, err := os.CreateTemp("/tmp", "merlin-"+repo.Name+"-trace-*")
if err != nil {
repo.Logger.Error(err.Error())
return
}
temp.Close()
defer os.Remove(temp.Name())
// get the trace command to copy a last updated file
var args []string
switch repo.SyncType {
case "csc-sync-archlinux":
args = cscTraceArchLinux(repo, temp.Name())
case "csc-sync-debian":
args = cscTraceDebian(repo, temp.Name())
default:
repo.Logger.Error("Trace files are not implemented for sync type '" + repo.SyncType + "'")
return
}
exit := runCommand(args)
if exit != 0 {
// if the process was terminated then don't continue to sync
if exit == -1 {
return config.TERMINATED, false
}
return
}
filesDiffer := true
switch repo.SyncType {
case "csc-sync-archlinux":
filesDiffer = cscTraceArchLinuxDiff(repo, temp.Name())
case "csc-sync-debian":
filesDiffer = cscTraceDebianDiff(repo, temp.Name())
}
// debian still syncs even if the trace file is the same
if !filesDiffer && repo.SyncType == "csc-sync-debian" {
repo.Logger.Error("trace file for " + repo.RsyncHost + " unchanged")
return
}
if filesDiffer {
// continue sync if files differ
return
}
repo.Logger.Debug("No changes found; full sync will not be made")
// archlinux wants to sync a "lastsync" file if lastupdate prevents sync
switch repo.SyncType {
case "csc-sync-archlinux":
args = cscTraceArchLinuxUpdate(repo)
default:
return config.SUCCESS, false
}
exit = runCommand(args)
if exit != 0 {
// if the process was terminated then don't continue to sync
if exit == -1 {
return config.TERMINATED, false
}
return
}
return config.SUCCESS, false
}
// csc-sync-debian wants to update a trace file after the repo is done syncing
func postSyncTraceUpdate(repo *config.Repo) {
if repo.SyncType == "csc-sync-debian" {
cscTraceDebianUpdate(repo)
}
}

View File

@ -16,11 +16,17 @@ import (
// It returns a channel through which a Cmd will be sent once it has finished,
// or nil if it was unable to start a process.
func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
if len(args) == 0 {
repo.Logger.Error("command given is of zero length")
return
}
repo.Logger.Debug(fmt.Sprintf("Running the command: %v", args))
if repo.DryRun {
repo.Logger.Debug("Dry running for 50 seconds")
args = []string{"sleep", "50"}
}
cmd := exec.Command(args[0], args[1:]...)
repo.Logger.Debug("Starting process")
@ -78,44 +84,13 @@ func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
return
}
func startRepoSync(repo *config.Repo) {
status := config.FAILURE
defer func() {
repo.DoneChan <- config.SyncResult{
Name: repo.Name,
Exit: status,
}
}()
if repo.TraceHost != "" {
repo.Logger.Debug("Checking for changes")
continueSync := true
status, continueSync = checkIfSyncNeeded(repo)
if !continueSync || status == config.TERMINATED {
return
}
repo.Logger.Debug("Changes found; will attempt to sync")
}
// clear the rsync log file before starting the sync
if repo.RsyncLogFile != "" {
err := os.Truncate(repo.RsyncLogFile, 0)
if err != nil {
repo.Logger.Error(err.Error())
}
}
args := getSyncCommand(repo)
if len(args) == 0 {
repo.Logger.Error("zero length command given for sync")
return
}
// spawns a process and waits for it complete before parsing the exit code and returning it
func spawnProcessAndWait(repo *config.Repo, args []string) (status int) {
status = config.FAILURE
ch := spawnProcess(repo, args)
if ch == nil {
// spawnSyncProcess will have already logged error
// spawnProcess will have already logged error
return
}
cmd := <-ch
@ -127,22 +102,46 @@ func startRepoSync(repo *config.Repo) {
status = config.TERMINATED
// default is already FAILURE
}
}
func zfsSync(repo *config.Repo) {
// we are not using zfs snapshots at the moment
repo.Logger.Debug("Would run a zfssync if not disabled")
return
}
out, err := exec.Command("/bin/sh", "/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
// returns true iff file contents are the same (for diff file contents, errors, empty return false)
func diffFileContent(repo *config.Repo, file1, file2 string) bool {
readFile := func(file string) string {
f, err := os.ReadFile(file)
if err != nil {
repo.Logger.Error(err)
} else {
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
repo.Logger.Debug("Error while trying to read file: " + file)
return ""
}
return string(f)
}
content1 := readFile(file1)
content2 := readFile(file2)
if content1 == "" || content2 == "" {
return false
}
return content1 == content2
}
// returns true iff file times are the same (for diff file times and errors return false)
func diffFileTime(repo *config.Repo, file1, file2 string) bool {
statFile := func(file string) int64 {
f, err := os.Stat(file)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
repo.Logger.Debug("Error while trying to stat file: " + file)
return 0
}
return f.ModTime().Unix()
}
time1 := statFile(file1)
time2 := statFile(file2)
if time1 == 0 || time2 == 0 {
return false
}
return file1 == file2
}