From 2833a62378af7c2211a920f4525fabec564ce4e6 Mon Sep 17 00:00:00 2001 From: koitu Date: Sat, 23 Oct 2021 23:14:41 -0400 Subject: [PATCH] go --- merlin/common/common.go | 125 ++++++++++++++-------------------------- merlin/common/sync.go | 105 +++++++++++++++++++++++++++++++++ merlin/ubuntu-releases | 4 ++ 3 files changed, 153 insertions(+), 81 deletions(-) create mode 100644 merlin/common/sync.go create mode 100644 merlin/ubuntu-releases diff --git a/merlin/common/common.go b/merlin/common/common.go index 274b826..33b9169 100644 --- a/merlin/common/common.go +++ b/merlin/common/common.go @@ -1,7 +1,6 @@ package common import ( - "fmt" "os" "time" @@ -23,6 +22,7 @@ const ( DEFAULT_MAX_JOBS = 6 DEFAULT_MAX_TIME = DAILY / 4 DEFAULT_SOCK_PATH = "/run/merlin/merlin.sock" + DEFAULT_STATE_PATH = "/home/mirror/merlin/states" DEFAULT_LOG_FILE = "/home/mirror/merlin/logs/transfer.log" DEFAULT_PASSWORD_DIR = "/home/mirror/passwords" DEFAULT_DOWNLOAD_DIR = "/mirror/root" @@ -43,7 +43,7 @@ var frequencies = map[string]int{ const ( NOT_RUN_YET = iota SUCCESS - FAILURE + FAILURE // change to TIMEOUT? TERMINATED // was killed by a signal ) @@ -78,14 +78,10 @@ type Repo struct { // the repo will write its name to DoneChan when it has finished a job // (successfully or unsuccessfully) DoneChan chan<- string `ini:"-"` - // the Unix epoch timestamp at which this repo last attempted a job - LastAttemptTime int64 `ini:"-"` - // whether the last attempt was successful or not - LastAttemptStatus int `ini:"-"` // a reference to the global config - cfg *Config - // whether this repo is running a job or not - isRunning bool + cfg *Config `ini:"-"` + // a struct that stores the repo's status + state State `ini:"-"` } type Config struct { @@ -95,6 +91,8 @@ type Config struct { MaxTime int `ini:"max_time"` // the Unix socket path which arthur will use to communicate with us SockPath string `ini:"sock_path"` + // the path to where the state of each repo's sync is saved + StatePath string `ini:"states_path"` // the default LogFile for each repo LogFile string `ini:"log_file"` // the IP addresses to use for rsync @@ -110,68 +108,21 @@ type Config struct { Repos []*Repo `ini:"-"` } +type State struct { + // these are stored in the states folder + // the Unix epoch timestamp at which this repo last attempted a job + LastAttemptTime int64 `ini:"last_attempt_time"` + // the number of seconds this repo ran for during its last attempted job + LastAttemptRunTime int64 `ini:"last_attempt_runtime"` + // whether the last attempt was successful or not + LastAttemptStatus int `ini:"last_attempt_status"` + // whether this repo is running a job or not + isRunning bool `ini:"is_running"` +} + // IsRunning returns true if the repo is currently running a sync job. func (repo *Repo) IsRunning() bool { - return repo.isRunning -} - -// CSCSyncStandard performs a standard rsync job. -func (repo *Repo) CSCSyncStandard() { - startTime := time.Now().Unix() - status := FAILURE - defer func() { - repo.LastAttemptTime = startTime - repo.LastAttemptStatus = status - repo.isRunning = false - repo.DoneChan <- repo.Name - }() - - localDir := repo.cfg.DownloadDir + "/" + repo.LocalDir - err := os.MkdirAll(localDir, 0775) - if err != nil { - err = fmt.Errorf("Could not create directory %s: %w", localDir, err) - repo.Logger.Error(err) - return - } - rsyncHost := repo.RsyncHost - if repo.RsyncUser != "" { - rsyncHost = repo.RsyncUser + "@" + repo.RsyncHost - } - address := repo.cfg.IPv4Address - logFile := repo.LogFile - args := []string{ - "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + address, - "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + logFile, - } - if repo.PasswordFile != "" { - filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile - args = append(args, "--password-file", filename) - } - args = append(args, "rsync://"+rsyncHost+"/"+repo.RsyncDir, localDir) - - ch := SpawnProcess(repo, args) - if ch == nil { - return - } - cmd := <-ch - switch cmd.ProcessState.ExitCode() { - case 0: - status = SUCCESS - case -1: - status = TERMINATED - // default is already FAILURE - } -} - -// StartSyncJob executes a particular sync job depending on repo.SyncType. -func (repo *Repo) StartSyncJob() { - switch repo.SyncType { - case "csc-sync-standard": - repo.CSCSyncStandard() - default: - repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") - } + return repo.state.isRunning } // RunIfScheduled starts a sync job for this repo if more than repo.Frequency @@ -179,13 +130,13 @@ func (repo *Repo) StartSyncJob() { // It returns true iff a job is started. func (repo *Repo) RunIfScheduled() bool { // sanity check; don't run if a job is already running - if repo.isRunning { + if repo.state.isRunning { return false } - if time.Now().Unix()-repo.LastAttemptTime > int64(repo.Frequency) { + if time.Now().Unix()-repo.state.LastAttemptTime > int64(repo.Frequency) { // this should be set in the caller's thread so that the check // above will always work - repo.isRunning = true + repo.state.isRunning = true go repo.StartSyncJob() return true } @@ -204,20 +155,24 @@ func GetConfig() Config { MaxJobs: DEFAULT_MAX_JOBS, MaxTime: DEFAULT_MAX_TIME, SockPath: DEFAULT_SOCK_PATH, + StatePath: DEFAULT_STATE_PATH, LogFile: DEFAULT_LOG_FILE, PasswordDir: DEFAULT_PASSWORD_DIR, DownloadDir: DEFAULT_DOWNLOAD_DIR, - Repos: make([]*Repo, 0), + Repos: make([]*Repo, 0), } - err = data.MapTo(&cfg) - if err != nil { + if err := data.MapTo(&cfg); err != nil { panic(err) } - if cfg.IPv4Address == "" { + + if err := os.MkdirAll(cfg.StatePath, 0755); err != nil { + panic("Could not create states path at " + cfg.StatePath) + } else if cfg.IPv4Address == "" { panic("Missing IPv4 address from config") } else if cfg.IPv6Address == "" { panic("Missing IPv6 address from config") } + doneChan := make(chan string) cfg.DoneChan = doneChan for _, section := range data.Sections() { @@ -225,10 +180,10 @@ func GetConfig() Config { continue } repo := Repo{Name: section.Name()} - err = section.MapTo(&repo) - if err != nil { + if err := section.MapTo(&repo); err != nil { panic(err) } + repo.Frequency = frequencies[repo.FrequencyStr] if repo.MaxTime == 0 { repo.MaxTime = cfg.MaxTime @@ -237,9 +192,17 @@ func GetConfig() Config { repo.DoneChan = doneChan repo.StopChan = make(chan bool, 1) repo.cfg = &cfg - // TODO: save and restore LastAttempt info (from stamps directory) - repo.LastAttemptTime = 0 - repo.LastAttemptStatus = NOT_RUN_YET + + repo.state = State{ + LastAttemptTime: 0, + LastAttemptRunTime: 0, + LastAttemptStatus: NOT_RUN_YET, + isRunning: false, + } + if err := ini.MapTo(&repo.state, cfg.StatePath+"/"+repo.Name); err != nil { + panic(err) + } + cfg.Repos = append(cfg.Repos, &repo) } if len(cfg.Repos) == 0 { diff --git a/merlin/common/sync.go b/merlin/common/sync.go new file mode 100644 index 0000000..25ab2cd --- /dev/null +++ b/merlin/common/sync.go @@ -0,0 +1,105 @@ +package common + +import ( + "fmt" + "os" + "time" + + ini "gopkg.in/ini.v1" +) + +// CSCSyncStandard performs a standard rsync job. +func (repo *Repo) CSCSyncStandard() { + startTime := time.Now().Unix() + status := FAILURE + defer func() { + repo.state.LastAttemptTime = startTime + repo.state.LastAttemptRunTime = time.Now().Unix() - startTime + repo.state.LastAttemptStatus = status + repo.state.isRunning = false + cfg := ini.Empty() + if err := ini.ReflectFrom(cfg, &repo.state); err != nil { + // log error + } + file, err := os.OpenFile(repo.cfg.StatePath+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + // log error + } + if _, err := cfg.WriteTo(file); err != nil { + // log error + } + repo.DoneChan <- repo.Name + }() + + localDir := repo.cfg.DownloadDir + "/" + repo.LocalDir + err := os.MkdirAll(localDir, 0775) + if err != nil { + err = fmt.Errorf("Could not create directory %s: %w", localDir, err) + repo.Logger.Error(err) + return + } + rsyncHost := repo.RsyncHost + if repo.RsyncUser != "" { + rsyncHost = repo.RsyncUser + "@" + repo.RsyncHost + } + address := repo.cfg.IPv4Address + logFile := repo.LogFile + args := []string{ + "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", + "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + address, + "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + logFile, + } + if repo.PasswordFile != "" { + filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile + args = append(args, "--password-file", filename) + } + args = append(args, "rsync://"+rsyncHost+"/"+repo.RsyncDir, localDir) + + ch := SpawnProcess(repo, args) + if ch == nil { + return + } + cmd := <-ch + switch cmd.ProcessState.ExitCode() { + case 0: + status = SUCCESS + case -1: + status = TERMINATED + // default is already FAILURE + } +} + +// StartSyncJob executes a particular sync job depending on repo.SyncType. +func (repo *Repo) StartSyncJob() { + switch repo.SyncType { + /* + # scripts used by merlin.py + csc-sync-debian + csc-sync-standard + csc-sync-ssh + csc-sync-apache + csc-sync-archlinux + csc-sync-debian-cd + csc-sync-gentoo + csc-sync-wget + csc-sync-s3 + csc-sync-standard-ipv6 + csc-sync-ceph + + zfssync + report_mirror (what is this?) + + # other things in bin/ + csc-sync-archlinux-old + csc-sync-cdimage + csc-sync-chmod + csc-sync-badperms + make-torrents + ubuntu-releases-sync + */ + case "csc-sync-standard": + repo.CSCSyncStandard() + default: + repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") + } +} diff --git a/merlin/ubuntu-releases b/merlin/ubuntu-releases new file mode 100644 index 0000000..a8e57ef --- /dev/null +++ b/merlin/ubuntu-releases @@ -0,0 +1,4 @@ +; move to states folder +last_run = 0 +runtime = 0 +status = 0 \ No newline at end of file