refactor sync
This commit is contained in:
parent
26a0266cd7
commit
8859e647c0
|
@ -2,10 +2,10 @@
|
|||
- [x] debian
|
||||
- [ ] ubuntu <
|
||||
- [ ] ubuntu-ports <
|
||||
- [ ] linuxmint-packages <
|
||||
- [ ] debian-multimedia <
|
||||
- [ ] debian-backports <
|
||||
- [ ] debian-security <
|
||||
- [x] linuxmint-packages
|
||||
- [x] debian-multimedia
|
||||
- [ ] debian-backports (part 1 broken)
|
||||
- [x] debian-security
|
||||
- [x] ubuntu-releases
|
||||
- [x] xubuntu-releases
|
||||
- [ ] puppylinux (broken)
|
||||
|
|
|
@ -13,12 +13,13 @@ Then configure `merlin-config.ini` and run using `./merlin`
|
|||
|
||||
### Nice Features To Add
|
||||
- detect if an rsync process is stuck (watch the stdout/stderr of the rsync processes)
|
||||
- get the config or the rsync command that a repo will sync with using a cli tool
|
||||
- improve conversion from exit status enum to string
|
||||
- detect if config file is changed and automatically reload (fsnotify)
|
||||
- sort `arthur status` by last time synced
|
||||
- last sync runtime, time until next sync
|
||||
- when sync fails, then make a copy of the rsync logs
|
||||
- implementation of `trace` is ad hoc and can be improved
|
||||
- respond to `arthur` without blocking main thread
|
||||
- split off arthur with a more featureful arthur for debugging
|
||||
- last sync runtime, time until next sync
|
||||
- get expected rsync command or repo config
|
||||
- details for last 10 syncs (avg time, success rate, data read/written)
|
||||
|
||||
### Completed
|
||||
- [x] add bwlimit option for each rsync process
|
||||
|
|
|
@ -68,12 +68,7 @@ func SendStatus(conn net.Conn) {
|
|||
repo := config.RepoMap[name]
|
||||
lastSync := repo.State.LastAttemptStartTime
|
||||
nextSync := lastSync + int64(repo.Frequency)
|
||||
lastExit := "failed"
|
||||
if repo.State.LastAttemptExit == config.SUCCESS {
|
||||
lastExit = "completed"
|
||||
} else if repo.State.LastAttemptExit == config.TERMINATED {
|
||||
lastExit = "terminated"
|
||||
}
|
||||
lastExit := config.StatusToString(repo.State.LastAttemptExit)
|
||||
|
||||
fmt.Fprintf(status, "%s\t%s\t%s\t%s\t%t\n",
|
||||
name,
|
||||
|
|
|
@ -50,12 +50,26 @@ var frequencies = map[string]int{
|
|||
|
||||
// Last job attempt statuses
|
||||
const (
|
||||
NOT_RUN_YET = iota
|
||||
TERMINATED = iota - 1
|
||||
SUCCESS
|
||||
FAILURE
|
||||
TERMINATED // was killed by a signal
|
||||
NOT_RUN_YET
|
||||
)
|
||||
|
||||
func StatusToString(status int) string {
|
||||
switch status {
|
||||
case TERMINATED:
|
||||
return "terminated"
|
||||
case SUCCESS:
|
||||
return "success"
|
||||
case FAILURE:
|
||||
return "failure"
|
||||
case NOT_RUN_YET:
|
||||
return "not run yet"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type SyncResult struct {
|
||||
Name string
|
||||
Exit int
|
||||
|
|
|
@ -20,6 +20,7 @@ func SyncIfPossible(repo *config.Repo, force bool) bool {
|
|||
repo.State.LastAttemptStartTime = curTime
|
||||
repo.SaveState()
|
||||
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
|
||||
|
||||
go startRepoSync(repo)
|
||||
return true
|
||||
}
|
||||
|
@ -40,16 +41,28 @@ func SyncCompleted(repo *config.Repo, exit int) {
|
|||
repoState.LastAttemptRunTime = syncTook
|
||||
repo.SaveState()
|
||||
|
||||
exitStr := "failed"
|
||||
if exit == config.SUCCESS {
|
||||
exitStr = "completed"
|
||||
} else if exit == config.TERMINATED {
|
||||
exitStr = "terminated"
|
||||
}
|
||||
exitStr := config.StatusToString(exit)
|
||||
repo.Logger.Info(fmt.Sprintf("Sync %s after running for %d seconds, will run again in %d seconds", exitStr, syncTook, nextSync))
|
||||
|
||||
if exit == config.SUCCESS {
|
||||
go zfsSync(repo)
|
||||
go postSyncTraceUpdate(repo)
|
||||
}
|
||||
go postRepoSync(repo, exit)
|
||||
}
|
||||
|
||||
// begin and manage the steps of the sync and return the exit status
|
||||
func startRepoSync(repo *config.Repo) {
|
||||
status := config.FAILURE
|
||||
done := false
|
||||
|
||||
defer func() {
|
||||
repo.DoneChan <- config.SyncResult{
|
||||
Name: repo.Name,
|
||||
Exit: status,
|
||||
}
|
||||
}()
|
||||
|
||||
status, done = preRepoSync(repo)
|
||||
if done {
|
||||
return
|
||||
}
|
||||
|
||||
status = runRepoSync(repo)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
)
|
||||
|
||||
func postRepoSync(repo *config.Repo, exit int) {
|
||||
if repo.DryRun {
|
||||
repo.Logger.Debug("post sync not run because in dry run mode")
|
||||
return
|
||||
}
|
||||
|
||||
switch exit {
|
||||
case config.SUCCESS:
|
||||
go zfsSync(repo)
|
||||
go postSyncTraceUpdate(repo)
|
||||
return
|
||||
|
||||
case config.FAILURE:
|
||||
go backupRsyncFailLog(repo)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func zfsSync(repo *config.Repo) {
|
||||
// we are not using zfs snapshots at the moment
|
||||
repo.Logger.Debug("Would run a zfssync if not disabled")
|
||||
return
|
||||
|
||||
out, err := exec.Command("/bin/sh", "/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
|
||||
if err != nil {
|
||||
repo.Logger.Error(err)
|
||||
} else {
|
||||
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
} else {
|
||||
f.Write(out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updates a trace file after the repo has finished syncing
|
||||
func postSyncTraceUpdate(repo *config.Repo) {
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-debian":
|
||||
cscPostDebian(repo)
|
||||
}
|
||||
}
|
||||
|
||||
// update our trace file's modification date by writing the current time
|
||||
func cscPostDebian(repo *config.Repo) {
|
||||
targetDir := filepath.Join(buildDownloadDir(repo), "project/trace")
|
||||
target := filepath.Join(targetDir, config.Conf.Hostname)
|
||||
|
||||
os.MkdirAll(targetDir, 0755)
|
||||
f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
|
||||
if err != nil {
|
||||
repo.Logger.Error("Unable to open trace file: " + target)
|
||||
return
|
||||
}
|
||||
if _, err = f.WriteString(time.Now().UTC().Format(time.RFC1123)); err != nil {
|
||||
repo.Logger.Error("Unable to write to trace file: " + target)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func backupRsyncFailLog(repo *config.Repo) {
|
||||
src := repo.RsyncLogFile
|
||||
dest := repo.RsyncLogFile + ".fail"
|
||||
|
||||
fin, err := os.Open(src)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
defer fin.Close()
|
||||
|
||||
fout, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
defer fout.Close()
|
||||
|
||||
if _, err = io.Copy(fout, fin); err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
)
|
||||
|
||||
// jobs to complete before repo sync command should be run
|
||||
// when done is false: status can be ignored and sync command should continue
|
||||
// when done is true: sync should exit with status
|
||||
func preRepoSync(repo *config.Repo) (status int, done bool) {
|
||||
status = config.SUCCESS
|
||||
done = false
|
||||
|
||||
// clear the rsync log file
|
||||
if repo.RsyncLogFile != "" {
|
||||
err := os.Truncate(repo.RsyncLogFile, 0)
|
||||
if err != nil {
|
||||
status = config.FAILURE
|
||||
repo.Logger.Error("Error while trying to clear logfile: " + repo.RepoLogFile)
|
||||
}
|
||||
}
|
||||
|
||||
if repo.TraceHost != "" {
|
||||
if status, done = checkIfRepoSyncNeeded(repo); done {
|
||||
|
||||
// run alternative jobs if a full sync will not be done
|
||||
if exit := noRepoSyncNeeded(repo); exit != config.SUCCESS {
|
||||
status = exit
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// check if the repo should be synced by comparing their trace file with our own
|
||||
// returns the exit status of the command used to retrieve the repo's trace file
|
||||
// returns true for done if and only if no errors occur and it is confirmed that
|
||||
// the trace files are the same or that a termination signal was given
|
||||
// (csc-sync-debian will always sync so done in its case will always be false)
|
||||
func checkIfRepoSyncNeeded(repo *config.Repo) (status int, done bool) {
|
||||
status = config.FAILURE
|
||||
done = false
|
||||
|
||||
if repo.DryRun {
|
||||
repo.Logger.Debug("dry running so assuming that trace file was changed")
|
||||
return config.SUCCESS, false
|
||||
}
|
||||
|
||||
repo.Logger.Debug("Retrieving the repo's trace file")
|
||||
|
||||
temp, err := os.CreateTemp("/tmp", "merlin-"+repo.Name+"-trace-*")
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
temp.Close()
|
||||
defer os.Remove(temp.Name())
|
||||
|
||||
// get the trace command to copy the last update file
|
||||
var cmd []string
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-archlinux":
|
||||
cmd = cscTraceArchLinux(repo, temp.Name())
|
||||
case "csc-sync-debian":
|
||||
cmd = cscTraceDebian(repo, temp.Name())
|
||||
default:
|
||||
repo.Logger.Error("Trace files are not implemented for sync type '" + repo.SyncType + "'")
|
||||
return
|
||||
}
|
||||
|
||||
status = spawnProcessAndWait(repo, cmd)
|
||||
if status != config.SUCCESS {
|
||||
if status == config.TERMINATED {
|
||||
done = true
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
repo.Logger.Debug("Comparing the repo's trace file with our own")
|
||||
|
||||
// diff returns true if files are the same (if files are the same then sync is done)
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-archlinux":
|
||||
done = cscTraceArchLinuxDiff(repo, temp.Name())
|
||||
case "csc-sync-debian":
|
||||
done = cscTraceDebianDiff(repo, temp.Name())
|
||||
}
|
||||
|
||||
if done {
|
||||
repo.Logger.Info("trace file for " + repo.RsyncHost + " unchanged")
|
||||
|
||||
// debian syncs even when trace file is unchanged
|
||||
if repo.SyncType == "csc-sync-debian" {
|
||||
done = false
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
repo.Logger.Debug("trace file changes found; will attempt to sync")
|
||||
return
|
||||
}
|
||||
|
||||
// jobs to do when a full sync will not be run
|
||||
func noRepoSyncNeeded(repo *config.Repo) (status int) {
|
||||
status = config.SUCCESS
|
||||
|
||||
var args []string
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-archlinux":
|
||||
args = cscTraceArchLinuxUpdate(repo)
|
||||
}
|
||||
|
||||
status = spawnProcessAndWait(repo, args)
|
||||
return
|
||||
}
|
||||
|
||||
func cscTraceArchLinux(repo *config.Repo, newTime string) []string {
|
||||
args := []string{
|
||||
"curl",
|
||||
"--interface", config.Conf.IPv4Address,
|
||||
"-s", repo.TraceHost,
|
||||
"-o", newTime,
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func cscTraceDebian(repo *config.Repo, newTime string) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-tv", "--quiet",
|
||||
"-4", "--address=" + config.Conf.IPv4Address,
|
||||
repo.RsyncHost + "::" + filepath.Join(repo.RsyncDir, "project/trace", repo.TraceHost),
|
||||
newTime,
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func cscTraceArchLinuxDiff(repo *config.Repo, newTime string) bool {
|
||||
return diffFileContent(repo, newTime, filepath.Join(buildDownloadDir(repo), "lastupdate"))
|
||||
}
|
||||
|
||||
func cscTraceDebianDiff(repo *config.Repo, newTime string) bool {
|
||||
return diffFileTime(repo, newTime, filepath.Join(buildDownloadDir(repo), "project/trace", repo.TraceHost))
|
||||
}
|
||||
|
||||
func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) {
|
||||
rsyncDir := repo.RsyncDir
|
||||
localDir := repo.LocalDir
|
||||
|
||||
repo.RsyncDir = repo.RsyncDir + "/lastsync"
|
||||
repo.LocalDir = repo.LocalDir + "/lastsync"
|
||||
|
||||
args = cscSyncArchLinux(repo)
|
||||
|
||||
repo.RsyncDir = rsyncDir
|
||||
repo.LocalDir = localDir
|
||||
|
||||
return args
|
||||
}
|
|
@ -8,32 +8,69 @@ import (
|
|||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
)
|
||||
|
||||
func buildRsyncHost(repo *config.Repo) string {
|
||||
rsyncHost := repo.RsyncHost
|
||||
if repo.RsyncUser != "" {
|
||||
rsyncHost = repo.RsyncUser + "@" + rsyncHost
|
||||
func runRepoSync(repo *config.Repo) (status int) {
|
||||
status = config.FAILURE
|
||||
|
||||
cmds := getSyncCommand(repo)
|
||||
if len(cmds) == 0 {
|
||||
repo.Logger.Error()
|
||||
return
|
||||
}
|
||||
return "rsync://" + rsyncHost + "/" + repo.RsyncDir
|
||||
|
||||
// run every step of the sync command as long as the previous step was successful
|
||||
for i, args := range cmds {
|
||||
repo.Logger.Debug(fmt.Sprintf("Running step %d of sync", i))
|
||||
|
||||
status = spawnProcessAndWait(repo, args)
|
||||
if status != config.SUCCESS {
|
||||
// spawnProcessAndWait will have already logged error
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func buildRsyncDaemonHost(repo *config.Repo) string {
|
||||
rsyncHost := repo.RsyncHost
|
||||
if repo.RsyncUser != "" {
|
||||
rsyncHost = repo.RsyncUser + "@" + rsyncHost
|
||||
// executes a particular sync job depending on repo.SyncType.
|
||||
func getSyncCommand(repo *config.Repo) (cmds [][]string) {
|
||||
// check that the download directory exists
|
||||
if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) {
|
||||
repo.Logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
return rsyncHost + "::" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func buildRsyncSSHHost(repo *config.Repo) string {
|
||||
rsyncHost := repo.RsyncHost
|
||||
if repo.RsyncUser != "" {
|
||||
rsyncHost = repo.RsyncUser + "@" + rsyncHost
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-apache":
|
||||
return append(cmds, cscSyncApache(repo))
|
||||
case "csc-sync-archlinux":
|
||||
return append(cmds, cscSyncArchLinux(repo))
|
||||
case "csc-sync-badperms":
|
||||
return append(cmds, cscSyncBadPerms(repo))
|
||||
case "csc-sync-cdimage":
|
||||
return append(cmds, cscSyncCDImage(repo))
|
||||
case "csc-sync-ceph":
|
||||
return append(cmds, cscSyncCephStep1(repo), cscSyncCephStep2(repo))
|
||||
case "csc-sync-chmod":
|
||||
return append(cmds, cscSyncChmod(repo))
|
||||
case "csc-sync-debian":
|
||||
return append(cmds, cscSyncDebianStep1(repo), cscSyncDebianStep2(repo))
|
||||
case "csc-sync-debian-cd":
|
||||
return append(cmds, cscSyncDebianCD(repo))
|
||||
case "csc-sync-gentoo":
|
||||
return append(cmds, cscSyncGentoo(repo))
|
||||
case "csc-sync-s3":
|
||||
return append(cmds, cscSyncS3(repo))
|
||||
case "csc-sync-ssh":
|
||||
return append(cmds, cscSyncSSH(repo))
|
||||
case "csc-sync-standard":
|
||||
return append(cmds, cscSyncStandard(repo))
|
||||
case "csc-sync-standard-ipv6":
|
||||
return append(cmds, cscSyncStandardIPV6(repo))
|
||||
case "csc-sync-wget":
|
||||
return append(cmds, cscSyncWget(repo))
|
||||
default:
|
||||
repo.Logger.Error("Unrecognized sync type: " + repo.SyncType)
|
||||
return
|
||||
}
|
||||
return rsyncHost + ":" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func buildDownloadDir(repo *config.Repo) string {
|
||||
return filepath.Join(config.Conf.DownloadDir, repo.LocalDir)
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -94,6 +131,34 @@ func addConditionalFlags(repo *config.Repo, flags int) []string {
|
|||
return args
|
||||
}
|
||||
|
||||
func buildRsyncHost(repo *config.Repo) string {
|
||||
rsyncHost := repo.RsyncHost
|
||||
if repo.RsyncUser != "" {
|
||||
rsyncHost = repo.RsyncUser + "@" + rsyncHost
|
||||
}
|
||||
return "rsync://" + rsyncHost + "/" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func buildRsyncDaemonHost(repo *config.Repo) string {
|
||||
rsyncHost := repo.RsyncHost
|
||||
if repo.RsyncUser != "" {
|
||||
rsyncHost = repo.RsyncUser + "@" + rsyncHost
|
||||
}
|
||||
return rsyncHost + "::" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func buildRsyncSSHHost(repo *config.Repo) string {
|
||||
rsyncHost := repo.RsyncHost
|
||||
if repo.RsyncUser != "" {
|
||||
rsyncHost = repo.RsyncUser + "@" + rsyncHost
|
||||
}
|
||||
return rsyncHost + ":" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func buildDownloadDir(repo *config.Repo) string {
|
||||
return filepath.Join(config.Conf.DownloadDir, repo.LocalDir)
|
||||
}
|
||||
|
||||
func cscSyncApache(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-az",
|
||||
|
@ -149,7 +214,7 @@ func cscSyncCDImage(repo *config.Repo) []string {
|
|||
return args
|
||||
}
|
||||
|
||||
func cscSyncCeph(repo *config.Repo) []string {
|
||||
func cscSyncCephStep1(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"rsync", "--stats", "--progress",
|
||||
repo.RsyncHost + "::ceph",
|
||||
|
@ -166,12 +231,16 @@ func cscSyncCeph(repo *config.Repo) []string {
|
|||
args = append(args, addConditionalFlags(repo, quiet|ipv4)...)
|
||||
args = append(args, buildDownloadDir(repo))
|
||||
|
||||
args = append(args, []string{
|
||||
return args
|
||||
}
|
||||
|
||||
func cscSyncCephStep2(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"&&", "rsync", "--stats", "--progress",
|
||||
repo.RsyncHost + "::ceph",
|
||||
"--recursive", "--times", "--links",
|
||||
"--hard-links", "--delete-after",
|
||||
}...)
|
||||
}
|
||||
args = append(args, addConditionalFlags(repo, quiet|ipv4)...)
|
||||
args = append(args, buildDownloadDir(repo))
|
||||
|
||||
|
@ -191,24 +260,18 @@ func cscSyncChmod(repo *config.Repo) []string {
|
|||
return args
|
||||
}
|
||||
|
||||
func cscSyncDebian(repo *config.Repo) []string {
|
||||
func cscSyncDebianStep1(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-rlHtvp",
|
||||
}
|
||||
args = append(args, addConditionalFlags(repo, baseFlags|excludeTmp|ipv4)...)
|
||||
args = append(args, buildRsyncDaemonHost(repo)+"/pool/", buildDownloadDir(repo)+"/pool/")
|
||||
|
||||
ch := spawnProcess(repo, args)
|
||||
if ch == nil {
|
||||
// spawnSyncProcess will have already logged error
|
||||
return nil
|
||||
}
|
||||
cmd := <-ch
|
||||
if cmd.ProcessState.ExitCode() != 0 {
|
||||
return nil
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
args = []string{
|
||||
func cscSyncDebianStep2(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-rlHtvp",
|
||||
"--exclude", filepath.Join("project/trace", config.Conf.Hostname),
|
||||
}
|
||||
|
@ -320,46 +383,3 @@ func cscSyncWget(repo *config.Repo) []string {
|
|||
|
||||
return args
|
||||
}
|
||||
|
||||
// executes a particular sync job depending on repo.SyncType.
|
||||
func getSyncCommand(repo *config.Repo) (args []string) {
|
||||
// check that the download directory exists
|
||||
if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) {
|
||||
repo.Logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-apache":
|
||||
return cscSyncApache(repo)
|
||||
case "csc-sync-archlinux":
|
||||
return cscSyncArchLinux(repo)
|
||||
case "csc-sync-badperms":
|
||||
return cscSyncBadPerms(repo)
|
||||
case "csc-sync-cdimage":
|
||||
return cscSyncCDImage(repo)
|
||||
case "csc-sync-ceph":
|
||||
return cscSyncCeph(repo)
|
||||
case "csc-sync-chmod":
|
||||
return cscSyncChmod(repo)
|
||||
case "csc-sync-debian":
|
||||
return cscSyncDebian(repo)
|
||||
case "csc-sync-debian-cd":
|
||||
return cscSyncDebianCD(repo)
|
||||
case "csc-sync-gentoo":
|
||||
return cscSyncGentoo(repo)
|
||||
case "csc-sync-s3":
|
||||
return cscSyncS3(repo)
|
||||
case "csc-sync-ssh":
|
||||
return cscSyncSSH(repo)
|
||||
case "csc-sync-standard":
|
||||
return cscSyncStandard(repo)
|
||||
case "csc-sync-standard-ipv6":
|
||||
return cscSyncStandardIPV6(repo)
|
||||
case "csc-sync-wget":
|
||||
return cscSyncWget(repo)
|
||||
default:
|
||||
repo.Logger.Error("Unrecognized sync type: " + repo.SyncType)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,88 +0,0 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||
)
|
||||
|
||||
func dummyRepoConf(name string, syncType string, frequencyStr string, localDir string, rsyncHost string, rsyncDir string) *config.Repo {
|
||||
doneChan := make(chan config.SyncResult)
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
repoLogFile := filepath.Join("test_files", name, name+".log")
|
||||
logger := logger.NewLogger(name, repoLogFile, false)
|
||||
|
||||
return &config.Repo{
|
||||
Name: name,
|
||||
SyncType: syncType,
|
||||
FrequencyStr: frequencyStr,
|
||||
Frequency: 0,
|
||||
MaxTime: config.DEFAULT_MAX_TIME,
|
||||
LocalDir: localDir,
|
||||
RsyncHost: rsyncHost,
|
||||
RsyncDir: rsyncDir,
|
||||
RsyncUser: "",
|
||||
PasswordFile: "",
|
||||
StateFile: "",
|
||||
RepoLogFile: repoLogFile,
|
||||
Logger: logger,
|
||||
RsyncLogFile: "/tmp/log/" + name + "-rsync.log",
|
||||
ZfssyncLogFile: "",
|
||||
DoneChan: doneChan,
|
||||
StopChan: stopChan,
|
||||
State: &config.RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: config.NOT_RUN_YET,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSyncCommand(t *testing.T) {
|
||||
|
||||
config.Conf.DownloadDir = "test_files"
|
||||
config.Conf.IPv4Address = "0.0.0.0"
|
||||
config.Conf.DownloadDir = "/tmp/mirror"
|
||||
|
||||
testData := []struct {
|
||||
repoConf *config.Repo
|
||||
expected []string
|
||||
}{
|
||||
{
|
||||
repoConf: dummyRepoConf("ubuntu-releases", "csc-sync-standard", "bi-hourly", "ubuntu-releases", "rsync.releases.ubuntu.com", "releases"),
|
||||
expected: []string{
|
||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
||||
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=0.0.0.0",
|
||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=/tmp/log/ubuntu-releases-rsync.log",
|
||||
"rsync://rsync.releases.ubuntu.com/releases", "/tmp/mirror/ubuntu-releases",
|
||||
},
|
||||
},
|
||||
{
|
||||
repoConf: dummyRepoConf("raspberrypi", "csc-sync-standard-ipv6", "bi-hourly", "raspberrypi", "apt-repo.raspberrypi.org", "archive"),
|
||||
expected: []string{
|
||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
||||
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=0.0.0.0",
|
||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=/tmp/log/raspberrypi-rsync.log",
|
||||
"apt-repo.raspberrypi.org::archive", "/tmp/mirror/raspberrypi",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testData {
|
||||
|
||||
syncCommand := getSyncCommand(test.repoConf)
|
||||
|
||||
// check for correct command output
|
||||
if !reflect.DeepEqual(syncCommand, test.expected) {
|
||||
t.Errorf("Invalid command string for %s repo\nRECIEVED:\n%+v\nEXPECTED:\n%+v\n", test.repoConf.Name, syncCommand, test.expected)
|
||||
}
|
||||
|
||||
// check if download dir was created
|
||||
|
||||
}
|
||||
}
|
|
@ -1,203 +0,0 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
)
|
||||
|
||||
func cscTraceArchLinux(repo *config.Repo, newTime string) []string {
|
||||
args := []string{
|
||||
"curl",
|
||||
"--interface", config.Conf.IPv4Address,
|
||||
"-s", repo.TraceHost,
|
||||
"-o", newTime,
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// return false iff both files exist, are non-empty, and have the same contents
|
||||
func cscTraceArchLinuxDiff(repo *config.Repo, newTime string) bool {
|
||||
readFile := func(file string) string {
|
||||
f, err := os.ReadFile(file)
|
||||
if err != nil {
|
||||
repo.Logger.Debug(err.Error())
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(string(f))
|
||||
}
|
||||
|
||||
file1 := readFile(newTime)
|
||||
file2 := readFile(filepath.Join(buildDownloadDir(repo), "lastupdate"))
|
||||
|
||||
if file1 == "" || file2 == "" {
|
||||
return true
|
||||
}
|
||||
return file1 != file2
|
||||
}
|
||||
|
||||
// update the lastsync file
|
||||
func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) {
|
||||
rsyncDir := repo.RsyncDir
|
||||
localDir := repo.LocalDir
|
||||
|
||||
repo.RsyncDir = repo.RsyncDir + "/lastsync"
|
||||
repo.LocalDir = repo.LocalDir + "/lastsync"
|
||||
|
||||
args = cscSyncArchLinux(repo)
|
||||
|
||||
repo.RsyncDir = rsyncDir
|
||||
repo.LocalDir = localDir
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func cscTraceDebian(repo *config.Repo, newTime string) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-tv", "-4",
|
||||
"--address=" + config.Conf.IPv4Address,
|
||||
"--quiet",
|
||||
repo.RsyncHost + "::" + filepath.Join(repo.RsyncDir, "project/trace", repo.TraceHost),
|
||||
newTime,
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// return false iff both files exist and were modified at the same time
|
||||
func cscTraceDebianDiff(repo *config.Repo, newTime string) bool {
|
||||
statFile := func(file string) int64 {
|
||||
f, err := os.Stat(file)
|
||||
if err != nil {
|
||||
repo.Logger.Debug("Error while trying to stat file: " + file)
|
||||
return 0
|
||||
}
|
||||
|
||||
return f.ModTime().Unix()
|
||||
}
|
||||
|
||||
file1 := statFile(newTime)
|
||||
file2 := statFile(filepath.Join(buildDownloadDir(repo), "project/trace", repo.TraceHost))
|
||||
|
||||
if file1 == 0 || file2 == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
return file1 != file2
|
||||
}
|
||||
|
||||
// update our trace file's modification date by writing the current time
|
||||
func cscTraceDebianUpdate(repo *config.Repo) {
|
||||
target := filepath.Join(buildDownloadDir(repo), "project/trace", config.Conf.Hostname)
|
||||
|
||||
os.MkdirAll(filepath.Join(buildDownloadDir(repo), "project/trace"), 0755)
|
||||
|
||||
f, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error("Unable to open trace file: " + target)
|
||||
return
|
||||
}
|
||||
if _, err = f.WriteString(time.Now().UTC().Format(time.RFC1123)); err != nil {
|
||||
repo.Logger.Error("Unable to write to trace file: " + target)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// some repos want us to check a last modification time file before performing a full sync
|
||||
// returns a status to update the original sync's status along with if the sync should continue
|
||||
func checkIfSyncNeeded(repo *config.Repo) (status int, continueSync bool) {
|
||||
// default return values are the default status and to continue the sync
|
||||
status = config.FAILURE
|
||||
continueSync = true
|
||||
if repo.DryRun {
|
||||
return
|
||||
}
|
||||
runCommand := func(args []string) int {
|
||||
ch := spawnProcess(repo, args)
|
||||
if ch == nil {
|
||||
// spawnSyncProcess will have already logged error
|
||||
return 1
|
||||
}
|
||||
cmd := <-ch
|
||||
return cmd.ProcessState.ExitCode()
|
||||
}
|
||||
|
||||
// create a temp file
|
||||
temp, err := os.CreateTemp("/tmp", "merlin-"+repo.Name+"-trace-*")
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
temp.Close()
|
||||
defer os.Remove(temp.Name())
|
||||
|
||||
// get the trace command to copy a last updated file
|
||||
var args []string
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-archlinux":
|
||||
args = cscTraceArchLinux(repo, temp.Name())
|
||||
case "csc-sync-debian":
|
||||
args = cscTraceDebian(repo, temp.Name())
|
||||
default:
|
||||
repo.Logger.Error("Trace files are not implemented for sync type '" + repo.SyncType + "'")
|
||||
return
|
||||
}
|
||||
|
||||
exit := runCommand(args)
|
||||
if exit != 0 {
|
||||
// if the process was terminated then don't continue to sync
|
||||
if exit == -1 {
|
||||
return config.TERMINATED, false
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
filesDiffer := true
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-archlinux":
|
||||
filesDiffer = cscTraceArchLinuxDiff(repo, temp.Name())
|
||||
case "csc-sync-debian":
|
||||
filesDiffer = cscTraceDebianDiff(repo, temp.Name())
|
||||
}
|
||||
// debian still syncs even if the trace file is the same
|
||||
if !filesDiffer && repo.SyncType == "csc-sync-debian" {
|
||||
repo.Logger.Error("trace file for " + repo.RsyncHost + " unchanged")
|
||||
return
|
||||
}
|
||||
if filesDiffer {
|
||||
// continue sync if files differ
|
||||
return
|
||||
}
|
||||
|
||||
repo.Logger.Debug("No changes found; full sync will not be made")
|
||||
|
||||
// archlinux wants to sync a "lastsync" file if lastupdate prevents sync
|
||||
switch repo.SyncType {
|
||||
case "csc-sync-archlinux":
|
||||
args = cscTraceArchLinuxUpdate(repo)
|
||||
default:
|
||||
return config.SUCCESS, false
|
||||
}
|
||||
|
||||
exit = runCommand(args)
|
||||
if exit != 0 {
|
||||
// if the process was terminated then don't continue to sync
|
||||
if exit == -1 {
|
||||
return config.TERMINATED, false
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
return config.SUCCESS, false
|
||||
}
|
||||
|
||||
// csc-sync-debian wants to update a trace file after the repo is done syncing
|
||||
func postSyncTraceUpdate(repo *config.Repo) {
|
||||
if repo.SyncType == "csc-sync-debian" {
|
||||
cscTraceDebianUpdate(repo)
|
||||
}
|
||||
}
|
|
@ -16,11 +16,17 @@ import (
|
|||
// It returns a channel through which a Cmd will be sent once it has finished,
|
||||
// or nil if it was unable to start a process.
|
||||
func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
|
||||
if len(args) == 0 {
|
||||
repo.Logger.Error("command given is of zero length")
|
||||
return
|
||||
}
|
||||
|
||||
repo.Logger.Debug(fmt.Sprintf("Running the command: %v", args))
|
||||
if repo.DryRun {
|
||||
repo.Logger.Debug("Dry running for 50 seconds")
|
||||
args = []string{"sleep", "50"}
|
||||
}
|
||||
|
||||
cmd := exec.Command(args[0], args[1:]...)
|
||||
repo.Logger.Debug("Starting process")
|
||||
|
||||
|
@ -78,44 +84,13 @@ func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
|
|||
return
|
||||
}
|
||||
|
||||
func startRepoSync(repo *config.Repo) {
|
||||
status := config.FAILURE
|
||||
defer func() {
|
||||
repo.DoneChan <- config.SyncResult{
|
||||
Name: repo.Name,
|
||||
Exit: status,
|
||||
}
|
||||
}()
|
||||
|
||||
if repo.TraceHost != "" {
|
||||
repo.Logger.Debug("Checking for changes")
|
||||
|
||||
continueSync := true
|
||||
status, continueSync = checkIfSyncNeeded(repo)
|
||||
if !continueSync || status == config.TERMINATED {
|
||||
return
|
||||
}
|
||||
|
||||
repo.Logger.Debug("Changes found; will attempt to sync")
|
||||
}
|
||||
|
||||
// clear the rsync log file before starting the sync
|
||||
if repo.RsyncLogFile != "" {
|
||||
err := os.Truncate(repo.RsyncLogFile, 0)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
args := getSyncCommand(repo)
|
||||
if len(args) == 0 {
|
||||
repo.Logger.Error("zero length command given for sync")
|
||||
return
|
||||
}
|
||||
// spawns a process and waits for it complete before parsing the exit code and returning it
|
||||
func spawnProcessAndWait(repo *config.Repo, args []string) (status int) {
|
||||
status = config.FAILURE
|
||||
|
||||
ch := spawnProcess(repo, args)
|
||||
if ch == nil {
|
||||
// spawnSyncProcess will have already logged error
|
||||
// spawnProcess will have already logged error
|
||||
return
|
||||
}
|
||||
cmd := <-ch
|
||||
|
@ -127,22 +102,46 @@ func startRepoSync(repo *config.Repo) {
|
|||
status = config.TERMINATED
|
||||
// default is already FAILURE
|
||||
}
|
||||
}
|
||||
|
||||
func zfsSync(repo *config.Repo) {
|
||||
// we are not using zfs snapshots at the moment
|
||||
repo.Logger.Debug("Would run a zfssync if not disabled")
|
||||
return
|
||||
|
||||
out, err := exec.Command("/bin/sh", "/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
|
||||
if err != nil {
|
||||
repo.Logger.Error(err)
|
||||
} else {
|
||||
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
} else {
|
||||
f.Write(out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// returns true iff file contents are the same (for diff file contents, errors, empty return false)
|
||||
func diffFileContent(repo *config.Repo, file1, file2 string) bool {
|
||||
readFile := func(file string) string {
|
||||
f, err := os.ReadFile(file)
|
||||
if err != nil {
|
||||
repo.Logger.Debug("Error while trying to read file: " + file)
|
||||
return ""
|
||||
}
|
||||
return string(f)
|
||||
}
|
||||
|
||||
content1 := readFile(file1)
|
||||
content2 := readFile(file2)
|
||||
|
||||
if content1 == "" || content2 == "" {
|
||||
return false
|
||||
}
|
||||
return content1 == content2
|
||||
}
|
||||
|
||||
// returns true iff file times are the same (for diff file times and errors return false)
|
||||
func diffFileTime(repo *config.Repo, file1, file2 string) bool {
|
||||
statFile := func(file string) int64 {
|
||||
f, err := os.Stat(file)
|
||||
if err != nil {
|
||||
repo.Logger.Debug("Error while trying to stat file: " + file)
|
||||
return 0
|
||||
}
|
||||
|
||||
return f.ModTime().Unix()
|
||||
}
|
||||
|
||||
time1 := statFile(file1)
|
||||
time2 := statFile(file2)
|
||||
|
||||
if time1 == 0 || time2 == 0 {
|
||||
return false
|
||||
}
|
||||
return file1 == file2
|
||||
}
|
Loading…
Reference in New Issue