Refactor Go Merlin #1

Closed
a268wang wants to merge 8 commits from refactor into go
21 changed files with 1138 additions and 611 deletions

18
.drone.yml Normal file
View File

@ -0,0 +1,18 @@
kind: pipeline
type: docker
name: default
steps:
- name: merlin
image: golang:1.17
commands:
# add linter
- cd merlin
- go build
- go test ./...
trigger:
branch:
- master
- go
- refactor

1
.gitignore vendored
View File

@ -1,4 +1,3 @@
/.*
!.gitignore
.git_old/
/dead.letter

View File

@ -20,6 +20,7 @@ This folder contains the code for merlin (which does the actual syncing) and art
- [ ] wget
### TODO
- [ ] ensure that the proper permissions (file mode, group, user) are used
- [ ] detect if an rsync process is stuck (\*\*)
- [ ] place each rsync process in a separate cgroup (\*\*\*)

130
merlin/arthur/arthur.go Normal file
View File

@ -0,0 +1,130 @@
package arthur
import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"sort"
"strings"
"text/tabwriter"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
"git.csclub.uwaterloo.ca/public/merlin/sync"
)
// Reads and parses the message sent over the accepted connection
func GetCommand(conn net.Conn) (command, repoName string) {
command = ""
repoName = ""
buf, err := ioutil.ReadAll(conn)
if err != nil {
logger.ErrLog(err.Error())
return
}
args := strings.Split(string(buf), ":")
if len(args) >= 1 {
command = args[0]
}
if len(args) >= 2 {
repoName = args[1]
}
return
}
func SendAndLog(conn net.Conn, msg string) {
logger.OutLog(msg)
conn.Write([]byte(msg))
}
func SendStatus(conn net.Conn) {
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\tRunning\n")
keys := make([]string, 0)
for name, _ := range config.RepoMap {
keys = append(keys, name)
}
sort.Strings(keys)
// for other ways to format the time see: https://pkg.go.dev/time#pkg-constants
for _, name := range keys {
repo := config.RepoMap[name]
lastSync := repo.State.LastAttemptStartTime
nextSync := lastSync + int64(repo.Frequency)
fmt.Fprintf(status, "%s\t%s\t%s\t%t\n",
name,
time.Unix(lastSync, 0).Format(time.RFC1123),
time.Unix(nextSync, 0).Format(time.RFC1123),
repo.State.IsRunning,
)
}
status.Flush()
}
// Attempt to force the sync of the repo
func ForceSync(conn net.Conn, repoName string) (newSync bool) {
newSync = false
// TODO: send repoName and every key in RepoMap to lowercase
if repo, isInMap := config.RepoMap[repoName]; isInMap {
logger.OutLog("Attempting to force sync of " + repoName)
if sync.SyncIfPossible(repo) {
conn.Write([]byte("Forced sync for " + repoName))
newSync = true
} else {
SendAndLog(conn, "Could not force sync: "+repoName+" is already syncing.")
}
} else {
SendAndLog(conn, repoName+" is not tracked so cannot sync")
}
return
}
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
sockpath := config.Conf.SockPath
// must remove old cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(sockpath) != ".sock" {
panic(fmt.Errorf("socket file must end with .sock"))
} else if _, err := os.Stat(sockpath); err == nil {
if err := os.Remove(sockpath); err != nil {
panic(err)
}
} else if !errors.Is(err, os.ErrNotExist) {
panic(err)
}
ear, err := net.Listen("unix", sockpath)
if err != nil {
panic(err)
}
logger.OutLog("Listening to unix socket at " + sockpath)
go func() {
for {
// Attempting to accept on a closed net.Listener will return a non-temporary error
conn, err := ear.Accept()
if err != nil {
if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() {
logger.ErrLog("Accepted socket error: " + err.Error())
continue
}
logger.ErrLog("Unhandlable socket error: " + err.Error())
return
}
connChan <- conn
}
}()
// TODO: check handling of multiple SIGHUP
<-stopLisChan
ear.Close()
}

View File

@ -0,0 +1,283 @@
package arthur
import (
"io/ioutil"
"net"
"os"
"testing"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
)
func TestStatusCommand(t *testing.T) {
r, w := net.Pipe()
go func() {
// will only finish write when EOF is sent
// only way to send EOF is to close
w.Write([]byte("status"))
w.Close()
}()
command, repoName := GetCommand(r)
if command != "status" {
t.Errorf("command for status should be \"status\", got " + command)
} else if repoName != "" {
t.Errorf("status should return an empty string for the repoName, got " + repoName)
}
}
func TestSyncCommand(t *testing.T) {
r, w := net.Pipe()
go func() {
w.Write([]byte("sync:ubuntu"))
w.Close()
}()
command, repoName := GetCommand(r)
r.Close()
if command != "sync" {
t.Errorf("command for sync:ubuntu should be \"sync\", got " + command)
} else if repoName != "ubuntu" {
t.Errorf("name of repo for sync:ubuntu should be \"ubuntu\", got " + repoName)
}
}
func TestSendStatus(t *testing.T) {
saveRepoMap := config.RepoMap
defer func() {
config.RepoMap = saveRepoMap
}()
repoMap := make(map[string]*config.Repo)
repoMap["eeeee"] = &config.Repo{
Frequency: 30 * 86400,
State: config.RepoState{
IsRunning: true,
LastAttemptStartTime: 1600000000,
},
}
repoMap["alinux"] = &config.Repo{
Frequency: 7*86400 + 3,
State: config.RepoState{
IsRunning: true,
LastAttemptStartTime: 1620000000,
},
}
repoMap["lnux"] = &config.Repo{
Frequency: 86400,
State: config.RepoState{
IsRunning: false,
LastAttemptStartTime: 1640000000,
},
}
config.RepoMap = repoMap
r, w := net.Pipe()
go func() {
SendStatus(w)
w.Close()
}()
msg, err := ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected := `Repository Last Synced Next Expected Sync Running
alinux Sun, 02 May 2021 20:00:00 EDT Sun, 09 May 2021 20:00:03 EDT true
eeeee Sun, 13 Sep 2020 08:26:40 EDT Tue, 13 Oct 2020 08:26:40 EDT true
lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST false
`
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
}
func TestForceSync(t *testing.T) {
saveRepos := config.Repos
saveRepoMap := config.RepoMap
doneChan := make(chan config.SyncResult)
defer func() {
config.Repos = saveRepos
config.RepoMap = saveRepoMap
close(doneChan)
}()
// Part 1: run a dummy sync
repo := config.Repo{
Name: "nux",
SyncType: "csc-sync-dummy",
Frequency: 7 * 86400,
MaxTime: 30,
Logger: logger.NewLogger("nux", "/tmp/merlin_force_sync_test_logs"),
StateFile: "/tmp/merlin_force_sync_test_state",
DoneChan: doneChan,
State: config.RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: config.NOT_RUN_YET,
},
}
config.Repos = nil
config.Repos = append(config.Repos, &repo)
config.RepoMap = make(map[string]*config.Repo)
config.RepoMap["nux"] = &repo
r, w := net.Pipe()
go func() {
if !ForceSync(w, "nux") {
t.Errorf("Sync for nux did not start")
}
w.Close()
}()
msg, err := ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected := "Forced sync for nux"
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
select {
case result := <-doneChan:
if result.Exit != config.SUCCESS {
t.Errorf("Sync should exit with SUCCESS, got %d", result.Exit)
}
case <-time.After(3 * time.Second):
t.Errorf("Dummy sync should be done in 1 second, waited 3 seconds")
}
// Part 2: attempt the same thing but with repo.State.IsRunning = true
r, w = net.Pipe()
go func() {
if ForceSync(w, "nux") {
t.Errorf("Sync for nux should not have started")
}
w.Close()
}()
msg, err = ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected = "Could not force sync: nux is already syncing."
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
select {
case <-doneChan:
t.Errorf("Sync for nux should not have been started")
case <-time.After(2 * time.Second):
}
// Part 3: attempt a force sync with a repo that does not exist
r, w = net.Pipe()
go func() {
if ForceSync(w, "nixx") {
t.Errorf("Sync for nixx should not have started")
}
w.Close()
}()
msg, err = ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected = "nixx is not tracked so cannot sync"
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
}
func TestStartListener(t *testing.T) {
saveConf := config.Conf
connChan := make(chan net.Conn)
stopLisChan := make(chan struct{})
wait := make(chan struct{})
defer func() {
config.Conf = saveConf
close(connChan)
close(stopLisChan)
}()
config.Conf = config.Config{
SockPath: "/tmp/merlin_listener_test.sock",
}
// Test 1: check that closing/sending something to stopLisChan will stop the listener
// and that a new listener can be created after stopping the old one
go func() {
StartListener(connChan, stopLisChan)
wait <- struct{}{}
}()
stopLisChan <- struct{}{}
select {
case <-wait:
case <-time.After(3 * time.Second):
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
}
go func() {
StartListener(connChan, stopLisChan)
wait <- struct{}{}
}()
close(stopLisChan)
select {
case <-wait:
case <-time.After(3 * time.Second):
t.Errorf("StartListener should stop when stopLisChan is closed")
}
close(wait)
// Test 2: check that connections can be made to the unix socket
// this test does not appear to be very stable (I think there is a race condition somewhere)
stopLisChan = make(chan struct{})
go StartListener(connChan, stopLisChan)
waitForMsg := func(expected string) {
select {
case conn := <-connChan:
msg, err := ioutil.ReadAll(conn)
if err != nil {
t.Errorf(err.Error())
} else if expected != string(msg) {
t.Errorf("Message expected was " + expected + " got " + string(msg))
}
conn.Close()
case <-time.After(3 * time.Second):
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
}
}
sendMsg := func(msg string) {
<-time.After(500 * time.Millisecond)
send, err := net.Dial("unix", "/tmp/merlin_listener_test.sock")
if err != nil {
panic(err)
}
_, err = send.Write([]byte(msg))
if err != nil {
t.Errorf(err.Error())
}
send.Close()
}
go func() {
waitForMsg("status")
}()
sendMsg("status")
go func() {
waitForMsg("sync:uuunix")
}()
sendMsg("sync:uuunix")
go func() {
waitForMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
}()
sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
// unsure why I can't put this in the defer
os.Remove("/tmp/merlin_listener_test.sock")
}

View File

@ -1,352 +0,0 @@
package common
import (
"fmt"
"os"
"os/exec"
"time"
ini "gopkg.in/ini.v1"
)
const (
DAILY = 86400
TWICE_DAILY = DAILY / 2
HOURLY = 3600
TWICE_HOURLY = HOURLY / 2
BI_HOURLY = HOURLY * 2
TRI_HOURLY = HOURLY * 3
TEN_MINUTELY = 600
FIVE_MINUTELY = 300
CONFIG_PATH = "merlin-config.ini"
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
DEFAULT_SOCK_PATH = "/run/merlin.sock"
)
var frequencies = map[string]int{
"daily": DAILY,
"twice-daily": TWICE_DAILY,
"hourly": HOURLY,
"twice-hourly": TWICE_HOURLY,
"bi-hourly": BI_HOURLY,
"tri-hourly": TRI_HOURLY,
"ten-minutely": TEN_MINUTELY,
"five-minutely": FIVE_MINUTELY,
}
// Last job attempt statuses
const (
NOT_RUN_YET = iota
SUCCESS
FAILURE
TERMINATED // was killed by a signal
)
type Result struct {
Name string
Exit int
}
type Repo struct {
// the name of this repo
Name string `ini:"-"`
// this should be one of "csc-sync-standard", etc.
SyncType string `ini:"sync_type"`
// a human-readable frequency, e.g. "bi-hourly"
FrequencyStr string `ini:"frequency"`
// the desired interval (in seconds) between successive runs
Frequency int `ini:"-"`
// the maximum time (in seconds) that each child process of this repo
// can for before being killed
MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download
// dir in the config)
LocalDir string `ini:"local_dir"`
// the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the file for general logging of this repo
LoggerFile string `ini:"log_file"`
// a reference to the general logger
Logger *Logger `ini:"-"`
// the file for logging this repo's rsync
RsyncLogFile string `ini:"rsync_log_file"`
// the file for logging this repo's zfssync
ZfssyncLogFile string `ini:"zfssync_log_file"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos)
DoneChan chan<- Result `ini:"-"`
// the repo should stop syncing if StopChan is closed (shared by all repos)
StopChan chan struct{} `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
// a reference to the global config
cfg *Config `ini:"-"`
}
type Config struct {
// the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"`
// the IP addresses to use for rsync
IPv4Address string `ini:"ipv4_address"`
IPv6Address string `ini:"ipv6_address"`
// the default sync type
SyncType string `ini:"default_sync_type"`
// the default frequency string for the repos
FrequencyStr string `ini:"default_frequency"`
// the default MaxTime for each repo
MaxTime int `ini:"default_max_time"`
// the directory where rsync should download files
DownloadDir string `ini:"download_dir"`
// the directory where rsync passwords are stored
PasswordDir string `ini:"password_dir"`
// the directory where the state of each repo sync is saved
StateDir string `ini:"states_dir"`
// the directory where merlin will store the general logs for each repo
LoggerDir string `ini:"log_dir"`
// the directory to store the rsync logs for each repo
RsyncLogDir string `ini:"rsync_log_dir"`
// the directory to store the zfssync logs for each repo
ZfssyncLogDir string `ini:"zfssync_log_dir"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
// a list of all of the repos
Repos []*Repo `ini:"-"`
}
// This should only be modified by the main thread
type RepoState struct {
// these are stored in the states folder
// whether this repo is running a job or not
IsRunning bool `ini:"is_running"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptStartTime int64 `ini:"last_attempt_time"`
// the number of seconds this repo ran for during its last attempted job
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
// whether the last attempt was successful or not
LastAttemptExit int `ini:"last_attempt_exit"`
}
// save the current state of the repo to a file
func (repo *Repo) SaveState() {
repo.Logger.Debug("Saving state")
state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
repo.Logger.Error(err.Error())
}
file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
repo.Logger.Error(err.Error())
}
if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err.Error())
}
repo.Logger.Debug("Saved state")
}
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
// and is not currently running.
// returns true iff a job is started.
func (repo *Repo) RunIfPossible() bool {
if repo.State.IsRunning {
return false
}
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
go repo.StartSyncJob()
return true
}
return false
}
func zfsSync(repo *Repo) {
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
if err != nil {
repo.Logger.Error(err)
} else {
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
}
}
}
// update the repo state with the last attempt time and exit now that the job is done
func (repo *Repo) SyncCompleted(exit int) {
repoState := repo.State
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
nextSync := repo.MaxTime - int(syncTook)
if nextSync < 0 {
nextSync = 0
}
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptRunTime = syncTook
var exitStr string
switch exit {
case SUCCESS:
exitStr = "completed"
case TERMINATED:
exitStr = "terminated"
default:
exitStr = "failed"
}
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
if exit == SUCCESS {
// it is possible that the zfssync from the last repo sync is still running is that fine?
go zfsSync(repo)
}
}
func panicIfErr(e error) {
if e != nil {
panic(e)
}
}
func touchFile(file string) {
fi, err := os.Stat(file)
if err != nil {
f, err := os.OpenFile(file, os.O_CREATE, 0644)
if err != nil {
panic(fmt.Errorf("unable to create file %s", file))
}
f.Close()
} else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file))
} else if os.Geteuid() != 1001 {
// UID 1001 is the hardcoded uid for mirror
err := os.Chown(file, 1001, os.Getegid())
panicIfErr(err)
} else if fi.Mode().Perm() != 0644 {
err := os.Chmod(file, 0644)
panicIfErr(err)
}
}
func touchFiles(files ...string) {
for _, file := range files {
touchFile(file)
}
}
// GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo.
// It returns a Config.
func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
// add global configuration in cfg
data, err := ini.Load(CONFIG_PATH)
panicIfErr(err)
cfg := Config{
MaxJobs: DEFAULT_MAX_JOBS,
MaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
StateDir: DEFAULT_STATE_DIR,
LoggerDir: DEFAULT_LOG_DIR,
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
SockPath: DEFAULT_SOCK_PATH,
Repos: make([]*Repo, 0),
}
err = data.MapTo(&cfg)
panicIfErr(err)
for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
}
if cfg.IPv4Address == "" {
panic("Missing IPv4 address from config")
} else if cfg.IPv6Address == "" {
panic("Missing IPv6 address from config")
}
// add each repo configuration to cfg
for _, section := range data.Sections() {
repoName := section.Name()
if repoName == "DEFAULT" {
continue
}
repo := Repo{
Name: repoName,
SyncType: cfg.SyncType,
FrequencyStr: cfg.FrequencyStr,
MaxTime: cfg.MaxTime,
LoggerFile: cfg.LoggerDir + "/" + repoName + ".log",
RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log",
ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log",
DoneChan: doneChan,
StopChan: stopChan,
}
err := section.MapTo(&repo)
panicIfErr(err)
touchFiles(
repo.LoggerFile,
repo.RsyncLogFile,
repo.ZfssyncLogFile,
)
repo.Logger = NewLogger(repo.Name, repo.LoggerFile)
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
} else if repo.Frequency == 0 {
panic("Missing or invalid frequency for " + repo.Name)
}
repo.cfg = &cfg
repo.State = RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: NOT_RUN_YET,
}
// create the state file if it does not exist, otherwise load it from existing file
repoStateFile := cfg.StateDir + "/" + repo.Name
if _, err := os.Stat(repoStateFile); err != nil {
touchFile(repoStateFile)
repo.SaveState()
} else {
err := ini.MapTo(&repo.State, repoStateFile)
panicIfErr(err)
}
cfg.Repos = append(cfg.Repos, &repo)
}
if len(cfg.Repos) == 0 {
panic("No repos found in config")
}
return cfg
}

View File

@ -1 +0,0 @@
package common

265
merlin/config/config.go Normal file
View File

@ -0,0 +1,265 @@
package config
import (
"os"
"path/filepath"
"gopkg.in/ini.v1"
"git.csclub.uwaterloo.ca/public/merlin/logger"
)
const (
DAILY = 86400
TWICE_DAILY = DAILY / 2
HOURLY = 3600
TWICE_HOURLY = HOURLY / 2
BI_HOURLY = HOURLY * 2
TRI_HOURLY = HOURLY * 3
TEN_MINUTELY = 600
FIVE_MINUTELY = 300
// could change this into a default_config
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_SYNC_TYPE = "csc-sync-standard"
DEFAULT_FREQUENCY_STRING = "by-hourly"
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
DEFAULT_SOCK_PATH = "/run/merlin.sock"
)
var frequencies = map[string]int{
"daily": DAILY,
"twice-daily": TWICE_DAILY,
"hourly": HOURLY,
"twice-hourly": TWICE_HOURLY,
"bi-hourly": BI_HOURLY,
"tri-hourly": TRI_HOURLY,
"ten-minutely": TEN_MINUTELY,
"five-minutely": FIVE_MINUTELY,
}
// Last job attempt statuses
const (
NOT_RUN_YET = iota
SUCCESS
FAILURE
TERMINATED // was killed by a signal
)
type SyncResult struct {
Name string
Exit int
}
type Config struct {
// the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"`
// the IP addresses to use for rsync
IPv4Address string `ini:"ipv4_address"`
IPv6Address string `ini:"ipv6_address"`
// the default sync type
DefaultSyncType string `ini:"default_sync_type"`
// the default frequency string
DefaultFrequencyStr string `ini:"default_frequency"`
// the default MaxTime
DefaultMaxTime int `ini:"default_max_time"`
// the directory where rsync should download files
DownloadDir string `ini:"download_dir"`
// the directory where rsync passwords are stored
PasswordDir string `ini:"password_dir"`
// the directory where the state of each repo sync is saved
StateDir string `ini:"states_dir"`
// the directory where merlin will store the general logs for each repo
RepoLogDir string `ini:"repo_logs_dir"`
// the directory to store the rsync logs for each repo
RsyncLogDir string `ini:"rsync_logs_dir"`
// the directory to store the zfssync logs for each repo
ZfssyncLogDir string `ini:"zfssync_logs_dir"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
}
// make it more clear when full path should be used vs when just the file name is needed
type Repo struct {
// the name of this repo
Name string `ini:"-"`
// this should be one of "csc-sync-standard", etc.
SyncType string `ini:"sync_type"`
// a human-readable frequency, e.g. "bi-hourly"
FrequencyStr string `ini:"frequency"`
// the desired interval (in seconds) between successive runs
Frequency int `ini:"-"`
// the maximum time (in seconds) that each child process of this repo
// can for before being killed
MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download
// dir in the config)
LocalDir string `ini:"local_dir"`
// the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the file storing the repo sync state (used to override default)
StateFile string `ini:"state_file"`
// the full file path for general logging of this repo (used to override default)
RepoLogFile string `ini:"repo_log_file"`
// a reference to the general logger
Logger *logger.Logger `ini:"-"`
// the full file path for logging this repo's rsync (used to override default)
RsyncLogFile string `ini:"rsync_log_file"`
// the full file path for logging this repo's zfssync (used to override default)
ZfssyncLogFile string `ini:"zfssync_log_file"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos)
DoneChan chan<- SyncResult `ini:"-"`
// repos should stop syncing if StopChan is closed (shared by all repos)
StopChan chan struct{} `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
}
// This should only be modified by the main thread
type RepoState struct {
// these are stored in the states folder
// whether this repo is running a job or not
IsRunning bool `ini:"is_running"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptStartTime int64 `ini:"last_attempt_time"`
// the number of seconds this repo ran for during its last attempted job
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
// whether the last attempt was successful or not
LastAttemptExit int `ini:"last_attempt_exit"`
}
var (
Conf Config
Repos []*Repo
RepoMap map[string]*Repo
)
// GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo.
// It returns a Config.
func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) {
// set default values then load config from file
newConf := Config{
MaxJobs: DEFAULT_MAX_JOBS,
DefaultSyncType: DEFAULT_SYNC_TYPE,
DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING,
DefaultMaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
StateDir: DEFAULT_STATE_DIR,
RepoLogDir: DEFAULT_LOG_DIR,
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
SockPath: DEFAULT_SOCK_PATH,
}
iniInfo, err := ini.Load(configPath)
panicIfErr(err)
err = iniInfo.MapTo(&newConf)
panicIfErr(err)
// check config for major errors
for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
}
if newConf.IPv4Address == "" {
panic("Missing IPv4 address from config")
} else if newConf.IPv6Address == "" {
panic("Missing IPv6 address from config")
}
newRepos := make([]*Repo, 0)
for _, section := range iniInfo.Sections() {
repoName := section.Name()
if repoName == "DEFAULT" {
continue
}
// set the default values for the repo then load from file
// TODO: check if local_dir and repoName are always the same value
// TODO: check to ensure that every Repo.Name is unique (may already be done by ini)
repo := Repo{
Name: repoName,
SyncType: newConf.DefaultSyncType,
FrequencyStr: newConf.DefaultFrequencyStr,
MaxTime: newConf.DefaultMaxTime,
StateFile: filepath.Join(newConf.StateDir, repoName),
RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log",
RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log",
ZfssyncLogFile: filepath.Join(newConf.ZfssyncLogDir, repoName) + "-zfssync.log",
DoneChan: doneChan,
StopChan: stopChan,
}
err := section.MapTo(&repo)
panicIfErr(err)
// TODO: ensure that the parent dirs to the file also exist when touching
// or just remove the ability to override
touchFiles(
repo.StateFile,
repo.RepoLogFile,
repo.RsyncLogFile,
repo.ZfssyncLogFile,
)
repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile)
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
} else if repo.Frequency == 0 {
panic("Missing or invalid frequency for " + repo.Name)
}
repo.State = RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: NOT_RUN_YET,
}
err = ini.MapTo(&repo.State, repo.StateFile)
panicIfErr(err)
repo.SaveState()
newRepos = append(newRepos, &repo)
}
if len(newRepos) == 0 {
panic("No repos found in config")
}
Conf = newConf
Repos = newRepos
RepoMap = make(map[string]*Repo)
for _, repo := range Repos {
RepoMap[repo.Name] = repo
}
}
// save the current state of the repo to a file
func (repo *Repo) SaveState() {
// repo.Logger.Debug("Saving state")
state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
repo.Logger.Error(err.Error())
}
file, err := os.OpenFile(repo.StateFile, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
repo.Logger.Error(err.Error())
}
if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err.Error())
}
// repo.Logger.Debug("Saved state")
}

View File

@ -0,0 +1,122 @@
package config
import (
"errors"
"os"
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
)
func TestTouchFiles(t *testing.T) {
files := []string{
"/tmp/merlin_touch_test_1",
"/tmp/merlin_touch_test_2",
"/tmp/merlin_touch_test_3",
}
touchFiles(files[0], files[1], files[2])
for _, file := range files {
if _, err := os.Stat(file); err != nil {
t.Errorf(err.Error())
} else if err := os.Remove(file); err != nil {
t.Errorf(err.Error())
}
}
}
func TestPanicIfErr(t *testing.T) {
panicIfErr(nil)
defer func() { recover() }()
panicIfErr(errors.New("AAAAAAAAAA"))
t.Errorf("panicIfErr should have panicked")
}
func TestLoadConfig(t *testing.T) {
doneChan := make(chan SyncResult)
stopChan := make(chan struct{})
LoadConfig("config_test.ini", doneChan, stopChan)
// TODO: Fill out parts not part of the ini or state file
expectedConfig := Config{
MaxJobs: 6,
IPv4Address: "129.97.134.129",
IPv6Address: "2620:101:f000:4901:c5c::129",
DefaultSyncType: "csc-sync-standard",
DefaultFrequencyStr: "daily",
DefaultMaxTime: 1000,
DownloadDir: "/tmp/test-mirror",
PasswordDir: "/home/mirror/passwords",
StateDir: "test_files",
RepoLogDir: "test_files/logs",
RsyncLogDir: "test_files/rsync",
ZfssyncLogDir: "test_files/zfssync",
SockPath: "test_files/test.sock",
}
expectedRepo1 := Repo{
Name: "eelinux",
SyncType: "csc-sync-nonstandard",
FrequencyStr: "tri-hourly",
Frequency: 10800,
MaxTime: 2000,
LocalDir: "eelinux",
RsyncHost: "rsync.releases.eelinux.ca",
RsyncDir: "releases",
StateFile: "test_files/eeelinux",
RepoLogFile: "test_files/logs/eelinux.log",
Logger: Repos[0].Logger,
RsyncLogFile: "test_files/rsync/eelinux.log",
ZfssyncLogFile: "test_files/zfssync/eelinux.log",
DoneChan: doneChan,
StopChan: stopChan,
State: RepoState{
IsRunning: false,
LastAttemptStartTime: 1600000000,
LastAttemptRunTime: 100,
LastAttemptExit: 1,
},
}
expectedRepo2 := Repo{
Name: "yoland",
SyncType: "csc-sync-standard",
FrequencyStr: "daily",
Frequency: 86400,
MaxTime: 1000,
LocalDir: "yoland-releases",
RsyncHost: "rsync.releases.yoland.io",
RsyncDir: "releases",
StateFile: "test_files/yoland",
RepoLogFile: "test_files/logs/yoland.log",
Logger: Repos[1].Logger,
RsyncLogFile: "test_files/rsync/yoland-rsync.log",
ZfssyncLogFile: "test_files/zfssync/yoland-zfssync.log",
DoneChan: doneChan,
StopChan: stopChan,
State: RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: 0,
},
}
if !reflect.DeepEqual(expectedConfig, Conf) {
t.Errorf("Config loaded does not match expected config")
spew.Dump(expectedConfig)
spew.Dump(Conf)
}
if !reflect.DeepEqual(expectedRepo1, *Repos[0]) {
t.Errorf("The eelinux repo loaded does not match the exected repo config")
spew.Dump(expectedRepo1)
spew.Dump(*Repos[0])
}
if !reflect.DeepEqual(expectedRepo2, *Repos[1]) {
t.Errorf("The yoland repo loaded does not match the exected repo config")
spew.Dump(expectedRepo2)
spew.Dump(*Repos[1])
}
os.Remove("test_files/yoland")
os.RemoveAll("test_files/logs")
os.RemoveAll("test_files/rsync")
os.RemoveAll("test_files/zfssync")
}

View File

@ -0,0 +1,39 @@
; default values are commented out
; max_jobs = 6
ipv4_address = 129.97.134.129
ipv6_address = 2620:101:f000:4901:c5c::129
; default_sync_type = csc-sync-standard
default_frequency = daily
default_max_time = 1000
download_dir = /tmp/test-mirror
; password_dir = /home/mirror/passwords
states_dir = test_files
repo_logs_dir = test_files/logs
rsync_logs_dir = test_files/rsync
zfssync_logs_dir = test_files/zfssync
sock_path = test_files/test.sock
[eelinux]
sync_type = csc-sync-nonstandard
frequency = tri-hourly
max_time = 2000
local_dir = eelinux
rsync_host = rsync.releases.eelinux.ca
rsync_dir = releases
state_file = test_files/eeelinux
repo_log_file = test_files/logs/eelinux.log
rsync_log_file = test_files/rsync/eelinux.log
zfssync_log_file = test_files/zfssync/eelinux.log
[yoland]
; sync_type = csc-sync-standard
; frequency = daily
; max_time = 1000
local_dir = yoland-releases
rsync_host = rsync.releases.yoland.io
rsync_dir = releases
; state_file = test_files/yoland
; repo_log_file = test_files/logs/yoland.log
; rsync_log_file = test_files/rsync/yoland-rsync.log
; zfssync_log_file = test_files/zfssync/yoland-zfssync.log

View File

@ -0,0 +1,5 @@
is_running = false
last_attempt_time = 1600000000
last_attempt_runtime = 100
last_attempt_exit = 1

37
merlin/config/utils.go Normal file
View File

@ -0,0 +1,37 @@
package config
import (
"fmt"
"os"
)
func panicIfErr(e error) {
if e != nil {
panic(e)
}
}
func touchFile(file string) {
fi, err := os.Stat(file)
if err != nil {
f, err := os.OpenFile(file, os.O_CREATE, 0644)
if err != nil {
panic(fmt.Errorf("unable to create file %s", file))
}
f.Close()
} else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file))
// } else if os.Geteuid() != 1001 {
// // mirror is UID 1001
// err := os.Chown(file, 1001, os.Getegid())
// panicIfErr(err)
}
err = os.Chmod(file, 0644)
panicIfErr(err)
}
func touchFiles(files ...string) {
for _, file := range files {
touchFile(file)
}
}

View File

@ -1,6 +1,7 @@
module git.csclub.uwaterloo.ca/public/merlin
require (
github.com/davecgh/go-spew v1.1.0
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164
gopkg.in/ini.v1 v1.63.2

View File

@ -1,4 +1,4 @@
package common
package logger
import (
"log"
@ -30,12 +30,12 @@ var levels = map[int]string{
var outLogger = log.New(os.Stdout, "", log.LstdFlags)
var errLogger = log.New(os.Stderr, "", log.LstdFlags)
func OutLogger() *log.Logger {
return outLogger
func OutLog(v ...interface{}) {
outLogger.Println(v...)
}
func ErrLogger() *log.Logger {
return errLogger
func ErrLog(v ...interface{}) {
errLogger.Println(v...)
}
func NewLogger(name, file string) *Logger {
@ -48,15 +48,9 @@ func NewLogger(name, file string) *Logger {
}
func (logger *Logger) Log(level int, v ...interface{}) {
if level == INFO {
outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
} else if level == ERROR {
errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
}
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
errLogger.Println(err.Error())
ErrLog(err.Error())
}
defer f.Close()
@ -65,7 +59,7 @@ func (logger *Logger) Log(level int, v ...interface{}) {
args = append(args, v...)
logger.SetOutput(f)
logger.Println(v...)
logger.Println(args)
}
func (logger *Logger) Debug(v ...interface{}) {
@ -73,6 +67,9 @@ func (logger *Logger) Debug(v ...interface{}) {
}
func (logger *Logger) Info(v ...interface{}) {
// src := []interface{}{logger.name + ":"}
// args := append(src, v...)
OutLog(v...)
logger.Log(INFO, v...)
}
@ -81,5 +78,6 @@ func (logger *Logger) Warning(v ...interface{}) {
}
func (logger *Logger) Error(v ...interface{}) {
ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...))
logger.Log(ERROR, v...)
}

View File

@ -1,153 +1,35 @@
package main
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"text/tabwriter"
"time"
"git.csclub.uwaterloo.ca/public/merlin/common"
"golang.org/x/sys/unix"
"git.csclub.uwaterloo.ca/public/merlin/arthur"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
"git.csclub.uwaterloo.ca/public/merlin/sync"
)
var (
cfg common.Config
outLogger *log.Logger
errLogger *log.Logger
repoMap map[string]*common.Repo
repoIdx int
numJobsRunning int
)
func getAndRunCommand(conn net.Conn) {
defer conn.Close()
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
errLogger.Println(err.Error())
return
}
command := buf.String()
args := strings.Split(command, ":")
respondAndLogErr := func(msg string) {
outLogger.Println(msg)
conn.Write([]byte(msg))
}
if args[0] == "status" {
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n")
// for time formating see https://pkg.go.dev/time#pkg-constants
for name, repo := range repoMap {
fmt.Fprintf(status, "%s\t%s\t%s\n",
name,
time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123),
time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123),
)
}
status.Flush()
} else if args[0] == "sync" {
if len(args) != 2 {
respondAndLogErr("Could not parse sync command, forced sync fails.")
return
}
if repo, inMap := repoMap[args[1]]; inMap {
outLogger.Println("Attempting to force sync of " + repo.Name)
if repo.RunIfPossible() {
conn.Write([]byte("Forced sync for " + repo.Name))
numJobsRunning++
} else {
respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.")
}
} else {
respondAndLogErr(args[1] + " is not tracked so cannot sync")
}
} else {
respondAndLogErr("Received unrecognized command: " + command)
}
}
func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// must remove cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(cfg.SockPath) != ".sock" {
panic(fmt.Errorf("Socket file must end with .sock"))
} else if _, err := os.Stat(cfg.SockPath); err == nil {
if err := os.Remove(cfg.SockPath); err != nil {
panic(err)
}
} else if !errors.Is(err, os.ErrNotExist) {
panic(err)
}
ear, err := net.Listen("unix", cfg.SockPath)
if err != nil {
panic(err)
}
outLogger.Println("Listening to unix socket at " + cfg.SockPath)
go func() {
for {
// will exit when ear is closed
conn, err := ear.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
errLogger.Println("Accepted socket error: " + err.Error())
continue
}
errLogger.Println("Unhandlable socket error: " + err.Error())
return
}
connChan <- conn
}
}()
<-stopLisChan
ear.Close()
}
// We use a round-robin strategy. It's not the most efficient, but it's simple
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
func runAsManyAsPossible() {
repos := cfg.Repos
startIdx := repoIdx
for numJobsRunning < cfg.MaxJobs {
repo := repos[repoIdx]
if repo.RunIfPossible() {
numJobsRunning++
}
repoIdx = (repoIdx + 1) % len(repos)
if repoIdx == startIdx {
// we've come full circle
return
}
}
}
// get config path from command args
var CONFIG_PATH = "merlin-config.ini"
func main() {
outLogger = common.OutLogger()
errLogger = common.ErrLogger()
// check that merlin is run as mirror user
// check that mirror user has pid of 1001
doneChan := make(chan common.Result)
// receives a Result struct when a repo stops syncing
doneChan := make(chan config.SyncResult)
// closed when merlin is told to stop running
stopChan := make(chan struct{})
// receives a Conn when a client makes a connection to unix socket
connChan := make(chan net.Conn)
// signal channel to stop listening to unix socket
stopLisChan := make(chan struct{})
stopSig := make(chan os.Signal, 1)
@ -157,24 +39,38 @@ func main() {
unix.Umask(002)
numJobsRunning = 0
numJobsRunning := 0
repoIdx := 0
loadConfig := func() {
cfg = common.GetConfig(doneChan, stopChan)
outLogger.Println("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg))
repoMap = make(map[string]*common.Repo)
for _, repo := range cfg.Repos {
repoMap[repo.Name] = repo
}
config.LoadConfig(CONFIG_PATH, doneChan, stopChan)
logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf))
repoIdx = 0
go unixSockListener(connChan, stopLisChan)
go arthur.StartListener(connChan, stopLisChan)
}
// We use a round-robin strategy. It's not the most efficient, but it's simple
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
runAsManyAsPossible := func() {
startIdx := repoIdx
for numJobsRunning < config.Conf.MaxJobs {
repo := config.Repos[repoIdx]
if sync.SyncIfPossible(repo) {
numJobsRunning++
}
repoIdx = (repoIdx + 1) % len(config.Repos)
if repoIdx == startIdx {
// we've come full circle
return
}
}
}
loadConfig()
// IsRunning must be false otherwise repo will never sync
for _, repo := range repoMap {
// ensure that IsRunning is false otherwise repo will never sync
// (only on startup can we assume that repos were not previously syncing)
for _, repo := range config.Repos {
repo.State.IsRunning = false
}
runAsManyAsPossible()
@ -190,24 +86,39 @@ runLoop:
case <-reloadSig:
stopLisChan <- struct{}{}
loadConfig()
// ensure that SyncCompleted can handle it if reloading config
// removes a repo that was already syncing
case done := <-doneChan:
repoMap[done.Name].SyncCompleted(done.Exit)
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
numJobsRunning--
case conn := <-connChan:
getAndRunCommand(conn)
command, repoName := arthur.GetCommand(conn)
switch command {
case "status":
arthur.SendStatus(conn)
case "sync":
if arthur.ForceSync(conn, repoName) {
numJobsRunning++
}
default:
arthur.SendAndLog(conn, "Received unrecognized command: "+command)
}
// None of the arthur functions close the connection so you will need to
// close it manually for the message to be sent
conn.Close()
case <-time.After(1 * time.Minute):
}
runAsManyAsPossible()
}
// give time for jobs to terminate before exiting
// give time for all jobs to terminate before exiting program
for {
select {
case done := <-doneChan:
repoMap[done.Name].SyncCompleted(done.Exit)
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
numJobsRunning--
case <-time.After(1 * time.Second):

View File

@ -1,7 +0,0 @@
package main
import "testing"
func TestSock1(t *testing.T) {
}

View File

@ -1,50 +1,50 @@
package common
package sync
import (
"fmt"
"math/rand"
"os"
"strconv"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
func (repo *Repo) buildRsyncHost() string {
func buildRsyncHost(repo *config.Repo) string {
if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir
}
func (repo *Repo) buildRsyncDaemonHost() string {
func buildRsyncDaemonHost(repo *config.Repo) string {
if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
return repo.RsyncHost + "::" + repo.RsyncDir
}
func (repo *Repo) buildRsyncSSHHost() string {
func buildRsyncSSHHost(repo *config.Repo) string {
if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
return repo.RsyncHost + ":" + repo.RsyncDir
}
func (repo *Repo) buildDownloadDir() string {
return repo.cfg.DownloadDir + "/" + repo.LocalDir
func buildDownloadDir(repo *config.Repo) string {
return config.Conf.DownloadDir + "/" + repo.LocalDir
}
func (repo *Repo) CSCSyncApache() []string {
func cscSyncApache(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func (repo *Repo) CSCSyncArchLinux() []string {
func cscSyncArchLinux(repo *config.Repo) []string {
tempDir := "" // is this option even needed?
@ -52,83 +52,83 @@ func (repo *Repo) CSCSyncArchLinux() []string {
args := []string{
"rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600",
"--contimeout=60", "-p", "--delay-updates", "--no-motd",
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + repo.cfg.IPv4Address,
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address,
}
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func (repo *Repo) CSCSyncBadPerms() []string {
func cscSyncBadPerms(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
// TODO ceph
func (repo *Repo) CSCSyncCDImage() []string {
func cscSyncCDImage(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func (repo *Repo) CSCSyncChmod() []string {
func cscSyncChmod(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
"--chmod=u=rwX,go=rX",
}
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func (repo *Repo) CSCSyncDebian() []string {
func cscSyncDebian(repo *config.Repo) []string {
// sync /pool
args := []string{"nice", "rsync", "-rlHtvp",
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
"--address=" + repo.cfg.IPv4Address,
"--address=" + config.Conf.IPv4Address,
}
// $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1
return args
}
func (repo *Repo) CSCSyncDebianCD() []string {
func cscSyncDebianCD(repo *config.Repo) []string {
// this is basically the same as CSCSyncDebian, except it has an extra --exclude
args := []string{"nice", "rsync", "-rlHtvp", "--delete",
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
"--address=" + repo.cfg.IPv4Address,
"--address=" + config.Conf.IPv4Address,
// "--exclude", "Archive-Update-in-Progress-${HOSTNAME}"
}
// $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1
return args
}
func (repo *Repo) CSCSyncGentoo() []string {
func cscSyncGentoo(repo *config.Repo) []string {
repo.RsyncUser = "gentoo"
repo.PasswordFile = "gentoo-distfiles"
return repo.CSCSyncStandard()
return cscSyncStandard(repo)
}
// TODO s3
func (repo *Repo) CSCSyncSSH() []string {
func cscSyncSSH(repo *config.Repo) []string {
args := []string{
"rsync", "-aH", "--no-owner", "--no-group", "--delete",
@ -138,48 +138,45 @@ func (repo *Repo) CSCSyncSSH() []string {
}
// not sure if we should be assuming ssh identity file is the password file
args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile))
args = append(args, repo.buildRsyncSSHHost(), repo.buildDownloadDir())
args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo))
return args
}
func (repo *Repo) CSCSyncStandard() []string {
func cscSyncStandard(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
if repo.PasswordFile != "" {
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
filename := config.Conf.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename)
}
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func (repo *Repo) CSCSyncStandardIPV6() []string {
func cscSyncStandardIPV6(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + repo.cfg.IPv4Address,
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
// for testing, to be removed later
func (repo *Repo) CSCSyncDummy() []string {
sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10)
args := []string{"sleep", sleepDur}
func cscSyncDummy(repo *config.Repo) []string {
args := []string{"sleep", "1"}
return args
}
// executes a particular sync job depending on repo.SyncType.
func (repo *Repo) getSyncCommand() []string {
func getSyncCommand(repo *config.Repo) (args []string) {
/*
# scripts used by merlin.py
csc-sync-debian
@ -206,80 +203,55 @@ func (repo *Repo) getSyncCommand() []string {
ubuntu-releases-sync
*/
switch repo.SyncType {
case "csc-sync-apache":
return repo.CSCSyncApache()
args = cscSyncApache(repo)
case "csc-sync-archlinux":
return repo.CSCSyncArchLinux()
args = cscSyncArchLinux(repo)
case "csc-sync-badperms":
return repo.CSCSyncBadPerms()
args = cscSyncBadPerms(repo)
case "csc-sync-cdimage":
return repo.CSCSyncCDImage()
args = cscSyncCDImage(repo)
// case "csc-sync-ceph":
// return repo.CSCSyncCeph()
// args = cscSyncCeph(repo)
case "csc-sync-chmod":
return repo.CSCSyncChmod()
args = cscSyncChmod(repo)
case "csc-sync-debian":
return repo.CSCSyncDebian()
args = cscSyncDebian(repo)
case "csc-sync-debian-cd":
return repo.CSCSyncDebianCD()
args = cscSyncDebianCD(repo)
case "csc-sync-gentoo":
return repo.CSCSyncGentoo()
args = cscSyncGentoo(repo)
// case "csc-sync-s3":
// return repo.CSCSyncS3()
// args = cscSyncS3(repo)
case "csc-sync-ssh":
return repo.CSCSyncSSH()
args = cscSyncSSH(repo)
case "csc-sync-standard":
return repo.CSCSyncStandard()
args = cscSyncStandard(repo)
case "csc-sync-standard-ipv6":
return repo.CSCSyncStandardIPV6()
args = cscSyncStandardIPV6(repo)
// case "csc-sync-wget":
// return repo.CSCSyncWget()
// args = cscSyncWget(repo)
case "csc-sync-dummy":
return repo.CSCSyncDummy()
return cscSyncDummy(repo)
default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
}
return []string{}
}
func (repo *Repo) StartSyncJob() {
status := FAILURE
defer func() {
repo.DoneChan <- Result{
Name: repo.Name,
Exit: status,
}
}()
localDir := repo.buildDownloadDir()
localDir := buildDownloadDir(repo)
err := os.MkdirAll(localDir, 0775)
if err != nil {
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
err = fmt.Errorf("could not create directory %s: %w", localDir, err)
// I'm not sure if logger can handle error so just use the string?
repo.Logger.Error(err.Error())
return
}
args := repo.getSyncCommand()
if repo.PasswordFile != "" {
a268wang marked this conversation as resolved
Review

same here, some sync types already append the password file - and how the password file should be added to args depends on the type of sync, so probably would remove this too

same here, some sync types already append the password file - and how the password file should be added to args depends on the type of sync, so probably would remove this too
Review

I agree with this, I'll remove it shortly

I agree with this, I'll remove it shortly
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
filename := config.Conf.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename)
}
args = append(args, repo.buildRsyncHost(), localDir)
ch := SpawnProcess(repo, args)
if ch == nil {
// SpawnProcess will have already logged error
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = SUCCESS
case -1:
status = TERMINATED
// default is already FAILURE
}
args = append(args, buildRsyncHost(repo), localDir)
a268wang marked this conversation as resolved
Review

don't think you want to be appending rsync host and localDir here too. each sync type already appends it.

also different sync types have different ways of building the host (some needs to be the daemon style host and some needs to be a ssh host)

don't think you want to be appending rsync host and localDir here too. each sync type already appends it. also different sync types have different ways of building the host (some needs to be the daemon style host and some needs to be a ssh host)
Review

Huh all this stuff was actually moved over from the startSync function

https://git.csclub.uwaterloo.ca/public/mirror/src/branch/go/merlin/common/sync.go#L256-L270

I'm thinking of keeping a modified version of part to make sure that startSync will immediately exit if merlin cannot mkdir the buildDownloadDir

Huh all this stuff was actually moved over from the startSync function https://git.csclub.uwaterloo.ca/public/mirror/src/branch/go/merlin/common/sync.go#L256-L270 I'm thinking of keeping a modified version of part to make sure that startSync will immediately exit if merlin cannot mkdir the buildDownloadDir
Review

yeah my bad about the original sync.go i think i forgot to remove them

yeah my bad about the original sync.go i think i forgot to remove them
return
}

View File

@ -0,0 +1 @@
package sync

61
merlin/sync/interface.go Normal file
View File

@ -0,0 +1,61 @@
package sync
import (
"fmt"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
// and is not currently running.
// returns true iff a job is started.
func SyncIfPossible(repo *config.Repo) bool {
// Change to SyncIfPossible
if repo.State.IsRunning {
return false
}
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
go startSync(repo)
return true
}
return false
}
// update the repo state with the last attempt time and exit now that the job is done
func SyncCompleted(repo *config.Repo, exit int) {
repoState := repo.State
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
nextSync := repo.MaxTime - int(syncTook)
if nextSync < 0 {
nextSync = 0
}
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptRunTime = syncTook
var exitStr string
switch exit {
case config.SUCCESS:
exitStr = "completed"
case config.TERMINATED:
exitStr = "terminated"
default:
exitStr = "failed"
}
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
// TODO: make it possible for this SyncCompleted to be run without zfsSync being run
if exit == config.SUCCESS {
// it is possible that the zfssync from the last repo sync is still running is that fine?
go zfsSync(repo)
}
}

View File

@ -1,10 +1,13 @@
package common
package sync
import (
"fmt"
"os"
"os/exec"
"syscall"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
// SpawnProcess spawns a child process for the given repo. The process will
@ -12,7 +15,7 @@ import (
// runs for longer than the repo's MaxTime.
// It returns a channel through which a Cmd will be sent once it has finished,
// or nil if it was unable to start a process.
func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
// startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...)
@ -55,6 +58,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
case <-cmdDoneChan:
if !cmd.ProcessState.Success() {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
// repo.Logger.Info(strings.Join(args, " "))
}
case <-repo.StopChan:
@ -68,3 +72,43 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
}()
return
}
func startSync(repo *config.Repo) {
status := config.FAILURE
defer func() {
repo.DoneChan <- config.SyncResult{
Name: repo.Name,
Exit: status,
}
}()
args := getSyncCommand(repo)
ch := spawnSyncProcess(repo, args)
if ch == nil {
// SpawnProcess will have already logged error
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = config.SUCCESS
case -1:
status = config.TERMINATED
// default is already FAILURE
}
}
func zfsSync(repo *config.Repo) {
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
if err != nil {
repo.Logger.Error(err)
} else {
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
}
}
}