From f06600c228311827879271fbf4c3acec4708786e Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Sat, 4 Jun 2022 18:45:10 -0400 Subject: [PATCH] add trace for debian and archlinux --- merlin/config/config.go | 24 +- merlin/config/config_test.go | 4 +- merlin/config/config_test.ini | 2 +- merlin/go.sum | 2 + merlin/merlin-config-all.ini | 28 +- merlin/sync/command.go | 279 ----------- merlin/sync/interface.go | 2 +- merlin/sync/process.go | 139 ++++++ merlin/sync/sync.go | 459 +++++++++++++----- merlin/sync/{command_test.go => sync_test.go} | 0 merlin/sync/trace.go | 174 +++++++ 11 files changed, 704 insertions(+), 409 deletions(-) delete mode 100644 merlin/sync/command.go create mode 100644 merlin/sync/process.go rename merlin/sync/{command_test.go => sync_test.go} (100%) create mode 100644 merlin/sync/trace.go diff --git a/merlin/config/config.go b/merlin/config/config.go index 187cced..7c5f531 100644 --- a/merlin/config/config.go +++ b/merlin/config/config.go @@ -23,11 +23,12 @@ const ( // could change this into a default_config DEFAULT_MAX_JOBS = 6 DEFAULT_MAX_TIME = DAILY / 4 - DEFAULT_MAX_RSYNC_IO = -1 + DEFAULT_MAX_RSYNC_IO = 0 DEFAULT_SYNC_TYPE = "csc-sync-standard" DEFAULT_FREQUENCY_STRING = "by-hourly" DEFAULT_DOWNLOAD_DIR = "/mirror/root" - DEFAULT_STATE_DIR = "/home/mirror/merlin/states" + DEFAULT_TRACE_DIR = "/home/mirror/merlin/trace" + DEFAULT_STATE_DIR = "/home/mirror/merlin/state" DEFAULT_LOG_DIR = "/home/mirror/merlin/logs" DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync" DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync" @@ -75,8 +76,10 @@ type Config struct { DefaultFrequencyStr string `ini:"default_frequency"` // directory where rsync should download files DownloadDir string `ini:"download_dir"` + // directory where the trace of each repo is saved + TraceDir string `ini:"trace_dir"` // directory where the state of each repo is saved - StateDir string `ini:"states_dir"` + StateDir string `ini:"state_dir"` // directory where merlin will store the merlin logs for each repo RepoLogDir string `ini:"repo_logs_dir"` // directory to store the rsync logs for each repo @@ -100,13 +103,15 @@ type Repo struct { // can for before being killed MaxTime int `ini:"max_time"` // limit the amount of bandwidth a repo can use while syncing - // (set to -1 for unlimited) + // (set to 0 to disable the limit) (unit is KiB) MaxRsyncIO int `ini:"max_rsync_io"` // where to download the files for this repo (relative to Conf.DownloadDir) LocalDir string `ini:"local_dir"` + // the address to the trace file (how this url will be used depends on SyncType) + TraceUrl string `ini:"trace_url"` // the address to the remote host to rsync from RsyncHost string `ini:"rsync_host"` - // the remote directory on the rsync host + // the remote directory on the rsync host (optional) RsyncDir string `ini:"rsync_dir"` // the rsync user (optional) RsyncUser string `ini:"rsync_user"` @@ -161,6 +166,7 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc DefaultSyncType: DEFAULT_SYNC_TYPE, DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING, DownloadDir: DEFAULT_DOWNLOAD_DIR, + TraceDir: DEFAULT_TRACE_DIR, StateDir: DEFAULT_STATE_DIR, RepoLogDir: DEFAULT_LOG_DIR, RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, @@ -182,7 +188,13 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc } else if _, err := os.Stat(newConf.DownloadDir); os.IsNotExist(err) { panic(fmt.Errorf("the path %s does not exist", newConf.DownloadDir)) } - for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} { + for _, dir := range []string{ + newConf.TraceDir, + newConf.StateDir, + newConf.RepoLogDir, + newConf.RsyncLogDir, + newConf.ZfssyncLogDir, + } { err := os.MkdirAll(dir, 0755) panicIfErr(err) } diff --git a/merlin/config/config_test.go b/merlin/config/config_test.go index 8da5437..4259a98 100644 --- a/merlin/config/config_test.go +++ b/merlin/config/config_test.go @@ -43,7 +43,7 @@ func TestLoadConfig(t *testing.T) { DefaultSyncType: "csc-sync-standard", DefaultFrequencyStr: "daily", DefaultMaxTime: 1000, - DefaultMaxRsyncIO: -1, + DefaultMaxRsyncIO: 0, DownloadDir: "test_files/download", StateDir: "test_files", RepoLogDir: "test_files/logs", @@ -81,7 +81,7 @@ func TestLoadConfig(t *testing.T) { FrequencyStr: "daily", Frequency: 86400, MaxTime: 1000, - MaxRsyncIO: -1, + MaxRsyncIO: 0, LocalDir: "yoland-releases", RsyncHost: "rsync.releases.yoland.io", RsyncDir: "releases", diff --git a/merlin/config/config_test.ini b/merlin/config/config_test.ini index 53ee2e3..721f7dc 100644 --- a/merlin/config/config_test.ini +++ b/merlin/config/config_test.ini @@ -7,7 +7,7 @@ ipv6_address = 2620:101:f000:4901:c5c::129 default_frequency = daily default_max_time = 1000 download_dir = test_files/download -states_dir = test_files +state_dir = test_files repo_logs_dir = test_files/logs rsync_logs_dir = test_files/rsync zfssync_logs_dir = test_files/zfssync diff --git a/merlin/go.sum b/merlin/go.sum index 56e171d..3552215 100644 --- a/merlin/go.sum +++ b/merlin/go.sum @@ -1,3 +1,5 @@ +github.com/andelf/go-curl v0.0.0-20200630032108-fd49ff24ed97 h1:Nyfs+rh56aORy2tGMI9GCYEqTfePwL1v47qOzebfv/o= +github.com/andelf/go-curl v0.0.0-20200630032108-fd49ff24ed97/go.mod h1:WO1d2m1QDzkoPcgn9lgHVMi7qQR5j3jxYjIIvMTHpC0= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/merlin/merlin-config-all.ini b/merlin/merlin-config-all.ini index a9d88d4..7e43b60 100644 --- a/merlin/merlin-config-all.ini +++ b/merlin/merlin-config-all.ini @@ -3,7 +3,7 @@ max_jobs = 6 sock_path = /run/merlin/merlin.sock states_path = /home/test-mirror/merlin/states -# log_path? (just spes a dir and make files) +; log_path? (just spes a dir and make files) log_file = /tmp/test-mirror/test.log ipv4_address = 129.97.134.129 @@ -12,9 +12,9 @@ download_dir = /tmp/test-mirror ; 5 hours ; default_max_time = 18000 -# add default max_time -# add default frequency -# add default sync_type +; add default max_time +; add default frequency +; add default sync_type ; [DEFAULT] ; @@ -125,10 +125,11 @@ rsync_dir = CPAN sync_type = csc-sync-ssh frequency = twice-daily local_dir = CRAN -rsync_host = cran.r-project.org -rsync_dir = "" -; rsync_user = cran-rsync ~/.ssh/id_cran_rsa -; password_file = ~/.ssh/id_cran_rsa +rsync_host = cran.r-project.org +; address is $RSYNC_HOST:$RSYNC_DIR/ (and $RSYNC_DIR="") but rsync_dir should not be empty +rsync_dir = / +rsync_user = cran-rsync +password_file = ~/.ssh/id_cran_rsa [CTAN] sync_type = csc-sync-standard @@ -260,11 +261,15 @@ rsync_dir = applicationdata ; We are a Tier 1 arch mirror (https://bugs.archlinux.org/task/52853) ; so our IP is important. +; our rsync_host is rsync://rsync.archlinux.org/ftp_tier1 [archlinux] ; csc-sync-standard archlinux archlinux.mirror.rafal.ca archlinux sync_type = csc-sync-archlinux frequency = five-minutely local_dir = archlinux +rsync_host = rsync.archlinux.org +rsync_dir = ftp_tier1 +trace_host = http://rsync.archlinux.org/lastupdate [debian-ports] sync_type = csc-sync-standard @@ -358,13 +363,12 @@ local_dir = gutenberg rsync_host = ftp@ftp.ibiblio.org rsync_dir = gutenberg -; I donno what csc-sync-wget is doing [racket-installers] sync_type = csc-sync-wget frequency = twice-daily local_dir = racket/racket-installers -; path = https://mirror.racket-lang.org/installers/ -; cut = 1 +rsync_host = https://mirror.racket-lang.org/installers/ +; for csc-sync-wget the --cut-dirs=1 is hardcoded [plt-bundles] sync_type = csc-sync-standard @@ -462,7 +466,7 @@ rsync_dir = tdf-pub sync_type = csc-sync-s3 frequency = daily local_dir = saltstack -; endpoint = https://s3.repo.saltproject.io +rsync_host = https://s3.repo.saltproject.io ; [kali] ; csc-sync-standard kali kali.mirror.globo.tech kali diff --git a/merlin/sync/command.go b/merlin/sync/command.go deleted file mode 100644 index 2717309..0000000 --- a/merlin/sync/command.go +++ /dev/null @@ -1,279 +0,0 @@ -package sync - -import ( - "fmt" - "os" - "path/filepath" - - "git.csclub.uwaterloo.ca/public/merlin/config" -) - -func buildRsyncHost(repo *config.Repo) string { - if repo.RsyncUser != "" { - repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost - } - return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir -} - -func buildRsyncDaemonHost(repo *config.Repo) string { - if repo.RsyncUser != "" { - repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost - } - return repo.RsyncHost + "::" + repo.RsyncDir -} - -func buildRsyncSSHHost(repo *config.Repo) string { - if repo.RsyncUser != "" { - repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost - } - return repo.RsyncHost + ":" + repo.RsyncDir -} - -func buildDownloadDir(repo *config.Repo) string { - return filepath.Join(config.Conf.DownloadDir, repo.LocalDir) -} - -func cscSyncApache(repo *config.Repo) []string { - args := []string{ - "nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links", - "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, - "--exclude", ".~tmp~/", - "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, - } - args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncArchLinux(repo *config.Repo) []string { - - tempDir := "" // is this option even needed? - - // add option for verbose flag? - args := []string{ - "rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600", - "--contimeout=60", "-p", "--delay-updates", "--no-motd", - "--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address, - } - args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) - - return args - -} - -func cscSyncBadPerms(repo *config.Repo) []string { - args := []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete", - "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, - "--exclude", ".~tmp~/", - "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, - } - args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncCDImage(repo *config.Repo) []string { - args := []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete", - "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, - "--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, - } - args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncCeph(repo *config.Repo) []string { - - args := []string{ - "rsync", "--stats", "--progress", "--quiet", "-4", "--address=" + config.Conf.IPv4Address, - repo.RsyncHost + "::ceph", - "--recursive", "--times", "--links", "--hard-links", - "--exclude", "Packages*", - "--exclude", "Sources*", - "--exclude", "Release*", - "--exclude", "InRelease", - "--exclude", "i18n/*", - "--exclude", "ls-lR*", - "--exclude", "repodata/*", - } - args = append(args, buildDownloadDir(repo)) - - args = append(args, []string{ - "&&", "rsync", "--stats", "--progress", "--quiet", "-4", "--address=" + config.Conf.IPv4Address, - repo.RsyncHost + "::ceph", - "--recursive", "--times", "--links", "--hard-links", "--delete-after", - }...) - args = append(args, buildDownloadDir(repo)) - - return args - -} - -func cscSyncChmod(repo *config.Repo) []string { - args := []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links", - "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, - "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, - "--chmod=u=rwX,go=rX", - } - args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncDebian(repo *config.Repo) []string { - - // sync /pool - args := []string{"nice", "rsync", "-rlHtvp", - "--exclude", ".~tmp~/", "--timeout=3600", "-4", - "--address=" + config.Conf.IPv4Address, - } - args = append(args, buildRsyncDaemonHost(repo) + "/pool/", buildDownloadDir(repo) + "/pool/") - - args = append(args, []string{"&&", "nice", "rsync", "-rlHtvp", - "--delay-updates", "--delete-after", - // --exclude "Archive-Update-in-Progress-${HOSTNAME}" \ - // --exclude "${TRACE_DIR}/${HOSTNAME}" \ - "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, - }...) - args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncDebianCD(repo *config.Repo) []string { - - // this is basically the same as CSCSyncDebian, except it has an extra --exclude - args := []string{"nice", "rsync", "-rlHtvp", "--delete", - "--exclude", ".~tmp~/", "--timeout=3600", "-4", - "--address=" + config.Conf.IPv4Address, - // "--exclude", "Archive-Update-in-Progress-${HOSTNAME}" - } - args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) - return args -} - -func cscSyncGentoo(repo *config.Repo) []string { - repo.RsyncUser = "gentoo" - repo.PasswordFile = "gentoo-distfiles" - return cscSyncStandard(repo) -} - -func cscSyncS3(repo *config.Repo) []string { - - // set these env vars before using - // export RCLONE_CONFIG_S3_ENDPOINT=https://s3.repo.saltproject.io RCLONE_CONFIG_S3_TYPE=s3 RCLONE_CONFIG_S3_PROVIDER=Other RCLONE_CONFIG_S3_ENV_AUTH=false - - args := []string{ - "nice", "rclone", "sync", "--fast-list", "--use-server-modtime", - "--bind", config.Conf.IPv4Address, - "s3:s3/", - } - args = append(args, buildDownloadDir(repo)) - - return args -} - -func cscSyncSSH(repo *config.Repo) []string { - - args := []string{ - "rsync", "-aH", "--no-owner", "--no-group", "--delete", - "--timeout=3600", "-4", - "--exclude", ".~tmp~/", - "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, - } - // not sure if we should be assuming ssh identity file is the password file - args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile)) - args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncStandard(repo *config.Repo) []string { - args := []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, - "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, - } - if repo.PasswordFile != "" { - args = append(args, "--password-file", repo.PasswordFile) - } - args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncStandardIPV6(repo *config.Repo) []string { - args := []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address, - "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, - } - args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) - - return args -} - -func cscSyncDummy(repo *config.Repo) []string { - args := []string{"sleep", "1"} - - return args -} - -// executes a particular sync job depending on repo.SyncType. -func getSyncCommand(repo *config.Repo) (args []string) { - if repo.SyncType == "csc-sync-dummy" { - return cscSyncDummy(repo) - } - - // check that the download directory exists - if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) { - repo.Logger.Error(err.Error()) - return - } - - switch repo.SyncType { - case "csc-sync-apache": - args = cscSyncApache(repo) - case "csc-sync-archlinux": - args = cscSyncArchLinux(repo) - case "csc-sync-badperms": - args = cscSyncBadPerms(repo) - case "csc-sync-cdimage": - args = cscSyncCDImage(repo) - case "csc-sync-ceph": - args = cscSyncCeph(repo) - case "csc-sync-chmod": - args = cscSyncChmod(repo) - case "csc-sync-debian": - args = cscSyncDebian(repo) - case "csc-sync-debian-cd": - args = cscSyncDebianCD(repo) - case "csc-sync-gentoo": - args = cscSyncGentoo(repo) - case "csc-sync-s3": - args = cscSyncS3(repo) - case "csc-sync-ssh": - args = cscSyncSSH(repo) - case "csc-sync-standard": - args = cscSyncStandard(repo) - case "csc-sync-standard-ipv6": - args = cscSyncStandardIPV6(repo) - // case "csc-sync-wget": - // return cscSyncWget(repo) - default: - repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") - return - } - - if repo.MaxRsyncIO >= 0 { - args = append(args, fmt.Sprintf("--bwlimit=%d", repo.MaxRsyncIO)) - } - // TODO: remove buildDownloadDir and check what cscSyncDebian and co are about - // (the ones that do not append buildDownloadDir at the end) - args = append(args, buildDownloadDir(repo)) - - return -} diff --git a/merlin/sync/interface.go b/merlin/sync/interface.go index ab2fb7e..c0ee5d9 100644 --- a/merlin/sync/interface.go +++ b/merlin/sync/interface.go @@ -22,7 +22,7 @@ func SyncIfPossible(repo *config.Repo) bool { repo.State.LastAttemptStartTime = curTime repo.SaveState() repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name)) - go startSync(repo) + go startRepoSync(repo) return true } return false diff --git a/merlin/sync/process.go b/merlin/sync/process.go new file mode 100644 index 0000000..550e7f8 --- /dev/null +++ b/merlin/sync/process.go @@ -0,0 +1,139 @@ +package sync + +import ( + "fmt" + "os" + "os/exec" + "syscall" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" +) + +// SpawnProcess spawns a child process for the given repo. The process will +// be stopped early if the repo receives a stop signal, or if the process +// runs for longer than the repo's MaxTime. +// It returns a channel through which a Cmd will be sent once it has finished, +// or nil if it was unable to start a process. +func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { + // startTime and time took will be handled in common.go by SyncExit + cmd := exec.Command(args[0], args[1:]...) + + repo.Logger.Debug("Starting process") + if err := cmd.Start(); err != nil { + repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err).Error()) + return + } + + cmdChan := make(chan *exec.Cmd) + ch = cmdChan + cmdDoneChan := make(chan struct{}) + killProcess := func() { + err := cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + repo.Logger.Error("Could not send signal to process:", err) + return + } + + select { + case <-time.After(30 * time.Second): + repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL") + cmd.Process.Signal(syscall.SIGKILL) + + case <-cmdDoneChan: + repo.Logger.Debug("Process has been stopped.") + } + } + + go func() { + cmd.Wait() + close(cmdDoneChan) + }() + + go func() { + defer func() { + cmdChan <- cmd + }() + select { + case <-cmdDoneChan: + if !cmd.ProcessState.Success() { + repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) + } + + case <-repo.StopChan: + repo.Logger.Debug("Received signal to stop, killing process...") + killProcess() + + case <-time.After(time.Duration(repo.MaxTime) * time.Second): + repo.Logger.Warning("Process has exceeded its max time; killing now") + killProcess() + } + }() + return +} + +func startRepoSync(repo *config.Repo) { + status := config.FAILURE + defer func() { + repo.DoneChan <- config.SyncResult{ + Name: repo.Name, + Exit: status, + } + }() + + if repo.TraceUrl != "" { + repo.Logger.Debug("Checking for changes") + + continueSync := true + status, continueSync = checkIfSyncNeeded(repo) + if !continueSync { + return + } + + repo.Logger.Debug("Changes found; will attempt to sync") + } + + args := getSyncCommand(repo) + if len(args) == 0 { + // getSyncCommand will have already logged error + return + } + + // clear the rsync log file before starting the sync + if repo.RsyncLogFile != "" { + err := os.Truncate(repo.RsyncLogFile, 0) + if err != nil { + repo.Logger.Error(err.Error()) + } + } + + ch := spawnProcess(repo, args) + if ch == nil { + // spawnSyncProcess will have already logged error + return + } + cmd := <-ch + + switch cmd.ProcessState.ExitCode() { + case 0: + status = config.SUCCESS + case -1: + status = config.TERMINATED + // default is already FAILURE + } +} + +// we are not using zfs snapshots at the moment +func zfsSync(repo *config.Repo) { + // out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() + // if err != nil { + // repo.Logger.Error(err) + // } else { + // f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + // if err != nil { + // repo.Logger.Error(err.Error()) + // } else { + // f.Write(out) + // } + // } +} diff --git a/merlin/sync/sync.go b/merlin/sync/sync.go index 1c2d4dc..e3aa3eb 100644 --- a/merlin/sync/sync.go +++ b/merlin/sync/sync.go @@ -3,124 +3,367 @@ package sync import ( "fmt" "os" - "os/exec" - "syscall" - "time" + "path/filepath" "git.csclub.uwaterloo.ca/public/merlin/config" ) -// SpawnProcess spawns a child process for the given repo. The process will -// be stopped early if the repo receives a stop signal, or if the process -// runs for longer than the repo's MaxTime. -// It returns a channel through which a Cmd will be sent once it has finished, -// or nil if it was unable to start a process. -func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { - // clear the rsync log file before starting the sync - if repo.RsyncLogFile != "" { - err := os.Truncate(repo.RsyncLogFile, 0) - if err != nil { - repo.Logger.Error(err.Error()) - } +func buildRsyncHost(repo *config.Repo) string { + rsyncHost := repo.RsyncHost + if repo.RsyncUser != "" { + rsyncHost = repo.RsyncUser + "@" + rsyncHost + } + return "rsync://" + rsyncHost + "/" + repo.RsyncDir +} + +func buildRsyncDaemonHost(repo *config.Repo) string { + rsyncHost := repo.RsyncHost + if repo.RsyncUser != "" { + rsyncHost = repo.RsyncUser + "@" + rsyncHost + } + return rsyncHost + "::" + repo.RsyncDir +} + +func buildRsyncSSHHost(repo *config.Repo) string { + rsyncHost := repo.RsyncHost + if repo.RsyncUser != "" { + rsyncHost = repo.RsyncUser + "@" + rsyncHost + } + return rsyncHost + ":" + repo.RsyncDir +} + +func buildDownloadDir(repo *config.Repo) string { + return filepath.Join(config.Conf.DownloadDir, repo.LocalDir) +} + +const ( + noOwnerNoGroup = 1 << iota + timeout3600 + logFile + quiet + ipv4 + ipv6 + delete + delayUpdatesDeleteAfter +) + +func addConditionalFlags(repo *config.Repo, flags int) []string { + args := []string{} + if flags&noOwnerNoGroup != 0 { + args = append(args, "--no-owner", "--no-group") + } + if flags&timeout3600 != 0 { + args = append(args, "--timeout=3600") + } + if flags&logFile != 0 { + args = append(args, "--log-file="+repo.RsyncLogFile) + } + if flags&quiet != 0 { + args = append(args, "--quiet") } - // startTime and time took will be handled in common.go by SyncExit - cmd := exec.Command(args[0], args[1:]...) + if flags&ipv4 != 0 { + args = append(args, "-4", "--address="+config.Conf.IPv4Address) + } else if flags&ipv6 != 0 { + args = append(args, "-6", "--address="+config.Conf.IPv6Address) + } - repo.Logger.Debug("Starting process") - if err := cmd.Start(); err != nil { - repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err).Error()) + if flags&delete != 0 { + args = append(args, "--delete") + } + if flags&delayUpdatesDeleteAfter != 0 { + args = append(args, "--delay-updates", "--delete-after") + } + + // add bwlimit (bandwidth limit) only when MaxRsyncIO is set to positive value + if repo.MaxRsyncIO > 0 { + args = append(args, fmt.Sprintf("--bwlimit=%d", repo.MaxRsyncIO)) + } + + return args +} + +func cscSyncApache(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-az", + "--delete", + "--safe-links", + "--stats", + "--exclude", ".~tmp~/", + } + args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4)...) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncArchLinux(repo *config.Repo) []string { + tempDir := "/home/mirror/tmp" + + args := []string{ + "rsync", "-rtlHp", + "--safe-links", + "--timeout=600", + "--contimeout=60", + "--no-motd", + "--temp-dir=" + tempDir, + "--address=" + config.Conf.IPv4Address, + } + args = append(args, addConditionalFlags(repo, logFile|delayUpdatesDeleteAfter)...) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncBadPerms(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-aH", + "--chmod=o=rX", + "--exclude", ".~tmp~/", + "--stats", + } + args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delete)...) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncCDImage(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-aH", + "--exclude", "\".*/\"", + "--stats", + } + args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delete)...) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncCeph(repo *config.Repo) []string { + args := []string{ + "rsync", "--stats", "--progress", + repo.RsyncHost + "::ceph", + "--recursive", "--times", "--links", + "--hard-links", + "--exclude", "Packages*", + "--exclude", "Sources*", + "--exclude", "Release*", + "--exclude", "InRelease", + "--exclude", "i18n/*", + "--exclude", "ls-lR*", + "--exclude", "repodata/*", + } + args = append(args, addConditionalFlags(repo, quiet|ipv4)...) + args = append(args, buildDownloadDir(repo)) + + args = append(args, []string{ + "&&", "rsync", "--stats", "--progress", + repo.RsyncHost + "::ceph", + "--recursive", "--times", "--links", + "--hard-links", "--delete-after", + }...) + args = append(args, addConditionalFlags(repo, quiet|ipv4)...) + args = append(args, buildDownloadDir(repo)) + + return args +} + +func cscSyncChmod(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-aH", + "--safe-links", + "--exclude", ".~tmp~/", + "--stats", + "--chmod=u=rwX,go=rX", + } + args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delayUpdatesDeleteAfter)...) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncDebian(repo *config.Repo) []string { + // some repos specify a trace file but does not exit script even if trace files are the same + // https://git.csclub.uwaterloo.ca/public/mirror/src/branch/go/bin/csc-sync-debian#L205-L218 + + // ubuntu repos are trying to get trace/drescher.canonical.com but this file does not exist + // http://archive.ubuntu.com/ubuntu/project/trace/ + // if are adding a trace check ensure it can handle when the file does not exist on the remote + + args := []string{ + "nice", "rsync", "-rlHtvp", + "--exclude", ".~tmp~/", + } + args = append(args, addConditionalFlags(repo, timeout3600|logFile|quiet|ipv4)...) + args = append(args, buildRsyncDaemonHost(repo)+"/pool/", buildDownloadDir(repo)+"/pool/") + + args = append(args, []string{ + "&&", "nice", "rsync", "-rlHtvp", + "--exclude", ".~tmp~/", + }...) + args = append(args, addConditionalFlags(repo, timeout3600|logFile|quiet|ipv4|delayUpdatesDeleteAfter)...) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) + + // TODO: run 'LANG=C date -u > "${TO}/${TRACE_DIR}/${HOSTNAME}"' after the sync is done + + return args +} + +func cscSyncDebianCD(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-rlHtvp4", + "--exclude", ".~tmp~/", + } + args = append(args, addConditionalFlags(repo, timeout3600|logFile|quiet|ipv4|delete)...) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncGentoo(repo *config.Repo) []string { + repo.RsyncUser = "gentoo" + repo.PasswordFile = "gentoo-distfiles" + return cscSyncStandard(repo) +} + +// TODO: check for special stuff that rcloning S3 needs +func cscSyncS3(repo *config.Repo) []string { + args := []string{ + // RsyncHost is just a regular host (https://s3.repo.saltproject.io) in this case + "RCLONE_CONFIG_S3_ENDPOINT=" + repo.RsyncHost, + "RCLONE_CONFIG_S3_TYPE=s3", + "RCLONE_CONFIG_S3_PROVIDER=Other", + "RCLONE_CONFIG_S3_ENV_AUTH=false", + "nice", "rclone", "sync", + "--fast-list", + "--use-server-modtime", + "--bind", config.Conf.IPv4Address, + "s3:s3/", + } + args = append(args, buildDownloadDir(repo)) + + return args +} + +func cscSyncSSH(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-aH", + "--exclude", ".~tmp~/", + "--stats", "-4", + } + args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|delete)...) + // PasswordFile should point to the SSH_KEYFILE + args = append(args, "-e", fmt.Sprintf("'ssh -b %s -i %s'", config.Conf.IPv4Address, repo.PasswordFile)) + args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncStandard(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-aH", + "--safe-links", + "--exclude", ".~tmp~/", + "--stats", + } + + if repo.PasswordFile != "" { + args = append(args, "--password-file", repo.PasswordFile) + } + + args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delayUpdatesDeleteAfter)...) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncStandardIPV6(repo *config.Repo) []string { + args := []string{ + "nice", "rsync", "-aH", + "--safe-links", + "--exclude", ".~tmp~/", + "--stats", + } + + if repo.PasswordFile != "" { + args = append(args, "--password-file", repo.PasswordFile) + } + + args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv6|delayUpdatesDeleteAfter)...) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) + + return args +} + +func cscSyncWget(repo *config.Repo) []string { + args := []string{ + "nice", "wget", "-q", + "--bind-address=" + config.Conf.IPv4Address, + "--mirror", + "--no-parent", + "--no-host-directories", + "--cut-dirs=1", + "--content-disposition", + "--execute", + "robots=off", + "--recursive", + "--reject", "\"*\\?*\"", + "--directory-prefix=" + buildDownloadDir(repo), + // RsyncHost is just a regular host (https://mirror.racket-lang.org/installers/) in this case + repo.RsyncHost, + } + + return args +} + +func cscSyncDummy(repo *config.Repo) []string { + return []string{"sleep", "1"} +} + +// executes a particular sync job depending on repo.SyncType. +func getSyncCommand(repo *config.Repo) (args []string) { + if repo.SyncType == "csc-sync-dummy" { + return cscSyncDummy(repo) + } + + // check that the download directory exists + if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) { + repo.Logger.Error(err.Error()) return } - cmdChan := make(chan *exec.Cmd) - ch = cmdChan - cmdDoneChan := make(chan struct{}) - killProcess := func() { - err := cmd.Process.Signal(syscall.SIGTERM) - if err != nil { - repo.Logger.Error("Could not send signal to process:", err) - return - } - - select { - case <-time.After(30 * time.Second): - repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL") - cmd.Process.Signal(syscall.SIGKILL) - - case <-cmdDoneChan: - repo.Logger.Debug("Process has been stopped.") - } - } - - go func() { - cmd.Wait() - close(cmdDoneChan) - }() - - go func() { - defer func() { - cmdChan <- cmd - }() - select { - case <-cmdDoneChan: - if !cmd.ProcessState.Success() { - repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) - } - - case <-repo.StopChan: - repo.Logger.Debug("Received signal to stop, killing process...") - killProcess() - - case <-time.After(time.Duration(repo.MaxTime) * time.Second): - repo.Logger.Warning("Process has exceeded its max time; killing now") - killProcess() - } - }() - return -} - -func startSync(repo *config.Repo) { - status := config.FAILURE - defer func() { - repo.DoneChan <- config.SyncResult{ - Name: repo.Name, - Exit: status, - } - }() - - args := getSyncCommand(repo) - if len(args) == 0 { - // getSyncCommand will have already logged error + switch repo.SyncType { + case "csc-sync-apache": + return cscSyncApache(repo) + case "csc-sync-archlinux": + return cscSyncArchLinux(repo) + case "csc-sync-badperms": + return cscSyncBadPerms(repo) + case "csc-sync-cdimage": + return cscSyncCDImage(repo) + case "csc-sync-ceph": + return cscSyncCeph(repo) + case "csc-sync-chmod": + return cscSyncChmod(repo) + case "csc-sync-debian": + return cscSyncDebian(repo) + case "csc-sync-debian-cd": + return cscSyncDebianCD(repo) + case "csc-sync-gentoo": + return cscSyncGentoo(repo) + case "csc-sync-s3": + return cscSyncS3(repo) + case "csc-sync-ssh": + return cscSyncSSH(repo) + case "csc-sync-standard": + return cscSyncStandard(repo) + case "csc-sync-standard-ipv6": + return cscSyncStandardIPV6(repo) + case "csc-sync-wget": + return cscSyncWget(repo) + default: + repo.Logger.Error("Unrecognized sync type ", "'"+repo.SyncType+"'") return } - - ch := spawnSyncProcess(repo, args) - if ch == nil { - // spawnSyncProcess will have already logged error - return - } - cmd := <-ch - switch cmd.ProcessState.ExitCode() { - case 0: - status = config.SUCCESS - case -1: - status = config.TERMINATED - // default is already FAILURE - } -} - -// we are not using zfs snapshots at the moment -func zfsSync(repo *config.Repo) { - // out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() - // if err != nil { - // repo.Logger.Error(err) - // } else { - // f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - // if err != nil { - // repo.Logger.Error(err.Error()) - // } else { - // f.Write(out) - // } - // } } diff --git a/merlin/sync/command_test.go b/merlin/sync/sync_test.go similarity index 100% rename from merlin/sync/command_test.go rename to merlin/sync/sync_test.go diff --git a/merlin/sync/trace.go b/merlin/sync/trace.go new file mode 100644 index 0000000..e866ebd --- /dev/null +++ b/merlin/sync/trace.go @@ -0,0 +1,174 @@ +package sync + +import ( + "os" + "path" + "strings" + + "git.csclub.uwaterloo.ca/public/merlin/config" +) + +func cscTraceArchLinux(repo *config.Repo, destPath string) []string { + args := []string{ + "curl", + "-s", repo.TraceUrl, + "-o", destPath, + } + + return args +} + +// return false iff both files exist, are non-empty, and have the same contents +func cscTraceArchLinuxDiff(repo *config.Repo, destPath string) bool { + readFile := func(file string) string { + f, err := os.ReadFile(file) + if err != nil { + repo.Logger.Debug(err.Error()) + return "" + } + return strings.TrimSpace(string(f)) + } + + file1 := readFile(destPath) + file2 := readFile(path.Join(buildDownloadDir(repo), "lastupdate")) + + if file1 == "" || file2 == "" { + return true + } + return file1 != file2 +} + +func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) { + rsyncDir := repo.RsyncDir + localDir := repo.LocalDir + + repo.RsyncDir = repo.RsyncDir + "/lastsync" + repo.LocalDir = repo.LocalDir + "/lastsync" + + args = cscSyncArchLinux(repo) + + repo.RsyncDir = rsyncDir + repo.LocalDir = localDir + + return args +} + +func cscTraceDebian(repo *config.Repo, destPath string) []string { + args := []string{ + "nice", "rsync", "-tv", "-4", + "--address=" + config.Conf.IPv4Address, + "--quiet", + // "--log-file=" + repo.RepoLogFile, + repo.RsyncHost + "::" + path.Join(repo.RsyncDir, "project/trace", repo.TraceUrl), + destPath, + } + + return args +} + +// return false iff both files exist and were modified at the same time +func cscTraceDebianDiff(repo *config.Repo, destPath string) bool { + statFile := func(file string) int64 { + f, err := os.Stat(file) + if err != nil { + repo.Logger.Debug(err.Error()) + return 0 + } + + return f.ModTime().Unix() + } + + file1 := statFile(destPath) + file2 := statFile(path.Join(buildDownloadDir(repo), "project/trace", "lastupdate")) + + if file1 == 0 || file2 == 0 { + return true + } + + return file1 != file2 +} + +func checkIfSyncNeeded(repo *config.Repo) (status int, continueSync bool) { + // default is to keep the default status and continue the sync + status = config.FAILURE + continueSync = true + + // create a temp file + temp, err := os.CreateTemp("/tmp", ".tmp.merlin."+repo.Name+"-") + if err != nil { + repo.Logger.Error(err.Error()) + return + } + temp.Close() + defer os.Remove(temp.Name()) + + // get the trace command + var args []string + switch repo.SyncType { + case "csc-sync-archlinux": + args = cscTraceArchLinux(repo, temp.Name()) + case "csc-sync-debian": + args = cscTraceDebian(repo, temp.Name()) + default: + if repo.SyncType != "csc-sync-dummy" { + repo.Logger.Error("Trace files are not implemented for sync type ", "'"+repo.SyncType+"'") + } + return + } + + ch := spawnProcess(repo, args) + if ch == nil { + // spawnSyncProcess will have already logged error + return + } + + cmd := <-ch + if cmd.ProcessState.ExitCode() != 0 { + // if the process was terminated then don't continue to sync + if cmd.ProcessState.ExitCode() == -1 { + return config.TERMINATED, false + } + return + } + + filesDiffer := true + switch repo.SyncType { + case "csc-sync-archlinux": + filesDiffer = cscTraceArchLinuxDiff(repo, temp.Name()) + case "csc-sync-debian": + filesDiffer = cscTraceDebianDiff(repo, temp.Name()) + } + + // if the files are the same then don't continue to sync + if !filesDiffer { + repo.Logger.Debug("No changes found; no sync made") + + // archlinux wants to sync a "lastsync" file even if lastupdate prevents sync + var args []string + switch repo.SyncType { + case "csc-sync-archlinux": + args = cscTraceArchLinuxUpdate(repo) + default: + return config.SUCCESS, false + } + + ch := spawnProcess(repo, args) + if ch == nil { + // spawnSyncProcess will have already logged error + return + } + + cmd := <-ch + if cmd.ProcessState.ExitCode() != 0 { + // if the process was terminated then don't continue to sync + if cmd.ProcessState.ExitCode() == -1 { + return config.TERMINATED, false + } + return + } + + return config.SUCCESS, false + } + + return +}