diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..0d02039 --- /dev/null +++ b/.drone.yml @@ -0,0 +1,18 @@ +kind: pipeline +type: docker +name: default + +steps: +- name: merlin + image: golang:1.17 + commands: + # add linter + - cd merlin + - go build + - go test ./... + +trigger: + branch: + - master + - go + - refactor diff --git a/.gitignore b/.gitignore index f9b3e14..c746cbd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -/.* !.gitignore .git_old/ /dead.letter diff --git a/merlin/README.md b/merlin/README.md index 23b48b0..600057d 100644 --- a/merlin/README.md +++ b/merlin/README.md @@ -20,6 +20,7 @@ This folder contains the code for merlin (which does the actual syncing) and art - [ ] wget ### TODO +- [ ] ensure that the proper permissions (file mode, group, user) are used - [ ] detect if an rsync process is stuck (\*\*) - [ ] place each rsync process in a separate cgroup (\*\*\*) diff --git a/merlin/arthur/arthur.go b/merlin/arthur/arthur.go new file mode 100644 index 0000000..9a2c418 --- /dev/null +++ b/merlin/arthur/arthur.go @@ -0,0 +1,130 @@ +package arthur + +import ( + "errors" + "fmt" + "io/ioutil" + "net" + "os" + "path/filepath" + "sort" + "strings" + "text/tabwriter" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" + "git.csclub.uwaterloo.ca/public/merlin/logger" + "git.csclub.uwaterloo.ca/public/merlin/sync" +) + +// Reads and parses the message sent over the accepted connection +func GetCommand(conn net.Conn) (command, repoName string) { + command = "" + repoName = "" + + buf, err := ioutil.ReadAll(conn) + if err != nil { + logger.ErrLog(err.Error()) + return + } + + args := strings.Split(string(buf), ":") + if len(args) >= 1 { + command = args[0] + } + if len(args) >= 2 { + repoName = args[1] + } + return +} + +func SendAndLog(conn net.Conn, msg string) { + logger.OutLog(msg) + conn.Write([]byte(msg)) +} + +func SendStatus(conn net.Conn) { + status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0) + fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\tRunning\n") + + keys := make([]string, 0) + for name, _ := range config.RepoMap { + keys = append(keys, name) + } + sort.Strings(keys) + + // for other ways to format the time see: https://pkg.go.dev/time#pkg-constants + for _, name := range keys { + repo := config.RepoMap[name] + lastSync := repo.State.LastAttemptStartTime + nextSync := lastSync + int64(repo.Frequency) + + fmt.Fprintf(status, "%s\t%s\t%s\t%t\n", + name, + time.Unix(lastSync, 0).Format(time.RFC1123), + time.Unix(nextSync, 0).Format(time.RFC1123), + repo.State.IsRunning, + ) + } + + status.Flush() +} + +// Attempt to force the sync of the repo +func ForceSync(conn net.Conn, repoName string) (newSync bool) { + newSync = false + + // TODO: send repoName and every key in RepoMap to lowercase + if repo, isInMap := config.RepoMap[repoName]; isInMap { + logger.OutLog("Attempting to force sync of " + repoName) + if sync.SyncIfPossible(repo) { + conn.Write([]byte("Forced sync for " + repoName)) + newSync = true + } else { + SendAndLog(conn, "Could not force sync: "+repoName+" is already syncing.") + } + } else { + SendAndLog(conn, repoName+" is not tracked so cannot sync") + } + return +} + +func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) { + sockpath := config.Conf.SockPath + // must remove old cfg.SockPath otherwise get "bind: address already in use" + if filepath.Ext(sockpath) != ".sock" { + panic(fmt.Errorf("socket file must end with .sock")) + } else if _, err := os.Stat(sockpath); err == nil { + if err := os.Remove(sockpath); err != nil { + panic(err) + } + } else if !errors.Is(err, os.ErrNotExist) { + panic(err) + } + + ear, err := net.Listen("unix", sockpath) + if err != nil { + panic(err) + } + logger.OutLog("Listening to unix socket at " + sockpath) + + go func() { + for { + // Attempting to accept on a closed net.Listener will return a non-temporary error + conn, err := ear.Accept() + if err != nil { + if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() { + logger.ErrLog("Accepted socket error: " + err.Error()) + continue + } + logger.ErrLog("Unhandlable socket error: " + err.Error()) + return + } + connChan <- conn + } + }() + + // TODO: check handling of multiple SIGHUP + <-stopLisChan + ear.Close() +} diff --git a/merlin/arthur/arthur_test.go b/merlin/arthur/arthur_test.go new file mode 100644 index 0000000..14b4860 --- /dev/null +++ b/merlin/arthur/arthur_test.go @@ -0,0 +1,283 @@ +package arthur + +import ( + "io/ioutil" + "net" + "os" + "testing" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" + "git.csclub.uwaterloo.ca/public/merlin/logger" +) + +func TestStatusCommand(t *testing.T) { + r, w := net.Pipe() + + go func() { + // will only finish write when EOF is sent + // only way to send EOF is to close + w.Write([]byte("status")) + w.Close() + }() + command, repoName := GetCommand(r) + if command != "status" { + t.Errorf("command for status should be \"status\", got " + command) + } else if repoName != "" { + t.Errorf("status should return an empty string for the repoName, got " + repoName) + } +} + +func TestSyncCommand(t *testing.T) { + r, w := net.Pipe() + + go func() { + w.Write([]byte("sync:ubuntu")) + w.Close() + }() + command, repoName := GetCommand(r) + r.Close() + if command != "sync" { + t.Errorf("command for sync:ubuntu should be \"sync\", got " + command) + } else if repoName != "ubuntu" { + t.Errorf("name of repo for sync:ubuntu should be \"ubuntu\", got " + repoName) + } +} + +func TestSendStatus(t *testing.T) { + saveRepoMap := config.RepoMap + defer func() { + config.RepoMap = saveRepoMap + }() + + repoMap := make(map[string]*config.Repo) + repoMap["eeeee"] = &config.Repo{ + Frequency: 30 * 86400, + State: config.RepoState{ + IsRunning: true, + LastAttemptStartTime: 1600000000, + }, + } + repoMap["alinux"] = &config.Repo{ + Frequency: 7*86400 + 3, + State: config.RepoState{ + IsRunning: true, + LastAttemptStartTime: 1620000000, + }, + } + repoMap["lnux"] = &config.Repo{ + Frequency: 86400, + State: config.RepoState{ + IsRunning: false, + LastAttemptStartTime: 1640000000, + }, + } + config.RepoMap = repoMap + + r, w := net.Pipe() + go func() { + SendStatus(w) + w.Close() + }() + msg, err := ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected := `Repository Last Synced Next Expected Sync Running +alinux Sun, 02 May 2021 20:00:00 EDT Sun, 09 May 2021 20:00:03 EDT true +eeeee Sun, 13 Sep 2020 08:26:40 EDT Tue, 13 Oct 2020 08:26:40 EDT true +lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST false +` + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } +} + +func TestForceSync(t *testing.T) { + saveRepos := config.Repos + saveRepoMap := config.RepoMap + doneChan := make(chan config.SyncResult) + defer func() { + config.Repos = saveRepos + config.RepoMap = saveRepoMap + close(doneChan) + }() + + // Part 1: run a dummy sync + repo := config.Repo{ + Name: "nux", + SyncType: "csc-sync-dummy", + Frequency: 7 * 86400, + MaxTime: 30, + Logger: logger.NewLogger("nux", "/tmp/merlin_force_sync_test_logs"), + StateFile: "/tmp/merlin_force_sync_test_state", + DoneChan: doneChan, + State: config.RepoState{ + IsRunning: false, + LastAttemptStartTime: 0, + LastAttemptRunTime: 0, + LastAttemptExit: config.NOT_RUN_YET, + }, + } + config.Repos = nil + config.Repos = append(config.Repos, &repo) + config.RepoMap = make(map[string]*config.Repo) + config.RepoMap["nux"] = &repo + + r, w := net.Pipe() + go func() { + if !ForceSync(w, "nux") { + t.Errorf("Sync for nux did not start") + } + w.Close() + }() + msg, err := ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected := "Forced sync for nux" + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } + + select { + case result := <-doneChan: + if result.Exit != config.SUCCESS { + t.Errorf("Sync should exit with SUCCESS, got %d", result.Exit) + } + case <-time.After(3 * time.Second): + t.Errorf("Dummy sync should be done in 1 second, waited 3 seconds") + } + + // Part 2: attempt the same thing but with repo.State.IsRunning = true + r, w = net.Pipe() + go func() { + if ForceSync(w, "nux") { + t.Errorf("Sync for nux should not have started") + } + w.Close() + }() + msg, err = ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected = "Could not force sync: nux is already syncing." + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } + + select { + case <-doneChan: + t.Errorf("Sync for nux should not have been started") + case <-time.After(2 * time.Second): + } + + // Part 3: attempt a force sync with a repo that does not exist + r, w = net.Pipe() + go func() { + if ForceSync(w, "nixx") { + t.Errorf("Sync for nixx should not have started") + } + w.Close() + }() + msg, err = ioutil.ReadAll(r) + r.Close() + if err != nil { + t.Errorf(err.Error()) + } + expected = "nixx is not tracked so cannot sync" + if expected != string(msg) { + t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg)) + } +} + +func TestStartListener(t *testing.T) { + saveConf := config.Conf + connChan := make(chan net.Conn) + stopLisChan := make(chan struct{}) + wait := make(chan struct{}) + defer func() { + config.Conf = saveConf + close(connChan) + close(stopLisChan) + }() + config.Conf = config.Config{ + SockPath: "/tmp/merlin_listener_test.sock", + } + + // Test 1: check that closing/sending something to stopLisChan will stop the listener + // and that a new listener can be created after stopping the old one + go func() { + StartListener(connChan, stopLisChan) + wait <- struct{}{} + }() + stopLisChan <- struct{}{} + select { + case <-wait: + case <-time.After(3 * time.Second): + t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan") + } + + go func() { + StartListener(connChan, stopLisChan) + wait <- struct{}{} + }() + close(stopLisChan) + select { + case <-wait: + case <-time.After(3 * time.Second): + t.Errorf("StartListener should stop when stopLisChan is closed") + } + close(wait) + + // Test 2: check that connections can be made to the unix socket + // this test does not appear to be very stable (I think there is a race condition somewhere) + stopLisChan = make(chan struct{}) + go StartListener(connChan, stopLisChan) + waitForMsg := func(expected string) { + select { + case conn := <-connChan: + msg, err := ioutil.ReadAll(conn) + if err != nil { + t.Errorf(err.Error()) + } else if expected != string(msg) { + t.Errorf("Message expected was " + expected + " got " + string(msg)) + } + conn.Close() + case <-time.After(3 * time.Second): + t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan") + } + } + sendMsg := func(msg string) { + <-time.After(500 * time.Millisecond) + send, err := net.Dial("unix", "/tmp/merlin_listener_test.sock") + if err != nil { + panic(err) + } + _, err = send.Write([]byte(msg)) + if err != nil { + t.Errorf(err.Error()) + } + send.Close() + } + go func() { + waitForMsg("status") + }() + sendMsg("status") + + go func() { + waitForMsg("sync:uuunix") + }() + sendMsg("sync:uuunix") + + go func() { + waitForMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*") + }() + sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*") + + // unsure why I can't put this in the defer + os.Remove("/tmp/merlin_listener_test.sock") +} diff --git a/merlin/common/common.go b/merlin/common/common.go deleted file mode 100644 index 511f987..0000000 --- a/merlin/common/common.go +++ /dev/null @@ -1,352 +0,0 @@ -package common - -import ( - "fmt" - "os" - "os/exec" - "time" - - ini "gopkg.in/ini.v1" -) - -const ( - DAILY = 86400 - TWICE_DAILY = DAILY / 2 - HOURLY = 3600 - TWICE_HOURLY = HOURLY / 2 - BI_HOURLY = HOURLY * 2 - TRI_HOURLY = HOURLY * 3 - TEN_MINUTELY = 600 - FIVE_MINUTELY = 300 - - CONFIG_PATH = "merlin-config.ini" - - DEFAULT_MAX_JOBS = 6 - DEFAULT_MAX_TIME = DAILY / 4 - DEFAULT_DOWNLOAD_DIR = "/mirror/root" - DEFAULT_PASSWORD_DIR = "/home/mirror/passwords" - DEFAULT_STATE_DIR = "/home/mirror/merlin/states" - DEFAULT_LOG_DIR = "/home/mirror/merlin/logs" - DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync" - DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync" - DEFAULT_SOCK_PATH = "/run/merlin.sock" -) - -var frequencies = map[string]int{ - "daily": DAILY, - "twice-daily": TWICE_DAILY, - "hourly": HOURLY, - "twice-hourly": TWICE_HOURLY, - "bi-hourly": BI_HOURLY, - "tri-hourly": TRI_HOURLY, - "ten-minutely": TEN_MINUTELY, - "five-minutely": FIVE_MINUTELY, -} - -// Last job attempt statuses -const ( - NOT_RUN_YET = iota - SUCCESS - FAILURE - TERMINATED // was killed by a signal -) - -type Result struct { - Name string - Exit int -} - -type Repo struct { - // the name of this repo - Name string `ini:"-"` - // this should be one of "csc-sync-standard", etc. - SyncType string `ini:"sync_type"` - // a human-readable frequency, e.g. "bi-hourly" - FrequencyStr string `ini:"frequency"` - // the desired interval (in seconds) between successive runs - Frequency int `ini:"-"` - // the maximum time (in seconds) that each child process of this repo - // can for before being killed - MaxTime int `ini:"max_time"` - // where to download the files for this repo (relative to the download - // dir in the config) - LocalDir string `ini:"local_dir"` - // the remote host to rsync from - RsyncHost string `ini:"rsync_host"` - // the remote directory on the rsync host - RsyncDir string `ini:"rsync_dir"` - // the rsync user (optional) - RsyncUser string `ini:"rsync_user"` - // the file storing the password for rsync (optional) - PasswordFile string `ini:"password_file"` - // the file for general logging of this repo - LoggerFile string `ini:"log_file"` - // a reference to the general logger - Logger *Logger `ini:"-"` - // the file for logging this repo's rsync - RsyncLogFile string `ini:"rsync_log_file"` - // the file for logging this repo's zfssync - ZfssyncLogFile string `ini:"zfssync_log_file"` - // the repo will write its name and status in a Result struct to DoneChan - // when it has finished a job (shared by all repos) - DoneChan chan<- Result `ini:"-"` - // the repo should stop syncing if StopChan is closed (shared by all repos) - StopChan chan struct{} `ini:"-"` - // a struct that stores the repo's status - State RepoState `ini:"-"` - // a reference to the global config - cfg *Config `ini:"-"` -} - -type Config struct { - // the maximum number of jobs allowed to execute concurrently - MaxJobs int `ini:"max_jobs"` - // the IP addresses to use for rsync - IPv4Address string `ini:"ipv4_address"` - IPv6Address string `ini:"ipv6_address"` - // the default sync type - SyncType string `ini:"default_sync_type"` - // the default frequency string for the repos - FrequencyStr string `ini:"default_frequency"` - // the default MaxTime for each repo - MaxTime int `ini:"default_max_time"` - // the directory where rsync should download files - DownloadDir string `ini:"download_dir"` - // the directory where rsync passwords are stored - PasswordDir string `ini:"password_dir"` - // the directory where the state of each repo sync is saved - StateDir string `ini:"states_dir"` - // the directory where merlin will store the general logs for each repo - LoggerDir string `ini:"log_dir"` - // the directory to store the rsync logs for each repo - RsyncLogDir string `ini:"rsync_log_dir"` - // the directory to store the zfssync logs for each repo - ZfssyncLogDir string `ini:"zfssync_log_dir"` - // the Unix socket path which arthur will use to communicate with us - SockPath string `ini:"sock_path"` - // a list of all of the repos - Repos []*Repo `ini:"-"` -} - -// This should only be modified by the main thread -type RepoState struct { - // these are stored in the states folder - // whether this repo is running a job or not - IsRunning bool `ini:"is_running"` - // the Unix epoch timestamp at which this repo last attempted a job - LastAttemptStartTime int64 `ini:"last_attempt_time"` - // the number of seconds this repo ran for during its last attempted job - LastAttemptRunTime int64 `ini:"last_attempt_runtime"` - // whether the last attempt was successful or not - LastAttemptExit int `ini:"last_attempt_exit"` -} - -// save the current state of the repo to a file -func (repo *Repo) SaveState() { - repo.Logger.Debug("Saving state") - state_cfg := ini.Empty() - if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { - repo.Logger.Error(err.Error()) - } - file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - repo.Logger.Error(err.Error()) - } - if _, err := state_cfg.WriteTo(file); err != nil { - repo.Logger.Error(err.Error()) - } - repo.Logger.Debug("Saved state") -} - -// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job -// and is not currently running. -// returns true iff a job is started. -func (repo *Repo) RunIfPossible() bool { - if repo.State.IsRunning { - return false - } - - curTime := time.Now().Unix() - if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) { - repo.State.IsRunning = true - repo.State.LastAttemptStartTime = curTime - repo.SaveState() - repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name)) - go repo.StartSyncJob() - return true - } - return false -} - -func zfsSync(repo *Repo) { - out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() - if err != nil { - repo.Logger.Error(err) - } else { - f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - repo.Logger.Error(err.Error()) - } else { - f.Write(out) - } - - } -} - -// update the repo state with the last attempt time and exit now that the job is done -func (repo *Repo) SyncCompleted(exit int) { - repoState := repo.State - syncTook := time.Now().Unix() - repoState.LastAttemptStartTime - nextSync := repo.MaxTime - int(syncTook) - if nextSync < 0 { - nextSync = 0 - } - - repoState.IsRunning = false - repoState.LastAttemptExit = exit - repoState.LastAttemptRunTime = syncTook - - var exitStr string - switch exit { - case SUCCESS: - exitStr = "completed" - case TERMINATED: - exitStr = "terminated" - default: - exitStr = "failed" - } - repo.SaveState() - repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)) - if exit == SUCCESS { - // it is possible that the zfssync from the last repo sync is still running is that fine? - go zfsSync(repo) - } -} - -func panicIfErr(e error) { - if e != nil { - panic(e) - } -} - -func touchFile(file string) { - fi, err := os.Stat(file) - if err != nil { - f, err := os.OpenFile(file, os.O_CREATE, 0644) - if err != nil { - panic(fmt.Errorf("unable to create file %s", file)) - } - f.Close() - } else if fi.IsDir() { - panic(fmt.Errorf("%s is a directory", file)) - } else if os.Geteuid() != 1001 { - // UID 1001 is the hardcoded uid for mirror - err := os.Chown(file, 1001, os.Getegid()) - panicIfErr(err) - } else if fi.Mode().Perm() != 0644 { - err := os.Chmod(file, 0644) - panicIfErr(err) - } -} - -func touchFiles(files ...string) { - for _, file := range files { - touchFile(file) - } -} - -// GetConfig reads the config from a JSON file, initializes default values, -// and initializes the non-configurable fields of each repo. -// It returns a Config. -func GetConfig(doneChan chan Result, stopChan chan struct{}) Config { - // add global configuration in cfg - data, err := ini.Load(CONFIG_PATH) - panicIfErr(err) - - cfg := Config{ - MaxJobs: DEFAULT_MAX_JOBS, - MaxTime: DEFAULT_MAX_TIME, - PasswordDir: DEFAULT_PASSWORD_DIR, - DownloadDir: DEFAULT_DOWNLOAD_DIR, - StateDir: DEFAULT_STATE_DIR, - LoggerDir: DEFAULT_LOG_DIR, - RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, - ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR, - SockPath: DEFAULT_SOCK_PATH, - Repos: make([]*Repo, 0), - } - err = data.MapTo(&cfg) - panicIfErr(err) - - for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} { - err := os.MkdirAll(dir, 0755) - panicIfErr(err) - } - if cfg.IPv4Address == "" { - panic("Missing IPv4 address from config") - } else if cfg.IPv6Address == "" { - panic("Missing IPv6 address from config") - } - - // add each repo configuration to cfg - for _, section := range data.Sections() { - repoName := section.Name() - if repoName == "DEFAULT" { - continue - } - - repo := Repo{ - Name: repoName, - SyncType: cfg.SyncType, - FrequencyStr: cfg.FrequencyStr, - MaxTime: cfg.MaxTime, - LoggerFile: cfg.LoggerDir + "/" + repoName + ".log", - RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log", - ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log", - DoneChan: doneChan, - StopChan: stopChan, - } - err := section.MapTo(&repo) - panicIfErr(err) - - touchFiles( - repo.LoggerFile, - repo.RsyncLogFile, - repo.ZfssyncLogFile, - ) - - repo.Logger = NewLogger(repo.Name, repo.LoggerFile) - repo.Frequency = frequencies[repo.FrequencyStr] - if repo.SyncType == "" { - panic("Missing sync type from " + repo.Name) - } else if repo.Frequency == 0 { - panic("Missing or invalid frequency for " + repo.Name) - } - repo.cfg = &cfg - - repo.State = RepoState{ - IsRunning: false, - LastAttemptStartTime: 0, - LastAttemptRunTime: 0, - LastAttemptExit: NOT_RUN_YET, - } - - // create the state file if it does not exist, otherwise load it from existing file - repoStateFile := cfg.StateDir + "/" + repo.Name - if _, err := os.Stat(repoStateFile); err != nil { - touchFile(repoStateFile) - repo.SaveState() - } else { - err := ini.MapTo(&repo.State, repoStateFile) - panicIfErr(err) - } - - cfg.Repos = append(cfg.Repos, &repo) - } - - if len(cfg.Repos) == 0 { - panic("No repos found in config") - } - - return cfg -} diff --git a/merlin/common/tests/common_test.go b/merlin/common/tests/common_test.go deleted file mode 100644 index 805d0c7..0000000 --- a/merlin/common/tests/common_test.go +++ /dev/null @@ -1 +0,0 @@ -package common diff --git a/merlin/common/tests/sync_test.go b/merlin/common/tests/sync_test.go deleted file mode 100644 index e69de29..0000000 diff --git a/merlin/config/config.go b/merlin/config/config.go new file mode 100644 index 0000000..b6d2839 --- /dev/null +++ b/merlin/config/config.go @@ -0,0 +1,265 @@ +package config + +import ( + "os" + "path/filepath" + + "gopkg.in/ini.v1" + + "git.csclub.uwaterloo.ca/public/merlin/logger" +) + +const ( + DAILY = 86400 + TWICE_DAILY = DAILY / 2 + HOURLY = 3600 + TWICE_HOURLY = HOURLY / 2 + BI_HOURLY = HOURLY * 2 + TRI_HOURLY = HOURLY * 3 + TEN_MINUTELY = 600 + FIVE_MINUTELY = 300 + + // could change this into a default_config + DEFAULT_MAX_JOBS = 6 + DEFAULT_MAX_TIME = DAILY / 4 + DEFAULT_SYNC_TYPE = "csc-sync-standard" + DEFAULT_FREQUENCY_STRING = "by-hourly" + DEFAULT_DOWNLOAD_DIR = "/mirror/root" + DEFAULT_PASSWORD_DIR = "/home/mirror/passwords" + DEFAULT_STATE_DIR = "/home/mirror/merlin/states" + DEFAULT_LOG_DIR = "/home/mirror/merlin/logs" + DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync" + DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync" + DEFAULT_SOCK_PATH = "/run/merlin.sock" +) + +var frequencies = map[string]int{ + "daily": DAILY, + "twice-daily": TWICE_DAILY, + "hourly": HOURLY, + "twice-hourly": TWICE_HOURLY, + "bi-hourly": BI_HOURLY, + "tri-hourly": TRI_HOURLY, + "ten-minutely": TEN_MINUTELY, + "five-minutely": FIVE_MINUTELY, +} + +// Last job attempt statuses +const ( + NOT_RUN_YET = iota + SUCCESS + FAILURE + TERMINATED // was killed by a signal +) + +type SyncResult struct { + Name string + Exit int +} + +type Config struct { + // the maximum number of jobs allowed to execute concurrently + MaxJobs int `ini:"max_jobs"` + // the IP addresses to use for rsync + IPv4Address string `ini:"ipv4_address"` + IPv6Address string `ini:"ipv6_address"` + // the default sync type + DefaultSyncType string `ini:"default_sync_type"` + // the default frequency string + DefaultFrequencyStr string `ini:"default_frequency"` + // the default MaxTime + DefaultMaxTime int `ini:"default_max_time"` + // the directory where rsync should download files + DownloadDir string `ini:"download_dir"` + // the directory where rsync passwords are stored + PasswordDir string `ini:"password_dir"` + // the directory where the state of each repo sync is saved + StateDir string `ini:"states_dir"` + // the directory where merlin will store the general logs for each repo + RepoLogDir string `ini:"repo_logs_dir"` + // the directory to store the rsync logs for each repo + RsyncLogDir string `ini:"rsync_logs_dir"` + // the directory to store the zfssync logs for each repo + ZfssyncLogDir string `ini:"zfssync_logs_dir"` + // the Unix socket path which arthur will use to communicate with us + SockPath string `ini:"sock_path"` +} + +// make it more clear when full path should be used vs when just the file name is needed +type Repo struct { + // the name of this repo + Name string `ini:"-"` + // this should be one of "csc-sync-standard", etc. + SyncType string `ini:"sync_type"` + // a human-readable frequency, e.g. "bi-hourly" + FrequencyStr string `ini:"frequency"` + // the desired interval (in seconds) between successive runs + Frequency int `ini:"-"` + // the maximum time (in seconds) that each child process of this repo + // can for before being killed + MaxTime int `ini:"max_time"` + // where to download the files for this repo (relative to the download + // dir in the config) + LocalDir string `ini:"local_dir"` + // the remote host to rsync from + RsyncHost string `ini:"rsync_host"` + // the remote directory on the rsync host + RsyncDir string `ini:"rsync_dir"` + // the rsync user (optional) + RsyncUser string `ini:"rsync_user"` + // the file storing the password for rsync (optional) + PasswordFile string `ini:"password_file"` + // the file storing the repo sync state (used to override default) + StateFile string `ini:"state_file"` + // the full file path for general logging of this repo (used to override default) + RepoLogFile string `ini:"repo_log_file"` + // a reference to the general logger + Logger *logger.Logger `ini:"-"` + // the full file path for logging this repo's rsync (used to override default) + RsyncLogFile string `ini:"rsync_log_file"` + // the full file path for logging this repo's zfssync (used to override default) + ZfssyncLogFile string `ini:"zfssync_log_file"` + // the repo will write its name and status in a Result struct to DoneChan + // when it has finished a job (shared by all repos) + DoneChan chan<- SyncResult `ini:"-"` + // repos should stop syncing if StopChan is closed (shared by all repos) + StopChan chan struct{} `ini:"-"` + // a struct that stores the repo's status + State RepoState `ini:"-"` +} + +// This should only be modified by the main thread +type RepoState struct { + // these are stored in the states folder + // whether this repo is running a job or not + IsRunning bool `ini:"is_running"` + // the Unix epoch timestamp at which this repo last attempted a job + LastAttemptStartTime int64 `ini:"last_attempt_time"` + // the number of seconds this repo ran for during its last attempted job + LastAttemptRunTime int64 `ini:"last_attempt_runtime"` + // whether the last attempt was successful or not + LastAttemptExit int `ini:"last_attempt_exit"` +} + +var ( + Conf Config + Repos []*Repo + RepoMap map[string]*Repo +) + +// GetConfig reads the config from a JSON file, initializes default values, +// and initializes the non-configurable fields of each repo. +// It returns a Config. +func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) { + // set default values then load config from file + newConf := Config{ + MaxJobs: DEFAULT_MAX_JOBS, + DefaultSyncType: DEFAULT_SYNC_TYPE, + DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING, + DefaultMaxTime: DEFAULT_MAX_TIME, + PasswordDir: DEFAULT_PASSWORD_DIR, + DownloadDir: DEFAULT_DOWNLOAD_DIR, + StateDir: DEFAULT_STATE_DIR, + RepoLogDir: DEFAULT_LOG_DIR, + RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, + ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR, + SockPath: DEFAULT_SOCK_PATH, + } + iniInfo, err := ini.Load(configPath) + panicIfErr(err) + err = iniInfo.MapTo(&newConf) + panicIfErr(err) + + // check config for major errors + for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} { + err := os.MkdirAll(dir, 0755) + panicIfErr(err) + } + if newConf.IPv4Address == "" { + panic("Missing IPv4 address from config") + } else if newConf.IPv6Address == "" { + panic("Missing IPv6 address from config") + } + + newRepos := make([]*Repo, 0) + for _, section := range iniInfo.Sections() { + repoName := section.Name() + if repoName == "DEFAULT" { + continue + } + + // set the default values for the repo then load from file + // TODO: check if local_dir and repoName are always the same value + // TODO: check to ensure that every Repo.Name is unique (may already be done by ini) + repo := Repo{ + Name: repoName, + SyncType: newConf.DefaultSyncType, + FrequencyStr: newConf.DefaultFrequencyStr, + MaxTime: newConf.DefaultMaxTime, + StateFile: filepath.Join(newConf.StateDir, repoName), + RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log", + RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log", + ZfssyncLogFile: filepath.Join(newConf.ZfssyncLogDir, repoName) + "-zfssync.log", + DoneChan: doneChan, + StopChan: stopChan, + } + err := section.MapTo(&repo) + panicIfErr(err) + + // TODO: ensure that the parent dirs to the file also exist when touching + // or just remove the ability to override + touchFiles( + repo.StateFile, + repo.RepoLogFile, + repo.RsyncLogFile, + repo.ZfssyncLogFile, + ) + repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile) + repo.Frequency = frequencies[repo.FrequencyStr] + if repo.SyncType == "" { + panic("Missing sync type from " + repo.Name) + } else if repo.Frequency == 0 { + panic("Missing or invalid frequency for " + repo.Name) + } + + repo.State = RepoState{ + IsRunning: false, + LastAttemptStartTime: 0, + LastAttemptRunTime: 0, + LastAttemptExit: NOT_RUN_YET, + } + err = ini.MapTo(&repo.State, repo.StateFile) + panicIfErr(err) + repo.SaveState() + + newRepos = append(newRepos, &repo) + } + + if len(newRepos) == 0 { + panic("No repos found in config") + } + + Conf = newConf + Repos = newRepos + RepoMap = make(map[string]*Repo) + for _, repo := range Repos { + RepoMap[repo.Name] = repo + } +} + +// save the current state of the repo to a file +func (repo *Repo) SaveState() { + // repo.Logger.Debug("Saving state") + state_cfg := ini.Empty() + if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { + repo.Logger.Error(err.Error()) + } + file, err := os.OpenFile(repo.StateFile, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + repo.Logger.Error(err.Error()) + } + if _, err := state_cfg.WriteTo(file); err != nil { + repo.Logger.Error(err.Error()) + } + // repo.Logger.Debug("Saved state") +} diff --git a/merlin/config/config_test.go b/merlin/config/config_test.go new file mode 100644 index 0000000..576ce3d --- /dev/null +++ b/merlin/config/config_test.go @@ -0,0 +1,122 @@ +package config + +import ( + "errors" + "os" + "reflect" + "testing" + + "github.com/davecgh/go-spew/spew" +) + +func TestTouchFiles(t *testing.T) { + files := []string{ + "/tmp/merlin_touch_test_1", + "/tmp/merlin_touch_test_2", + "/tmp/merlin_touch_test_3", + } + touchFiles(files[0], files[1], files[2]) + for _, file := range files { + if _, err := os.Stat(file); err != nil { + t.Errorf(err.Error()) + } else if err := os.Remove(file); err != nil { + t.Errorf(err.Error()) + } + } +} + +func TestPanicIfErr(t *testing.T) { + panicIfErr(nil) + defer func() { recover() }() + panicIfErr(errors.New("AAAAAAAAAA")) + t.Errorf("panicIfErr should have panicked") +} + +func TestLoadConfig(t *testing.T) { + doneChan := make(chan SyncResult) + stopChan := make(chan struct{}) + LoadConfig("config_test.ini", doneChan, stopChan) + // TODO: Fill out parts not part of the ini or state file + expectedConfig := Config{ + MaxJobs: 6, + IPv4Address: "129.97.134.129", + IPv6Address: "2620:101:f000:4901:c5c::129", + DefaultSyncType: "csc-sync-standard", + DefaultFrequencyStr: "daily", + DefaultMaxTime: 1000, + DownloadDir: "/tmp/test-mirror", + PasswordDir: "/home/mirror/passwords", + StateDir: "test_files", + RepoLogDir: "test_files/logs", + RsyncLogDir: "test_files/rsync", + ZfssyncLogDir: "test_files/zfssync", + SockPath: "test_files/test.sock", + } + expectedRepo1 := Repo{ + Name: "eelinux", + SyncType: "csc-sync-nonstandard", + FrequencyStr: "tri-hourly", + Frequency: 10800, + MaxTime: 2000, + LocalDir: "eelinux", + RsyncHost: "rsync.releases.eelinux.ca", + RsyncDir: "releases", + StateFile: "test_files/eeelinux", + RepoLogFile: "test_files/logs/eelinux.log", + Logger: Repos[0].Logger, + RsyncLogFile: "test_files/rsync/eelinux.log", + ZfssyncLogFile: "test_files/zfssync/eelinux.log", + DoneChan: doneChan, + StopChan: stopChan, + State: RepoState{ + IsRunning: false, + LastAttemptStartTime: 1600000000, + LastAttemptRunTime: 100, + LastAttemptExit: 1, + }, + } + expectedRepo2 := Repo{ + Name: "yoland", + SyncType: "csc-sync-standard", + FrequencyStr: "daily", + Frequency: 86400, + MaxTime: 1000, + LocalDir: "yoland-releases", + RsyncHost: "rsync.releases.yoland.io", + RsyncDir: "releases", + StateFile: "test_files/yoland", + RepoLogFile: "test_files/logs/yoland.log", + Logger: Repos[1].Logger, + RsyncLogFile: "test_files/rsync/yoland-rsync.log", + ZfssyncLogFile: "test_files/zfssync/yoland-zfssync.log", + DoneChan: doneChan, + StopChan: stopChan, + State: RepoState{ + IsRunning: false, + LastAttemptStartTime: 0, + LastAttemptRunTime: 0, + LastAttemptExit: 0, + }, + } + + if !reflect.DeepEqual(expectedConfig, Conf) { + t.Errorf("Config loaded does not match expected config") + spew.Dump(expectedConfig) + spew.Dump(Conf) + } + if !reflect.DeepEqual(expectedRepo1, *Repos[0]) { + t.Errorf("The eelinux repo loaded does not match the exected repo config") + spew.Dump(expectedRepo1) + spew.Dump(*Repos[0]) + } + if !reflect.DeepEqual(expectedRepo2, *Repos[1]) { + t.Errorf("The yoland repo loaded does not match the exected repo config") + spew.Dump(expectedRepo2) + spew.Dump(*Repos[1]) + } + + os.Remove("test_files/yoland") + os.RemoveAll("test_files/logs") + os.RemoveAll("test_files/rsync") + os.RemoveAll("test_files/zfssync") +} diff --git a/merlin/config/config_test.ini b/merlin/config/config_test.ini new file mode 100644 index 0000000..e240b8a --- /dev/null +++ b/merlin/config/config_test.ini @@ -0,0 +1,39 @@ +; default values are commented out + +; max_jobs = 6 +ipv4_address = 129.97.134.129 +ipv6_address = 2620:101:f000:4901:c5c::129 +; default_sync_type = csc-sync-standard +default_frequency = daily +default_max_time = 1000 +download_dir = /tmp/test-mirror +; password_dir = /home/mirror/passwords +states_dir = test_files +repo_logs_dir = test_files/logs +rsync_logs_dir = test_files/rsync +zfssync_logs_dir = test_files/zfssync +sock_path = test_files/test.sock + +[eelinux] +sync_type = csc-sync-nonstandard +frequency = tri-hourly +max_time = 2000 +local_dir = eelinux +rsync_host = rsync.releases.eelinux.ca +rsync_dir = releases +state_file = test_files/eeelinux +repo_log_file = test_files/logs/eelinux.log +rsync_log_file = test_files/rsync/eelinux.log +zfssync_log_file = test_files/zfssync/eelinux.log + +[yoland] +; sync_type = csc-sync-standard +; frequency = daily +; max_time = 1000 +local_dir = yoland-releases +rsync_host = rsync.releases.yoland.io +rsync_dir = releases +; state_file = test_files/yoland +; repo_log_file = test_files/logs/yoland.log +; rsync_log_file = test_files/rsync/yoland-rsync.log +; zfssync_log_file = test_files/zfssync/yoland-zfssync.log \ No newline at end of file diff --git a/merlin/config/test_files/eeelinux b/merlin/config/test_files/eeelinux new file mode 100644 index 0000000..571d74e --- /dev/null +++ b/merlin/config/test_files/eeelinux @@ -0,0 +1,5 @@ +is_running = false +last_attempt_time = 1600000000 +last_attempt_runtime = 100 +last_attempt_exit = 1 + diff --git a/merlin/config/utils.go b/merlin/config/utils.go new file mode 100644 index 0000000..a3e1e56 --- /dev/null +++ b/merlin/config/utils.go @@ -0,0 +1,37 @@ +package config + +import ( + "fmt" + "os" +) + +func panicIfErr(e error) { + if e != nil { + panic(e) + } +} + +func touchFile(file string) { + fi, err := os.Stat(file) + if err != nil { + f, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + panic(fmt.Errorf("unable to create file %s", file)) + } + f.Close() + } else if fi.IsDir() { + panic(fmt.Errorf("%s is a directory", file)) + // } else if os.Geteuid() != 1001 { + // // mirror is UID 1001 + // err := os.Chown(file, 1001, os.Getegid()) + // panicIfErr(err) + } + err = os.Chmod(file, 0644) + panicIfErr(err) +} + +func touchFiles(files ...string) { + for _, file := range files { + touchFile(file) + } +} diff --git a/merlin/go.mod b/merlin/go.mod index 1c3e571..256c295 100644 --- a/merlin/go.mod +++ b/merlin/go.mod @@ -1,6 +1,7 @@ module git.csclub.uwaterloo.ca/public/merlin require ( + github.com/davecgh/go-spew v1.1.0 github.com/stretchr/testify v1.7.0 // indirect golang.org/x/sys v0.0.0-20210915083310-ed5796bab164 gopkg.in/ini.v1 v1.63.2 diff --git a/merlin/common/logger.go b/merlin/logger/logger.go similarity index 77% rename from merlin/common/logger.go rename to merlin/logger/logger.go index 53f2a7f..bd5c6a1 100644 --- a/merlin/common/logger.go +++ b/merlin/logger/logger.go @@ -1,4 +1,4 @@ -package common +package logger import ( "log" @@ -30,12 +30,12 @@ var levels = map[int]string{ var outLogger = log.New(os.Stdout, "", log.LstdFlags) var errLogger = log.New(os.Stderr, "", log.LstdFlags) -func OutLogger() *log.Logger { - return outLogger +func OutLog(v ...interface{}) { + outLogger.Println(v...) } -func ErrLogger() *log.Logger { - return errLogger +func ErrLog(v ...interface{}) { + errLogger.Println(v...) } func NewLogger(name, file string) *Logger { @@ -48,15 +48,9 @@ func NewLogger(name, file string) *Logger { } func (logger *Logger) Log(level int, v ...interface{}) { - if level == INFO { - outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...)) - } else if level == ERROR { - errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...)) - } - f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { - errLogger.Println(err.Error()) + ErrLog(err.Error()) } defer f.Close() @@ -65,7 +59,7 @@ func (logger *Logger) Log(level int, v ...interface{}) { args = append(args, v...) logger.SetOutput(f) - logger.Println(v...) + logger.Println(args) } func (logger *Logger) Debug(v ...interface{}) { @@ -73,6 +67,9 @@ func (logger *Logger) Debug(v ...interface{}) { } func (logger *Logger) Info(v ...interface{}) { + // src := []interface{}{logger.name + ":"} + // args := append(src, v...) + OutLog(v...) logger.Log(INFO, v...) } @@ -81,5 +78,6 @@ func (logger *Logger) Warning(v ...interface{}) { } func (logger *Logger) Error(v ...interface{}) { + ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...)) logger.Log(ERROR, v...) } diff --git a/merlin/merlin.go b/merlin/merlin.go index 2bd6e73..b8e3cff 100644 --- a/merlin/merlin.go +++ b/merlin/merlin.go @@ -1,153 +1,35 @@ package main import ( - "bytes" - "errors" "fmt" - "io" - "log" "net" "os" "os/signal" - "path/filepath" - "strings" "syscall" - "text/tabwriter" "time" - "git.csclub.uwaterloo.ca/public/merlin/common" "golang.org/x/sys/unix" + + "git.csclub.uwaterloo.ca/public/merlin/arthur" + "git.csclub.uwaterloo.ca/public/merlin/config" + "git.csclub.uwaterloo.ca/public/merlin/logger" + "git.csclub.uwaterloo.ca/public/merlin/sync" ) -var ( - cfg common.Config - outLogger *log.Logger - errLogger *log.Logger - repoMap map[string]*common.Repo - repoIdx int - numJobsRunning int -) - -func getAndRunCommand(conn net.Conn) { - defer conn.Close() - - var buf bytes.Buffer - _, err := io.Copy(&buf, conn) - if err != nil { - errLogger.Println(err.Error()) - return - } - - command := buf.String() - args := strings.Split(command, ":") - respondAndLogErr := func(msg string) { - outLogger.Println(msg) - conn.Write([]byte(msg)) - } - - if args[0] == "status" { - status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0) - fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n") - - // for time formating see https://pkg.go.dev/time#pkg-constants - for name, repo := range repoMap { - fmt.Fprintf(status, "%s\t%s\t%s\n", - name, - time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123), - time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123), - ) - } - - status.Flush() - } else if args[0] == "sync" { - if len(args) != 2 { - respondAndLogErr("Could not parse sync command, forced sync fails.") - return - } - - if repo, inMap := repoMap[args[1]]; inMap { - outLogger.Println("Attempting to force sync of " + repo.Name) - if repo.RunIfPossible() { - conn.Write([]byte("Forced sync for " + repo.Name)) - numJobsRunning++ - } else { - respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.") - } - } else { - respondAndLogErr(args[1] + " is not tracked so cannot sync") - } - } else { - respondAndLogErr("Received unrecognized command: " + command) - } -} - -func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) { - // must remove cfg.SockPath otherwise get "bind: address already in use" - if filepath.Ext(cfg.SockPath) != ".sock" { - panic(fmt.Errorf("Socket file must end with .sock")) - } else if _, err := os.Stat(cfg.SockPath); err == nil { - if err := os.Remove(cfg.SockPath); err != nil { - panic(err) - } - } else if !errors.Is(err, os.ErrNotExist) { - panic(err) - } - - ear, err := net.Listen("unix", cfg.SockPath) - if err != nil { - panic(err) - } - outLogger.Println("Listening to unix socket at " + cfg.SockPath) - - go func() { - for { - // will exit when ear is closed - conn, err := ear.Accept() - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - errLogger.Println("Accepted socket error: " + err.Error()) - continue - } - errLogger.Println("Unhandlable socket error: " + err.Error()) - return - } - connChan <- conn - } - }() - - <-stopLisChan - ear.Close() -} - -// We use a round-robin strategy. It's not the most efficient, but it's simple -// (read: easy to understand) and guarantees each repo will eventually get a chance to run. -func runAsManyAsPossible() { - repos := cfg.Repos - startIdx := repoIdx - - for numJobsRunning < cfg.MaxJobs { - repo := repos[repoIdx] - if repo.RunIfPossible() { - numJobsRunning++ - } - repoIdx = (repoIdx + 1) % len(repos) - if repoIdx == startIdx { - // we've come full circle - return - } - } -} +// get config path from command args +var CONFIG_PATH = "merlin-config.ini" func main() { - outLogger = common.OutLogger() - errLogger = common.ErrLogger() - // check that merlin is run as mirror user // check that mirror user has pid of 1001 - doneChan := make(chan common.Result) + // receives a Result struct when a repo stops syncing + doneChan := make(chan config.SyncResult) + // closed when merlin is told to stop running stopChan := make(chan struct{}) + // receives a Conn when a client makes a connection to unix socket connChan := make(chan net.Conn) + // signal channel to stop listening to unix socket stopLisChan := make(chan struct{}) stopSig := make(chan os.Signal, 1) @@ -157,24 +39,38 @@ func main() { unix.Umask(002) - numJobsRunning = 0 + numJobsRunning := 0 + repoIdx := 0 loadConfig := func() { - cfg = common.GetConfig(doneChan, stopChan) - outLogger.Println("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg)) - - repoMap = make(map[string]*common.Repo) - for _, repo := range cfg.Repos { - repoMap[repo.Name] = repo - } + config.LoadConfig(CONFIG_PATH, doneChan, stopChan) + logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf)) repoIdx = 0 - go unixSockListener(connChan, stopLisChan) + go arthur.StartListener(connChan, stopLisChan) + } + // We use a round-robin strategy. It's not the most efficient, but it's simple + // (read: easy to understand) and guarantees each repo will eventually get a chance to run. + runAsManyAsPossible := func() { + startIdx := repoIdx + + for numJobsRunning < config.Conf.MaxJobs { + repo := config.Repos[repoIdx] + if sync.SyncIfPossible(repo) { + numJobsRunning++ + } + repoIdx = (repoIdx + 1) % len(config.Repos) + if repoIdx == startIdx { + // we've come full circle + return + } + } } loadConfig() - // IsRunning must be false otherwise repo will never sync - for _, repo := range repoMap { + // ensure that IsRunning is false otherwise repo will never sync + // (only on startup can we assume that repos were not previously syncing) + for _, repo := range config.Repos { repo.State.IsRunning = false } runAsManyAsPossible() @@ -190,24 +86,39 @@ runLoop: case <-reloadSig: stopLisChan <- struct{}{} loadConfig() + // ensure that SyncCompleted can handle it if reloading config + // removes a repo that was already syncing case done := <-doneChan: - repoMap[done.Name].SyncCompleted(done.Exit) + sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) numJobsRunning-- case conn := <-connChan: - getAndRunCommand(conn) + command, repoName := arthur.GetCommand(conn) + switch command { + case "status": + arthur.SendStatus(conn) + case "sync": + if arthur.ForceSync(conn, repoName) { + numJobsRunning++ + } + default: + arthur.SendAndLog(conn, "Received unrecognized command: "+command) + } + // None of the arthur functions close the connection so you will need to + // close it manually for the message to be sent + conn.Close() case <-time.After(1 * time.Minute): } runAsManyAsPossible() } - // give time for jobs to terminate before exiting + // give time for all jobs to terminate before exiting program for { select { case done := <-doneChan: - repoMap[done.Name].SyncCompleted(done.Exit) + sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) numJobsRunning-- case <-time.After(1 * time.Second): diff --git a/merlin/merlin_test.go b/merlin/merlin_test.go deleted file mode 100644 index b425ba5..0000000 --- a/merlin/merlin_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "testing" - -func TestSock1(t *testing.T) { - -} diff --git a/merlin/common/sync.go b/merlin/sync/command.go similarity index 57% rename from merlin/common/sync.go rename to merlin/sync/command.go index 846b2a0..bbc1929 100644 --- a/merlin/common/sync.go +++ b/merlin/sync/command.go @@ -1,50 +1,50 @@ -package common +package sync import ( "fmt" - "math/rand" "os" - "strconv" + + "git.csclub.uwaterloo.ca/public/merlin/config" ) -func (repo *Repo) buildRsyncHost() string { +func buildRsyncHost(repo *config.Repo) string { if repo.RsyncUser != "" { repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost } return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir } -func (repo *Repo) buildRsyncDaemonHost() string { +func buildRsyncDaemonHost(repo *config.Repo) string { if repo.RsyncUser != "" { repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost } return repo.RsyncHost + "::" + repo.RsyncDir } -func (repo *Repo) buildRsyncSSHHost() string { +func buildRsyncSSHHost(repo *config.Repo) string { if repo.RsyncUser != "" { repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost } return repo.RsyncHost + ":" + repo.RsyncDir } -func (repo *Repo) buildDownloadDir() string { - return repo.cfg.DownloadDir + "/" + repo.LocalDir +func buildDownloadDir(repo *config.Repo) string { + return config.Conf.DownloadDir + "/" + repo.LocalDir } -func (repo *Repo) CSCSyncApache() []string { +func cscSyncApache(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncArchLinux() []string { +func cscSyncArchLinux(repo *config.Repo) []string { tempDir := "" // is this option even needed? @@ -52,83 +52,83 @@ func (repo *Repo) CSCSyncArchLinux() []string { args := []string{ "rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600", "--contimeout=60", "-p", "--delay-updates", "--no-motd", - "--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + repo.cfg.IPv4Address, + "--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address, } - args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncBadPerms() []string { +func cscSyncBadPerms(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } // TODO ceph -func (repo *Repo) CSCSyncCDImage() []string { +func cscSyncCDImage(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncChmod() []string { +func cscSyncChmod(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links", - "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--chmod=u=rwX,go=rX", } - args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncDebian() []string { +func cscSyncDebian(repo *config.Repo) []string { // sync /pool args := []string{"nice", "rsync", "-rlHtvp", "--exclude", ".~tmp~/", "--timeout=3600", "-4", - "--address=" + repo.cfg.IPv4Address, + "--address=" + config.Conf.IPv4Address, } // $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1 return args } -func (repo *Repo) CSCSyncDebianCD() []string { +func cscSyncDebianCD(repo *config.Repo) []string { // this is basically the same as CSCSyncDebian, except it has an extra --exclude args := []string{"nice", "rsync", "-rlHtvp", "--delete", "--exclude", ".~tmp~/", "--timeout=3600", "-4", - "--address=" + repo.cfg.IPv4Address, + "--address=" + config.Conf.IPv4Address, // "--exclude", "Archive-Update-in-Progress-${HOSTNAME}" } // $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1 return args } -func (repo *Repo) CSCSyncGentoo() []string { +func cscSyncGentoo(repo *config.Repo) []string { repo.RsyncUser = "gentoo" repo.PasswordFile = "gentoo-distfiles" - return repo.CSCSyncStandard() + return cscSyncStandard(repo) } // TODO s3 -func (repo *Repo) CSCSyncSSH() []string { +func cscSyncSSH(repo *config.Repo) []string { args := []string{ "rsync", "-aH", "--no-owner", "--no-group", "--delete", @@ -138,48 +138,45 @@ func (repo *Repo) CSCSyncSSH() []string { } // not sure if we should be assuming ssh identity file is the password file args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile)) - args = append(args, repo.buildRsyncSSHHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncStandard() []string { +func cscSyncStandard(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, + "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } if repo.PasswordFile != "" { - filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile + filename := config.Conf.PasswordDir + "/" + repo.PasswordFile args = append(args, "--password-file", filename) } - args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) return args } -func (repo *Repo) CSCSyncStandardIPV6() []string { +func cscSyncStandardIPV6(repo *config.Repo) []string { args := []string{ "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", - "--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + repo.cfg.IPv4Address, + "--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, } - args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) + args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo)) return args } -// for testing, to be removed later -func (repo *Repo) CSCSyncDummy() []string { - - sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10) - args := []string{"sleep", sleepDur} +func cscSyncDummy(repo *config.Repo) []string { + args := []string{"sleep", "1"} return args } // executes a particular sync job depending on repo.SyncType. -func (repo *Repo) getSyncCommand() []string { +func getSyncCommand(repo *config.Repo) (args []string) { /* # scripts used by merlin.py csc-sync-debian @@ -206,80 +203,55 @@ func (repo *Repo) getSyncCommand() []string { ubuntu-releases-sync */ switch repo.SyncType { - case "csc-sync-apache": - return repo.CSCSyncApache() + args = cscSyncApache(repo) case "csc-sync-archlinux": - return repo.CSCSyncArchLinux() + args = cscSyncArchLinux(repo) case "csc-sync-badperms": - return repo.CSCSyncBadPerms() + args = cscSyncBadPerms(repo) case "csc-sync-cdimage": - return repo.CSCSyncCDImage() + args = cscSyncCDImage(repo) // case "csc-sync-ceph": - // return repo.CSCSyncCeph() + // args = cscSyncCeph(repo) case "csc-sync-chmod": - return repo.CSCSyncChmod() + args = cscSyncChmod(repo) case "csc-sync-debian": - return repo.CSCSyncDebian() + args = cscSyncDebian(repo) case "csc-sync-debian-cd": - return repo.CSCSyncDebianCD() + args = cscSyncDebianCD(repo) case "csc-sync-gentoo": - return repo.CSCSyncGentoo() + args = cscSyncGentoo(repo) // case "csc-sync-s3": - // return repo.CSCSyncS3() + // args = cscSyncS3(repo) case "csc-sync-ssh": - return repo.CSCSyncSSH() + args = cscSyncSSH(repo) case "csc-sync-standard": - return repo.CSCSyncStandard() + args = cscSyncStandard(repo) case "csc-sync-standard-ipv6": - return repo.CSCSyncStandardIPV6() + args = cscSyncStandardIPV6(repo) // case "csc-sync-wget": - // return repo.CSCSyncWget() + // args = cscSyncWget(repo) case "csc-sync-dummy": - return repo.CSCSyncDummy() + return cscSyncDummy(repo) default: repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") } - return []string{} -} - -func (repo *Repo) StartSyncJob() { - status := FAILURE - defer func() { - repo.DoneChan <- Result{ - Name: repo.Name, - Exit: status, - } - }() - - localDir := repo.buildDownloadDir() + localDir := buildDownloadDir(repo) err := os.MkdirAll(localDir, 0775) if err != nil { - err = fmt.Errorf("Could not create directory %s: %w", localDir, err) + err = fmt.Errorf("could not create directory %s: %w", localDir, err) // I'm not sure if logger can handle error so just use the string? repo.Logger.Error(err.Error()) return } - args := repo.getSyncCommand() if repo.PasswordFile != "" { - filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile + filename := config.Conf.PasswordDir + "/" + repo.PasswordFile args = append(args, "--password-file", filename) } - args = append(args, repo.buildRsyncHost(), localDir) - ch := SpawnProcess(repo, args) - if ch == nil { - // SpawnProcess will have already logged error - return - } - cmd := <-ch - switch cmd.ProcessState.ExitCode() { - case 0: - status = SUCCESS - case -1: - status = TERMINATED - // default is already FAILURE - } + args = append(args, buildRsyncHost(repo), localDir) + + return } diff --git a/merlin/sync/command_test.go b/merlin/sync/command_test.go new file mode 100644 index 0000000..1ca2a85 --- /dev/null +++ b/merlin/sync/command_test.go @@ -0,0 +1 @@ +package sync diff --git a/merlin/sync/interface.go b/merlin/sync/interface.go new file mode 100644 index 0000000..41a444c --- /dev/null +++ b/merlin/sync/interface.go @@ -0,0 +1,61 @@ +package sync + +import ( + "fmt" + "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" +) + +// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job +// and is not currently running. +// returns true iff a job is started. +func SyncIfPossible(repo *config.Repo) bool { + // Change to SyncIfPossible + if repo.State.IsRunning { + return false + } + + curTime := time.Now().Unix() + if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) { + repo.State.IsRunning = true + repo.State.LastAttemptStartTime = curTime + repo.SaveState() + repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name)) + go startSync(repo) + return true + } + return false +} + +// update the repo state with the last attempt time and exit now that the job is done +func SyncCompleted(repo *config.Repo, exit int) { + repoState := repo.State + syncTook := time.Now().Unix() - repoState.LastAttemptStartTime + nextSync := repo.MaxTime - int(syncTook) + if nextSync < 0 { + nextSync = 0 + } + + repoState.IsRunning = false + repoState.LastAttemptExit = exit + repoState.LastAttemptRunTime = syncTook + + var exitStr string + switch exit { + case config.SUCCESS: + exitStr = "completed" + case config.TERMINATED: + exitStr = "terminated" + default: + exitStr = "failed" + } + repo.SaveState() + repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)) + + // TODO: make it possible for this SyncCompleted to be run without zfsSync being run + if exit == config.SUCCESS { + // it is possible that the zfssync from the last repo sync is still running is that fine? + go zfsSync(repo) + } +} diff --git a/merlin/common/process_manager.go b/merlin/sync/sync.go similarity index 63% rename from merlin/common/process_manager.go rename to merlin/sync/sync.go index 4455418..81446eb 100644 --- a/merlin/common/process_manager.go +++ b/merlin/sync/sync.go @@ -1,10 +1,13 @@ -package common +package sync import ( "fmt" + "os" "os/exec" "syscall" "time" + + "git.csclub.uwaterloo.ca/public/merlin/config" ) // SpawnProcess spawns a child process for the given repo. The process will @@ -12,7 +15,7 @@ import ( // runs for longer than the repo's MaxTime. // It returns a channel through which a Cmd will be sent once it has finished, // or nil if it was unable to start a process. -func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { +func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) { // startTime and time took will be handled in common.go by SyncExit cmd := exec.Command(args[0], args[1:]...) @@ -55,6 +58,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { case <-cmdDoneChan: if !cmd.ProcessState.Success() { repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) + // repo.Logger.Info(strings.Join(args, " ")) } case <-repo.StopChan: @@ -68,3 +72,43 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { }() return } + +func startSync(repo *config.Repo) { + status := config.FAILURE + defer func() { + repo.DoneChan <- config.SyncResult{ + Name: repo.Name, + Exit: status, + } + }() + + args := getSyncCommand(repo) + ch := spawnSyncProcess(repo, args) + if ch == nil { + // SpawnProcess will have already logged error + return + } + cmd := <-ch + switch cmd.ProcessState.ExitCode() { + case 0: + status = config.SUCCESS + case -1: + status = config.TERMINATED + // default is already FAILURE + } +} + +func zfsSync(repo *config.Repo) { + out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() + if err != nil { + repo.Logger.Error(err) + } else { + f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + repo.Logger.Error(err.Error()) + } else { + f.Write(out) + } + + } +}