From 8d572a0c3ff2839013167cafa93a47c0fcf00857 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Sat, 11 Dec 2021 18:28:09 -0500 Subject: [PATCH 1/8] split into packages --- merlin/arthur/arthur.go | 114 +++++++ merlin/arthur/arthur_test.go | 1 + merlin/common/tests/common_test.go | 1 - merlin/common/tests/sync_test.go | 0 merlin/{common/common.go => config/config.go} | 284 ++++++------------ merlin/config/config_test.go | 1 + merlin/config/utils.go | 38 +++ merlin/{common => logger}/logger.go | 22 +- merlin/merlin.go | 189 +++--------- merlin/merlin_test.go | 7 - merlin/{common/sync.go => sync/command.go} | 149 ++++----- merlin/sync/command_test.go | 1 + merlin/sync/interface.go | 59 ++++ .../process_manager.go => sync/sync.go} | 62 +++- 14 files changed, 476 insertions(+), 452 deletions(-) create mode 100644 merlin/arthur/arthur.go create mode 100644 merlin/arthur/arthur_test.go delete mode 100644 merlin/common/tests/common_test.go delete mode 100644 merlin/common/tests/sync_test.go rename merlin/{common/common.go => config/config.go} (63%) create mode 100644 merlin/config/config_test.go create mode 100644 merlin/config/utils.go rename merlin/{common => logger}/logger.go (77%) delete mode 100644 merlin/merlin_test.go rename merlin/{common/sync.go => sync/command.go} (56%) create mode 100644 merlin/sync/command_test.go create mode 100644 merlin/sync/interface.go rename merlin/{common/process_manager.go => sync/sync.go} (54%) diff --git a/merlin/arthur/arthur.go b/merlin/arthur/arthur.go new file mode 100644 index 0000000..ac3bf1e --- /dev/null +++ b/merlin/arthur/arthur.go @@ -0,0 +1,114 @@ +package arthur + +import ( + "bytes" + "errors" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strings" + "text/tabwriter" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" + "git.csclub.uwaterloo.ca/public/merlin/logger" + "git.csclub.uwaterloo.ca/public/merlin/sync" +) + +// get repo by name function in config + +func GetAndRunCommand(conn net.Conn) (newSync bool) { + newSync = false + defer conn.Close() + + var buf bytes.Buffer + _, err := io.Copy(&buf, conn) + if err != nil { + logger.ErrLog(err.Error()) + return + } + + command := buf.String() + args := strings.Split(command, ":") + respondAndLogErr := func(msg string) { + logger.OutLog(msg) + conn.Write([]byte(msg)) + } + + if args[0] == "status" { + status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0) + fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n") + + // for time formating see https://pkg.go.dev/time#pkg-constants + for name, repo := range config.RepoMap { + fmt.Fprintf(status, "%s\t%s\t%s\n", + name, + time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123), + time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123), + ) + } + + status.Flush() + } else if args[0] == "sync" { + if len(args) != 2 { + respondAndLogErr("Could not parse sync command, forced sync fails.") + return + } + + if repo, inMap := config.RepoMap[args[1]]; inMap { + logger.OutLog("Attempting to force sync of " + repo.Name) + if sync.SyncIfPossible(repo) { + conn.Write([]byte("Forced sync for " + repo.Name)) + newSync = true + } else { + respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.") + } + } else { + respondAndLogErr(args[1] + " is not tracked so cannot sync") + } + } else { + respondAndLogErr("Received unrecognized command: " + command) + } + return +} + +func UnixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) { + sockpath := config.Conf.SockPath + // must remove cfg.SockPath otherwise get "bind: address already in use" + if filepath.Ext(sockpath) != ".sock" { + panic(fmt.Errorf("Socket file must end with .sock")) + } else if _, err := os.Stat(sockpath); err == nil { + if err := os.Remove(sockpath); err != nil { + panic(err) + } + } else if !errors.Is(err, os.ErrNotExist) { + panic(err) + } + + ear, err := net.Listen("unix", sockpath) + if err != nil { + panic(err) + } + logger.OutLog("Listening to unix socket at " + sockpath) + + go func() { + for { + // will exit when ear is closed + conn, err := ear.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + logger.ErrLog("Accepted socket error: " + err.Error()) + continue + } + logger.ErrLog("Unhandlable socket error: " + err.Error()) + return + } + connChan <- conn + } + }() + + <-stopLisChan + ear.Close() +} diff --git a/merlin/arthur/arthur_test.go b/merlin/arthur/arthur_test.go new file mode 100644 index 0000000..a1aaaea --- /dev/null +++ b/merlin/arthur/arthur_test.go @@ -0,0 +1 @@ +package arthur diff --git a/merlin/common/tests/common_test.go b/merlin/common/tests/common_test.go deleted file mode 100644 index 805d0c7..0000000 --- a/merlin/common/tests/common_test.go +++ /dev/null @@ -1 +0,0 @@ -package common diff --git a/merlin/common/tests/sync_test.go b/merlin/common/tests/sync_test.go deleted file mode 100644 index e69de29..0000000 diff --git a/merlin/common/common.go b/merlin/config/config.go similarity index 63% rename from merlin/common/common.go rename to merlin/config/config.go index 511f987..48c81fe 100644 --- a/merlin/common/common.go +++ b/merlin/config/config.go @@ -1,12 +1,11 @@ -package common +package config import ( - "fmt" "os" - "os/exec" - "time" - ini "gopkg.in/ini.v1" + "gopkg.in/ini.v1" + + "git.csclub.uwaterloo.ca/public/merlin/logger" ) const ( @@ -51,53 +50,11 @@ const ( TERMINATED // was killed by a signal ) -type Result struct { +type SyncResult struct { Name string Exit int } -type Repo struct { - // the name of this repo - Name string `ini:"-"` - // this should be one of "csc-sync-standard", etc. - SyncType string `ini:"sync_type"` - // a human-readable frequency, e.g. "bi-hourly" - FrequencyStr string `ini:"frequency"` - // the desired interval (in seconds) between successive runs - Frequency int `ini:"-"` - // the maximum time (in seconds) that each child process of this repo - // can for before being killed - MaxTime int `ini:"max_time"` - // where to download the files for this repo (relative to the download - // dir in the config) - LocalDir string `ini:"local_dir"` - // the remote host to rsync from - RsyncHost string `ini:"rsync_host"` - // the remote directory on the rsync host - RsyncDir string `ini:"rsync_dir"` - // the rsync user (optional) - RsyncUser string `ini:"rsync_user"` - // the file storing the password for rsync (optional) - PasswordFile string `ini:"password_file"` - // the file for general logging of this repo - LoggerFile string `ini:"log_file"` - // a reference to the general logger - Logger *Logger `ini:"-"` - // the file for logging this repo's rsync - RsyncLogFile string `ini:"rsync_log_file"` - // the file for logging this repo's zfssync - ZfssyncLogFile string `ini:"zfssync_log_file"` - // the repo will write its name and status in a Result struct to DoneChan - // when it has finished a job (shared by all repos) - DoneChan chan<- Result `ini:"-"` - // the repo should stop syncing if StopChan is closed (shared by all repos) - StopChan chan struct{} `ini:"-"` - // a struct that stores the repo's status - State RepoState `ini:"-"` - // a reference to the global config - cfg *Config `ini:"-"` -} - type Config struct { // the maximum number of jobs allowed to execute concurrently MaxJobs int `ini:"max_jobs"` @@ -124,8 +81,46 @@ type Config struct { ZfssyncLogDir string `ini:"zfssync_log_dir"` // the Unix socket path which arthur will use to communicate with us SockPath string `ini:"sock_path"` - // a list of all of the repos - Repos []*Repo `ini:"-"` +} + +type Repo struct { + // the name of this repo + Name string `ini:"-"` + // this should be one of "csc-sync-standard", etc. + SyncType string `ini:"sync_type"` + // a human-readable frequency, e.g. "bi-hourly" + FrequencyStr string `ini:"frequency"` + // the desired interval (in seconds) between successive runs + Frequency int `ini:"-"` + // the maximum time (in seconds) that each child process of this repo + // can for before being killed + MaxTime int `ini:"max_time"` + // where to download the files for this repo (relative to the download + // dir in the config) + LocalDir string `ini:"local_dir"` + // the remote host to rsync from + RsyncHost string `ini:"rsync_host"` + // the remote directory on the rsync host + RsyncDir string `ini:"rsync_dir"` + // the rsync user (optional) + RsyncUser string `ini:"rsync_user"` + // the file storing the password for rsync (optional) + PasswordFile string `ini:"password_file"` + // the file for general logging of this repo + LoggerFile string `ini:"log_file"` + // a reference to the general logger + Logger *logger.Logger `ini:"-"` + // the file for logging this repo's rsync + RsyncLogFile string `ini:"rsync_log_file"` + // the file for logging this repo's zfssync + ZfssyncLogFile string `ini:"zfssync_log_file"` + // the repo will write its name and status in a Result struct to DoneChan + // when it has finished a job (shared by all repos) + DoneChan chan<- SyncResult `ini:"-"` + // the repo should stop syncing if StopChan is closed (shared by all repos) + StopChan chan struct{} `ini:"-"` + // a struct that stores the repo's status + State RepoState `ini:"-"` } // This should only be modified by the main thread @@ -141,129 +136,18 @@ type RepoState struct { LastAttemptExit int `ini:"last_attempt_exit"` } -// save the current state of the repo to a file -func (repo *Repo) SaveState() { - repo.Logger.Debug("Saving state") - state_cfg := ini.Empty() - if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { - repo.Logger.Error(err.Error()) - } - file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - repo.Logger.Error(err.Error()) - } - if _, err := state_cfg.WriteTo(file); err != nil { - repo.Logger.Error(err.Error()) - } - repo.Logger.Debug("Saved state") -} - -// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job -// and is not currently running. -// returns true iff a job is started. -func (repo *Repo) RunIfPossible() bool { - if repo.State.IsRunning { - return false - } - - curTime := time.Now().Unix() - if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) { - repo.State.IsRunning = true - repo.State.LastAttemptStartTime = curTime - repo.SaveState() - repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name)) - go repo.StartSyncJob() - return true - } - return false -} - -func zfsSync(repo *Repo) { - out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() - if err != nil { - repo.Logger.Error(err) - } else { - f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - repo.Logger.Error(err.Error()) - } else { - f.Write(out) - } - - } -} - -// update the repo state with the last attempt time and exit now that the job is done -func (repo *Repo) SyncCompleted(exit int) { - repoState := repo.State - syncTook := time.Now().Unix() - repoState.LastAttemptStartTime - nextSync := repo.MaxTime - int(syncTook) - if nextSync < 0 { - nextSync = 0 - } - - repoState.IsRunning = false - repoState.LastAttemptExit = exit - repoState.LastAttemptRunTime = syncTook - - var exitStr string - switch exit { - case SUCCESS: - exitStr = "completed" - case TERMINATED: - exitStr = "terminated" - default: - exitStr = "failed" - } - repo.SaveState() - repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)) - if exit == SUCCESS { - // it is possible that the zfssync from the last repo sync is still running is that fine? - go zfsSync(repo) - } -} - -func panicIfErr(e error) { - if e != nil { - panic(e) - } -} - -func touchFile(file string) { - fi, err := os.Stat(file) - if err != nil { - f, err := os.OpenFile(file, os.O_CREATE, 0644) - if err != nil { - panic(fmt.Errorf("unable to create file %s", file)) - } - f.Close() - } else if fi.IsDir() { - panic(fmt.Errorf("%s is a directory", file)) - } else if os.Geteuid() != 1001 { - // UID 1001 is the hardcoded uid for mirror - err := os.Chown(file, 1001, os.Getegid()) - panicIfErr(err) - } else if fi.Mode().Perm() != 0644 { - err := os.Chmod(file, 0644) - panicIfErr(err) - } -} - -func touchFiles(files ...string) { - for _, file := range files { - touchFile(file) - } -} +var ( + Conf Config + Repos []*Repo + RepoMap map[string]*Repo +) // GetConfig reads the config from a JSON file, initializes default values, // and initializes the non-configurable fields of each repo. // It returns a Config. -func GetConfig(doneChan chan Result, stopChan chan struct{}) Config { - // add global configuration in cfg - data, err := ini.Load(CONFIG_PATH) - panicIfErr(err) - - cfg := Config{ +func LoadConfig(doneChan chan SyncResult, stopChan chan struct{}) { + // set default values then load config from file + newConf := Config{ MaxJobs: DEFAULT_MAX_JOBS, MaxTime: DEFAULT_MAX_TIME, PasswordDir: DEFAULT_PASSWORD_DIR, @@ -273,36 +157,39 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config { RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR, SockPath: DEFAULT_SOCK_PATH, - Repos: make([]*Repo, 0), } - err = data.MapTo(&cfg) + iniInfo, err := ini.Load(CONFIG_PATH) + panicIfErr(err) + err = iniInfo.MapTo(&newConf) panicIfErr(err) - for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} { + // check config for major errors + for _, dir := range []string{Conf.StateDir, Conf.LoggerDir, Conf.RsyncLogDir, Conf.ZfssyncLogDir} { err := os.MkdirAll(dir, 0755) panicIfErr(err) } - if cfg.IPv4Address == "" { + if Conf.IPv4Address == "" { panic("Missing IPv4 address from config") - } else if cfg.IPv6Address == "" { + } else if Conf.IPv6Address == "" { panic("Missing IPv6 address from config") } - // add each repo configuration to cfg - for _, section := range data.Sections() { + newRepos := make([]*Repo, 0) + for _, section := range iniInfo.Sections() { repoName := section.Name() if repoName == "DEFAULT" { continue } + // set the default values for the repo then load from file repo := Repo{ Name: repoName, - SyncType: cfg.SyncType, - FrequencyStr: cfg.FrequencyStr, - MaxTime: cfg.MaxTime, - LoggerFile: cfg.LoggerDir + "/" + repoName + ".log", - RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log", - ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log", + SyncType: Conf.SyncType, + FrequencyStr: Conf.FrequencyStr, + MaxTime: Conf.MaxTime, + LoggerFile: Conf.LoggerDir + "/" + repoName + ".log", + RsyncLogFile: Conf.RsyncLogDir + "/" + repoName + "-rsync.log", + ZfssyncLogFile: Conf.ZfssyncLogDir + "/" + repoName + "-zfssync.log", DoneChan: doneChan, StopChan: stopChan, } @@ -314,15 +201,13 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config { repo.RsyncLogFile, repo.ZfssyncLogFile, ) - - repo.Logger = NewLogger(repo.Name, repo.LoggerFile) + repo.Logger = logger.NewLogger(repo.Name, repo.LoggerFile) repo.Frequency = frequencies[repo.FrequencyStr] if repo.SyncType == "" { panic("Missing sync type from " + repo.Name) } else if repo.Frequency == 0 { panic("Missing or invalid frequency for " + repo.Name) } - repo.cfg = &cfg repo.State = RepoState{ IsRunning: false, @@ -332,7 +217,7 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config { } // create the state file if it does not exist, otherwise load it from existing file - repoStateFile := cfg.StateDir + "/" + repo.Name + repoStateFile := Conf.StateDir + "/" + repo.Name if _, err := os.Stat(repoStateFile); err != nil { touchFile(repoStateFile) repo.SaveState() @@ -341,12 +226,33 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config { panicIfErr(err) } - cfg.Repos = append(cfg.Repos, &repo) + newRepos = append(newRepos, &repo) } - if len(cfg.Repos) == 0 { + if len(newRepos) == 0 { panic("No repos found in config") } - return cfg + Conf = newConf + Repos = newRepos + for _, repo := range Repos { + RepoMap[repo.Name] = repo + } +} + +// save the current state of the repo to a file +func (repo *Repo) SaveState() { + repo.Logger.Debug("Saving state") + state_cfg := ini.Empty() + if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { + repo.Logger.Error(err.Error()) + } + file, err := os.OpenFile(Conf.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + repo.Logger.Error(err.Error()) + } + if _, err := state_cfg.WriteTo(file); err != nil { + repo.Logger.Error(err.Error()) + } + repo.Logger.Debug("Saved state") } diff --git a/merlin/config/config_test.go b/merlin/config/config_test.go new file mode 100644 index 0000000..d912156 --- /dev/null +++ b/merlin/config/config_test.go @@ -0,0 +1 @@ +package config diff --git a/merlin/config/utils.go b/merlin/config/utils.go new file mode 100644 index 0000000..a28ea6d --- /dev/null +++ b/merlin/config/utils.go @@ -0,0 +1,38 @@ +package config + +import ( + "fmt" + "os" +) + +func panicIfErr(e error) { + if e != nil { + panic(e) + } +} + +func touchFile(file string) { + fi, err := os.Stat(file) + if err != nil { + f, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + panic(fmt.Errorf("unable to create file %s", file)) + } + f.Close() + } else if fi.IsDir() { + panic(fmt.Errorf("%s is a directory", file)) + } else if os.Geteuid() != 1001 { + // UID 1001 is the hardcoded uid for mirror + err := os.Chown(file, 1001, os.Getegid()) + panicIfErr(err) + } else if fi.Mode().Perm() != 0644 { + err := os.Chmod(file, 0644) + panicIfErr(err) + } +} + +func touchFiles(files ...string) { + for _, file := range files { + touchFile(file) + } +} diff --git a/merlin/common/logger.go b/merlin/logger/logger.go similarity index 77% rename from merlin/common/logger.go rename to merlin/logger/logger.go index 53f2a7f..642efa2 100644 --- a/merlin/common/logger.go +++ b/merlin/logger/logger.go @@ -1,4 +1,4 @@ -package common +package logger import ( "log" @@ -30,12 +30,12 @@ var levels = map[int]string{ var outLogger = log.New(os.Stdout, "", log.LstdFlags) var errLogger = log.New(os.Stderr, "", log.LstdFlags) -func OutLogger() *log.Logger { - return outLogger +func OutLog(v ...interface{}) { + outLogger.Println(v...) } -func ErrLogger() *log.Logger { - return errLogger +func ErrLog(v ...interface{}) { + errLogger.Println(v...) } func NewLogger(name, file string) *Logger { @@ -48,15 +48,9 @@ func NewLogger(name, file string) *Logger { } func (logger *Logger) Log(level int, v ...interface{}) { - if level == INFO { - outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...)) - } else if level == ERROR { - errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...)) - } - f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { - errLogger.Println(err.Error()) + ErrLog(err.Error()) } defer f.Close() @@ -65,7 +59,7 @@ func (logger *Logger) Log(level int, v ...interface{}) { args = append(args, v...) logger.SetOutput(f) - logger.Println(v...) + logger.Println(args) } func (logger *Logger) Debug(v ...interface{}) { @@ -73,6 +67,7 @@ func (logger *Logger) Debug(v ...interface{}) { } func (logger *Logger) Info(v ...interface{}) { + OutLog(append([]interface{}{"[" + logger.name + "]"}, v...)) logger.Log(INFO, v...) } @@ -81,5 +76,6 @@ func (logger *Logger) Warning(v ...interface{}) { } func (logger *Logger) Error(v ...interface{}) { + ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...)) logger.Log(ERROR, v...) } diff --git a/merlin/merlin.go b/merlin/merlin.go index 2bd6e73..83b390e 100644 --- a/merlin/merlin.go +++ b/merlin/merlin.go @@ -1,153 +1,32 @@ package main import ( - "bytes" - "errors" "fmt" - "io" - "log" "net" "os" "os/signal" - "path/filepath" - "strings" "syscall" - "text/tabwriter" "time" - "git.csclub.uwaterloo.ca/public/merlin/common" "golang.org/x/sys/unix" + + "git.csclub.uwaterloo.ca/public/merlin/arthur" + "git.csclub.uwaterloo.ca/public/merlin/config" + "git.csclub.uwaterloo.ca/public/merlin/logger" + "git.csclub.uwaterloo.ca/public/merlin/sync" ) -var ( - cfg common.Config - outLogger *log.Logger - errLogger *log.Logger - repoMap map[string]*common.Repo - repoIdx int - numJobsRunning int -) - -func getAndRunCommand(conn net.Conn) { - defer conn.Close() - - var buf bytes.Buffer - _, err := io.Copy(&buf, conn) - if err != nil { - errLogger.Println(err.Error()) - return - } - - command := buf.String() - args := strings.Split(command, ":") - respondAndLogErr := func(msg string) { - outLogger.Println(msg) - conn.Write([]byte(msg)) - } - - if args[0] == "status" { - status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0) - fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n") - - // for time formating see https://pkg.go.dev/time#pkg-constants - for name, repo := range repoMap { - fmt.Fprintf(status, "%s\t%s\t%s\n", - name, - time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123), - time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123), - ) - } - - status.Flush() - } else if args[0] == "sync" { - if len(args) != 2 { - respondAndLogErr("Could not parse sync command, forced sync fails.") - return - } - - if repo, inMap := repoMap[args[1]]; inMap { - outLogger.Println("Attempting to force sync of " + repo.Name) - if repo.RunIfPossible() { - conn.Write([]byte("Forced sync for " + repo.Name)) - numJobsRunning++ - } else { - respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.") - } - } else { - respondAndLogErr(args[1] + " is not tracked so cannot sync") - } - } else { - respondAndLogErr("Received unrecognized command: " + command) - } -} - -func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) { - // must remove cfg.SockPath otherwise get "bind: address already in use" - if filepath.Ext(cfg.SockPath) != ".sock" { - panic(fmt.Errorf("Socket file must end with .sock")) - } else if _, err := os.Stat(cfg.SockPath); err == nil { - if err := os.Remove(cfg.SockPath); err != nil { - panic(err) - } - } else if !errors.Is(err, os.ErrNotExist) { - panic(err) - } - - ear, err := net.Listen("unix", cfg.SockPath) - if err != nil { - panic(err) - } - outLogger.Println("Listening to unix socket at " + cfg.SockPath) - - go func() { - for { - // will exit when ear is closed - conn, err := ear.Accept() - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - errLogger.Println("Accepted socket error: " + err.Error()) - continue - } - errLogger.Println("Unhandlable socket error: " + err.Error()) - return - } - connChan <- conn - } - }() - - <-stopLisChan - ear.Close() -} - -// We use a round-robin strategy. It's not the most efficient, but it's simple -// (read: easy to understand) and guarantees each repo will eventually get a chance to run. -func runAsManyAsPossible() { - repos := cfg.Repos - startIdx := repoIdx - - for numJobsRunning < cfg.MaxJobs { - repo := repos[repoIdx] - if repo.RunIfPossible() { - numJobsRunning++ - } - repoIdx = (repoIdx + 1) % len(repos) - if repoIdx == startIdx { - // we've come full circle - return - } - } -} - func main() { - outLogger = common.OutLogger() - errLogger = common.ErrLogger() - // check that merlin is run as mirror user // check that mirror user has pid of 1001 - doneChan := make(chan common.Result) + // receives a Result struct when a repo stops syncing + doneChan := make(chan config.SyncResult) + // closed when merlin is told to stop running stopChan := make(chan struct{}) + // receives a Conn when a client makes a connection to unix socket connChan := make(chan net.Conn) + // signal channel to stop listening to unix socket stopLisChan := make(chan struct{}) stopSig := make(chan os.Signal, 1) @@ -157,24 +36,37 @@ func main() { unix.Umask(002) - numJobsRunning = 0 + numJobsRunning := 0 + repoIdx := 0 loadConfig := func() { - cfg = common.GetConfig(doneChan, stopChan) - outLogger.Println("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg)) - - repoMap = make(map[string]*common.Repo) - for _, repo := range cfg.Repos { - repoMap[repo.Name] = repo - } + config.LoadConfig(doneChan, stopChan) + logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf)) repoIdx = 0 - go unixSockListener(connChan, stopLisChan) + go arthur.UnixSockListener(connChan, stopLisChan) + } + // We use a round-robin strategy. It's not the most efficient, but it's simple + // (read: easy to understand) and guarantees each repo will eventually get a chance to run. + runAsManyAsPossible := func() { + startIdx := repoIdx + + for numJobsRunning < config.Conf.MaxJobs { + repo := config.Repos[repoIdx] + if sync.SyncIfPossible(repo) { + numJobsRunning++ + } + repoIdx = (repoIdx + 1) % len(config.Repos) + if repoIdx == startIdx { + // we've come full circle + return + } + } } loadConfig() - // IsRunning must be false otherwise repo will never sync - for _, repo := range repoMap { + // ensure that IsRunning is false otherwise repo will never sync + for _, repo := range config.RepoMap { repo.State.IsRunning = false } runAsManyAsPossible() @@ -192,22 +84,27 @@ runLoop: loadConfig() case done := <-doneChan: - repoMap[done.Name].SyncCompleted(done.Exit) + sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) numJobsRunning-- case conn := <-connChan: - getAndRunCommand(conn) + // TODO: may want to split this into GetCommand and something else + // to make it more clear tha GetAndRunCommand returns true if + // it starts a sync + if arthur.GetAndRunCommand(conn) { + numJobsRunning-- + } case <-time.After(1 * time.Minute): } runAsManyAsPossible() } - // give time for jobs to terminate before exiting + // give time for all jobs to terminate before exiting program for { select { case done := <-doneChan: - repoMap[done.Name].SyncCompleted(done.Exit) + sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) numJobsRunning-- case <-time.After(1 * time.Second): diff --git a/merlin/merlin_test.go b/merlin/merlin_test.go deleted file mode 100644 index b425ba5..0000000 --- a/merlin/merlin_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "testing" - -func TestSock1(t *testing.T) { - -} diff --git a/merlin/common/sync.go b/merlin/sync/command.go similarity index 56% rename from merlin/common/sync.go rename to merlin/sync/command.go index 846b2a0..77da414 100644 --- a/merlin/common/sync.go +++ b/merlin/sync/command.go @@ -1,50 +1,51 @@ -package common +package sync import ( "fmt" "math/rand" - "os" "strconv" + + "git.csclub.uwaterloo.ca/public/merlin/config" ) -func (repo *Repo) buildRsyncHost() string { +func buildRsyncHost(repo *config.Repo) string { if repo.RsyncUser != "" { repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost } return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir } -func (repo *Repo) buildRsyncDaemonHost() string { +func buildRsyncDaemonHost(repo *config.Repo) string { if repo.RsyncUser != "" { repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost } return repo.RsyncHost + "::" + repo.RsyncDir } -func (repo *Repo) buildRsyncSSHHost() string { +func buildRsyncSSHHost(repo *config.Repo) string { if repo.RsyncUser != "" { repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost } return repo.RsyncHost + ":" + repo.RsyncDir } -func (repo *Repo) buildDownloadDir() string { - return repo.cfg.DownloadDir + "/" + repo.LocalDir +func buildDownloadDir(repo *config.Repo) string { + return config.Conf.DownloadDir + "/" + repo.LocalDir } -func (repo *Repo) CSCSyncApache() []string { +func cscSyncApache(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncArchLinux() []string { +func cscSyncArchLinux(repo *config.Repo) []string { tempDir := "" // is this option even needed? @@ -52,83 +53,83 @@ func (repo *Repo) CSCSyncArchLinux() []string { args := []string{ "rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600", "--contimeout=60", "-p", "--delay-updates", "--no-motd", - "--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + repo.cfg.IPv4Address, + "--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address, } - args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncBadPerms() []string { +func cscSyncBadPerms(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } // TODO ceph -func (repo *Repo) CSCSyncCDImage() []string { +func cscSyncCDImage(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncChmod() []string { +func cscSyncChmod(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--chmod=u=rwX,go=rX", } - args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncDebian() []string { +func cscSyncDebian(repo *config.Repo) []string { // sync /pool args := []string{"nice", "rsync", "-rlHtvp", "--exclude", ".~tmp~/", "--timeout=3600", "-4", - "--address=" + repo.cfg.IPv4Address, + "--address=" + config.Conf.IPv4Address, } // $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1 return args } -func (repo *Repo) CSCSyncDebianCD() []string { +func cscSyncDebianCD(repo *config.Repo) []string { // this is basically the same as CSCSyncDebian, except it has an extra --exclude args := []string{"nice", "rsync", "-rlHtvp", "--delete", "--exclude", ".~tmp~/", "--timeout=3600", "-4", - "--address=" + repo.cfg.IPv4Address, + "--address=" + config.Conf.IPv4Address, // "--exclude", "Archive-Update-in-Progress-${HOSTNAME}" } // $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1 return args } -func (repo *Repo) CSCSyncGentoo() []string { +func cscSyncGentoo(repo *config.Repo) []string { repo.RsyncUser = "gentoo" repo.PasswordFile = "gentoo-distfiles" - return repo.CSCSyncStandard() + return cscSyncStandard(repo) } // TODO s3 -func (repo *Repo) CSCSyncSSH() []string { +func cscSyncSSH(repo *config.Repo) []string { args := []string{ "rsync", "-aH", "--no-owner", "--no-group", "--delete", @@ -138,39 +139,39 @@ func (repo *Repo) CSCSyncSSH() []string { } // not sure if we should be assuming ssh identity file is the password file args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile)) - args = append(args, repo.buildRsyncSSHHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncStandard() []string { +func cscSyncStandard(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } if repo.PasswordFile != "" { - filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile + filename := config.Conf.PasswordDir + "/" + repo.PasswordFile args = append(args, "--password-file", filename) } - args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncStandardIPV6() []string { +func cscSyncStandardIPV6(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + repo.cfg.IPv4Address, + "--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } // for testing, to be removed later -func (repo *Repo) CSCSyncDummy() []string { +func cscSyncDummy(repo *config.Repo) []string { sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10) args := []string{"sleep", sleepDur} @@ -179,7 +180,7 @@ func (repo *Repo) CSCSyncDummy() []string { } // executes a particular sync job depending on repo.SyncType. -func (repo *Repo) getSyncCommand() []string { +func getSyncCommand(repo *config.Repo) []string { /* # scripts used by merlin.py csc-sync-debian @@ -208,78 +209,38 @@ func (repo *Repo) getSyncCommand() []string { switch repo.SyncType { case "csc-sync-apache": - return repo.CSCSyncApache() + return cscSyncApache(repo) case "csc-sync-archlinux": - return repo.CSCSyncArchLinux() + return cscSyncArchLinux(repo) case "csc-sync-badperms": - return repo.CSCSyncBadPerms() + return cscSyncBadPerms(repo) case "csc-sync-cdimage": - return repo.CSCSyncCDImage() + return cscSyncCDImage(repo) // case "csc-sync-ceph": - // return repo.CSCSyncCeph() + // return cscSyncCeph(repo) case "csc-sync-chmod": - return repo.CSCSyncChmod() + return cscSyncChmod(repo) case "csc-sync-debian": - return repo.CSCSyncDebian() + return cscSyncDebian(repo) case "csc-sync-debian-cd": - return repo.CSCSyncDebianCD() + return cscSyncDebianCD(repo) case "csc-sync-gentoo": - return repo.CSCSyncGentoo() + return cscSyncGentoo(repo) // case "csc-sync-s3": - // return repo.CSCSyncS3() + // return cscSyncS3(repo) case "csc-sync-ssh": - return repo.CSCSyncSSH() + return cscSyncSSH(repo) case "csc-sync-standard": - return repo.CSCSyncStandard() + return cscSyncStandard(repo) case "csc-sync-standard-ipv6": - return repo.CSCSyncStandardIPV6() + return cscSyncStandardIPV6(repo) // case "csc-sync-wget": - // return repo.CSCSyncWget() + // return cscSyncWget(repo) case "csc-sync-dummy": - return repo.CSCSyncDummy() + return cscSyncDummy(repo) default: repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") } return []string{} } - -func (repo *Repo) StartSyncJob() { - status := FAILURE - defer func() { - repo.DoneChan <- Result{ - Name: repo.Name, - Exit: status, - } - }() - - localDir := repo.buildDownloadDir() - err := os.MkdirAll(localDir, 0775) - if err != nil { - err = fmt.Errorf("Could not create directory %s: %w", localDir, err) - // I'm not sure if logger can handle error so just use the string? - repo.Logger.Error(err.Error()) - return - } - - args := repo.getSyncCommand() - if repo.PasswordFile != "" { - filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile - args = append(args, "--password-file", filename) - } - args = append(args, repo.buildRsyncHost(), localDir) - - ch := SpawnProcess(repo, args) - if ch == nil { - // SpawnProcess will have already logged error - return - } - cmd := <-ch - switch cmd.ProcessState.ExitCode() { - case 0: - status = SUCCESS - case -1: - status = TERMINATED - // default is already FAILURE - } -} diff --git a/merlin/sync/command_test.go b/merlin/sync/command_test.go new file mode 100644 index 0000000..1ca2a85 --- /dev/null +++ b/merlin/sync/command_test.go @@ -0,0 +1 @@ +package sync diff --git a/merlin/sync/interface.go b/merlin/sync/interface.go new file mode 100644 index 0000000..3b8e30b --- /dev/null +++ b/merlin/sync/interface.go @@ -0,0 +1,59 @@ +package sync + +import ( + "fmt" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" +) + +// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job +// and is not currently running. +// returns true iff a job is started. +func SyncIfPossible(repo *config.Repo) bool { + // Change to SyncIfPossible + if repo.State.IsRunning { + return false + } + + curTime := time.Now().Unix() + if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) { + repo.State.IsRunning = true + repo.State.LastAttemptStartTime = curTime + repo.SaveState() + repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name)) + go startSync(repo) + return true + } + return false +} + +// update the repo state with the last attempt time and exit now that the job is done +func SyncCompleted(repo *config.Repo, exit int) { + repoState := repo.State + syncTook := time.Now().Unix() - repoState.LastAttemptStartTime + nextSync := repo.MaxTime - int(syncTook) + if nextSync < 0 { + nextSync = 0 + } + + repoState.IsRunning = false + repoState.LastAttemptExit = exit + repoState.LastAttemptRunTime = syncTook + + var exitStr string + switch exit { + case config.SUCCESS: + exitStr = "completed" + case config.TERMINATED: + exitStr = "terminated" + default: + exitStr = "failed" + } + repo.SaveState() + repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)) + if exit == config.SUCCESS { + // it is possible that the zfssync from the last repo sync is still running is that fine? + go zfsSync(repo) + } +} diff --git a/merlin/common/process_manager.go b/merlin/sync/sync.go similarity index 54% rename from merlin/common/process_manager.go rename to merlin/sync/sync.go index 4455418..3b8ed0b 100644 --- a/merlin/common/process_manager.go +++ b/merlin/sync/sync.go @@ -1,10 +1,13 @@ -package common +package sync import ( "fmt" + "os" "os/exec" "syscall" "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" ) // SpawnProcess spawns a child process for the given repo. The process will @@ -12,7 +15,7 @@ import ( // runs for longer than the repo's MaxTime. // It returns a channel through which a Cmd will be sent once it has finished, // or nil if it was unable to start a process. -func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { +func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { // startTime and time took will be handled in common.go by SyncExit cmd := exec.Command(args[0], args[1:]...) @@ -68,3 +71,58 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { }() return } + +func startSync(repo *config.Repo) { + status := config.FAILURE + defer func() { + repo.DoneChan <- config.SyncResult{ + Name: repo.Name, + Exit: status, + } + }() + + localDir := buildDownloadDir(repo) + err := os.MkdirAll(localDir, 0775) + if err != nil { + err = fmt.Errorf("Could not create directory %s: %w", localDir, err) + // I'm not sure if logger can handle error so just use the string? + repo.Logger.Error(err.Error()) + return + } + + args := getSyncCommand(repo) + if repo.PasswordFile != "" { + filename := config.Conf.PasswordDir + "/" + repo.PasswordFile + args = append(args, "--password-file", filename) + } + args = append(args, buildRsyncHost(repo), localDir) + + ch := spawnSyncProcess(repo, args) + if ch == nil { + // SpawnProcess will have already logged error + return + } + cmd := <-ch + switch cmd.ProcessState.ExitCode() { + case 0: + status = config.SUCCESS + case -1: + status = config.TERMINATED + // default is already FAILURE + } +} + +func zfsSync(repo *config.Repo) { + out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() + if err != nil { + repo.Logger.Error(err) + } else { + f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + repo.Logger.Error(err.Error()) + } else { + f.Write(out) + } + + } +} -- 2.39.2 From 69fbcfb13d17c3dcf436b490641adc8d5712d78a Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 15 Dec 2021 01:18:52 -0500 Subject: [PATCH 2/8] arthur and config testing and fixes --- merlin/README.md | 1 + merlin/arthur/arthur.go | 122 +++++++------ merlin/arthur/arthur_test.go | 282 ++++++++++++++++++++++++++++++ merlin/config/config.go | 125 ++++++------- merlin/config/config_test.go | 121 +++++++++++++ merlin/config/config_test.ini | 39 +++++ merlin/config/test_files/eeelinux | 5 + merlin/config/utils.go | 13 +- merlin/go.mod | 1 + merlin/logger/logger.go | 4 +- merlin/merlin.go | 30 +++- merlin/sync/command.go | 57 +++--- merlin/sync/interface.go | 2 + merlin/sync/sync.go | 16 +- 14 files changed, 652 insertions(+), 166 deletions(-) create mode 100644 merlin/config/config_test.ini create mode 100644 merlin/config/test_files/eeelinux diff --git a/merlin/README.md b/merlin/README.md index 23b48b0..600057d 100644 --- a/merlin/README.md +++ b/merlin/README.md @@ -20,6 +20,7 @@ This folder contains the code for merlin (which does the actual syncing) and art - [ ] wget ### TODO +- [ ] ensure that the proper permissions (file mode, group, user) are used - [ ] detect if an rsync process is stuck (\*\*) - [ ] place each rsync process in a separate cgroup (\*\*\*) diff --git a/merlin/arthur/arthur.go b/merlin/arthur/arthur.go index ac3bf1e..9a2c418 100644 --- a/merlin/arthur/arthur.go +++ b/merlin/arthur/arthur.go @@ -1,13 +1,13 @@ package arthur import ( - "bytes" "errors" "fmt" - "io" + "io/ioutil" "net" "os" "path/filepath" + "sort" "strings" "text/tabwriter" "time" @@ -17,68 +17,83 @@ import ( "git.csclub.uwaterloo.ca/public/merlin/sync" ) -// get repo by name function in config +// Reads and parses the message sent over the accepted connection +func GetCommand(conn net.Conn) (command, repoName string) { + command = "" + repoName = "" -func GetAndRunCommand(conn net.Conn) (newSync bool) { - newSync = false - defer conn.Close() - - var buf bytes.Buffer - _, err := io.Copy(&buf, conn) + buf, err := ioutil.ReadAll(conn) if err != nil { logger.ErrLog(err.Error()) return } - command := buf.String() - args := strings.Split(command, ":") - respondAndLogErr := func(msg string) { - logger.OutLog(msg) - conn.Write([]byte(msg)) + args := strings.Split(string(buf), ":") + if len(args) >= 1 { + command = args[0] } - - if args[0] == "status" { - status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0) - fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n") - - // for time formating see https://pkg.go.dev/time#pkg-constants - for name, repo := range config.RepoMap { - fmt.Fprintf(status, "%s\t%s\t%s\n", - name, - time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123), - time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123), - ) - } - - status.Flush() - } else if args[0] == "sync" { - if len(args) != 2 { - respondAndLogErr("Could not parse sync command, forced sync fails.") - return - } - - if repo, inMap := config.RepoMap[args[1]]; inMap { - logger.OutLog("Attempting to force sync of " + repo.Name) - if sync.SyncIfPossible(repo) { - conn.Write([]byte("Forced sync for " + repo.Name)) - newSync = true - } else { - respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.") - } - } else { - respondAndLogErr(args[1] + " is not tracked so cannot sync") - } - } else { - respondAndLogErr("Received unrecognized command: " + command) + if len(args) >= 2 { + repoName = args[1] } return } -func UnixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) { +func SendAndLog(conn net.Conn, msg string) { + logger.OutLog(msg) + conn.Write([]byte(msg)) +} + +func SendStatus(conn net.Conn) { + status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0) + fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\tRunning\n") + + keys := make([]string, 0) + for name, _ := range config.RepoMap { + keys = append(keys, name) + } + sort.Strings(keys) + + // for other ways to format the time see: https://pkg.go.dev/time#pkg-constants + for _, name := range keys { + repo := config.RepoMap[name] + lastSync := repo.State.LastAttemptStartTime + nextSync := lastSync + int64(repo.Frequency) + + fmt.Fprintf(status, "%s\t%s\t%s\t%t\n", + name, + time.Unix(lastSync, 0).Format(time.RFC1123), + time.Unix(nextSync, 0).Format(time.RFC1123), + repo.State.IsRunning, + ) + } + + status.Flush() +} + +// Attempt to force the sync of the repo +func ForceSync(conn net.Conn, repoName string) (newSync bool) { + newSync = false + + // TODO: send repoName and every key in RepoMap to lowercase + if repo, isInMap := config.RepoMap[repoName]; isInMap { + logger.OutLog("Attempting to force sync of " + repoName) + if sync.SyncIfPossible(repo) { + conn.Write([]byte("Forced sync for " + repoName)) + newSync = true + } else { + SendAndLog(conn, "Could not force sync: "+repoName+" is already syncing.") + } + } else { + SendAndLog(conn, repoName+" is not tracked so cannot sync") + } + return +} + +func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) { sockpath := config.Conf.SockPath - // must remove cfg.SockPath otherwise get "bind: address already in use" + // must remove old cfg.SockPath otherwise get "bind: address already in use" if filepath.Ext(sockpath) != ".sock" { - panic(fmt.Errorf("Socket file must end with .sock")) + panic(fmt.Errorf("socket file must end with .sock")) } else if _, err := os.Stat(sockpath); err == nil { if err := os.Remove(sockpath); err != nil { panic(err) @@ -95,10 +110,10 @@ func UnixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) { go func() { for { - // will exit when ear is closed + // Attempting to accept on a closed net.Listener will return a non-temporary error conn, err := ear.Accept() if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { + if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() { logger.ErrLog("Accepted socket error: " + err.Error()) continue } @@ -109,6 +124,7 @@ func UnixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) { } }() + // TODO: check handling of multiple SIGHUP <-stopLisChan ear.Close() } diff --git a/merlin/arthur/arthur_test.go b/merlin/arthur/arthur_test.go index a1aaaea..14b4860 100644 --- a/merlin/arthur/arthur_test.go +++ b/merlin/arthur/arthur_test.go @@ -1 +1,283 @@ package arthur + +import ( + "io/ioutil" + "net" + "os" + "testing" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" + "git.csclub.uwaterloo.ca/public/merlin/logger" +) + +func TestStatusCommand(t *testing.T) { + r, w := net.Pipe() + + go func() { + // will only finish write when EOF is sent + // only way to send EOF is to close + w.Write([]byte("status")) + w.Close() + }() + command, repoName := GetCommand(r) + if command != "status" { + t.Errorf("command for status should be \"status\", got " + command) + } else if repoName != "" { + t.Errorf("status should return an empty string for the repoName, got " + repoName) + } +} + +func TestSyncCommand(t *testing.T) { + r, w := net.Pipe() + + go func() { + w.Write([]byte("sync:ubuntu")) + w.Close() + }() + command, repoName := GetCommand(r) + r.Close() + if command != "sync" { + t.Errorf("command for sync:ubuntu should be \"sync\", got " + command) + } else if repoName != "ubuntu" { + t.Errorf("name of repo for sync:ubuntu should be \"ubuntu\", got " + repoName) + } +} + +func TestSendStatus(t *testing.T) { + saveRepoMap := config.RepoMap + defer func() { + config.RepoMap = saveRepoMap + }() + + repoMap := make(map[string]*config.Repo) + repoMap["eeeee"] = &config.Repo{ + Frequency: 30 * 86400, + State: config.RepoState{ + IsRunning: true, + LastAttemptStartTime: 1600000000, + }, + } + repoMap["alinux"] = &config.Repo{ + Frequency: 7*86400 + 3, + State: config.RepoState{ + IsRunning: true, + LastAttemptStartTime: 1620000000, + }, + } + repoMap["lnux"] = &config.Repo{ + Frequency: 86400, + State: config.RepoState{ + IsRunning: false, + LastAttemptStartTime: 1640000000, + }, + } + config.RepoMap = repoMap + + r, w := net.Pipe() + go func() { + SendStatus(w) + w.Close() + }() + msg, err := ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected := `Repository Last Synced Next Expected Sync Running +alinux Sun, 02 May 2021 20:00:00 EDT Sun, 09 May 2021 20:00:03 EDT true +eeeee Sun, 13 Sep 2020 08:26:40 EDT Tue, 13 Oct 2020 08:26:40 EDT true +lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST false +` + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } +} + +func TestForceSync(t *testing.T) { + saveRepos := config.Repos + saveRepoMap := config.RepoMap + doneChan := make(chan config.SyncResult) + defer func() { + config.Repos = saveRepos + config.RepoMap = saveRepoMap + close(doneChan) + }() + + // Part 1: run a dummy sync + repo := config.Repo{ + Name: "nux", + SyncType: "csc-sync-dummy", + Frequency: 7 * 86400, + MaxTime: 30, + Logger: logger.NewLogger("nux", "/tmp/merlin_force_sync_test_logs"), + StateFile: "/tmp/merlin_force_sync_test_state", + DoneChan: doneChan, + State: config.RepoState{ + IsRunning: false, + LastAttemptStartTime: 0, + LastAttemptRunTime: 0, + LastAttemptExit: config.NOT_RUN_YET, + }, + } + config.Repos = nil + config.Repos = append(config.Repos, &repo) + config.RepoMap = make(map[string]*config.Repo) + config.RepoMap["nux"] = &repo + + r, w := net.Pipe() + go func() { + if !ForceSync(w, "nux") { + t.Errorf("Sync for nux did not start") + } + w.Close() + }() + msg, err := ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected := "Forced sync for nux" + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } + + select { + case result := <-doneChan: + if result.Exit != config.SUCCESS { + t.Errorf("Sync should exit with SUCCESS, got %d", result.Exit) + } + case <-time.After(3 * time.Second): + t.Errorf("Dummy sync should be done in 1 second, waited 3 seconds") + } + + // Part 2: attempt the same thing but with repo.State.IsRunning = true + r, w = net.Pipe() + go func() { + if ForceSync(w, "nux") { + t.Errorf("Sync for nux should not have started") + } + w.Close() + }() + msg, err = ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected = "Could not force sync: nux is already syncing." + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } + + select { + case <-doneChan: + t.Errorf("Sync for nux should not have been started") + case <-time.After(2 * time.Second): + } + + // Part 3: attempt a force sync with a repo that does not exist + r, w = net.Pipe() + go func() { + if ForceSync(w, "nixx") { + t.Errorf("Sync for nixx should not have started") + } + w.Close() + }() + msg, err = ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected = "nixx is not tracked so cannot sync" + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } +} + +func TestStartListener(t *testing.T) { + saveConf := config.Conf + connChan := make(chan net.Conn) + stopLisChan := make(chan struct{}) + wait := make(chan struct{}) + defer func() { + config.Conf = saveConf + close(connChan) + close(stopLisChan) + }() + config.Conf = config.Config{ + SockPath: "/tmp/merlin_listener_test.sock", + } + + // Test 1: check that closing/sending something to stopLisChan will stop the listener + // and that a new listener can be created after stopping the old one + go func() { + StartListener(connChan, stopLisChan) + wait <- struct{}{} + }() + stopLisChan <- struct{}{} + select { + case <-wait: + case <-time.After(3 * time.Second): + t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan") + } + + go func() { + StartListener(connChan, stopLisChan) + wait <- struct{}{} + }() + close(stopLisChan) + select { + case <-wait: + case <-time.After(3 * time.Second): + t.Errorf("StartListener should stop when stopLisChan is closed") + } + close(wait) + + // Test 2: check that connections can be made to the unix socket + // this test does not appear to be very stable (I think there is a race condition somewhere) + stopLisChan = make(chan struct{}) + go StartListener(connChan, stopLisChan) + waitForMsg := func(expected string) { + select { + case conn := <-connChan: + msg, err := ioutil.ReadAll(conn) + if err != nil { + t.Errorf(err.Error()) + } else if expected != string(msg) { + t.Errorf("Message expected was " + expected + " got " + string(msg)) + } + conn.Close() + case <-time.After(3 * time.Second): + t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan") + } + } + sendMsg := func(msg string) { + <-time.After(500 * time.Millisecond) + send, err := net.Dial("unix", "/tmp/merlin_listener_test.sock") + if err != nil { + panic(err) + } + _, err = send.Write([]byte(msg)) + if err != nil { + t.Errorf(err.Error()) + } + send.Close() + } + go func() { + waitForMsg("status") + }() + sendMsg("status") + + go func() { + waitForMsg("sync:uuunix") + }() + sendMsg("sync:uuunix") + + go func() { + waitForMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*") + }() + sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*") + + // unsure why I can't put this in the defer + os.Remove("/tmp/merlin_listener_test.sock") +} diff --git a/merlin/config/config.go b/merlin/config/config.go index 48c81fe..b6d2839 100644 --- a/merlin/config/config.go +++ b/merlin/config/config.go @@ -2,6 +2,7 @@ package config import ( "os" + "path/filepath" "gopkg.in/ini.v1" @@ -18,17 +19,18 @@ const ( TEN_MINUTELY = 600 FIVE_MINUTELY = 300 - CONFIG_PATH = "merlin-config.ini" - - DEFAULT_MAX_JOBS = 6 - DEFAULT_MAX_TIME = DAILY / 4 - DEFAULT_DOWNLOAD_DIR = "/mirror/root" - DEFAULT_PASSWORD_DIR = "/home/mirror/passwords" - DEFAULT_STATE_DIR = "/home/mirror/merlin/states" - DEFAULT_LOG_DIR = "/home/mirror/merlin/logs" - DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync" - DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync" - DEFAULT_SOCK_PATH = "/run/merlin.sock" + // could change this into a default_config + DEFAULT_MAX_JOBS = 6 + DEFAULT_MAX_TIME = DAILY / 4 + DEFAULT_SYNC_TYPE = "csc-sync-standard" + DEFAULT_FREQUENCY_STRING = "by-hourly" + DEFAULT_DOWNLOAD_DIR = "/mirror/root" + DEFAULT_PASSWORD_DIR = "/home/mirror/passwords" + DEFAULT_STATE_DIR = "/home/mirror/merlin/states" + DEFAULT_LOG_DIR = "/home/mirror/merlin/logs" + DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync" + DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync" + DEFAULT_SOCK_PATH = "/run/merlin.sock" ) var frequencies = map[string]int{ @@ -62,11 +64,11 @@ type Config struct { IPv4Address string `ini:"ipv4_address"` IPv6Address string `ini:"ipv6_address"` // the default sync type - SyncType string `ini:"default_sync_type"` - // the default frequency string for the repos - FrequencyStr string `ini:"default_frequency"` - // the default MaxTime for each repo - MaxTime int `ini:"default_max_time"` + DefaultSyncType string `ini:"default_sync_type"` + // the default frequency string + DefaultFrequencyStr string `ini:"default_frequency"` + // the default MaxTime + DefaultMaxTime int `ini:"default_max_time"` // the directory where rsync should download files DownloadDir string `ini:"download_dir"` // the directory where rsync passwords are stored @@ -74,15 +76,16 @@ type Config struct { // the directory where the state of each repo sync is saved StateDir string `ini:"states_dir"` // the directory where merlin will store the general logs for each repo - LoggerDir string `ini:"log_dir"` + RepoLogDir string `ini:"repo_logs_dir"` // the directory to store the rsync logs for each repo - RsyncLogDir string `ini:"rsync_log_dir"` + RsyncLogDir string `ini:"rsync_logs_dir"` // the directory to store the zfssync logs for each repo - ZfssyncLogDir string `ini:"zfssync_log_dir"` + ZfssyncLogDir string `ini:"zfssync_logs_dir"` // the Unix socket path which arthur will use to communicate with us SockPath string `ini:"sock_path"` } +// make it more clear when full path should be used vs when just the file name is needed type Repo struct { // the name of this repo Name string `ini:"-"` @@ -106,18 +109,20 @@ type Repo struct { RsyncUser string `ini:"rsync_user"` // the file storing the password for rsync (optional) PasswordFile string `ini:"password_file"` - // the file for general logging of this repo - LoggerFile string `ini:"log_file"` + // the file storing the repo sync state (used to override default) + StateFile string `ini:"state_file"` + // the full file path for general logging of this repo (used to override default) + RepoLogFile string `ini:"repo_log_file"` // a reference to the general logger Logger *logger.Logger `ini:"-"` - // the file for logging this repo's rsync + // the full file path for logging this repo's rsync (used to override default) RsyncLogFile string `ini:"rsync_log_file"` - // the file for logging this repo's zfssync + // the full file path for logging this repo's zfssync (used to override default) ZfssyncLogFile string `ini:"zfssync_log_file"` // the repo will write its name and status in a Result struct to DoneChan // when it has finished a job (shared by all repos) DoneChan chan<- SyncResult `ini:"-"` - // the repo should stop syncing if StopChan is closed (shared by all repos) + // repos should stop syncing if StopChan is closed (shared by all repos) StopChan chan struct{} `ini:"-"` // a struct that stores the repo's status State RepoState `ini:"-"` @@ -145,32 +150,34 @@ var ( // GetConfig reads the config from a JSON file, initializes default values, // and initializes the non-configurable fields of each repo. // It returns a Config. -func LoadConfig(doneChan chan SyncResult, stopChan chan struct{}) { +func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) { // set default values then load config from file newConf := Config{ - MaxJobs: DEFAULT_MAX_JOBS, - MaxTime: DEFAULT_MAX_TIME, - PasswordDir: DEFAULT_PASSWORD_DIR, - DownloadDir: DEFAULT_DOWNLOAD_DIR, - StateDir: DEFAULT_STATE_DIR, - LoggerDir: DEFAULT_LOG_DIR, - RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, - ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR, - SockPath: DEFAULT_SOCK_PATH, + MaxJobs: DEFAULT_MAX_JOBS, + DefaultSyncType: DEFAULT_SYNC_TYPE, + DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING, + DefaultMaxTime: DEFAULT_MAX_TIME, + PasswordDir: DEFAULT_PASSWORD_DIR, + DownloadDir: DEFAULT_DOWNLOAD_DIR, + StateDir: DEFAULT_STATE_DIR, + RepoLogDir: DEFAULT_LOG_DIR, + RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, + ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR, + SockPath: DEFAULT_SOCK_PATH, } - iniInfo, err := ini.Load(CONFIG_PATH) + iniInfo, err := ini.Load(configPath) panicIfErr(err) err = iniInfo.MapTo(&newConf) panicIfErr(err) // check config for major errors - for _, dir := range []string{Conf.StateDir, Conf.LoggerDir, Conf.RsyncLogDir, Conf.ZfssyncLogDir} { + for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} { err := os.MkdirAll(dir, 0755) panicIfErr(err) } - if Conf.IPv4Address == "" { + if newConf.IPv4Address == "" { panic("Missing IPv4 address from config") - } else if Conf.IPv6Address == "" { + } else if newConf.IPv6Address == "" { panic("Missing IPv6 address from config") } @@ -182,26 +189,32 @@ func LoadConfig(doneChan chan SyncResult, stopChan chan struct{}) { } // set the default values for the repo then load from file + // TODO: check if local_dir and repoName are always the same value + // TODO: check to ensure that every Repo.Name is unique (may already be done by ini) repo := Repo{ Name: repoName, - SyncType: Conf.SyncType, - FrequencyStr: Conf.FrequencyStr, - MaxTime: Conf.MaxTime, - LoggerFile: Conf.LoggerDir + "/" + repoName + ".log", - RsyncLogFile: Conf.RsyncLogDir + "/" + repoName + "-rsync.log", - ZfssyncLogFile: Conf.ZfssyncLogDir + "/" + repoName + "-zfssync.log", + SyncType: newConf.DefaultSyncType, + FrequencyStr: newConf.DefaultFrequencyStr, + MaxTime: newConf.DefaultMaxTime, + StateFile: filepath.Join(newConf.StateDir, repoName), + RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log", + RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log", + ZfssyncLogFile: filepath.Join(newConf.ZfssyncLogDir, repoName) + "-zfssync.log", DoneChan: doneChan, StopChan: stopChan, } err := section.MapTo(&repo) panicIfErr(err) + // TODO: ensure that the parent dirs to the file also exist when touching + // or just remove the ability to override touchFiles( - repo.LoggerFile, + repo.StateFile, + repo.RepoLogFile, repo.RsyncLogFile, repo.ZfssyncLogFile, ) - repo.Logger = logger.NewLogger(repo.Name, repo.LoggerFile) + repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile) repo.Frequency = frequencies[repo.FrequencyStr] if repo.SyncType == "" { panic("Missing sync type from " + repo.Name) @@ -215,16 +228,9 @@ func LoadConfig(doneChan chan SyncResult, stopChan chan struct{}) { LastAttemptRunTime: 0, LastAttemptExit: NOT_RUN_YET, } - - // create the state file if it does not exist, otherwise load it from existing file - repoStateFile := Conf.StateDir + "/" + repo.Name - if _, err := os.Stat(repoStateFile); err != nil { - touchFile(repoStateFile) - repo.SaveState() - } else { - err := ini.MapTo(&repo.State, repoStateFile) - panicIfErr(err) - } + err = ini.MapTo(&repo.State, repo.StateFile) + panicIfErr(err) + repo.SaveState() newRepos = append(newRepos, &repo) } @@ -235,6 +241,7 @@ func LoadConfig(doneChan chan SyncResult, stopChan chan struct{}) { Conf = newConf Repos = newRepos + RepoMap = make(map[string]*Repo) for _, repo := range Repos { RepoMap[repo.Name] = repo } @@ -242,17 +249,17 @@ func LoadConfig(doneChan chan SyncResult, stopChan chan struct{}) { // save the current state of the repo to a file func (repo *Repo) SaveState() { - repo.Logger.Debug("Saving state") + // repo.Logger.Debug("Saving state") state_cfg := ini.Empty() if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { repo.Logger.Error(err.Error()) } - file, err := os.OpenFile(Conf.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644) + file, err := os.OpenFile(repo.StateFile, os.O_RDWR|os.O_CREATE, 0644) if err != nil { repo.Logger.Error(err.Error()) } if _, err := state_cfg.WriteTo(file); err != nil { repo.Logger.Error(err.Error()) } - repo.Logger.Debug("Saved state") + // repo.Logger.Debug("Saved state") } diff --git a/merlin/config/config_test.go b/merlin/config/config_test.go index d912156..576ce3d 100644 --- a/merlin/config/config_test.go +++ b/merlin/config/config_test.go @@ -1 +1,122 @@ package config + +import ( + "errors" + "os" + "reflect" + "testing" + + "github.com/davecgh/go-spew/spew" +) + +func TestTouchFiles(t *testing.T) { + files := []string{ + "/tmp/merlin_touch_test_1", + "/tmp/merlin_touch_test_2", + "/tmp/merlin_touch_test_3", + } + touchFiles(files[0], files[1], files[2]) + for _, file := range files { + if _, err := os.Stat(file); err != nil { + t.Errorf(err.Error()) + } else if err := os.Remove(file); err != nil { + t.Errorf(err.Error()) + } + } +} + +func TestPanicIfErr(t *testing.T) { + panicIfErr(nil) + defer func() { recover() }() + panicIfErr(errors.New("AAAAAAAAAA")) + t.Errorf("panicIfErr should have panicked") +} + +func TestLoadConfig(t *testing.T) { + doneChan := make(chan SyncResult) + stopChan := make(chan struct{}) + LoadConfig("config_test.ini", doneChan, stopChan) + // TODO: Fill out parts not part of the ini or state file + expectedConfig := Config{ + MaxJobs: 6, + IPv4Address: "129.97.134.129", + IPv6Address: "2620:101:f000:4901:c5c::129", + DefaultSyncType: "csc-sync-standard", + DefaultFrequencyStr: "daily", + DefaultMaxTime: 1000, + DownloadDir: "/tmp/test-mirror", + PasswordDir: "/home/mirror/passwords", + StateDir: "test_files", + RepoLogDir: "test_files/logs", + RsyncLogDir: "test_files/rsync", + ZfssyncLogDir: "test_files/zfssync", + SockPath: "test_files/test.sock", + } + expectedRepo1 := Repo{ + Name: "eelinux", + SyncType: "csc-sync-nonstandard", + FrequencyStr: "tri-hourly", + Frequency: 10800, + MaxTime: 2000, + LocalDir: "eelinux", + RsyncHost: "rsync.releases.eelinux.ca", + RsyncDir: "releases", + StateFile: "test_files/eeelinux", + RepoLogFile: "test_files/logs/eelinux.log", + Logger: Repos[0].Logger, + RsyncLogFile: "test_files/rsync/eelinux.log", + ZfssyncLogFile: "test_files/zfssync/eelinux.log", + DoneChan: doneChan, + StopChan: stopChan, + State: RepoState{ + IsRunning: false, + LastAttemptStartTime: 1600000000, + LastAttemptRunTime: 100, + LastAttemptExit: 1, + }, + } + expectedRepo2 := Repo{ + Name: "yoland", + SyncType: "csc-sync-standard", + FrequencyStr: "daily", + Frequency: 86400, + MaxTime: 1000, + LocalDir: "yoland-releases", + RsyncHost: "rsync.releases.yoland.io", + RsyncDir: "releases", + StateFile: "test_files/yoland", + RepoLogFile: "test_files/logs/yoland.log", + Logger: Repos[1].Logger, + RsyncLogFile: "test_files/rsync/yoland-rsync.log", + ZfssyncLogFile: "test_files/zfssync/yoland-zfssync.log", + DoneChan: doneChan, + StopChan: stopChan, + State: RepoState{ + IsRunning: false, + LastAttemptStartTime: 0, + LastAttemptRunTime: 0, + LastAttemptExit: 0, + }, + } + + if !reflect.DeepEqual(expectedConfig, Conf) { + t.Errorf("Config loaded does not match expected config") + spew.Dump(expectedConfig) + spew.Dump(Conf) + } + if !reflect.DeepEqual(expectedRepo1, *Repos[0]) { + t.Errorf("The eelinux repo loaded does not match the exected repo config") + spew.Dump(expectedRepo1) + spew.Dump(*Repos[0]) + } + if !reflect.DeepEqual(expectedRepo2, *Repos[1]) { + t.Errorf("The yoland repo loaded does not match the exected repo config") + spew.Dump(expectedRepo2) + spew.Dump(*Repos[1]) + } + + os.Remove("test_files/yoland") + os.RemoveAll("test_files/logs") + os.RemoveAll("test_files/rsync") + os.RemoveAll("test_files/zfssync") +} diff --git a/merlin/config/config_test.ini b/merlin/config/config_test.ini new file mode 100644 index 0000000..e240b8a --- /dev/null +++ b/merlin/config/config_test.ini @@ -0,0 +1,39 @@ +; default values are commented out + +; max_jobs = 6 +ipv4_address = 129.97.134.129 +ipv6_address = 2620:101:f000:4901:c5c::129 +; default_sync_type = csc-sync-standard +default_frequency = daily +default_max_time = 1000 +download_dir = /tmp/test-mirror +; password_dir = /home/mirror/passwords +states_dir = test_files +repo_logs_dir = test_files/logs +rsync_logs_dir = test_files/rsync +zfssync_logs_dir = test_files/zfssync +sock_path = test_files/test.sock + +[eelinux] +sync_type = csc-sync-nonstandard +frequency = tri-hourly +max_time = 2000 +local_dir = eelinux +rsync_host = rsync.releases.eelinux.ca +rsync_dir = releases +state_file = test_files/eeelinux +repo_log_file = test_files/logs/eelinux.log +rsync_log_file = test_files/rsync/eelinux.log +zfssync_log_file = test_files/zfssync/eelinux.log + +[yoland] +; sync_type = csc-sync-standard +; frequency = daily +; max_time = 1000 +local_dir = yoland-releases +rsync_host = rsync.releases.yoland.io +rsync_dir = releases +; state_file = test_files/yoland +; repo_log_file = test_files/logs/yoland.log +; rsync_log_file = test_files/rsync/yoland-rsync.log +; zfssync_log_file = test_files/zfssync/yoland-zfssync.log \ No newline at end of file diff --git a/merlin/config/test_files/eeelinux b/merlin/config/test_files/eeelinux new file mode 100644 index 0000000..571d74e --- /dev/null +++ b/merlin/config/test_files/eeelinux @@ -0,0 +1,5 @@ +is_running = false +last_attempt_time = 1600000000 +last_attempt_runtime = 100 +last_attempt_exit = 1 + diff --git a/merlin/config/utils.go b/merlin/config/utils.go index a28ea6d..a3e1e56 100644 --- a/merlin/config/utils.go +++ b/merlin/config/utils.go @@ -21,14 +21,13 @@ func touchFile(file string) { f.Close() } else if fi.IsDir() { panic(fmt.Errorf("%s is a directory", file)) - } else if os.Geteuid() != 1001 { - // UID 1001 is the hardcoded uid for mirror - err := os.Chown(file, 1001, os.Getegid()) - panicIfErr(err) - } else if fi.Mode().Perm() != 0644 { - err := os.Chmod(file, 0644) - panicIfErr(err) + // } else if os.Geteuid() != 1001 { + // // mirror is UID 1001 + // err := os.Chown(file, 1001, os.Getegid()) + // panicIfErr(err) } + err = os.Chmod(file, 0644) + panicIfErr(err) } func touchFiles(files ...string) { diff --git a/merlin/go.mod b/merlin/go.mod index 1c3e571..256c295 100644 --- a/merlin/go.mod +++ b/merlin/go.mod @@ -1,6 +1,7 @@ module git.csclub.uwaterloo.ca/public/merlin require ( + github.com/davecgh/go-spew v1.1.0 github.com/stretchr/testify v1.7.0 // indirect golang.org/x/sys v0.0.0-20210915083310-ed5796bab164 gopkg.in/ini.v1 v1.63.2 diff --git a/merlin/logger/logger.go b/merlin/logger/logger.go index 642efa2..bd5c6a1 100644 --- a/merlin/logger/logger.go +++ b/merlin/logger/logger.go @@ -67,7 +67,9 @@ func (logger *Logger) Debug(v ...interface{}) { } func (logger *Logger) Info(v ...interface{}) { - OutLog(append([]interface{}{"[" + logger.name + "]"}, v...)) + // src := []interface{}{logger.name + ":"} + // args := append(src, v...) + OutLog(v...) logger.Log(INFO, v...) } diff --git a/merlin/merlin.go b/merlin/merlin.go index 83b390e..b8e3cff 100644 --- a/merlin/merlin.go +++ b/merlin/merlin.go @@ -16,6 +16,9 @@ import ( "git.csclub.uwaterloo.ca/public/merlin/sync" ) +// get config path from command args +var CONFIG_PATH = "merlin-config.ini" + func main() { // check that merlin is run as mirror user // check that mirror user has pid of 1001 @@ -40,11 +43,11 @@ func main() { repoIdx := 0 loadConfig := func() { - config.LoadConfig(doneChan, stopChan) + config.LoadConfig(CONFIG_PATH, doneChan, stopChan) logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf)) repoIdx = 0 - go arthur.UnixSockListener(connChan, stopLisChan) + go arthur.StartListener(connChan, stopLisChan) } // We use a round-robin strategy. It's not the most efficient, but it's simple // (read: easy to understand) and guarantees each repo will eventually get a chance to run. @@ -66,7 +69,8 @@ func main() { loadConfig() // ensure that IsRunning is false otherwise repo will never sync - for _, repo := range config.RepoMap { + // (only on startup can we assume that repos were not previously syncing) + for _, repo := range config.Repos { repo.State.IsRunning = false } runAsManyAsPossible() @@ -82,18 +86,28 @@ runLoop: case <-reloadSig: stopLisChan <- struct{}{} loadConfig() + // ensure that SyncCompleted can handle it if reloading config + // removes a repo that was already syncing case done := <-doneChan: sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) numJobsRunning-- case conn := <-connChan: - // TODO: may want to split this into GetCommand and something else - // to make it more clear tha GetAndRunCommand returns true if - // it starts a sync - if arthur.GetAndRunCommand(conn) { - numJobsRunning-- + command, repoName := arthur.GetCommand(conn) + switch command { + case "status": + arthur.SendStatus(conn) + case "sync": + if arthur.ForceSync(conn, repoName) { + numJobsRunning++ + } + default: + arthur.SendAndLog(conn, "Received unrecognized command: "+command) } + // None of the arthur functions close the connection so you will need to + // close it manually for the message to be sent + conn.Close() case <-time.After(1 * time.Minute): } diff --git a/merlin/sync/command.go b/merlin/sync/command.go index 77da414..bbc1929 100644 --- a/merlin/sync/command.go +++ b/merlin/sync/command.go @@ -2,8 +2,7 @@ package sync import ( "fmt" - "math/rand" - "strconv" + "os" "git.csclub.uwaterloo.ca/public/merlin/config" ) @@ -170,17 +169,14 @@ func cscSyncStandardIPV6(repo *config.Repo) []string { return args } -// for testing, to be removed later func cscSyncDummy(repo *config.Repo) []string { - - sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10) - args := []string{"sleep", sleepDur} + args := []string{"sleep", "1"} return args } // executes a particular sync job depending on repo.SyncType. -func getSyncCommand(repo *config.Repo) []string { +func getSyncCommand(repo *config.Repo) (args []string) { /* # scripts used by merlin.py csc-sync-debian @@ -207,40 +203,55 @@ func getSyncCommand(repo *config.Repo) []string { ubuntu-releases-sync */ switch repo.SyncType { - case "csc-sync-apache": - return cscSyncApache(repo) + args = cscSyncApache(repo) case "csc-sync-archlinux": - return cscSyncArchLinux(repo) + args = cscSyncArchLinux(repo) case "csc-sync-badperms": - return cscSyncBadPerms(repo) + args = cscSyncBadPerms(repo) case "csc-sync-cdimage": - return cscSyncCDImage(repo) + args = cscSyncCDImage(repo) // case "csc-sync-ceph": - // return cscSyncCeph(repo) + // args = cscSyncCeph(repo) case "csc-sync-chmod": - return cscSyncChmod(repo) + args = cscSyncChmod(repo) case "csc-sync-debian": - return cscSyncDebian(repo) + args = cscSyncDebian(repo) case "csc-sync-debian-cd": - return cscSyncDebianCD(repo) + args = cscSyncDebianCD(repo) case "csc-sync-gentoo": - return cscSyncGentoo(repo) + args = cscSyncGentoo(repo) // case "csc-sync-s3": - // return cscSyncS3(repo) + // args = cscSyncS3(repo) case "csc-sync-ssh": - return cscSyncSSH(repo) + args = cscSyncSSH(repo) case "csc-sync-standard": - return cscSyncStandard(repo) + args = cscSyncStandard(repo) case "csc-sync-standard-ipv6": - return cscSyncStandardIPV6(repo) + args = cscSyncStandardIPV6(repo) // case "csc-sync-wget": - // return cscSyncWget(repo) + // args = cscSyncWget(repo) case "csc-sync-dummy": return cscSyncDummy(repo) default: repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") } - return []string{} + localDir := buildDownloadDir(repo) + err := os.MkdirAll(localDir, 0775) + if err != nil { + err = fmt.Errorf("could not create directory %s: %w", localDir, err) + // I'm not sure if logger can handle error so just use the string? + repo.Logger.Error(err.Error()) + return + } + + if repo.PasswordFile != "" { + filename := config.Conf.PasswordDir + "/" + repo.PasswordFile + args = append(args, "--password-file", filename) + } + + args = append(args, buildRsyncHost(repo), localDir) + + return } diff --git a/merlin/sync/interface.go b/merlin/sync/interface.go index 3b8e30b..41a444c 100644 --- a/merlin/sync/interface.go +++ b/merlin/sync/interface.go @@ -52,6 +52,8 @@ func SyncCompleted(repo *config.Repo, exit int) { } repo.SaveState() repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)) + + // TODO: make it possible for this SyncCompleted to be run without zfsSync being run if exit == config.SUCCESS { // it is possible that the zfssync from the last repo sync is still running is that fine? go zfsSync(repo) diff --git a/merlin/sync/sync.go b/merlin/sync/sync.go index 3b8ed0b..81446eb 100644 --- a/merlin/sync/sync.go +++ b/merlin/sync/sync.go @@ -58,6 +58,7 @@ func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { case <-cmdDoneChan: if !cmd.ProcessState.Success() { repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) + // repo.Logger.Info(strings.Join(args, " ")) } case <-repo.StopChan: @@ -81,22 +82,7 @@ func startSync(repo *config.Repo) { } }() - localDir := buildDownloadDir(repo) - err := os.MkdirAll(localDir, 0775) - if err != nil { - err = fmt.Errorf("Could not create directory %s: %w", localDir, err) - // I'm not sure if logger can handle error so just use the string? - repo.Logger.Error(err.Error()) - return - } - args := getSyncCommand(repo) - if repo.PasswordFile != "" { - filename := config.Conf.PasswordDir + "/" + repo.PasswordFile - args = append(args, "--password-file", filename) - } - args = append(args, buildRsyncHost(repo), localDir) - ch := spawnSyncProcess(repo, args) if ch == nil { // SpawnProcess will have already logged error -- 2.39.2 From 3f541ec31fa9330d6cc12026ed8f333c4e08b492 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 15 Dec 2021 01:29:20 -0500 Subject: [PATCH 3/8] add drone.io --- .drone.yml | 17 +++++++++++++++++ .gitignore | 1 - 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 .drone.yml diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..755f90b --- /dev/null +++ b/.drone.yml @@ -0,0 +1,17 @@ +--- +kind: pipeline +type: docker +name: default + +steps: +- name: merlin + image: golang:1.17 + commands: + # add linter + - cd merlin + - go build + - go test + +trigger: + event: + - push diff --git a/.gitignore b/.gitignore index f9b3e14..c746cbd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -/.* !.gitignore .git_old/ /dead.letter -- 2.39.2 From bc9f5f1dbd251b19e17f77ccf0c936a983353da9 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 15 Dec 2021 17:26:58 -0500 Subject: [PATCH 4/8] test drone.io --- merlin/te | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 merlin/te diff --git a/merlin/te b/merlin/te new file mode 100644 index 0000000..e69de29 -- 2.39.2 From de0e175c929154a298f7df9b29baba3a9d8e55d9 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 15 Dec 2021 17:34:49 -0500 Subject: [PATCH 5/8] test drone.io --- .drone.yml | 6 ++++-- merlin/te | 0 2 files changed, 4 insertions(+), 2 deletions(-) delete mode 100644 merlin/te diff --git a/.drone.yml b/.drone.yml index 755f90b..5b24517 100644 --- a/.drone.yml +++ b/.drone.yml @@ -13,5 +13,7 @@ steps: - go test trigger: - event: - - push + branch: + - master + - go + - refactor diff --git a/merlin/te b/merlin/te deleted file mode 100644 index e69de29..0000000 -- 2.39.2 From 5049b40ad97f7ec139c91efcc787e2557f7741bd Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 15 Dec 2021 17:37:47 -0500 Subject: [PATCH 6/8] attempt to trigger drone.io --- merlin/hi | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 merlin/hi diff --git a/merlin/hi b/merlin/hi new file mode 100644 index 0000000..e69de29 -- 2.39.2 From ea292c82f1e0a7d8285569294add931f66fea04f Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 15 Dec 2021 17:55:36 -0500 Subject: [PATCH 7/8] trigger drone.io --- merlin/hi | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 merlin/hi diff --git a/merlin/hi b/merlin/hi deleted file mode 100644 index e69de29..0000000 -- 2.39.2 From 4a76506b3544609d0eb81a06d8c615f4f5af9fc3 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 15 Dec 2021 18:02:48 -0500 Subject: [PATCH 8/8] update .drone.yml to actually run the tests --- .drone.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.drone.yml b/.drone.yml index 5b24517..0d02039 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,4 +1,3 @@ ---- kind: pipeline type: docker name: default @@ -10,7 +9,7 @@ steps: # add linter - cd merlin - go build - - go test + - go test ./... trigger: branch: -- 2.39.2