From 1e14603367dd7120b2c0249f00b73cb708c420cc Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Fri, 25 Feb 2022 23:23:06 -0500 Subject: [PATCH] resolve most TODOs and simplify configuration --- merlin/README.md | 16 +-- merlin/arthur/arthur.go | 5 +- merlin/arthur/arthur_test.go | 6 +- merlin/config/config.go | 121 ++++++++++-------- merlin/config/config_test.go | 13 +- merlin/config/config_test.ini | 14 +- .../test_files/download/eelinux/.gitkeep | 0 .../download/yoland-releases/.gitkeep | 0 .../config/test_files/{eeelinux => eelinux} | 0 merlin/config/utils.go | 4 - merlin/logger/logger.go | 22 ++-- merlin/merlin-config-all.ini | 3 + merlin/merlin.go | 23 ++-- merlin/sync/command.go | 71 ++++------ merlin/sync/interface.go | 10 +- merlin/sync/sync.go | 28 ++-- 16 files changed, 167 insertions(+), 169 deletions(-) create mode 100644 merlin/config/test_files/download/eelinux/.gitkeep create mode 100644 merlin/config/test_files/download/yoland-releases/.gitkeep rename merlin/config/test_files/{eeelinux => eelinux} (100%) diff --git a/merlin/README.md b/merlin/README.md index ab30992..9d205d0 100644 --- a/merlin/README.md +++ b/merlin/README.md @@ -5,18 +5,12 @@ This folder contains the code for merlin (which does the actual syncing) and art Check out the the [mirror env](https://git.csclub.uwaterloo.ca/public/mirror-env) for a testing environment -### TODO -- [ ] ensure that the proper permissions (file mode, group, user) are being used -- [ ] document configs -- [ ] detect if an rsync process is stuck (optional) (\*\*) -- [ ] add bwlimit option for each rsync process (\*\*\*) - - -\*\* watch the stdout/stderr of the rsync processes - -\*\*\* previously wanted to place each rsync process in a separate cgroup +### Nice Features To Add +- detect if an rsync process is stuck (watch the stdout/stderr of the rsync processes) +- set user and group in config ### Completed +- [x] add bwlimit option for each rsync process - [x] write process manager - [x] save state (last attempted time, last attempted status) for each repo, and restore state on startup (e.g. use JSON/INI file for each repo) - [x] calculate difference between the scheduled time of a job and the time at which it actually ran; log this @@ -27,5 +21,3 @@ Check out the the [mirror env](https://git.csclub.uwaterloo.ca/public/mirror-env - [x] allow dynamic reloading in merlin - [x] use separate log file for each child process (currently sharing stdout/stderr with parent) - [x] implement zfssync in merlin (just invoke the existing Python script) - -\* there are some parts that I don't understand (trace_host, csc-sync-ceph, csc-sync-saltstack, etc) diff --git a/merlin/arthur/arthur.go b/merlin/arthur/arthur.go index ff54a64..45f4543 100644 --- a/merlin/arthur/arthur.go +++ b/merlin/arthur/arthur.go @@ -38,11 +38,13 @@ func GetCommand(conn net.Conn) (command, repoName string) { return } +// Print a message to stdout also send the message to the connection passed in func SendAndLog(conn net.Conn, msg string) { logger.OutLog(msg) conn.Write([]byte(msg)) } +// Send the status of the repos to the connection created passed in func SendStatus(conn net.Conn) { // Force arthur to send back time information in America/Toronto time location, err := time.LoadLocation("America/Toronto") @@ -80,7 +82,6 @@ func SendStatus(conn net.Conn) { func ForceSync(conn net.Conn, repoName string) (newSync bool) { newSync = false - // TODO: send repoName and every key in RepoMap to lowercase if repo, isInMap := config.RepoMap[repoName]; isInMap { logger.OutLog("Attempting to force sync of " + repoName) if sync.SyncIfPossible(repo) { @@ -95,6 +96,7 @@ func ForceSync(conn net.Conn, repoName string) (newSync bool) { return } +// Create the unix socket and send a new connection to the connChan channel for every connection made func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) { sockpath := config.Conf.SockPath // must remove old cfg.SockPath otherwise get "bind: address already in use" @@ -130,7 +132,6 @@ func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) { } }() - // TODO: check handling of multiple SIGHUP <-stopLisChan ear.Close() } diff --git a/merlin/arthur/arthur_test.go b/merlin/arthur/arthur_test.go index 14b4860..78cb515 100644 --- a/merlin/arthur/arthur_test.go +++ b/merlin/arthur/arthur_test.go @@ -11,6 +11,7 @@ import ( "git.csclub.uwaterloo.ca/public/merlin/logger" ) +// Test that GetCommand is able to accept a connection and parse a request for the status func TestStatusCommand(t *testing.T) { r, w := net.Pipe() @@ -28,6 +29,7 @@ func TestStatusCommand(t *testing.T) { } } +// Test that GetCommand is able to accept a connection and parse a request for the a forced sync func TestSyncCommand(t *testing.T) { r, w := net.Pipe() @@ -44,6 +46,7 @@ func TestSyncCommand(t *testing.T) { } } +// Test that SendStatus returns the correct status for some repo state func TestSendStatus(t *testing.T) { saveRepoMap := config.RepoMap defer func() { @@ -94,6 +97,7 @@ lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST } } +// Test that ForceSync behaves properly func TestForceSync(t *testing.T) { saveRepos := config.Repos saveRepoMap := config.RepoMap @@ -194,6 +198,7 @@ func TestForceSync(t *testing.T) { } } +// Test that StartListener stops properly when told to and that it properly creates a unix socket func TestStartListener(t *testing.T) { saveConf := config.Conf connChan := make(chan net.Conn) @@ -278,6 +283,5 @@ func TestStartListener(t *testing.T) { }() sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*") - // unsure why I can't put this in the defer os.Remove("/tmp/merlin_listener_test.sock") } diff --git a/merlin/config/config.go b/merlin/config/config.go index b6d2839..187cced 100644 --- a/merlin/config/config.go +++ b/merlin/config/config.go @@ -1,6 +1,7 @@ package config import ( + "fmt" "os" "path/filepath" @@ -22,10 +23,10 @@ const ( // could change this into a default_config DEFAULT_MAX_JOBS = 6 DEFAULT_MAX_TIME = DAILY / 4 + DEFAULT_MAX_RSYNC_IO = -1 DEFAULT_SYNC_TYPE = "csc-sync-standard" DEFAULT_FREQUENCY_STRING = "by-hourly" DEFAULT_DOWNLOAD_DIR = "/mirror/root" - DEFAULT_PASSWORD_DIR = "/home/mirror/passwords" DEFAULT_STATE_DIR = "/home/mirror/merlin/states" DEFAULT_LOG_DIR = "/home/mirror/merlin/logs" DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync" @@ -58,34 +59,34 @@ type SyncResult struct { } type Config struct { - // the maximum number of jobs allowed to execute concurrently - MaxJobs int `ini:"max_jobs"` // the IP addresses to use for rsync IPv4Address string `ini:"ipv4_address"` IPv6Address string `ini:"ipv6_address"` + // the maximum number of jobs allowed to execute concurrently + MaxJobs int `ini:"max_jobs"` + // the default maximum time before killing rsync proccess + DefaultMaxTime int `ini:"default_max_time"` + // the default value for the maximum bandwidth a repo can use while syncing + // (set to -1 for unlimited) + DefaultMaxRsyncIO int `ini:"default_max_rsync_io"` // the default sync type DefaultSyncType string `ini:"default_sync_type"` - // the default frequency string + // the default sync frequency string DefaultFrequencyStr string `ini:"default_frequency"` - // the default MaxTime - DefaultMaxTime int `ini:"default_max_time"` - // the directory where rsync should download files + // directory where rsync should download files DownloadDir string `ini:"download_dir"` - // the directory where rsync passwords are stored - PasswordDir string `ini:"password_dir"` - // the directory where the state of each repo sync is saved + // directory where the state of each repo is saved StateDir string `ini:"states_dir"` - // the directory where merlin will store the general logs for each repo + // directory where merlin will store the merlin logs for each repo RepoLogDir string `ini:"repo_logs_dir"` - // the directory to store the rsync logs for each repo + // directory to store the rsync logs for each repo RsyncLogDir string `ini:"rsync_logs_dir"` - // the directory to store the zfssync logs for each repo + // directory to store the zfssync logs for each repo ZfssyncLogDir string `ini:"zfssync_logs_dir"` - // the Unix socket path which arthur will use to communicate with us + // path to the unix socket for arthur to use for communication SockPath string `ini:"sock_path"` } -// make it more clear when full path should be used vs when just the file name is needed type Repo struct { // the name of this repo Name string `ini:"-"` @@ -98,27 +99,29 @@ type Repo struct { // the maximum time (in seconds) that each child process of this repo // can for before being killed MaxTime int `ini:"max_time"` - // where to download the files for this repo (relative to the download - // dir in the config) + // limit the amount of bandwidth a repo can use while syncing + // (set to -1 for unlimited) + MaxRsyncIO int `ini:"max_rsync_io"` + // where to download the files for this repo (relative to Conf.DownloadDir) LocalDir string `ini:"local_dir"` - // the remote host to rsync from + // the address to the remote host to rsync from RsyncHost string `ini:"rsync_host"` // the remote directory on the rsync host RsyncDir string `ini:"rsync_dir"` // the rsync user (optional) RsyncUser string `ini:"rsync_user"` - // the file storing the password for rsync (optional) + // full path to file storing the password for rsync (optional) PasswordFile string `ini:"password_file"` - // the file storing the repo sync state (used to override default) - StateFile string `ini:"state_file"` - // the full file path for general logging of this repo (used to override default) - RepoLogFile string `ini:"repo_log_file"` - // a reference to the general logger + // full path to file storing the repo sync state + StateFile string `ini:"-"` + // full path for file storing general logging of this repo + RepoLogFile string `ini:"-"` + // a pointer to the general logger Logger *logger.Logger `ini:"-"` - // the full file path for logging this repo's rsync (used to override default) - RsyncLogFile string `ini:"rsync_log_file"` - // the full file path for logging this repo's zfssync (used to override default) - ZfssyncLogFile string `ini:"zfssync_log_file"` + // full file path for file logging this repo's rsync + RsyncLogFile string `ini:"-"` + // full file path for file logging this repo's zfssync + ZfssyncLogFile string `ini:"-"` // the repo will write its name and status in a Result struct to DoneChan // when it has finished a job (shared by all repos) DoneChan chan<- SyncResult `ini:"-"` @@ -147,17 +150,16 @@ var ( RepoMap map[string]*Repo ) -// GetConfig reads the config from a JSON file, initializes default values, -// and initializes the non-configurable fields of each repo. -// It returns a Config. +// LoadConfig initializes the default config values then overrides them with +// the values it reads from the INI config file func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) { // set default values then load config from file newConf := Config{ MaxJobs: DEFAULT_MAX_JOBS, + DefaultMaxTime: DEFAULT_MAX_TIME, + DefaultMaxRsyncIO: DEFAULT_MAX_RSYNC_IO, DefaultSyncType: DEFAULT_SYNC_TYPE, DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING, - DefaultMaxTime: DEFAULT_MAX_TIME, - PasswordDir: DEFAULT_PASSWORD_DIR, DownloadDir: DEFAULT_DOWNLOAD_DIR, StateDir: DEFAULT_STATE_DIR, RepoLogDir: DEFAULT_LOG_DIR, @@ -170,18 +172,23 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc err = iniInfo.MapTo(&newConf) panicIfErr(err) - // check config for major errors - for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} { - err := os.MkdirAll(dir, 0755) - panicIfErr(err) - } + // check newConf for possible configuration errors if newConf.IPv4Address == "" { panic("Missing IPv4 address from config") } else if newConf.IPv6Address == "" { panic("Missing IPv6 address from config") + } else if _, check := frequencies[newConf.DefaultFrequencyStr]; !check { + panic(fmt.Errorf("%s is not a valid frequency", newConf.DefaultFrequencyStr)) + } else if _, err := os.Stat(newConf.DownloadDir); os.IsNotExist(err) { + panic(fmt.Errorf("the path %s does not exist", newConf.DownloadDir)) + } + for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} { + err := os.MkdirAll(dir, 0755) + panicIfErr(err) } newRepos := make([]*Repo, 0) + check := false for _, section := range iniInfo.Sections() { repoName := section.Name() if repoName == "DEFAULT" { @@ -189,13 +196,13 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc } // set the default values for the repo then load from file - // TODO: check if local_dir and repoName are always the same value - // TODO: check to ensure that every Repo.Name is unique (may already be done by ini) repo := Repo{ Name: repoName, SyncType: newConf.DefaultSyncType, FrequencyStr: newConf.DefaultFrequencyStr, MaxTime: newConf.DefaultMaxTime, + MaxRsyncIO: newConf.DefaultMaxRsyncIO, + LocalDir: repoName, StateFile: filepath.Join(newConf.StateDir, repoName), RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log", RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log", @@ -206,21 +213,33 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc err := section.MapTo(&repo) panicIfErr(err) - // TODO: ensure that the parent dirs to the file also exist when touching - // or just remove the ability to override + // checks for validity of repo configuration + repo.Frequency, check = frequencies[repo.FrequencyStr] + if !check { + panic("Missing or invalid frequency for " + repo.Name) + } else if repo.SyncType == "" { + panic("Missing sync type from " + repo.Name) + } else if repo.LocalDir == "" { + panic("Missing local download location for " + repo.Name) + } else if repo.RsyncHost == "" { + panic("Missing rsync host for " + repo.Name) + } + localDirFull := filepath.Join(newConf.DownloadDir, repo.LocalDir) + if _, err := os.Stat(localDirFull); os.IsNotExist(err) { + panic("the path " + localDirFull + " does not exist") + } else if repo.PasswordFile != "" { + if _, err := os.Stat(repo.PasswordFile); os.IsNotExist(err) { + panic("the file " + repo.PasswordFile + " does not exist") + } + } touchFiles( repo.StateFile, repo.RepoLogFile, repo.RsyncLogFile, repo.ZfssyncLogFile, ) + repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile) - repo.Frequency = frequencies[repo.FrequencyStr] - if repo.SyncType == "" { - panic("Missing sync type from " + repo.Name) - } else if repo.Frequency == 0 { - panic("Missing or invalid frequency for " + repo.Name) - } repo.State = RepoState{ IsRunning: false, @@ -236,7 +255,7 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc } if len(newRepos) == 0 { - panic("No repos found in config") + panic("no repos found in config") } Conf = newConf @@ -247,9 +266,8 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc } } -// save the current state of the repo to a file +// Save the current state of the repo to the state file func (repo *Repo) SaveState() { - // repo.Logger.Debug("Saving state") state_cfg := ini.Empty() if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { repo.Logger.Error(err.Error()) @@ -261,5 +279,4 @@ func (repo *Repo) SaveState() { if _, err := state_cfg.WriteTo(file); err != nil { repo.Logger.Error(err.Error()) } - // repo.Logger.Debug("Saved state") } diff --git a/merlin/config/config_test.go b/merlin/config/config_test.go index 576ce3d..8da5437 100644 --- a/merlin/config/config_test.go +++ b/merlin/config/config_test.go @@ -36,7 +36,6 @@ func TestLoadConfig(t *testing.T) { doneChan := make(chan SyncResult) stopChan := make(chan struct{}) LoadConfig("config_test.ini", doneChan, stopChan) - // TODO: Fill out parts not part of the ini or state file expectedConfig := Config{ MaxJobs: 6, IPv4Address: "129.97.134.129", @@ -44,8 +43,8 @@ func TestLoadConfig(t *testing.T) { DefaultSyncType: "csc-sync-standard", DefaultFrequencyStr: "daily", DefaultMaxTime: 1000, - DownloadDir: "/tmp/test-mirror", - PasswordDir: "/home/mirror/passwords", + DefaultMaxRsyncIO: -1, + DownloadDir: "test_files/download", StateDir: "test_files", RepoLogDir: "test_files/logs", RsyncLogDir: "test_files/rsync", @@ -58,14 +57,15 @@ func TestLoadConfig(t *testing.T) { FrequencyStr: "tri-hourly", Frequency: 10800, MaxTime: 2000, + MaxRsyncIO: 100, LocalDir: "eelinux", RsyncHost: "rsync.releases.eelinux.ca", RsyncDir: "releases", - StateFile: "test_files/eeelinux", + StateFile: "test_files/eelinux", RepoLogFile: "test_files/logs/eelinux.log", Logger: Repos[0].Logger, - RsyncLogFile: "test_files/rsync/eelinux.log", - ZfssyncLogFile: "test_files/zfssync/eelinux.log", + RsyncLogFile: "test_files/rsync/eelinux-rsync.log", + ZfssyncLogFile: "test_files/zfssync/eelinux-zfssync.log", DoneChan: doneChan, StopChan: stopChan, State: RepoState{ @@ -81,6 +81,7 @@ func TestLoadConfig(t *testing.T) { FrequencyStr: "daily", Frequency: 86400, MaxTime: 1000, + MaxRsyncIO: -1, LocalDir: "yoland-releases", RsyncHost: "rsync.releases.yoland.io", RsyncDir: "releases", diff --git a/merlin/config/config_test.ini b/merlin/config/config_test.ini index e240b8a..53ee2e3 100644 --- a/merlin/config/config_test.ini +++ b/merlin/config/config_test.ini @@ -6,8 +6,7 @@ ipv6_address = 2620:101:f000:4901:c5c::129 ; default_sync_type = csc-sync-standard default_frequency = daily default_max_time = 1000 -download_dir = /tmp/test-mirror -; password_dir = /home/mirror/passwords +download_dir = test_files/download states_dir = test_files repo_logs_dir = test_files/logs rsync_logs_dir = test_files/rsync @@ -18,13 +17,10 @@ sock_path = test_files/test.sock sync_type = csc-sync-nonstandard frequency = tri-hourly max_time = 2000 +max_rsync_io = 100 local_dir = eelinux rsync_host = rsync.releases.eelinux.ca rsync_dir = releases -state_file = test_files/eeelinux -repo_log_file = test_files/logs/eelinux.log -rsync_log_file = test_files/rsync/eelinux.log -zfssync_log_file = test_files/zfssync/eelinux.log [yoland] ; sync_type = csc-sync-standard @@ -32,8 +28,4 @@ zfssync_log_file = test_files/zfssync/eelinux.log ; max_time = 1000 local_dir = yoland-releases rsync_host = rsync.releases.yoland.io -rsync_dir = releases -; state_file = test_files/yoland -; repo_log_file = test_files/logs/yoland.log -; rsync_log_file = test_files/rsync/yoland-rsync.log -; zfssync_log_file = test_files/zfssync/yoland-zfssync.log \ No newline at end of file +rsync_dir = releases \ No newline at end of file diff --git a/merlin/config/test_files/download/eelinux/.gitkeep b/merlin/config/test_files/download/eelinux/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/merlin/config/test_files/download/yoland-releases/.gitkeep b/merlin/config/test_files/download/yoland-releases/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/merlin/config/test_files/eeelinux b/merlin/config/test_files/eelinux similarity index 100% rename from merlin/config/test_files/eeelinux rename to merlin/config/test_files/eelinux diff --git a/merlin/config/utils.go b/merlin/config/utils.go index a3e1e56..be66e7e 100644 --- a/merlin/config/utils.go +++ b/merlin/config/utils.go @@ -21,10 +21,6 @@ func touchFile(file string) { f.Close() } else if fi.IsDir() { panic(fmt.Errorf("%s is a directory", file)) - // } else if os.Geteuid() != 1001 { - // // mirror is UID 1001 - // err := os.Chown(file, 1001, os.Getegid()) - // panicIfErr(err) } err = os.Chmod(file, 0644) panicIfErr(err) diff --git a/merlin/logger/logger.go b/merlin/logger/logger.go index bd5c6a1..a6ab73b 100644 --- a/merlin/logger/logger.go +++ b/merlin/logger/logger.go @@ -30,14 +30,17 @@ var levels = map[int]string{ var outLogger = log.New(os.Stdout, "", log.LstdFlags) var errLogger = log.New(os.Stderr, "", log.LstdFlags) +// log to stdout func OutLog(v ...interface{}) { outLogger.Println(v...) } +// log to stderr func ErrLog(v ...interface{}) { errLogger.Println(v...) } +// initialize a logger func NewLogger(name, file string) *Logger { logger := Logger{ Logger: log.New(os.Stderr, "", log.LstdFlags), @@ -47,7 +50,8 @@ func NewLogger(name, file string) *Logger { return &logger } -func (logger *Logger) Log(level int, v ...interface{}) { +// write something to the logger +func (logger *Logger) log(level int, v ...interface{}) { f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { ErrLog(err.Error()) @@ -62,22 +66,24 @@ func (logger *Logger) Log(level int, v ...interface{}) { logger.Println(args) } +// write debug information to the logfile func (logger *Logger) Debug(v ...interface{}) { - logger.Log(DEBUG, v...) + logger.log(DEBUG, v...) } +// write information to the logfile and to stdout func (logger *Logger) Info(v ...interface{}) { - // src := []interface{}{logger.name + ":"} - // args := append(src, v...) - OutLog(v...) - logger.Log(INFO, v...) + OutLog(append([]interface{}{"[" + logger.name + "]"}, v...)) + logger.log(INFO, v...) } +// write warnings to the logfile func (logger *Logger) Warning(v ...interface{}) { - logger.Log(WARNING, v...) + logger.log(WARNING, v...) } +// write errors to the logfile and to stderr func (logger *Logger) Error(v ...interface{}) { ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...)) - logger.Log(ERROR, v...) + logger.log(ERROR, v...) } diff --git a/merlin/merlin-config-all.ini b/merlin/merlin-config-all.ini index 1e441bc..a9d88d4 100644 --- a/merlin/merlin-config-all.ini +++ b/merlin/merlin-config-all.ini @@ -16,6 +16,8 @@ download_dir = /tmp/test-mirror # add default frequency # add default sync_type +; [DEFAULT] +; [debian] @@ -198,6 +200,7 @@ local_dir = gnome rsync_host = master.gnome.org rsync_dir = gnomeftp ; password_file = gnome +; password_file = /home/mirror/passwords/gnome [damnsmalllinux] sync_type = csc-sync-standard diff --git a/merlin/merlin.go b/merlin/merlin.go index e02247b..44dc0c5 100644 --- a/merlin/merlin.go +++ b/merlin/merlin.go @@ -34,8 +34,10 @@ func main() { configPath := flag.String("config", DEFAULT_CONFIG_PATH, "alternate config file") flag.Parse() - // check that merlin is run as mirror user - // check that mirror user has pid of 1001 + // check the user that program is running under (should be mirror) + if os.Getuid() == 0 { + panic("Merlin should not be run as root") + } // receives a Result struct when a repo stops syncing doneChan := make(chan config.SyncResult) @@ -63,6 +65,7 @@ func main() { repoIdx = 0 go arthur.StartListener(connChan, stopLisChan) } + // We use a round-robin strategy. It's not the most efficient, but it's simple // (read: easy to understand) and guarantees each repo will eventually get a chance to run. runAsManyAsPossible := func() { @@ -100,11 +103,13 @@ runLoop: case <-reloadSig: stopLisChan <- struct{}{} loadConfig() - // ensure that SyncCompleted can handle it if reloading config - // removes a repo that was already syncing case done := <-doneChan: - sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) + // checking for when a repo is removed while it is still syncing + repo, check := config.RepoMap[done.Name] + if check { + sync.SyncCompleted(repo, done.Exit) + } numJobsRunning-- case conn := <-connChan: @@ -119,8 +124,7 @@ runLoop: default: arthur.SendAndLog(conn, "Received unrecognized command: "+command) } - // None of the arthur functions close the connection so you will need to - // close it manually for the message to be sent + // close the received connection so that the message is sent conn.Close() case <-time.After(1 * time.Minute): @@ -132,7 +136,10 @@ runLoop: for { select { case done := <-doneChan: - sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) + repo, check := config.RepoMap[done.Name] + if check { + sync.SyncCompleted(repo, done.Exit) + } numJobsRunning-- case <-time.After(1 * time.Second): diff --git a/merlin/sync/command.go b/merlin/sync/command.go index d3a107f..d0a1915 100644 --- a/merlin/sync/command.go +++ b/merlin/sync/command.go @@ -3,6 +3,7 @@ package sync import ( "fmt" "os" + "path/filepath" "git.csclub.uwaterloo.ca/public/merlin/config" ) @@ -28,9 +29,8 @@ func buildRsyncSSHHost(repo *config.Repo) string { return repo.RsyncHost + ":" + repo.RsyncDir } -// TODO: remove this (repo.LocalDir should be the full path) func buildDownloadDir(repo *config.Repo) string { - return config.Conf.DownloadDir + "/" + repo.LocalDir + return filepath.Join(config.Conf.DownloadDir, repo.LocalDir) } func cscSyncApache(repo *config.Repo) []string { @@ -188,8 +188,7 @@ func cscSyncStandard(repo *config.Repo) []string { "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } if repo.PasswordFile != "" { - filename := config.Conf.PasswordDir + "/" + repo.PasswordFile - args = append(args, "--password-file", filename) + args = append(args, "--password-file", repo.PasswordFile) } args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) @@ -215,72 +214,56 @@ func cscSyncDummy(repo *config.Repo) []string { // executes a particular sync job depending on repo.SyncType. func getSyncCommand(repo *config.Repo) (args []string) { - /* - # scripts used by merlin.py - csc-sync-debian - csc-sync-standard - csc-sync-ssh - csc-sync-apache - csc-sync-archlinux - csc-sync-debian-cd - csc-sync-gentoo - csc-sync-wget - csc-sync-s3 - csc-sync-standard-ipv6 - csc-sync-ceph - - zfssync - report_mirror (what is this?) - - # other things in bin/ - csc-sync-archlinux-old - csc-sync-cdimage - csc-sync-chmod - csc-sync-badperms - make-torrents - ubuntu-releases-sync - */ if repo.SyncType == "csc-sync-dummy" { return cscSyncDummy(repo) } - localDir := buildDownloadDir(repo) - if err := os.MkdirAll(localDir, 0775); err != nil { + // check that the download directory exists + if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) { repo.Logger.Error(err.Error()) return } switch repo.SyncType { case "csc-sync-apache": - return cscSyncApache(repo) + args = cscSyncApache(repo) case "csc-sync-archlinux": - return cscSyncArchLinux(repo) + args = cscSyncArchLinux(repo) case "csc-sync-badperms": - return cscSyncBadPerms(repo) + args = cscSyncBadPerms(repo) case "csc-sync-cdimage": - return cscSyncCDImage(repo) + args = cscSyncCDImage(repo) case "csc-sync-ceph": - return cscSyncCeph(repo) + args = cscSyncCeph(repo) case "csc-sync-chmod": - return cscSyncChmod(repo) + args = cscSyncChmod(repo) case "csc-sync-debian": - return cscSyncDebian(repo) + args = cscSyncDebian(repo) case "csc-sync-debian-cd": - return cscSyncDebianCD(repo) + args = cscSyncDebianCD(repo) case "csc-sync-gentoo": - return cscSyncGentoo(repo) + args = cscSyncGentoo(repo) case "csc-sync-s3": - return cscSyncS3(repo) + args = cscSyncS3(repo) case "csc-sync-ssh": - return cscSyncSSH(repo) + args = cscSyncSSH(repo) case "csc-sync-standard": - return cscSyncStandard(repo) + args = cscSyncStandard(repo) case "csc-sync-standard-ipv6": - return cscSyncStandardIPV6(repo) + args = cscSyncStandardIPV6(repo) // case "csc-sync-wget": // return cscSyncWget(repo) default: repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") + return } + + if repo.MaxRsyncIO >= 0 { + args = append(args, fmt.Sprintf("--bwlimit=%d", repo.MaxRsyncIO)) + } + // TODO: remove buildDownloadDir and check what cscSyncDebian and co are about + // (the ones that do not append buildDownloadDir at the end) + args = append(args, buildDownloadDir(repo)) + return } diff --git a/merlin/sync/interface.go b/merlin/sync/interface.go index c36e9e9..ab2fb7e 100644 --- a/merlin/sync/interface.go +++ b/merlin/sync/interface.go @@ -53,11 +53,7 @@ func SyncCompleted(repo *config.Repo, exit int) { repo.SaveState() repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)) - // We are no longer creating snapshots so we don't need the zfssync script anymore - - // // TODO: make it possible for this SyncCompleted to be run without zfsSync being run - // if exit == config.SUCCESS { - // // it is possible that the zfssync from the last repo sync is still running is that fine? - // go zfsSync(repo) - // } + if exit == config.SUCCESS { + go zfsSync(repo) + } } diff --git a/merlin/sync/sync.go b/merlin/sync/sync.go index c342f0f..1c2d4dc 100644 --- a/merlin/sync/sync.go +++ b/merlin/sync/sync.go @@ -66,7 +66,6 @@ func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { case <-cmdDoneChan: if !cmd.ProcessState.Success() { repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) - // repo.Logger.Info(strings.Join(args, " ")) } case <-repo.StopChan: @@ -111,16 +110,17 @@ func startSync(repo *config.Repo) { } } -// func zfsSync(repo *config.Repo) { -// out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() -// if err != nil { -// repo.Logger.Error(err) -// } else { -// f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) -// if err != nil { -// repo.Logger.Error(err.Error()) -// } else { -// f.Write(out) -// } -// } -// } +// we are not using zfs snapshots at the moment +func zfsSync(repo *config.Repo) { + // out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() + // if err != nil { + // repo.Logger.Error(err) + // } else { + // f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + // if err != nil { + // repo.Logger.Error(err.Error()) + // } else { + // f.Write(out) + // } + // } +}