Refactor Go Merlin #1
|
@ -0,0 +1,18 @@
|
|||
kind: pipeline
|
||||
type: docker
|
||||
name: default
|
||||
|
||||
steps:
|
||||
- name: merlin
|
||||
image: golang:1.17
|
||||
commands:
|
||||
# add linter
|
||||
- cd merlin
|
||||
- go build
|
||||
- go test ./...
|
||||
|
||||
trigger:
|
||||
branch:
|
||||
- master
|
||||
- go
|
||||
- refactor
|
|
@ -1,4 +1,3 @@
|
|||
/.*
|
||||
!.gitignore
|
||||
.git_old/
|
||||
/dead.letter
|
||||
|
|
|
@ -20,6 +20,7 @@ This folder contains the code for merlin (which does the actual syncing) and art
|
|||
- [ ] wget
|
||||
|
||||
### TODO
|
||||
- [ ] ensure that the proper permissions (file mode, group, user) are used
|
||||
- [ ] detect if an rsync process is stuck (\*\*)
|
||||
- [ ] place each rsync process in a separate cgroup (\*\*\*)
|
||||
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
package arthur
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||
"git.csclub.uwaterloo.ca/public/merlin/sync"
|
||||
)
|
||||
|
||||
// Reads and parses the message sent over the accepted connection
|
||||
func GetCommand(conn net.Conn) (command, repoName string) {
|
||||
command = ""
|
||||
repoName = ""
|
||||
|
||||
buf, err := ioutil.ReadAll(conn)
|
||||
if err != nil {
|
||||
logger.ErrLog(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
args := strings.Split(string(buf), ":")
|
||||
if len(args) >= 1 {
|
||||
command = args[0]
|
||||
}
|
||||
if len(args) >= 2 {
|
||||
repoName = args[1]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func SendAndLog(conn net.Conn, msg string) {
|
||||
logger.OutLog(msg)
|
||||
conn.Write([]byte(msg))
|
||||
}
|
||||
|
||||
func SendStatus(conn net.Conn) {
|
||||
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
|
||||
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\tRunning\n")
|
||||
|
||||
keys := make([]string, 0)
|
||||
for name, _ := range config.RepoMap {
|
||||
keys = append(keys, name)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
// for other ways to format the time see: https://pkg.go.dev/time#pkg-constants
|
||||
for _, name := range keys {
|
||||
repo := config.RepoMap[name]
|
||||
lastSync := repo.State.LastAttemptStartTime
|
||||
nextSync := lastSync + int64(repo.Frequency)
|
||||
|
||||
fmt.Fprintf(status, "%s\t%s\t%s\t%t\n",
|
||||
name,
|
||||
time.Unix(lastSync, 0).Format(time.RFC1123),
|
||||
time.Unix(nextSync, 0).Format(time.RFC1123),
|
||||
repo.State.IsRunning,
|
||||
)
|
||||
}
|
||||
|
||||
status.Flush()
|
||||
}
|
||||
|
||||
// Attempt to force the sync of the repo
|
||||
func ForceSync(conn net.Conn, repoName string) (newSync bool) {
|
||||
newSync = false
|
||||
|
||||
// TODO: send repoName and every key in RepoMap to lowercase
|
||||
if repo, isInMap := config.RepoMap[repoName]; isInMap {
|
||||
logger.OutLog("Attempting to force sync of " + repoName)
|
||||
if sync.SyncIfPossible(repo) {
|
||||
conn.Write([]byte("Forced sync for " + repoName))
|
||||
newSync = true
|
||||
} else {
|
||||
SendAndLog(conn, "Could not force sync: "+repoName+" is already syncing.")
|
||||
}
|
||||
} else {
|
||||
SendAndLog(conn, repoName+" is not tracked so cannot sync")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
|
||||
sockpath := config.Conf.SockPath
|
||||
// must remove old cfg.SockPath otherwise get "bind: address already in use"
|
||||
if filepath.Ext(sockpath) != ".sock" {
|
||||
panic(fmt.Errorf("socket file must end with .sock"))
|
||||
} else if _, err := os.Stat(sockpath); err == nil {
|
||||
if err := os.Remove(sockpath); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else if !errors.Is(err, os.ErrNotExist) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ear, err := net.Listen("unix", sockpath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logger.OutLog("Listening to unix socket at " + sockpath)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
// Attempting to accept on a closed net.Listener will return a non-temporary error
|
||||
conn, err := ear.Accept()
|
||||
if err != nil {
|
||||
if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() {
|
||||
logger.ErrLog("Accepted socket error: " + err.Error())
|
||||
continue
|
||||
}
|
||||
logger.ErrLog("Unhandlable socket error: " + err.Error())
|
||||
return
|
||||
}
|
||||
connChan <- conn
|
||||
}
|
||||
}()
|
||||
|
||||
// TODO: check handling of multiple SIGHUP
|
||||
<-stopLisChan
|
||||
ear.Close()
|
||||
}
|
|
@ -0,0 +1,283 @@
|
|||
package arthur
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||
)
|
||||
|
||||
func TestStatusCommand(t *testing.T) {
|
||||
r, w := net.Pipe()
|
||||
|
||||
go func() {
|
||||
// will only finish write when EOF is sent
|
||||
// only way to send EOF is to close
|
||||
w.Write([]byte("status"))
|
||||
w.Close()
|
||||
}()
|
||||
command, repoName := GetCommand(r)
|
||||
if command != "status" {
|
||||
t.Errorf("command for status should be \"status\", got " + command)
|
||||
} else if repoName != "" {
|
||||
t.Errorf("status should return an empty string for the repoName, got " + repoName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncCommand(t *testing.T) {
|
||||
r, w := net.Pipe()
|
||||
|
||||
go func() {
|
||||
w.Write([]byte("sync:ubuntu"))
|
||||
w.Close()
|
||||
}()
|
||||
command, repoName := GetCommand(r)
|
||||
r.Close()
|
||||
if command != "sync" {
|
||||
t.Errorf("command for sync:ubuntu should be \"sync\", got " + command)
|
||||
} else if repoName != "ubuntu" {
|
||||
t.Errorf("name of repo for sync:ubuntu should be \"ubuntu\", got " + repoName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendStatus(t *testing.T) {
|
||||
saveRepoMap := config.RepoMap
|
||||
defer func() {
|
||||
config.RepoMap = saveRepoMap
|
||||
}()
|
||||
|
||||
repoMap := make(map[string]*config.Repo)
|
||||
repoMap["eeeee"] = &config.Repo{
|
||||
Frequency: 30 * 86400,
|
||||
State: config.RepoState{
|
||||
IsRunning: true,
|
||||
LastAttemptStartTime: 1600000000,
|
||||
},
|
||||
}
|
||||
repoMap["alinux"] = &config.Repo{
|
||||
Frequency: 7*86400 + 3,
|
||||
State: config.RepoState{
|
||||
IsRunning: true,
|
||||
LastAttemptStartTime: 1620000000,
|
||||
},
|
||||
}
|
||||
repoMap["lnux"] = &config.Repo{
|
||||
Frequency: 86400,
|
||||
State: config.RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 1640000000,
|
||||
},
|
||||
}
|
||||
config.RepoMap = repoMap
|
||||
|
||||
r, w := net.Pipe()
|
||||
go func() {
|
||||
SendStatus(w)
|
||||
w.Close()
|
||||
}()
|
||||
msg, err := ioutil.ReadAll(r)
|
||||
r.Close()
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
expected := `Repository Last Synced Next Expected Sync Running
|
||||
alinux Sun, 02 May 2021 20:00:00 EDT Sun, 09 May 2021 20:00:03 EDT true
|
||||
eeeee Sun, 13 Sep 2020 08:26:40 EDT Tue, 13 Oct 2020 08:26:40 EDT true
|
||||
lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST false
|
||||
`
|
||||
if expected != string(msg) {
|
||||
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||
}
|
||||
}
|
||||
|
||||
func TestForceSync(t *testing.T) {
|
||||
saveRepos := config.Repos
|
||||
saveRepoMap := config.RepoMap
|
||||
doneChan := make(chan config.SyncResult)
|
||||
defer func() {
|
||||
config.Repos = saveRepos
|
||||
config.RepoMap = saveRepoMap
|
||||
close(doneChan)
|
||||
}()
|
||||
|
||||
// Part 1: run a dummy sync
|
||||
repo := config.Repo{
|
||||
Name: "nux",
|
||||
SyncType: "csc-sync-dummy",
|
||||
Frequency: 7 * 86400,
|
||||
MaxTime: 30,
|
||||
Logger: logger.NewLogger("nux", "/tmp/merlin_force_sync_test_logs"),
|
||||
StateFile: "/tmp/merlin_force_sync_test_state",
|
||||
DoneChan: doneChan,
|
||||
State: config.RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: config.NOT_RUN_YET,
|
||||
},
|
||||
}
|
||||
config.Repos = nil
|
||||
config.Repos = append(config.Repos, &repo)
|
||||
config.RepoMap = make(map[string]*config.Repo)
|
||||
config.RepoMap["nux"] = &repo
|
||||
|
||||
r, w := net.Pipe()
|
||||
go func() {
|
||||
if !ForceSync(w, "nux") {
|
||||
t.Errorf("Sync for nux did not start")
|
||||
}
|
||||
w.Close()
|
||||
}()
|
||||
msg, err := ioutil.ReadAll(r)
|
||||
r.Close()
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
expected := "Forced sync for nux"
|
||||
if expected != string(msg) {
|
||||
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||
}
|
||||
|
||||
select {
|
||||
case result := <-doneChan:
|
||||
if result.Exit != config.SUCCESS {
|
||||
t.Errorf("Sync should exit with SUCCESS, got %d", result.Exit)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Errorf("Dummy sync should be done in 1 second, waited 3 seconds")
|
||||
}
|
||||
|
||||
// Part 2: attempt the same thing but with repo.State.IsRunning = true
|
||||
r, w = net.Pipe()
|
||||
go func() {
|
||||
if ForceSync(w, "nux") {
|
||||
t.Errorf("Sync for nux should not have started")
|
||||
}
|
||||
w.Close()
|
||||
}()
|
||||
msg, err = ioutil.ReadAll(r)
|
||||
r.Close()
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
expected = "Could not force sync: nux is already syncing."
|
||||
if expected != string(msg) {
|
||||
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||
}
|
||||
|
||||
select {
|
||||
case <-doneChan:
|
||||
t.Errorf("Sync for nux should not have been started")
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
|
||||
// Part 3: attempt a force sync with a repo that does not exist
|
||||
r, w = net.Pipe()
|
||||
go func() {
|
||||
if ForceSync(w, "nixx") {
|
||||
t.Errorf("Sync for nixx should not have started")
|
||||
}
|
||||
w.Close()
|
||||
}()
|
||||
msg, err = ioutil.ReadAll(r)
|
||||
r.Close()
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
expected = "nixx is not tracked so cannot sync"
|
||||
if expected != string(msg) {
|
||||
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartListener(t *testing.T) {
|
||||
saveConf := config.Conf
|
||||
connChan := make(chan net.Conn)
|
||||
stopLisChan := make(chan struct{})
|
||||
wait := make(chan struct{})
|
||||
defer func() {
|
||||
config.Conf = saveConf
|
||||
close(connChan)
|
||||
close(stopLisChan)
|
||||
}()
|
||||
config.Conf = config.Config{
|
||||
SockPath: "/tmp/merlin_listener_test.sock",
|
||||
}
|
||||
|
||||
// Test 1: check that closing/sending something to stopLisChan will stop the listener
|
||||
// and that a new listener can be created after stopping the old one
|
||||
go func() {
|
||||
StartListener(connChan, stopLisChan)
|
||||
wait <- struct{}{}
|
||||
}()
|
||||
stopLisChan <- struct{}{}
|
||||
select {
|
||||
case <-wait:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
|
||||
}
|
||||
|
||||
go func() {
|
||||
StartListener(connChan, stopLisChan)
|
||||
wait <- struct{}{}
|
||||
}()
|
||||
close(stopLisChan)
|
||||
select {
|
||||
case <-wait:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Errorf("StartListener should stop when stopLisChan is closed")
|
||||
}
|
||||
close(wait)
|
||||
|
||||
// Test 2: check that connections can be made to the unix socket
|
||||
// this test does not appear to be very stable (I think there is a race condition somewhere)
|
||||
stopLisChan = make(chan struct{})
|
||||
go StartListener(connChan, stopLisChan)
|
||||
waitForMsg := func(expected string) {
|
||||
select {
|
||||
case conn := <-connChan:
|
||||
msg, err := ioutil.ReadAll(conn)
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
} else if expected != string(msg) {
|
||||
t.Errorf("Message expected was " + expected + " got " + string(msg))
|
||||
}
|
||||
conn.Close()
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
|
||||
}
|
||||
}
|
||||
sendMsg := func(msg string) {
|
||||
<-time.After(500 * time.Millisecond)
|
||||
send, err := net.Dial("unix", "/tmp/merlin_listener_test.sock")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = send.Write([]byte(msg))
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
send.Close()
|
||||
}
|
||||
go func() {
|
||||
waitForMsg("status")
|
||||
}()
|
||||
sendMsg("status")
|
||||
|
||||
go func() {
|
||||
waitForMsg("sync:uuunix")
|
||||
}()
|
||||
sendMsg("sync:uuunix")
|
||||
|
||||
go func() {
|
||||
waitForMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
|
||||
}()
|
||||
sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
|
||||
|
||||
// unsure why I can't put this in the defer
|
||||
os.Remove("/tmp/merlin_listener_test.sock")
|
||||
}
|
|
@ -1,352 +0,0 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
ini "gopkg.in/ini.v1"
|
||||
)
|
||||
|
||||
const (
|
||||
DAILY = 86400
|
||||
TWICE_DAILY = DAILY / 2
|
||||
HOURLY = 3600
|
||||
TWICE_HOURLY = HOURLY / 2
|
||||
BI_HOURLY = HOURLY * 2
|
||||
TRI_HOURLY = HOURLY * 3
|
||||
TEN_MINUTELY = 600
|
||||
FIVE_MINUTELY = 300
|
||||
|
||||
CONFIG_PATH = "merlin-config.ini"
|
||||
|
||||
DEFAULT_MAX_JOBS = 6
|
||||
DEFAULT_MAX_TIME = DAILY / 4
|
||||
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
|
||||
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
|
||||
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
|
||||
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
|
||||
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
|
||||
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
|
||||
DEFAULT_SOCK_PATH = "/run/merlin.sock"
|
||||
)
|
||||
|
||||
var frequencies = map[string]int{
|
||||
"daily": DAILY,
|
||||
"twice-daily": TWICE_DAILY,
|
||||
"hourly": HOURLY,
|
||||
"twice-hourly": TWICE_HOURLY,
|
||||
"bi-hourly": BI_HOURLY,
|
||||
"tri-hourly": TRI_HOURLY,
|
||||
"ten-minutely": TEN_MINUTELY,
|
||||
"five-minutely": FIVE_MINUTELY,
|
||||
}
|
||||
|
||||
// Last job attempt statuses
|
||||
const (
|
||||
NOT_RUN_YET = iota
|
||||
SUCCESS
|
||||
FAILURE
|
||||
TERMINATED // was killed by a signal
|
||||
)
|
||||
|
||||
type Result struct {
|
||||
Name string
|
||||
Exit int
|
||||
}
|
||||
|
||||
type Repo struct {
|
||||
// the name of this repo
|
||||
Name string `ini:"-"`
|
||||
// this should be one of "csc-sync-standard", etc.
|
||||
SyncType string `ini:"sync_type"`
|
||||
// a human-readable frequency, e.g. "bi-hourly"
|
||||
FrequencyStr string `ini:"frequency"`
|
||||
// the desired interval (in seconds) between successive runs
|
||||
Frequency int `ini:"-"`
|
||||
// the maximum time (in seconds) that each child process of this repo
|
||||
// can for before being killed
|
||||
MaxTime int `ini:"max_time"`
|
||||
// where to download the files for this repo (relative to the download
|
||||
// dir in the config)
|
||||
LocalDir string `ini:"local_dir"`
|
||||
// the remote host to rsync from
|
||||
RsyncHost string `ini:"rsync_host"`
|
||||
// the remote directory on the rsync host
|
||||
RsyncDir string `ini:"rsync_dir"`
|
||||
// the rsync user (optional)
|
||||
RsyncUser string `ini:"rsync_user"`
|
||||
// the file storing the password for rsync (optional)
|
||||
PasswordFile string `ini:"password_file"`
|
||||
// the file for general logging of this repo
|
||||
LoggerFile string `ini:"log_file"`
|
||||
// a reference to the general logger
|
||||
Logger *Logger `ini:"-"`
|
||||
// the file for logging this repo's rsync
|
||||
RsyncLogFile string `ini:"rsync_log_file"`
|
||||
// the file for logging this repo's zfssync
|
||||
ZfssyncLogFile string `ini:"zfssync_log_file"`
|
||||
// the repo will write its name and status in a Result struct to DoneChan
|
||||
// when it has finished a job (shared by all repos)
|
||||
DoneChan chan<- Result `ini:"-"`
|
||||
// the repo should stop syncing if StopChan is closed (shared by all repos)
|
||||
StopChan chan struct{} `ini:"-"`
|
||||
// a struct that stores the repo's status
|
||||
State RepoState `ini:"-"`
|
||||
// a reference to the global config
|
||||
cfg *Config `ini:"-"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// the maximum number of jobs allowed to execute concurrently
|
||||
MaxJobs int `ini:"max_jobs"`
|
||||
// the IP addresses to use for rsync
|
||||
IPv4Address string `ini:"ipv4_address"`
|
||||
IPv6Address string `ini:"ipv6_address"`
|
||||
// the default sync type
|
||||
SyncType string `ini:"default_sync_type"`
|
||||
// the default frequency string for the repos
|
||||
FrequencyStr string `ini:"default_frequency"`
|
||||
// the default MaxTime for each repo
|
||||
MaxTime int `ini:"default_max_time"`
|
||||
// the directory where rsync should download files
|
||||
DownloadDir string `ini:"download_dir"`
|
||||
// the directory where rsync passwords are stored
|
||||
PasswordDir string `ini:"password_dir"`
|
||||
// the directory where the state of each repo sync is saved
|
||||
StateDir string `ini:"states_dir"`
|
||||
// the directory where merlin will store the general logs for each repo
|
||||
LoggerDir string `ini:"log_dir"`
|
||||
// the directory to store the rsync logs for each repo
|
||||
RsyncLogDir string `ini:"rsync_log_dir"`
|
||||
// the directory to store the zfssync logs for each repo
|
||||
ZfssyncLogDir string `ini:"zfssync_log_dir"`
|
||||
// the Unix socket path which arthur will use to communicate with us
|
||||
SockPath string `ini:"sock_path"`
|
||||
// a list of all of the repos
|
||||
Repos []*Repo `ini:"-"`
|
||||
}
|
||||
|
||||
// This should only be modified by the main thread
|
||||
type RepoState struct {
|
||||
// these are stored in the states folder
|
||||
// whether this repo is running a job or not
|
||||
IsRunning bool `ini:"is_running"`
|
||||
// the Unix epoch timestamp at which this repo last attempted a job
|
||||
LastAttemptStartTime int64 `ini:"last_attempt_time"`
|
||||
// the number of seconds this repo ran for during its last attempted job
|
||||
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
|
||||
// whether the last attempt was successful or not
|
||||
LastAttemptExit int `ini:"last_attempt_exit"`
|
||||
}
|
||||
|
||||
// save the current state of the repo to a file
|
||||
func (repo *Repo) SaveState() {
|
||||
repo.Logger.Debug("Saving state")
|
||||
state_cfg := ini.Empty()
|
||||
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
if _, err := state_cfg.WriteTo(file); err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
repo.Logger.Debug("Saved state")
|
||||
}
|
||||
|
||||
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
|
||||
// and is not currently running.
|
||||
// returns true iff a job is started.
|
||||
func (repo *Repo) RunIfPossible() bool {
|
||||
if repo.State.IsRunning {
|
||||
return false
|
||||
}
|
||||
|
||||
curTime := time.Now().Unix()
|
||||
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
|
||||
repo.State.IsRunning = true
|
||||
repo.State.LastAttemptStartTime = curTime
|
||||
repo.SaveState()
|
||||
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
|
||||
go repo.StartSyncJob()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func zfsSync(repo *Repo) {
|
||||
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
|
||||
if err != nil {
|
||||
repo.Logger.Error(err)
|
||||
} else {
|
||||
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
} else {
|
||||
f.Write(out)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// update the repo state with the last attempt time and exit now that the job is done
|
||||
func (repo *Repo) SyncCompleted(exit int) {
|
||||
repoState := repo.State
|
||||
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
|
||||
nextSync := repo.MaxTime - int(syncTook)
|
||||
if nextSync < 0 {
|
||||
nextSync = 0
|
||||
}
|
||||
|
||||
repoState.IsRunning = false
|
||||
repoState.LastAttemptExit = exit
|
||||
repoState.LastAttemptRunTime = syncTook
|
||||
|
||||
var exitStr string
|
||||
switch exit {
|
||||
case SUCCESS:
|
||||
exitStr = "completed"
|
||||
case TERMINATED:
|
||||
exitStr = "terminated"
|
||||
default:
|
||||
exitStr = "failed"
|
||||
}
|
||||
repo.SaveState()
|
||||
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
|
||||
if exit == SUCCESS {
|
||||
// it is possible that the zfssync from the last repo sync is still running is that fine?
|
||||
go zfsSync(repo)
|
||||
}
|
||||
}
|
||||
|
||||
func panicIfErr(e error) {
|
||||
if e != nil {
|
||||
panic(e)
|
||||
}
|
||||
}
|
||||
|
||||
func touchFile(file string) {
|
||||
fi, err := os.Stat(file)
|
||||
if err != nil {
|
||||
f, err := os.OpenFile(file, os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to create file %s", file))
|
||||
}
|
||||
f.Close()
|
||||
} else if fi.IsDir() {
|
||||
panic(fmt.Errorf("%s is a directory", file))
|
||||
} else if os.Geteuid() != 1001 {
|
||||
// UID 1001 is the hardcoded uid for mirror
|
||||
err := os.Chown(file, 1001, os.Getegid())
|
||||
panicIfErr(err)
|
||||
} else if fi.Mode().Perm() != 0644 {
|
||||
err := os.Chmod(file, 0644)
|
||||
panicIfErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
func touchFiles(files ...string) {
|
||||
for _, file := range files {
|
||||
touchFile(file)
|
||||
}
|
||||
}
|
||||
|
||||
// GetConfig reads the config from a JSON file, initializes default values,
|
||||
// and initializes the non-configurable fields of each repo.
|
||||
// It returns a Config.
|
||||
func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
|
||||
// add global configuration in cfg
|
||||
data, err := ini.Load(CONFIG_PATH)
|
||||
panicIfErr(err)
|
||||
|
||||
cfg := Config{
|
||||
MaxJobs: DEFAULT_MAX_JOBS,
|
||||
MaxTime: DEFAULT_MAX_TIME,
|
||||
PasswordDir: DEFAULT_PASSWORD_DIR,
|
||||
DownloadDir: DEFAULT_DOWNLOAD_DIR,
|
||||
StateDir: DEFAULT_STATE_DIR,
|
||||
LoggerDir: DEFAULT_LOG_DIR,
|
||||
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
|
||||
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
|
||||
SockPath: DEFAULT_SOCK_PATH,
|
||||
Repos: make([]*Repo, 0),
|
||||
}
|
||||
err = data.MapTo(&cfg)
|
||||
panicIfErr(err)
|
||||
|
||||
for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} {
|
||||
err := os.MkdirAll(dir, 0755)
|
||||
panicIfErr(err)
|
||||
}
|
||||
if cfg.IPv4Address == "" {
|
||||
panic("Missing IPv4 address from config")
|
||||
} else if cfg.IPv6Address == "" {
|
||||
panic("Missing IPv6 address from config")
|
||||
}
|
||||
|
||||
// add each repo configuration to cfg
|
||||
for _, section := range data.Sections() {
|
||||
repoName := section.Name()
|
||||
if repoName == "DEFAULT" {
|
||||
continue
|
||||
}
|
||||
|
||||
repo := Repo{
|
||||
Name: repoName,
|
||||
SyncType: cfg.SyncType,
|
||||
FrequencyStr: cfg.FrequencyStr,
|
||||
MaxTime: cfg.MaxTime,
|
||||
LoggerFile: cfg.LoggerDir + "/" + repoName + ".log",
|
||||
RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log",
|
||||
ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log",
|
||||
DoneChan: doneChan,
|
||||
StopChan: stopChan,
|
||||
}
|
||||
err := section.MapTo(&repo)
|
||||
panicIfErr(err)
|
||||
|
||||
touchFiles(
|
||||
repo.LoggerFile,
|
||||
repo.RsyncLogFile,
|
||||
repo.ZfssyncLogFile,
|
||||
)
|
||||
|
||||
repo.Logger = NewLogger(repo.Name, repo.LoggerFile)
|
||||
repo.Frequency = frequencies[repo.FrequencyStr]
|
||||
if repo.SyncType == "" {
|
||||
panic("Missing sync type from " + repo.Name)
|
||||
} else if repo.Frequency == 0 {
|
||||
panic("Missing or invalid frequency for " + repo.Name)
|
||||
}
|
||||
repo.cfg = &cfg
|
||||
|
||||
repo.State = RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: NOT_RUN_YET,
|
||||
}
|
||||
|
||||
// create the state file if it does not exist, otherwise load it from existing file
|
||||
repoStateFile := cfg.StateDir + "/" + repo.Name
|
||||
if _, err := os.Stat(repoStateFile); err != nil {
|
||||
touchFile(repoStateFile)
|
||||
repo.SaveState()
|
||||
} else {
|
||||
err := ini.MapTo(&repo.State, repoStateFile)
|
||||
panicIfErr(err)
|
||||
}
|
||||
|
||||
cfg.Repos = append(cfg.Repos, &repo)
|
||||
}
|
||||
|
||||
if len(cfg.Repos) == 0 {
|
||||
panic("No repos found in config")
|
||||
}
|
||||
|
||||
return cfg
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
package common
|
|
@ -0,0 +1,265 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"gopkg.in/ini.v1"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
DAILY = 86400
|
||||
TWICE_DAILY = DAILY / 2
|
||||
HOURLY = 3600
|
||||
TWICE_HOURLY = HOURLY / 2
|
||||
BI_HOURLY = HOURLY * 2
|
||||
TRI_HOURLY = HOURLY * 3
|
||||
TEN_MINUTELY = 600
|
||||
FIVE_MINUTELY = 300
|
||||
|
||||
// could change this into a default_config
|
||||
DEFAULT_MAX_JOBS = 6
|
||||
DEFAULT_MAX_TIME = DAILY / 4
|
||||
DEFAULT_SYNC_TYPE = "csc-sync-standard"
|
||||
DEFAULT_FREQUENCY_STRING = "by-hourly"
|
||||
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
|
||||
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
|
||||
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
|
||||
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
|
||||
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
|
||||
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
|
||||
DEFAULT_SOCK_PATH = "/run/merlin.sock"
|
||||
)
|
||||
|
||||
var frequencies = map[string]int{
|
||||
"daily": DAILY,
|
||||
"twice-daily": TWICE_DAILY,
|
||||
"hourly": HOURLY,
|
||||
"twice-hourly": TWICE_HOURLY,
|
||||
"bi-hourly": BI_HOURLY,
|
||||
"tri-hourly": TRI_HOURLY,
|
||||
"ten-minutely": TEN_MINUTELY,
|
||||
"five-minutely": FIVE_MINUTELY,
|
||||
}
|
||||
|
||||
// Last job attempt statuses
|
||||
const (
|
||||
NOT_RUN_YET = iota
|
||||
SUCCESS
|
||||
FAILURE
|
||||
TERMINATED // was killed by a signal
|
||||
)
|
||||
|
||||
type SyncResult struct {
|
||||
Name string
|
||||
Exit int
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// the maximum number of jobs allowed to execute concurrently
|
||||
MaxJobs int `ini:"max_jobs"`
|
||||
// the IP addresses to use for rsync
|
||||
IPv4Address string `ini:"ipv4_address"`
|
||||
IPv6Address string `ini:"ipv6_address"`
|
||||
// the default sync type
|
||||
DefaultSyncType string `ini:"default_sync_type"`
|
||||
// the default frequency string
|
||||
DefaultFrequencyStr string `ini:"default_frequency"`
|
||||
// the default MaxTime
|
||||
DefaultMaxTime int `ini:"default_max_time"`
|
||||
// the directory where rsync should download files
|
||||
DownloadDir string `ini:"download_dir"`
|
||||
// the directory where rsync passwords are stored
|
||||
PasswordDir string `ini:"password_dir"`
|
||||
// the directory where the state of each repo sync is saved
|
||||
StateDir string `ini:"states_dir"`
|
||||
// the directory where merlin will store the general logs for each repo
|
||||
RepoLogDir string `ini:"repo_logs_dir"`
|
||||
// the directory to store the rsync logs for each repo
|
||||
RsyncLogDir string `ini:"rsync_logs_dir"`
|
||||
// the directory to store the zfssync logs for each repo
|
||||
ZfssyncLogDir string `ini:"zfssync_logs_dir"`
|
||||
// the Unix socket path which arthur will use to communicate with us
|
||||
SockPath string `ini:"sock_path"`
|
||||
}
|
||||
|
||||
// make it more clear when full path should be used vs when just the file name is needed
|
||||
type Repo struct {
|
||||
// the name of this repo
|
||||
Name string `ini:"-"`
|
||||
// this should be one of "csc-sync-standard", etc.
|
||||
SyncType string `ini:"sync_type"`
|
||||
// a human-readable frequency, e.g. "bi-hourly"
|
||||
FrequencyStr string `ini:"frequency"`
|
||||
// the desired interval (in seconds) between successive runs
|
||||
Frequency int `ini:"-"`
|
||||
// the maximum time (in seconds) that each child process of this repo
|
||||
// can for before being killed
|
||||
MaxTime int `ini:"max_time"`
|
||||
// where to download the files for this repo (relative to the download
|
||||
// dir in the config)
|
||||
LocalDir string `ini:"local_dir"`
|
||||
// the remote host to rsync from
|
||||
RsyncHost string `ini:"rsync_host"`
|
||||
// the remote directory on the rsync host
|
||||
RsyncDir string `ini:"rsync_dir"`
|
||||
// the rsync user (optional)
|
||||
RsyncUser string `ini:"rsync_user"`
|
||||
// the file storing the password for rsync (optional)
|
||||
PasswordFile string `ini:"password_file"`
|
||||
// the file storing the repo sync state (used to override default)
|
||||
StateFile string `ini:"state_file"`
|
||||
// the full file path for general logging of this repo (used to override default)
|
||||
RepoLogFile string `ini:"repo_log_file"`
|
||||
// a reference to the general logger
|
||||
Logger *logger.Logger `ini:"-"`
|
||||
// the full file path for logging this repo's rsync (used to override default)
|
||||
RsyncLogFile string `ini:"rsync_log_file"`
|
||||
// the full file path for logging this repo's zfssync (used to override default)
|
||||
ZfssyncLogFile string `ini:"zfssync_log_file"`
|
||||
// the repo will write its name and status in a Result struct to DoneChan
|
||||
// when it has finished a job (shared by all repos)
|
||||
DoneChan chan<- SyncResult `ini:"-"`
|
||||
// repos should stop syncing if StopChan is closed (shared by all repos)
|
||||
StopChan chan struct{} `ini:"-"`
|
||||
// a struct that stores the repo's status
|
||||
State RepoState `ini:"-"`
|
||||
}
|
||||
|
||||
// This should only be modified by the main thread
|
||||
type RepoState struct {
|
||||
// these are stored in the states folder
|
||||
// whether this repo is running a job or not
|
||||
IsRunning bool `ini:"is_running"`
|
||||
// the Unix epoch timestamp at which this repo last attempted a job
|
||||
LastAttemptStartTime int64 `ini:"last_attempt_time"`
|
||||
// the number of seconds this repo ran for during its last attempted job
|
||||
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
|
||||
// whether the last attempt was successful or not
|
||||
LastAttemptExit int `ini:"last_attempt_exit"`
|
||||
}
|
||||
|
||||
var (
|
||||
Conf Config
|
||||
Repos []*Repo
|
||||
RepoMap map[string]*Repo
|
||||
)
|
||||
|
||||
// GetConfig reads the config from a JSON file, initializes default values,
|
||||
// and initializes the non-configurable fields of each repo.
|
||||
// It returns a Config.
|
||||
func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) {
|
||||
// set default values then load config from file
|
||||
newConf := Config{
|
||||
MaxJobs: DEFAULT_MAX_JOBS,
|
||||
DefaultSyncType: DEFAULT_SYNC_TYPE,
|
||||
DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING,
|
||||
DefaultMaxTime: DEFAULT_MAX_TIME,
|
||||
PasswordDir: DEFAULT_PASSWORD_DIR,
|
||||
DownloadDir: DEFAULT_DOWNLOAD_DIR,
|
||||
StateDir: DEFAULT_STATE_DIR,
|
||||
RepoLogDir: DEFAULT_LOG_DIR,
|
||||
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
|
||||
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
|
||||
SockPath: DEFAULT_SOCK_PATH,
|
||||
}
|
||||
iniInfo, err := ini.Load(configPath)
|
||||
panicIfErr(err)
|
||||
err = iniInfo.MapTo(&newConf)
|
||||
panicIfErr(err)
|
||||
|
||||
// check config for major errors
|
||||
for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} {
|
||||
err := os.MkdirAll(dir, 0755)
|
||||
panicIfErr(err)
|
||||
}
|
||||
if newConf.IPv4Address == "" {
|
||||
panic("Missing IPv4 address from config")
|
||||
} else if newConf.IPv6Address == "" {
|
||||
panic("Missing IPv6 address from config")
|
||||
}
|
||||
|
||||
newRepos := make([]*Repo, 0)
|
||||
for _, section := range iniInfo.Sections() {
|
||||
repoName := section.Name()
|
||||
if repoName == "DEFAULT" {
|
||||
continue
|
||||
}
|
||||
|
||||
// set the default values for the repo then load from file
|
||||
// TODO: check if local_dir and repoName are always the same value
|
||||
// TODO: check to ensure that every Repo.Name is unique (may already be done by ini)
|
||||
repo := Repo{
|
||||
Name: repoName,
|
||||
SyncType: newConf.DefaultSyncType,
|
||||
FrequencyStr: newConf.DefaultFrequencyStr,
|
||||
MaxTime: newConf.DefaultMaxTime,
|
||||
StateFile: filepath.Join(newConf.StateDir, repoName),
|
||||
RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log",
|
||||
RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log",
|
||||
ZfssyncLogFile: filepath.Join(newConf.ZfssyncLogDir, repoName) + "-zfssync.log",
|
||||
DoneChan: doneChan,
|
||||
StopChan: stopChan,
|
||||
}
|
||||
err := section.MapTo(&repo)
|
||||
panicIfErr(err)
|
||||
|
||||
// TODO: ensure that the parent dirs to the file also exist when touching
|
||||
// or just remove the ability to override
|
||||
touchFiles(
|
||||
repo.StateFile,
|
||||
repo.RepoLogFile,
|
||||
repo.RsyncLogFile,
|
||||
repo.ZfssyncLogFile,
|
||||
)
|
||||
repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile)
|
||||
repo.Frequency = frequencies[repo.FrequencyStr]
|
||||
if repo.SyncType == "" {
|
||||
panic("Missing sync type from " + repo.Name)
|
||||
} else if repo.Frequency == 0 {
|
||||
panic("Missing or invalid frequency for " + repo.Name)
|
||||
}
|
||||
|
||||
repo.State = RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: NOT_RUN_YET,
|
||||
}
|
||||
err = ini.MapTo(&repo.State, repo.StateFile)
|
||||
panicIfErr(err)
|
||||
repo.SaveState()
|
||||
|
||||
newRepos = append(newRepos, &repo)
|
||||
}
|
||||
|
||||
if len(newRepos) == 0 {
|
||||
panic("No repos found in config")
|
||||
}
|
||||
|
||||
Conf = newConf
|
||||
Repos = newRepos
|
||||
RepoMap = make(map[string]*Repo)
|
||||
for _, repo := range Repos {
|
||||
RepoMap[repo.Name] = repo
|
||||
}
|
||||
}
|
||||
|
||||
// save the current state of the repo to a file
|
||||
func (repo *Repo) SaveState() {
|
||||
// repo.Logger.Debug("Saving state")
|
||||
state_cfg := ini.Empty()
|
||||
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
file, err := os.OpenFile(repo.StateFile, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
if _, err := state_cfg.WriteTo(file); err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
}
|
||||
// repo.Logger.Debug("Saved state")
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
)
|
||||
|
||||
func TestTouchFiles(t *testing.T) {
|
||||
files := []string{
|
||||
"/tmp/merlin_touch_test_1",
|
||||
"/tmp/merlin_touch_test_2",
|
||||
"/tmp/merlin_touch_test_3",
|
||||
}
|
||||
touchFiles(files[0], files[1], files[2])
|
||||
for _, file := range files {
|
||||
if _, err := os.Stat(file); err != nil {
|
||||
t.Errorf(err.Error())
|
||||
} else if err := os.Remove(file); err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPanicIfErr(t *testing.T) {
|
||||
panicIfErr(nil)
|
||||
defer func() { recover() }()
|
||||
panicIfErr(errors.New("AAAAAAAAAA"))
|
||||
t.Errorf("panicIfErr should have panicked")
|
||||
}
|
||||
|
||||
func TestLoadConfig(t *testing.T) {
|
||||
doneChan := make(chan SyncResult)
|
||||
stopChan := make(chan struct{})
|
||||
LoadConfig("config_test.ini", doneChan, stopChan)
|
||||
// TODO: Fill out parts not part of the ini or state file
|
||||
expectedConfig := Config{
|
||||
MaxJobs: 6,
|
||||
IPv4Address: "129.97.134.129",
|
||||
IPv6Address: "2620:101:f000:4901:c5c::129",
|
||||
DefaultSyncType: "csc-sync-standard",
|
||||
DefaultFrequencyStr: "daily",
|
||||
DefaultMaxTime: 1000,
|
||||
DownloadDir: "/tmp/test-mirror",
|
||||
PasswordDir: "/home/mirror/passwords",
|
||||
StateDir: "test_files",
|
||||
RepoLogDir: "test_files/logs",
|
||||
RsyncLogDir: "test_files/rsync",
|
||||
ZfssyncLogDir: "test_files/zfssync",
|
||||
SockPath: "test_files/test.sock",
|
||||
}
|
||||
expectedRepo1 := Repo{
|
||||
Name: "eelinux",
|
||||
SyncType: "csc-sync-nonstandard",
|
||||
FrequencyStr: "tri-hourly",
|
||||
Frequency: 10800,
|
||||
MaxTime: 2000,
|
||||
LocalDir: "eelinux",
|
||||
RsyncHost: "rsync.releases.eelinux.ca",
|
||||
RsyncDir: "releases",
|
||||
StateFile: "test_files/eeelinux",
|
||||
RepoLogFile: "test_files/logs/eelinux.log",
|
||||
Logger: Repos[0].Logger,
|
||||
RsyncLogFile: "test_files/rsync/eelinux.log",
|
||||
ZfssyncLogFile: "test_files/zfssync/eelinux.log",
|
||||
DoneChan: doneChan,
|
||||
StopChan: stopChan,
|
||||
State: RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 1600000000,
|
||||
LastAttemptRunTime: 100,
|
||||
LastAttemptExit: 1,
|
||||
},
|
||||
}
|
||||
expectedRepo2 := Repo{
|
||||
Name: "yoland",
|
||||
SyncType: "csc-sync-standard",
|
||||
FrequencyStr: "daily",
|
||||
Frequency: 86400,
|
||||
MaxTime: 1000,
|
||||
LocalDir: "yoland-releases",
|
||||
RsyncHost: "rsync.releases.yoland.io",
|
||||
RsyncDir: "releases",
|
||||
StateFile: "test_files/yoland",
|
||||
RepoLogFile: "test_files/logs/yoland.log",
|
||||
Logger: Repos[1].Logger,
|
||||
RsyncLogFile: "test_files/rsync/yoland-rsync.log",
|
||||
ZfssyncLogFile: "test_files/zfssync/yoland-zfssync.log",
|
||||
DoneChan: doneChan,
|
||||
StopChan: stopChan,
|
||||
State: RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: 0,
|
||||
},
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(expectedConfig, Conf) {
|
||||
t.Errorf("Config loaded does not match expected config")
|
||||
spew.Dump(expectedConfig)
|
||||
spew.Dump(Conf)
|
||||
}
|
||||
if !reflect.DeepEqual(expectedRepo1, *Repos[0]) {
|
||||
t.Errorf("The eelinux repo loaded does not match the exected repo config")
|
||||
spew.Dump(expectedRepo1)
|
||||
spew.Dump(*Repos[0])
|
||||
}
|
||||
if !reflect.DeepEqual(expectedRepo2, *Repos[1]) {
|
||||
t.Errorf("The yoland repo loaded does not match the exected repo config")
|
||||
spew.Dump(expectedRepo2)
|
||||
spew.Dump(*Repos[1])
|
||||
}
|
||||
|
||||
os.Remove("test_files/yoland")
|
||||
os.RemoveAll("test_files/logs")
|
||||
os.RemoveAll("test_files/rsync")
|
||||
os.RemoveAll("test_files/zfssync")
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
; default values are commented out
|
||||
|
||||
; max_jobs = 6
|
||||
ipv4_address = 129.97.134.129
|
||||
ipv6_address = 2620:101:f000:4901:c5c::129
|
||||
; default_sync_type = csc-sync-standard
|
||||
default_frequency = daily
|
||||
default_max_time = 1000
|
||||
download_dir = /tmp/test-mirror
|
||||
; password_dir = /home/mirror/passwords
|
||||
states_dir = test_files
|
||||
repo_logs_dir = test_files/logs
|
||||
rsync_logs_dir = test_files/rsync
|
||||
zfssync_logs_dir = test_files/zfssync
|
||||
sock_path = test_files/test.sock
|
||||
|
||||
[eelinux]
|
||||
sync_type = csc-sync-nonstandard
|
||||
frequency = tri-hourly
|
||||
max_time = 2000
|
||||
local_dir = eelinux
|
||||
rsync_host = rsync.releases.eelinux.ca
|
||||
rsync_dir = releases
|
||||
state_file = test_files/eeelinux
|
||||
repo_log_file = test_files/logs/eelinux.log
|
||||
rsync_log_file = test_files/rsync/eelinux.log
|
||||
zfssync_log_file = test_files/zfssync/eelinux.log
|
||||
|
||||
[yoland]
|
||||
; sync_type = csc-sync-standard
|
||||
; frequency = daily
|
||||
; max_time = 1000
|
||||
local_dir = yoland-releases
|
||||
rsync_host = rsync.releases.yoland.io
|
||||
rsync_dir = releases
|
||||
; state_file = test_files/yoland
|
||||
; repo_log_file = test_files/logs/yoland.log
|
||||
; rsync_log_file = test_files/rsync/yoland-rsync.log
|
||||
; zfssync_log_file = test_files/zfssync/yoland-zfssync.log
|
|
@ -0,0 +1,5 @@
|
|||
is_running = false
|
||||
last_attempt_time = 1600000000
|
||||
last_attempt_runtime = 100
|
||||
last_attempt_exit = 1
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
func panicIfErr(e error) {
|
||||
if e != nil {
|
||||
panic(e)
|
||||
}
|
||||
}
|
||||
|
||||
func touchFile(file string) {
|
||||
fi, err := os.Stat(file)
|
||||
if err != nil {
|
||||
f, err := os.OpenFile(file, os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to create file %s", file))
|
||||
}
|
||||
f.Close()
|
||||
} else if fi.IsDir() {
|
||||
panic(fmt.Errorf("%s is a directory", file))
|
||||
// } else if os.Geteuid() != 1001 {
|
||||
// // mirror is UID 1001
|
||||
// err := os.Chown(file, 1001, os.Getegid())
|
||||
// panicIfErr(err)
|
||||
}
|
||||
err = os.Chmod(file, 0644)
|
||||
panicIfErr(err)
|
||||
}
|
||||
|
||||
func touchFiles(files ...string) {
|
||||
for _, file := range files {
|
||||
touchFile(file)
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
module git.csclub.uwaterloo.ca/public/merlin
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.0
|
||||
github.com/stretchr/testify v1.7.0 // indirect
|
||||
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164
|
||||
gopkg.in/ini.v1 v1.63.2
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package common
|
||||
package logger
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
@ -30,12 +30,12 @@ var levels = map[int]string{
|
|||
var outLogger = log.New(os.Stdout, "", log.LstdFlags)
|
||||
var errLogger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
|
||||
func OutLogger() *log.Logger {
|
||||
return outLogger
|
||||
func OutLog(v ...interface{}) {
|
||||
outLogger.Println(v...)
|
||||
}
|
||||
|
||||
func ErrLogger() *log.Logger {
|
||||
return errLogger
|
||||
func ErrLog(v ...interface{}) {
|
||||
errLogger.Println(v...)
|
||||
}
|
||||
|
||||
func NewLogger(name, file string) *Logger {
|
||||
|
@ -48,15 +48,9 @@ func NewLogger(name, file string) *Logger {
|
|||
}
|
||||
|
||||
func (logger *Logger) Log(level int, v ...interface{}) {
|
||||
if level == INFO {
|
||||
outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
|
||||
} else if level == ERROR {
|
||||
errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
errLogger.Println(err.Error())
|
||||
ErrLog(err.Error())
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
|
@ -65,7 +59,7 @@ func (logger *Logger) Log(level int, v ...interface{}) {
|
|||
args = append(args, v...)
|
||||
|
||||
logger.SetOutput(f)
|
||||
logger.Println(v...)
|
||||
logger.Println(args)
|
||||
}
|
||||
|
||||
func (logger *Logger) Debug(v ...interface{}) {
|
||||
|
@ -73,6 +67,9 @@ func (logger *Logger) Debug(v ...interface{}) {
|
|||
}
|
||||
|
||||
func (logger *Logger) Info(v ...interface{}) {
|
||||
// src := []interface{}{logger.name + ":"}
|
||||
// args := append(src, v...)
|
||||
OutLog(v...)
|
||||
logger.Log(INFO, v...)
|
||||
}
|
||||
|
||||
|
@ -81,5 +78,6 @@ func (logger *Logger) Warning(v ...interface{}) {
|
|||
}
|
||||
|
||||
func (logger *Logger) Error(v ...interface{}) {
|
||||
ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...))
|
||||
logger.Log(ERROR, v...)
|
||||
}
|
201
merlin/merlin.go
201
merlin/merlin.go
|
@ -1,153 +1,35 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/common"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/arthur"
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||
"git.csclub.uwaterloo.ca/public/merlin/sync"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg common.Config
|
||||
outLogger *log.Logger
|
||||
errLogger *log.Logger
|
||||
repoMap map[string]*common.Repo
|
||||
repoIdx int
|
||||
numJobsRunning int
|
||||
)
|
||||
|
||||
func getAndRunCommand(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
_, err := io.Copy(&buf, conn)
|
||||
if err != nil {
|
||||
errLogger.Println(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
command := buf.String()
|
||||
args := strings.Split(command, ":")
|
||||
respondAndLogErr := func(msg string) {
|
||||
outLogger.Println(msg)
|
||||
conn.Write([]byte(msg))
|
||||
}
|
||||
|
||||
if args[0] == "status" {
|
||||
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
|
||||
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n")
|
||||
|
||||
// for time formating see https://pkg.go.dev/time#pkg-constants
|
||||
for name, repo := range repoMap {
|
||||
fmt.Fprintf(status, "%s\t%s\t%s\n",
|
||||
name,
|
||||
time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123),
|
||||
time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123),
|
||||
)
|
||||
}
|
||||
|
||||
status.Flush()
|
||||
} else if args[0] == "sync" {
|
||||
if len(args) != 2 {
|
||||
respondAndLogErr("Could not parse sync command, forced sync fails.")
|
||||
return
|
||||
}
|
||||
|
||||
if repo, inMap := repoMap[args[1]]; inMap {
|
||||
outLogger.Println("Attempting to force sync of " + repo.Name)
|
||||
if repo.RunIfPossible() {
|
||||
conn.Write([]byte("Forced sync for " + repo.Name))
|
||||
numJobsRunning++
|
||||
} else {
|
||||
respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.")
|
||||
}
|
||||
} else {
|
||||
respondAndLogErr(args[1] + " is not tracked so cannot sync")
|
||||
}
|
||||
} else {
|
||||
respondAndLogErr("Received unrecognized command: " + command)
|
||||
}
|
||||
}
|
||||
|
||||
func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
|
||||
// must remove cfg.SockPath otherwise get "bind: address already in use"
|
||||
if filepath.Ext(cfg.SockPath) != ".sock" {
|
||||
panic(fmt.Errorf("Socket file must end with .sock"))
|
||||
} else if _, err := os.Stat(cfg.SockPath); err == nil {
|
||||
if err := os.Remove(cfg.SockPath); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else if !errors.Is(err, os.ErrNotExist) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
ear, err := net.Listen("unix", cfg.SockPath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
outLogger.Println("Listening to unix socket at " + cfg.SockPath)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
// will exit when ear is closed
|
||||
conn, err := ear.Accept()
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
errLogger.Println("Accepted socket error: " + err.Error())
|
||||
continue
|
||||
}
|
||||
errLogger.Println("Unhandlable socket error: " + err.Error())
|
||||
return
|
||||
}
|
||||
connChan <- conn
|
||||
}
|
||||
}()
|
||||
|
||||
<-stopLisChan
|
||||
ear.Close()
|
||||
}
|
||||
|
||||
// We use a round-robin strategy. It's not the most efficient, but it's simple
|
||||
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
|
||||
func runAsManyAsPossible() {
|
||||
repos := cfg.Repos
|
||||
startIdx := repoIdx
|
||||
|
||||
for numJobsRunning < cfg.MaxJobs {
|
||||
repo := repos[repoIdx]
|
||||
if repo.RunIfPossible() {
|
||||
numJobsRunning++
|
||||
}
|
||||
repoIdx = (repoIdx + 1) % len(repos)
|
||||
if repoIdx == startIdx {
|
||||
// we've come full circle
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
// get config path from command args
|
||||
var CONFIG_PATH = "merlin-config.ini"
|
||||
|
||||
func main() {
|
||||
outLogger = common.OutLogger()
|
||||
errLogger = common.ErrLogger()
|
||||
|
||||
// check that merlin is run as mirror user
|
||||
// check that mirror user has pid of 1001
|
||||
|
||||
doneChan := make(chan common.Result)
|
||||
// receives a Result struct when a repo stops syncing
|
||||
doneChan := make(chan config.SyncResult)
|
||||
// closed when merlin is told to stop running
|
||||
stopChan := make(chan struct{})
|
||||
// receives a Conn when a client makes a connection to unix socket
|
||||
connChan := make(chan net.Conn)
|
||||
// signal channel to stop listening to unix socket
|
||||
stopLisChan := make(chan struct{})
|
||||
|
||||
stopSig := make(chan os.Signal, 1)
|
||||
|
@ -157,24 +39,38 @@ func main() {
|
|||
|
||||
unix.Umask(002)
|
||||
|
||||
numJobsRunning = 0
|
||||
numJobsRunning := 0
|
||||
repoIdx := 0
|
||||
|
||||
loadConfig := func() {
|
||||
cfg = common.GetConfig(doneChan, stopChan)
|
||||
outLogger.Println("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg))
|
||||
|
||||
repoMap = make(map[string]*common.Repo)
|
||||
for _, repo := range cfg.Repos {
|
||||
repoMap[repo.Name] = repo
|
||||
}
|
||||
config.LoadConfig(CONFIG_PATH, doneChan, stopChan)
|
||||
logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf))
|
||||
|
||||
repoIdx = 0
|
||||
go unixSockListener(connChan, stopLisChan)
|
||||
go arthur.StartListener(connChan, stopLisChan)
|
||||
}
|
||||
// We use a round-robin strategy. It's not the most efficient, but it's simple
|
||||
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
|
||||
runAsManyAsPossible := func() {
|
||||
startIdx := repoIdx
|
||||
|
||||
for numJobsRunning < config.Conf.MaxJobs {
|
||||
repo := config.Repos[repoIdx]
|
||||
if sync.SyncIfPossible(repo) {
|
||||
numJobsRunning++
|
||||
}
|
||||
repoIdx = (repoIdx + 1) % len(config.Repos)
|
||||
if repoIdx == startIdx {
|
||||
// we've come full circle
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loadConfig()
|
||||
// IsRunning must be false otherwise repo will never sync
|
||||
for _, repo := range repoMap {
|
||||
// ensure that IsRunning is false otherwise repo will never sync
|
||||
// (only on startup can we assume that repos were not previously syncing)
|
||||
for _, repo := range config.Repos {
|
||||
repo.State.IsRunning = false
|
||||
}
|
||||
runAsManyAsPossible()
|
||||
|
@ -190,24 +86,39 @@ runLoop:
|
|||
case <-reloadSig:
|
||||
stopLisChan <- struct{}{}
|
||||
loadConfig()
|
||||
// ensure that SyncCompleted can handle it if reloading config
|
||||
// removes a repo that was already syncing
|
||||
|
||||
case done := <-doneChan:
|
||||
repoMap[done.Name].SyncCompleted(done.Exit)
|
||||
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
|
||||
numJobsRunning--
|
||||
|
||||
case conn := <-connChan:
|
||||
getAndRunCommand(conn)
|
||||
command, repoName := arthur.GetCommand(conn)
|
||||
switch command {
|
||||
case "status":
|
||||
arthur.SendStatus(conn)
|
||||
case "sync":
|
||||
if arthur.ForceSync(conn, repoName) {
|
||||
numJobsRunning++
|
||||
}
|
||||
default:
|
||||
arthur.SendAndLog(conn, "Received unrecognized command: "+command)
|
||||
}
|
||||
// None of the arthur functions close the connection so you will need to
|
||||
// close it manually for the message to be sent
|
||||
conn.Close()
|
||||
|
||||
case <-time.After(1 * time.Minute):
|
||||
}
|
||||
runAsManyAsPossible()
|
||||
}
|
||||
|
||||
// give time for jobs to terminate before exiting
|
||||
// give time for all jobs to terminate before exiting program
|
||||
for {
|
||||
select {
|
||||
case done := <-doneChan:
|
||||
repoMap[done.Name].SyncCompleted(done.Exit)
|
||||
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
|
||||
numJobsRunning--
|
||||
|
||||
case <-time.After(1 * time.Second):
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
package main
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestSock1(t *testing.T) {
|
||||
|
||||
}
|
|
@ -1,50 +1,50 @@
|
|||
package common
|
||||
package sync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
)
|
||||
|
||||
func (repo *Repo) buildRsyncHost() string {
|
||||
func buildRsyncHost(repo *config.Repo) string {
|
||||
if repo.RsyncUser != "" {
|
||||
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
||||
}
|
||||
return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func (repo *Repo) buildRsyncDaemonHost() string {
|
||||
func buildRsyncDaemonHost(repo *config.Repo) string {
|
||||
if repo.RsyncUser != "" {
|
||||
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
||||
}
|
||||
return repo.RsyncHost + "::" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func (repo *Repo) buildRsyncSSHHost() string {
|
||||
func buildRsyncSSHHost(repo *config.Repo) string {
|
||||
if repo.RsyncUser != "" {
|
||||
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
||||
}
|
||||
return repo.RsyncHost + ":" + repo.RsyncDir
|
||||
}
|
||||
|
||||
func (repo *Repo) buildDownloadDir() string {
|
||||
return repo.cfg.DownloadDir + "/" + repo.LocalDir
|
||||
func buildDownloadDir(repo *config.Repo) string {
|
||||
return config.Conf.DownloadDir + "/" + repo.LocalDir
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncApache() []string {
|
||||
func cscSyncApache(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links",
|
||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
||||
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||
"--exclude", ".~tmp~/",
|
||||
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||
}
|
||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncArchLinux() []string {
|
||||
func cscSyncArchLinux(repo *config.Repo) []string {
|
||||
|
||||
tempDir := "" // is this option even needed?
|
||||
|
||||
|
@ -52,83 +52,83 @@ func (repo *Repo) CSCSyncArchLinux() []string {
|
|||
args := []string{
|
||||
"rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600",
|
||||
"--contimeout=60", "-p", "--delay-updates", "--no-motd",
|
||||
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + repo.cfg.IPv4Address,
|
||||
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address,
|
||||
}
|
||||
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncBadPerms() []string {
|
||||
func cscSyncBadPerms(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete",
|
||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
||||
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||
"--exclude", ".~tmp~/",
|
||||
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||
}
|
||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// TODO ceph
|
||||
|
||||
func (repo *Repo) CSCSyncCDImage() []string {
|
||||
func cscSyncCDImage(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
|
||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
||||
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||
}
|
||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncChmod() []string {
|
||||
func cscSyncChmod(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links",
|
||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
||||
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||
"--chmod=u=rwX,go=rX",
|
||||
}
|
||||
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncDebian() []string {
|
||||
func cscSyncDebian(repo *config.Repo) []string {
|
||||
|
||||
// sync /pool
|
||||
args := []string{"nice", "rsync", "-rlHtvp",
|
||||
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
|
||||
"--address=" + repo.cfg.IPv4Address,
|
||||
"--address=" + config.Conf.IPv4Address,
|
||||
}
|
||||
// $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1
|
||||
return args
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncDebianCD() []string {
|
||||
func cscSyncDebianCD(repo *config.Repo) []string {
|
||||
|
||||
// this is basically the same as CSCSyncDebian, except it has an extra --exclude
|
||||
args := []string{"nice", "rsync", "-rlHtvp", "--delete",
|
||||
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
|
||||
"--address=" + repo.cfg.IPv4Address,
|
||||
"--address=" + config.Conf.IPv4Address,
|
||||
// "--exclude", "Archive-Update-in-Progress-${HOSTNAME}"
|
||||
}
|
||||
// $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1
|
||||
return args
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncGentoo() []string {
|
||||
func cscSyncGentoo(repo *config.Repo) []string {
|
||||
repo.RsyncUser = "gentoo"
|
||||
repo.PasswordFile = "gentoo-distfiles"
|
||||
return repo.CSCSyncStandard()
|
||||
return cscSyncStandard(repo)
|
||||
}
|
||||
|
||||
// TODO s3
|
||||
|
||||
func (repo *Repo) CSCSyncSSH() []string {
|
||||
func cscSyncSSH(repo *config.Repo) []string {
|
||||
|
||||
args := []string{
|
||||
"rsync", "-aH", "--no-owner", "--no-group", "--delete",
|
||||
|
@ -138,48 +138,45 @@ func (repo *Repo) CSCSyncSSH() []string {
|
|||
}
|
||||
// not sure if we should be assuming ssh identity file is the password file
|
||||
args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile))
|
||||
args = append(args, repo.buildRsyncSSHHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncStandard() []string {
|
||||
func cscSyncStandard(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
||||
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
||||
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||
}
|
||||
if repo.PasswordFile != "" {
|
||||
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
|
||||
filename := config.Conf.PasswordDir + "/" + repo.PasswordFile
|
||||
args = append(args, "--password-file", filename)
|
||||
}
|
||||
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (repo *Repo) CSCSyncStandardIPV6() []string {
|
||||
func cscSyncStandardIPV6(repo *config.Repo) []string {
|
||||
args := []string{
|
||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
||||
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + repo.cfg.IPv4Address,
|
||||
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address,
|
||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||
}
|
||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
||||
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// for testing, to be removed later
|
||||
func (repo *Repo) CSCSyncDummy() []string {
|
||||
|
||||
sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10)
|
||||
args := []string{"sleep", sleepDur}
|
||||
func cscSyncDummy(repo *config.Repo) []string {
|
||||
args := []string{"sleep", "1"}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// executes a particular sync job depending on repo.SyncType.
|
||||
func (repo *Repo) getSyncCommand() []string {
|
||||
func getSyncCommand(repo *config.Repo) (args []string) {
|
||||
/*
|
||||
# scripts used by merlin.py
|
||||
csc-sync-debian
|
||||
|
@ -206,80 +203,55 @@ func (repo *Repo) getSyncCommand() []string {
|
|||
ubuntu-releases-sync
|
||||
*/
|
||||
switch repo.SyncType {
|
||||
|
||||
case "csc-sync-apache":
|
||||
return repo.CSCSyncApache()
|
||||
args = cscSyncApache(repo)
|
||||
case "csc-sync-archlinux":
|
||||
return repo.CSCSyncArchLinux()
|
||||
args = cscSyncArchLinux(repo)
|
||||
case "csc-sync-badperms":
|
||||
return repo.CSCSyncBadPerms()
|
||||
args = cscSyncBadPerms(repo)
|
||||
case "csc-sync-cdimage":
|
||||
return repo.CSCSyncCDImage()
|
||||
args = cscSyncCDImage(repo)
|
||||
// case "csc-sync-ceph":
|
||||
// return repo.CSCSyncCeph()
|
||||
// args = cscSyncCeph(repo)
|
||||
case "csc-sync-chmod":
|
||||
return repo.CSCSyncChmod()
|
||||
args = cscSyncChmod(repo)
|
||||
case "csc-sync-debian":
|
||||
return repo.CSCSyncDebian()
|
||||
args = cscSyncDebian(repo)
|
||||
case "csc-sync-debian-cd":
|
||||
return repo.CSCSyncDebianCD()
|
||||
args = cscSyncDebianCD(repo)
|
||||
case "csc-sync-gentoo":
|
||||
return repo.CSCSyncGentoo()
|
||||
args = cscSyncGentoo(repo)
|
||||
// case "csc-sync-s3":
|
||||
// return repo.CSCSyncS3()
|
||||
// args = cscSyncS3(repo)
|
||||
case "csc-sync-ssh":
|
||||
return repo.CSCSyncSSH()
|
||||
args = cscSyncSSH(repo)
|
||||
case "csc-sync-standard":
|
||||
return repo.CSCSyncStandard()
|
||||
args = cscSyncStandard(repo)
|
||||
case "csc-sync-standard-ipv6":
|
||||
return repo.CSCSyncStandardIPV6()
|
||||
args = cscSyncStandardIPV6(repo)
|
||||
// case "csc-sync-wget":
|
||||
// return repo.CSCSyncWget()
|
||||
// args = cscSyncWget(repo)
|
||||
case "csc-sync-dummy":
|
||||
return repo.CSCSyncDummy()
|
||||
return cscSyncDummy(repo)
|
||||
default:
|
||||
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
|
||||
}
|
||||
|
||||
return []string{}
|
||||
}
|
||||
|
||||
func (repo *Repo) StartSyncJob() {
|
||||
status := FAILURE
|
||||
defer func() {
|
||||
repo.DoneChan <- Result{
|
||||
Name: repo.Name,
|
||||
Exit: status,
|
||||
}
|
||||
}()
|
||||
|
||||
localDir := repo.buildDownloadDir()
|
||||
localDir := buildDownloadDir(repo)
|
||||
err := os.MkdirAll(localDir, 0775)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
|
||||
err = fmt.Errorf("could not create directory %s: %w", localDir, err)
|
||||
// I'm not sure if logger can handle error so just use the string?
|
||||
repo.Logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
args := repo.getSyncCommand()
|
||||
if repo.PasswordFile != "" {
|
||||
a268wang marked this conversation as resolved
|
||||
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
|
||||
filename := config.Conf.PasswordDir + "/" + repo.PasswordFile
|
||||
args = append(args, "--password-file", filename)
|
||||
}
|
||||
args = append(args, repo.buildRsyncHost(), localDir)
|
||||
|
||||
ch := SpawnProcess(repo, args)
|
||||
if ch == nil {
|
||||
// SpawnProcess will have already logged error
|
||||
args = append(args, buildRsyncHost(repo), localDir)
|
||||
a268wang marked this conversation as resolved
d278liu
commented
don't think you want to be appending rsync host and localDir here too. each sync type already appends it. also different sync types have different ways of building the host (some needs to be the daemon style host and some needs to be a ssh host) don't think you want to be appending rsync host and localDir here too. each sync type already appends it.
also different sync types have different ways of building the host (some needs to be the daemon style host and some needs to be a ssh host)
a268wang
commented
Huh all this stuff was actually moved over from the startSync function https://git.csclub.uwaterloo.ca/public/mirror/src/branch/go/merlin/common/sync.go#L256-L270 I'm thinking of keeping a modified version of part to make sure that startSync will immediately exit if merlin cannot mkdir the buildDownloadDir Huh all this stuff was actually moved over from the startSync function
https://git.csclub.uwaterloo.ca/public/mirror/src/branch/go/merlin/common/sync.go#L256-L270
I'm thinking of keeping a modified version of part to make sure that startSync will immediately exit if merlin cannot mkdir the buildDownloadDir
d278liu
commented
yeah my bad about the original sync.go i think i forgot to remove them yeah my bad about the original sync.go i think i forgot to remove them
|
||||
|
||||
return
|
||||
}
|
||||
cmd := <-ch
|
||||
switch cmd.ProcessState.ExitCode() {
|
||||
case 0:
|
||||
status = SUCCESS
|
||||
case -1:
|
||||
status = TERMINATED
|
||||
// default is already FAILURE
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
package sync
|
|
@ -0,0 +1,61 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
)
|
||||
|
||||
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
|
||||
// and is not currently running.
|
||||
// returns true iff a job is started.
|
||||
func SyncIfPossible(repo *config.Repo) bool {
|
||||
// Change to SyncIfPossible
|
||||
if repo.State.IsRunning {
|
||||
return false
|
||||
}
|
||||
|
||||
curTime := time.Now().Unix()
|
||||
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
|
||||
repo.State.IsRunning = true
|
||||
repo.State.LastAttemptStartTime = curTime
|
||||
repo.SaveState()
|
||||
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
|
||||
go startSync(repo)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// update the repo state with the last attempt time and exit now that the job is done
|
||||
func SyncCompleted(repo *config.Repo, exit int) {
|
||||
repoState := repo.State
|
||||
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
|
||||
nextSync := repo.MaxTime - int(syncTook)
|
||||
if nextSync < 0 {
|
||||
nextSync = 0
|
||||
}
|
||||
|
||||
repoState.IsRunning = false
|
||||
repoState.LastAttemptExit = exit
|
||||
repoState.LastAttemptRunTime = syncTook
|
||||
|
||||
var exitStr string
|
||||
switch exit {
|
||||
case config.SUCCESS:
|
||||
exitStr = "completed"
|
||||
case config.TERMINATED:
|
||||
exitStr = "terminated"
|
||||
default:
|
||||
exitStr = "failed"
|
||||
}
|
||||
repo.SaveState()
|
||||
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
|
||||
|
||||
// TODO: make it possible for this SyncCompleted to be run without zfsSync being run
|
||||
if exit == config.SUCCESS {
|
||||
// it is possible that the zfssync from the last repo sync is still running is that fine?
|
||||
go zfsSync(repo)
|
||||
}
|
||||
}
|
|
@ -1,10 +1,13 @@
|
|||
package common
|
||||
package sync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||
)
|
||||
|
||||
// SpawnProcess spawns a child process for the given repo. The process will
|
||||
|
@ -12,7 +15,7 @@ import (
|
|||
// runs for longer than the repo's MaxTime.
|
||||
// It returns a channel through which a Cmd will be sent once it has finished,
|
||||
// or nil if it was unable to start a process.
|
||||
func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
|
||||
func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
|
||||
// startTime and time took will be handled in common.go by SyncExit
|
||||
cmd := exec.Command(args[0], args[1:]...)
|
||||
|
||||
|
@ -55,6 +58,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
|
|||
case <-cmdDoneChan:
|
||||
if !cmd.ProcessState.Success() {
|
||||
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
|
||||
// repo.Logger.Info(strings.Join(args, " "))
|
||||
}
|
||||
|
||||
case <-repo.StopChan:
|
||||
|
@ -68,3 +72,43 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
|
|||
}()
|
||||
return
|
||||
}
|
||||
|
||||
func startSync(repo *config.Repo) {
|
||||
status := config.FAILURE
|
||||
defer func() {
|
||||
repo.DoneChan <- config.SyncResult{
|
||||
Name: repo.Name,
|
||||
Exit: status,
|
||||
}
|
||||
}()
|
||||
|
||||
args := getSyncCommand(repo)
|
||||
ch := spawnSyncProcess(repo, args)
|
||||
if ch == nil {
|
||||
// SpawnProcess will have already logged error
|
||||
return
|
||||
}
|
||||
cmd := <-ch
|
||||
switch cmd.ProcessState.ExitCode() {
|
||||
case 0:
|
||||
status = config.SUCCESS
|
||||
case -1:
|
||||
status = config.TERMINATED
|
||||
// default is already FAILURE
|
||||
}
|
||||
}
|
||||
|
||||
func zfsSync(repo *config.Repo) {
|
||||
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
|
||||
if err != nil {
|
||||
repo.Logger.Error(err)
|
||||
} else {
|
||||
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
repo.Logger.Error(err.Error())
|
||||
} else {
|
||||
f.Write(out)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
same here, some sync types already append the password file - and how the password file should be added to args depends on the type of sync, so probably would remove this too
I agree with this, I'll remove it shortly