Refactor Go Merlin #1

Closed
a268wang wants to merge 8 commits from refactor into go
21 changed files with 1138 additions and 611 deletions

18
.drone.yml Normal file
View File

@ -0,0 +1,18 @@
kind: pipeline
type: docker
name: default
steps:
- name: merlin
image: golang:1.17
commands:
# add linter
- cd merlin
- go build
- go test ./...
trigger:
branch:
- master
- go
- refactor

1
.gitignore vendored
View File

@ -1,4 +1,3 @@
/.*
!.gitignore
.git_old/
/dead.letter

View File

@ -20,6 +20,7 @@ This folder contains the code for merlin (which does the actual syncing) and art
- [ ] wget
### TODO
- [ ] ensure that the proper permissions (file mode, group, user) are used
- [ ] detect if an rsync process is stuck (\*\*)
- [ ] place each rsync process in a separate cgroup (\*\*\*)

130
merlin/arthur/arthur.go Normal file
View File

@ -0,0 +1,130 @@
package arthur
import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"sort"
"strings"
"text/tabwriter"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
"git.csclub.uwaterloo.ca/public/merlin/sync"
)
// Reads and parses the message sent over the accepted connection
func GetCommand(conn net.Conn) (command, repoName string) {
command = ""
repoName = ""
buf, err := ioutil.ReadAll(conn)
if err != nil {
logger.ErrLog(err.Error())
return
}
args := strings.Split(string(buf), ":")
if len(args) >= 1 {
command = args[0]
}
if len(args) >= 2 {
repoName = args[1]
}
return
}
func SendAndLog(conn net.Conn, msg string) {
logger.OutLog(msg)
conn.Write([]byte(msg))
}
func SendStatus(conn net.Conn) {
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\tRunning\n")
keys := make([]string, 0)
for name, _ := range config.RepoMap {
keys = append(keys, name)
}
sort.Strings(keys)
// for other ways to format the time see: https://pkg.go.dev/time#pkg-constants
for _, name := range keys {
repo := config.RepoMap[name]
lastSync := repo.State.LastAttemptStartTime
nextSync := lastSync + int64(repo.Frequency)
fmt.Fprintf(status, "%s\t%s\t%s\t%t\n",
name,
time.Unix(lastSync, 0).Format(time.RFC1123),
time.Unix(nextSync, 0).Format(time.RFC1123),
repo.State.IsRunning,
)
}
status.Flush()
}
// Attempt to force the sync of the repo
func ForceSync(conn net.Conn, repoName string) (newSync bool) {
newSync = false
// TODO: send repoName and every key in RepoMap to lowercase
if repo, isInMap := config.RepoMap[repoName]; isInMap {
logger.OutLog("Attempting to force sync of " + repoName)
if sync.SyncIfPossible(repo) {
conn.Write([]byte("Forced sync for " + repoName))
newSync = true
} else {
SendAndLog(conn, "Could not force sync: "+repoName+" is already syncing.")
}
} else {
SendAndLog(conn, repoName+" is not tracked so cannot sync")
}
return
}
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
sockpath := config.Conf.SockPath
// must remove old cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(sockpath) != ".sock" {
panic(fmt.Errorf("socket file must end with .sock"))
} else if _, err := os.Stat(sockpath); err == nil {
if err := os.Remove(sockpath); err != nil {
panic(err)
}
} else if !errors.Is(err, os.ErrNotExist) {
panic(err)
}
ear, err := net.Listen("unix", sockpath)
if err != nil {
panic(err)
}
logger.OutLog("Listening to unix socket at " + sockpath)
go func() {
for {
// Attempting to accept on a closed net.Listener will return a non-temporary error
conn, err := ear.Accept()
if err != nil {
if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() {
logger.ErrLog("Accepted socket error: " + err.Error())
continue
}
logger.ErrLog("Unhandlable socket error: " + err.Error())
return
}
connChan <- conn
}
}()
// TODO: check handling of multiple SIGHUP
<-stopLisChan
ear.Close()
}

View File

@ -0,0 +1,283 @@
package arthur
import (
"io/ioutil"
"net"
"os"
"testing"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
)
func TestStatusCommand(t *testing.T) {
r, w := net.Pipe()
go func() {
// will only finish write when EOF is sent
// only way to send EOF is to close
w.Write([]byte("status"))
w.Close()
}()
command, repoName := GetCommand(r)
if command != "status" {
t.Errorf("command for status should be \"status\", got " + command)
} else if repoName != "" {
t.Errorf("status should return an empty string for the repoName, got " + repoName)
}
}
func TestSyncCommand(t *testing.T) {
r, w := net.Pipe()
go func() {
w.Write([]byte("sync:ubuntu"))
w.Close()
}()
command, repoName := GetCommand(r)
r.Close()
if command != "sync" {
t.Errorf("command for sync:ubuntu should be \"sync\", got " + command)
} else if repoName != "ubuntu" {
t.Errorf("name of repo for sync:ubuntu should be \"ubuntu\", got " + repoName)
}
}
func TestSendStatus(t *testing.T) {
saveRepoMap := config.RepoMap
defer func() {
config.RepoMap = saveRepoMap
}()
repoMap := make(map[string]*config.Repo)
repoMap["eeeee"] = &config.Repo{
Frequency: 30 * 86400,
State: config.RepoState{
IsRunning: true,
LastAttemptStartTime: 1600000000,
},
}
repoMap["alinux"] = &config.Repo{
Frequency: 7*86400 + 3,
State: config.RepoState{
IsRunning: true,
LastAttemptStartTime: 1620000000,
},
}
repoMap["lnux"] = &config.Repo{
Frequency: 86400,
State: config.RepoState{
IsRunning: false,
LastAttemptStartTime: 1640000000,
},
}
config.RepoMap = repoMap
r, w := net.Pipe()
go func() {
SendStatus(w)
w.Close()
}()
msg, err := ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected := `Repository Last Synced Next Expected Sync Running
alinux Sun, 02 May 2021 20:00:00 EDT Sun, 09 May 2021 20:00:03 EDT true
eeeee Sun, 13 Sep 2020 08:26:40 EDT Tue, 13 Oct 2020 08:26:40 EDT true
lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST false
`
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
}
func TestForceSync(t *testing.T) {
saveRepos := config.Repos
saveRepoMap := config.RepoMap
doneChan := make(chan config.SyncResult)
defer func() {
config.Repos = saveRepos
config.RepoMap = saveRepoMap
close(doneChan)
}()
// Part 1: run a dummy sync
repo := config.Repo{
Name: "nux",
SyncType: "csc-sync-dummy",
Frequency: 7 * 86400,
MaxTime: 30,
Logger: logger.NewLogger("nux", "/tmp/merlin_force_sync_test_logs"),
StateFile: "/tmp/merlin_force_sync_test_state",
DoneChan: doneChan,
State: config.RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: config.NOT_RUN_YET,
},
}
config.Repos = nil
config.Repos = append(config.Repos, &repo)
config.RepoMap = make(map[string]*config.Repo)
config.RepoMap["nux"] = &repo
r, w := net.Pipe()
go func() {
if !ForceSync(w, "nux") {
t.Errorf("Sync for nux did not start")
}
w.Close()
}()
msg, err := ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected := "Forced sync for nux"
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
select {
case result := <-doneChan:
if result.Exit != config.SUCCESS {
t.Errorf("Sync should exit with SUCCESS, got %d", result.Exit)
}
case <-time.After(3 * time.Second):
t.Errorf("Dummy sync should be done in 1 second, waited 3 seconds")
}
// Part 2: attempt the same thing but with repo.State.IsRunning = true
r, w = net.Pipe()
go func() {
if ForceSync(w, "nux") {
t.Errorf("Sync for nux should not have started")
}
w.Close()
}()
msg, err = ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected = "Could not force sync: nux is already syncing."
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
select {
case <-doneChan:
t.Errorf("Sync for nux should not have been started")
case <-time.After(2 * time.Second):
}
// Part 3: attempt a force sync with a repo that does not exist
r, w = net.Pipe()
go func() {
if ForceSync(w, "nixx") {
t.Errorf("Sync for nixx should not have started")
}
w.Close()
}()
msg, err = ioutil.ReadAll(r)
r.Close()
if err != nil {
t.Errorf(err.Error())
}
expected = "nixx is not tracked so cannot sync"
if expected != string(msg) {
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
}
}
func TestStartListener(t *testing.T) {
saveConf := config.Conf
connChan := make(chan net.Conn)
stopLisChan := make(chan struct{})
wait := make(chan struct{})
defer func() {
config.Conf = saveConf
close(connChan)
close(stopLisChan)
}()
config.Conf = config.Config{
SockPath: "/tmp/merlin_listener_test.sock",
}
// Test 1: check that closing/sending something to stopLisChan will stop the listener
// and that a new listener can be created after stopping the old one
go func() {
StartListener(connChan, stopLisChan)
wait <- struct{}{}
}()
stopLisChan <- struct{}{}
select {
case <-wait:
case <-time.After(3 * time.Second):
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
}
go func() {
StartListener(connChan, stopLisChan)
wait <- struct{}{}
}()
close(stopLisChan)
select {
case <-wait:
case <-time.After(3 * time.Second):
t.Errorf("StartListener should stop when stopLisChan is closed")
}
close(wait)
// Test 2: check that connections can be made to the unix socket
// this test does not appear to be very stable (I think there is a race condition somewhere)
stopLisChan = make(chan struct{})
go StartListener(connChan, stopLisChan)
waitForMsg := func(expected string) {
select {
case conn := <-connChan:
msg, err := ioutil.ReadAll(conn)
if err != nil {
t.Errorf(err.Error())
} else if expected != string(msg) {
t.Errorf("Message expected was " + expected + " got " + string(msg))
}
conn.Close()
case <-time.After(3 * time.Second):
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
}
}
sendMsg := func(msg string) {
<-time.After(500 * time.Millisecond)
send, err := net.Dial("unix", "/tmp/merlin_listener_test.sock")
if err != nil {
panic(err)
}
_, err = send.Write([]byte(msg))
if err != nil {
t.Errorf(err.Error())
}
send.Close()
}
go func() {
waitForMsg("status")
}()
sendMsg("status")
go func() {
waitForMsg("sync:uuunix")
}()
sendMsg("sync:uuunix")
go func() {
waitForMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
}()
sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
// unsure why I can't put this in the defer
os.Remove("/tmp/merlin_listener_test.sock")
}

View File

@ -1,352 +0,0 @@
package common
import (
"fmt"
"os"
"os/exec"
"time"
ini "gopkg.in/ini.v1"
)
const (
DAILY = 86400
TWICE_DAILY = DAILY / 2
HOURLY = 3600
TWICE_HOURLY = HOURLY / 2
BI_HOURLY = HOURLY * 2
TRI_HOURLY = HOURLY * 3
TEN_MINUTELY = 600
FIVE_MINUTELY = 300
CONFIG_PATH = "merlin-config.ini"
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
DEFAULT_SOCK_PATH = "/run/merlin.sock"
)
var frequencies = map[string]int{
"daily": DAILY,
"twice-daily": TWICE_DAILY,
"hourly": HOURLY,
"twice-hourly": TWICE_HOURLY,
"bi-hourly": BI_HOURLY,
"tri-hourly": TRI_HOURLY,
"ten-minutely": TEN_MINUTELY,
"five-minutely": FIVE_MINUTELY,
}
// Last job attempt statuses
const (
NOT_RUN_YET = iota
SUCCESS
FAILURE
TERMINATED // was killed by a signal
)
type Result struct {
Name string
Exit int
}
type Repo struct {
// the name of this repo
Name string `ini:"-"`
// this should be one of "csc-sync-standard", etc.
SyncType string `ini:"sync_type"`
// a human-readable frequency, e.g. "bi-hourly"
FrequencyStr string `ini:"frequency"`
// the desired interval (in seconds) between successive runs
Frequency int `ini:"-"`
// the maximum time (in seconds) that each child process of this repo
// can for before being killed
MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download
// dir in the config)
LocalDir string `ini:"local_dir"`
// the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the file for general logging of this repo
LoggerFile string `ini:"log_file"`
// a reference to the general logger
Logger *Logger `ini:"-"`
// the file for logging this repo's rsync
RsyncLogFile string `ini:"rsync_log_file"`
// the file for logging this repo's zfssync
ZfssyncLogFile string `ini:"zfssync_log_file"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos)
DoneChan chan<- Result `ini:"-"`
// the repo should stop syncing if StopChan is closed (shared by all repos)
StopChan chan struct{} `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
// a reference to the global config
cfg *Config `ini:"-"`
}
type Config struct {
// the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"`
// the IP addresses to use for rsync
IPv4Address string `ini:"ipv4_address"`
IPv6Address string `ini:"ipv6_address"`
// the default sync type
SyncType string `ini:"default_sync_type"`
// the default frequency string for the repos
FrequencyStr string `ini:"default_frequency"`
// the default MaxTime for each repo
MaxTime int `ini:"default_max_time"`
// the directory where rsync should download files
DownloadDir string `ini:"download_dir"`
// the directory where rsync passwords are stored
PasswordDir string `ini:"password_dir"`
// the directory where the state of each repo sync is saved
StateDir string `ini:"states_dir"`
// the directory where merlin will store the general logs for each repo
LoggerDir string `ini:"log_dir"`
// the directory to store the rsync logs for each repo
RsyncLogDir string `ini:"rsync_log_dir"`
// the directory to store the zfssync logs for each repo
ZfssyncLogDir string `ini:"zfssync_log_dir"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
// a list of all of the repos
Repos []*Repo `ini:"-"`
}
// This should only be modified by the main thread
type RepoState struct {
// these are stored in the states folder
// whether this repo is running a job or not
IsRunning bool `ini:"is_running"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptStartTime int64 `ini:"last_attempt_time"`
// the number of seconds this repo ran for during its last attempted job
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
// whether the last attempt was successful or not
LastAttemptExit int `ini:"last_attempt_exit"`
}
// save the current state of the repo to a file
func (repo *Repo) SaveState() {
repo.Logger.Debug("Saving state")
state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
repo.Logger.Error(err.Error())
}
file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
repo.Logger.Error(err.Error())
}
if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err.Error())
}
repo.Logger.Debug("Saved state")
}
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
// and is not currently running.
// returns true iff a job is started.
func (repo *Repo) RunIfPossible() bool {
if repo.State.IsRunning {
return false
}
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
go repo.StartSyncJob()
return true
}
return false
}
func zfsSync(repo *Repo) {
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
if err != nil {
repo.Logger.Error(err)
} else {
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
}
}
}
// update the repo state with the last attempt time and exit now that the job is done
func (repo *Repo) SyncCompleted(exit int) {
repoState := repo.State
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
nextSync := repo.MaxTime - int(syncTook)
if nextSync < 0 {
nextSync = 0
}
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptRunTime = syncTook
var exitStr string
switch exit {
case SUCCESS:
exitStr = "completed"
case TERMINATED:
exitStr = "terminated"
default:
exitStr = "failed"
}
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
if exit == SUCCESS {
// it is possible that the zfssync from the last repo sync is still running is that fine?
go zfsSync(repo)
}
}
func panicIfErr(e error) {
if e != nil {
panic(e)
}
}
func touchFile(file string) {
fi, err := os.Stat(file)
if err != nil {
f, err := os.OpenFile(file, os.O_CREATE, 0644)
if err != nil {
panic(fmt.Errorf("unable to create file %s", file))
}
f.Close()
} else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file))
} else if os.Geteuid() != 1001 {
// UID 1001 is the hardcoded uid for mirror
err := os.Chown(file, 1001, os.Getegid())
panicIfErr(err)
} else if fi.Mode().Perm() != 0644 {
err := os.Chmod(file, 0644)
panicIfErr(err)
}
}
func touchFiles(files ...string) {
for _, file := range files {
touchFile(file)
}
}
// GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo.
// It returns a Config.
func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
// add global configuration in cfg
data, err := ini.Load(CONFIG_PATH)
panicIfErr(err)
cfg := Config{
MaxJobs: DEFAULT_MAX_JOBS,
MaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
StateDir: DEFAULT_STATE_DIR,
LoggerDir: DEFAULT_LOG_DIR,
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
SockPath: DEFAULT_SOCK_PATH,
Repos: make([]*Repo, 0),
}
err = data.MapTo(&cfg)
panicIfErr(err)
for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
}
if cfg.IPv4Address == "" {
panic("Missing IPv4 address from config")
} else if cfg.IPv6Address == "" {
panic("Missing IPv6 address from config")
}
// add each repo configuration to cfg
for _, section := range data.Sections() {
repoName := section.Name()
if repoName == "DEFAULT" {
continue
}
repo := Repo{
Name: repoName,
SyncType: cfg.SyncType,
FrequencyStr: cfg.FrequencyStr,
MaxTime: cfg.MaxTime,
LoggerFile: cfg.LoggerDir + "/" + repoName + ".log",
RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log",
ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log",
DoneChan: doneChan,
StopChan: stopChan,
}
err := section.MapTo(&repo)
panicIfErr(err)
touchFiles(
repo.LoggerFile,
repo.RsyncLogFile,
repo.ZfssyncLogFile,
)
repo.Logger = NewLogger(repo.Name, repo.LoggerFile)
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
} else if repo.Frequency == 0 {
panic("Missing or invalid frequency for " + repo.Name)
}
repo.cfg = &cfg
repo.State = RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: NOT_RUN_YET,
}
// create the state file if it does not exist, otherwise load it from existing file
repoStateFile := cfg.StateDir + "/" + repo.Name
if _, err := os.Stat(repoStateFile); err != nil {
touchFile(repoStateFile)
repo.SaveState()
} else {
err := ini.MapTo(&repo.State, repoStateFile)
panicIfErr(err)
}
cfg.Repos = append(cfg.Repos, &repo)
}
if len(cfg.Repos) == 0 {
panic("No repos found in config")
}
return cfg
}

View File

@ -1 +0,0 @@
package common

265
merlin/config/config.go Normal file
View File

@ -0,0 +1,265 @@
package config
import (
"os"
"path/filepath"
"gopkg.in/ini.v1"
"git.csclub.uwaterloo.ca/public/merlin/logger"
)
const (
DAILY = 86400
TWICE_DAILY = DAILY / 2
HOURLY = 3600
TWICE_HOURLY = HOURLY / 2
BI_HOURLY = HOURLY * 2
TRI_HOURLY = HOURLY * 3
TEN_MINUTELY = 600
FIVE_MINUTELY = 300
// could change this into a default_config
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_SYNC_TYPE = "csc-sync-standard"
DEFAULT_FREQUENCY_STRING = "by-hourly"
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
DEFAULT_SOCK_PATH = "/run/merlin.sock"
)
var frequencies = map[string]int{
"daily": DAILY,
"twice-daily": TWICE_DAILY,
"hourly": HOURLY,
"twice-hourly": TWICE_HOURLY,
"bi-hourly": BI_HOURLY,
"tri-hourly": TRI_HOURLY,
"ten-minutely": TEN_MINUTELY,
"five-minutely": FIVE_MINUTELY,
}
// Last job attempt statuses
const (
NOT_RUN_YET = iota
SUCCESS
FAILURE
TERMINATED // was killed by a signal
)
type SyncResult struct {
Name string
Exit int
}
type Config struct {
// the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"`
// the IP addresses to use for rsync
IPv4Address string `ini:"ipv4_address"`
IPv6Address string `ini:"ipv6_address"`
// the default sync type
DefaultSyncType string `ini:"default_sync_type"`
// the default frequency string
DefaultFrequencyStr string `ini:"default_frequency"`
// the default MaxTime
DefaultMaxTime int `ini:"default_max_time"`
// the directory where rsync should download files
DownloadDir string `ini:"download_dir"`
// the directory where rsync passwords are stored
PasswordDir string `ini:"password_dir"`
// the directory where the state of each repo sync is saved
StateDir string `ini:"states_dir"`
// the directory where merlin will store the general logs for each repo
RepoLogDir string `ini:"repo_logs_dir"`
// the directory to store the rsync logs for each repo
RsyncLogDir string `ini:"rsync_logs_dir"`
// the directory to store the zfssync logs for each repo
ZfssyncLogDir string `ini:"zfssync_logs_dir"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
}
// make it more clear when full path should be used vs when just the file name is needed
type Repo struct {
// the name of this repo
Name string `ini:"-"`
// this should be one of "csc-sync-standard", etc.
SyncType string `ini:"sync_type"`
// a human-readable frequency, e.g. "bi-hourly"
FrequencyStr string `ini:"frequency"`
// the desired interval (in seconds) between successive runs
Frequency int `ini:"-"`
// the maximum time (in seconds) that each child process of this repo
// can for before being killed
MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download
// dir in the config)
LocalDir string `ini:"local_dir"`
// the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the file storing the repo sync state (used to override default)
StateFile string `ini:"state_file"`
// the full file path for general logging of this repo (used to override default)
RepoLogFile string `ini:"repo_log_file"`
// a reference to the general logger
Logger *logger.Logger `ini:"-"`
// the full file path for logging this repo's rsync (used to override default)
RsyncLogFile string `ini:"rsync_log_file"`
// the full file path for logging this repo's zfssync (used to override default)
ZfssyncLogFile string `ini:"zfssync_log_file"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos)
DoneChan chan<- SyncResult `ini:"-"`
// repos should stop syncing if StopChan is closed (shared by all repos)
StopChan chan struct{} `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
}
// This should only be modified by the main thread
type RepoState struct {
// these are stored in the states folder
// whether this repo is running a job or not
IsRunning bool `ini:"is_running"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptStartTime int64 `ini:"last_attempt_time"`
// the number of seconds this repo ran for during its last attempted job
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
// whether the last attempt was successful or not
LastAttemptExit int `ini:"last_attempt_exit"`
}
var (
Conf Config
Repos []*Repo
RepoMap map[string]*Repo
)
// GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo.
// It returns a Config.
func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) {
// set default values then load config from file
newConf := Config{
MaxJobs: DEFAULT_MAX_JOBS,
DefaultSyncType: DEFAULT_SYNC_TYPE,
DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING,
DefaultMaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
StateDir: DEFAULT_STATE_DIR,
RepoLogDir: DEFAULT_LOG_DIR,
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
SockPath: DEFAULT_SOCK_PATH,
}
iniInfo, err := ini.Load(configPath)
panicIfErr(err)
err = iniInfo.MapTo(&newConf)
panicIfErr(err)
// check config for major errors
for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
}
if newConf.IPv4Address == "" {
panic("Missing IPv4 address from config")
} else if newConf.IPv6Address == "" {
panic("Missing IPv6 address from config")
}
newRepos := make([]*Repo, 0)
for _, section := range iniInfo.Sections() {
repoName := section.Name()
if repoName == "DEFAULT" {
continue
}
// set the default values for the repo then load from file
// TODO: check if local_dir and repoName are always the same value
// TODO: check to ensure that every Repo.Name is unique (may already be done by ini)
repo := Repo{
Name: repoName,
SyncType: newConf.DefaultSyncType,
FrequencyStr: newConf.DefaultFrequencyStr,
MaxTime: newConf.DefaultMaxTime,
StateFile: filepath.Join(newConf.StateDir, repoName),
RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log",
RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log",
ZfssyncLogFile: filepath.Join(newConf.ZfssyncLogDir, repoName) + "-zfssync.log",
DoneChan: doneChan,
StopChan: stopChan,
}
err := section.MapTo(&repo)
panicIfErr(err)
// TODO: ensure that the parent dirs to the file also exist when touching
// or just remove the ability to override
touchFiles(
repo.StateFile,
repo.RepoLogFile,
repo.RsyncLogFile,
repo.ZfssyncLogFile,
)
repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile)
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
} else if repo.Frequency == 0 {
panic("Missing or invalid frequency for " + repo.Name)
}
repo.State = RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: NOT_RUN_YET,
}
err = ini.MapTo(&repo.State, repo.StateFile)
panicIfErr(err)
repo.SaveState()
newRepos = append(newRepos, &repo)
}
if len(newRepos) == 0 {
panic("No repos found in config")
}
Conf = newConf
Repos = newRepos
RepoMap = make(map[string]*Repo)
for _, repo := range Repos {
RepoMap[repo.Name] = repo
}
}
// save the current state of the repo to a file
func (repo *Repo) SaveState() {
// repo.Logger.Debug("Saving state")
state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
repo.Logger.Error(err.Error())
}
file, err := os.OpenFile(repo.StateFile, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
repo.Logger.Error(err.Error())
}
if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err.Error())
}
// repo.Logger.Debug("Saved state")
}

View File

@ -0,0 +1,122 @@
package config
import (
"errors"
"os"
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
)
func TestTouchFiles(t *testing.T) {
files := []string{
"/tmp/merlin_touch_test_1",
"/tmp/merlin_touch_test_2",
"/tmp/merlin_touch_test_3",
}
touchFiles(files[0], files[1], files[2])
for _, file := range files {
if _, err := os.Stat(file); err != nil {
t.Errorf(err.Error())
} else if err := os.Remove(file); err != nil {
t.Errorf(err.Error())
}
}
}
func TestPanicIfErr(t *testing.T) {
panicIfErr(nil)
defer func() { recover() }()
panicIfErr(errors.New("AAAAAAAAAA"))
t.Errorf("panicIfErr should have panicked")
}
func TestLoadConfig(t *testing.T) {
doneChan := make(chan SyncResult)
stopChan := make(chan struct{})
LoadConfig("config_test.ini", doneChan, stopChan)
// TODO: Fill out parts not part of the ini or state file
expectedConfig := Config{
MaxJobs: 6,
IPv4Address: "129.97.134.129",
IPv6Address: "2620:101:f000:4901:c5c::129",
DefaultSyncType: "csc-sync-standard",
DefaultFrequencyStr: "daily",
DefaultMaxTime: 1000,
DownloadDir: "/tmp/test-mirror",
PasswordDir: "/home/mirror/passwords",
StateDir: "test_files",
RepoLogDir: "test_files/logs",
RsyncLogDir: "test_files/rsync",
ZfssyncLogDir: "test_files/zfssync",
SockPath: "test_files/test.sock",
}
expectedRepo1 := Repo{
Name: "eelinux",
SyncType: "csc-sync-nonstandard",
FrequencyStr: "tri-hourly",
Frequency: 10800,
MaxTime: 2000,
LocalDir: "eelinux",
RsyncHost: "rsync.releases.eelinux.ca",
RsyncDir: "releases",
StateFile: "test_files/eeelinux",
RepoLogFile: "test_files/logs/eelinux.log",
Logger: Repos[0].Logger,
RsyncLogFile: "test_files/rsync/eelinux.log",
ZfssyncLogFile: "test_files/zfssync/eelinux.log",
DoneChan: doneChan,
StopChan: stopChan,
State: RepoState{
IsRunning: false,
LastAttemptStartTime: 1600000000,
LastAttemptRunTime: 100,
LastAttemptExit: 1,
},
}
expectedRepo2 := Repo{
Name: "yoland",
SyncType: "csc-sync-standard",
FrequencyStr: "daily",
Frequency: 86400,
MaxTime: 1000,
LocalDir: "yoland-releases",
RsyncHost: "rsync.releases.yoland.io",
RsyncDir: "releases",
StateFile: "test_files/yoland",
RepoLogFile: "test_files/logs/yoland.log",
Logger: Repos[1].Logger,
RsyncLogFile: "test_files/rsync/yoland-rsync.log",
ZfssyncLogFile: "test_files/zfssync/yoland-zfssync.log",
DoneChan: doneChan,
StopChan: stopChan,
State: RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: 0,
},
}
if !reflect.DeepEqual(expectedConfig, Conf) {
t.Errorf("Config loaded does not match expected config")
spew.Dump(expectedConfig)
spew.Dump(Conf)
}
if !reflect.DeepEqual(expectedRepo1, *Repos[0]) {
t.Errorf("The eelinux repo loaded does not match the exected repo config")
spew.Dump(expectedRepo1)
spew.Dump(*Repos[0])
}
if !reflect.DeepEqual(expectedRepo2, *Repos[1]) {
t.Errorf("The yoland repo loaded does not match the exected repo config")
spew.Dump(expectedRepo2)
spew.Dump(*Repos[1])
}
os.Remove("test_files/yoland")
os.RemoveAll("test_files/logs")
os.RemoveAll("test_files/rsync")
os.RemoveAll("test_files/zfssync")
}

View File

@ -0,0 +1,39 @@
; default values are commented out
; max_jobs = 6
ipv4_address = 129.97.134.129
ipv6_address = 2620:101:f000:4901:c5c::129
; default_sync_type = csc-sync-standard
default_frequency = daily
default_max_time = 1000
download_dir = /tmp/test-mirror
; password_dir = /home/mirror/passwords
states_dir = test_files
repo_logs_dir = test_files/logs
rsync_logs_dir = test_files/rsync
zfssync_logs_dir = test_files/zfssync
sock_path = test_files/test.sock
[eelinux]
sync_type = csc-sync-nonstandard
frequency = tri-hourly
max_time = 2000
local_dir = eelinux
rsync_host = rsync.releases.eelinux.ca
rsync_dir = releases
state_file = test_files/eeelinux
repo_log_file = test_files/logs/eelinux.log
rsync_log_file = test_files/rsync/eelinux.log
zfssync_log_file = test_files/zfssync/eelinux.log
[yoland]
; sync_type = csc-sync-standard
; frequency = daily
; max_time = 1000
local_dir = yoland-releases
rsync_host = rsync.releases.yoland.io
rsync_dir = releases
; state_file = test_files/yoland
; repo_log_file = test_files/logs/yoland.log
; rsync_log_file = test_files/rsync/yoland-rsync.log
; zfssync_log_file = test_files/zfssync/yoland-zfssync.log

View File

@ -0,0 +1,5 @@
is_running = false
last_attempt_time = 1600000000
last_attempt_runtime = 100
last_attempt_exit = 1

37
merlin/config/utils.go Normal file
View File

@ -0,0 +1,37 @@
package config
import (
"fmt"
"os"
)
func panicIfErr(e error) {
if e != nil {
panic(e)
}
}
func touchFile(file string) {
fi, err := os.Stat(file)
if err != nil {
f, err := os.OpenFile(file, os.O_CREATE, 0644)
if err != nil {
panic(fmt.Errorf("unable to create file %s", file))
}
f.Close()
} else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file))
// } else if os.Geteuid() != 1001 {
// // mirror is UID 1001
// err := os.Chown(file, 1001, os.Getegid())
// panicIfErr(err)
}
err = os.Chmod(file, 0644)
panicIfErr(err)
}
func touchFiles(files ...string) {
for _, file := range files {
touchFile(file)
}
}

View File

@ -1,6 +1,7 @@
module git.csclub.uwaterloo.ca/public/merlin
require (
github.com/davecgh/go-spew v1.1.0
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164
gopkg.in/ini.v1 v1.63.2

View File

@ -1,4 +1,4 @@
package common
package logger
import (
"log"
@ -30,12 +30,12 @@ var levels = map[int]string{
var outLogger = log.New(os.Stdout, "", log.LstdFlags)
var errLogger = log.New(os.Stderr, "", log.LstdFlags)
func OutLogger() *log.Logger {
return outLogger
func OutLog(v ...interface{}) {
outLogger.Println(v...)
}
func ErrLogger() *log.Logger {
return errLogger
func ErrLog(v ...interface{}) {
errLogger.Println(v...)
}
func NewLogger(name, file string) *Logger {
@ -48,15 +48,9 @@ func NewLogger(name, file string) *Logger {
}
func (logger *Logger) Log(level int, v ...interface{}) {
if level == INFO {
outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
} else if level == ERROR {
errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
}
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
errLogger.Println(err.Error())
ErrLog(err.Error())
}
defer f.Close()
@ -65,7 +59,7 @@ func (logger *Logger) Log(level int, v ...interface{}) {
args = append(args, v...)
logger.SetOutput(f)
logger.Println(v...)
logger.Println(args)
}
func (logger *Logger) Debug(v ...interface{}) {
@ -73,6 +67,9 @@ func (logger *Logger) Debug(v ...interface{}) {
}
func (logger *Logger) Info(v ...interface{}) {
// src := []interface{}{logger.name + ":"}
// args := append(src, v...)
OutLog(v...)
logger.Log(INFO, v...)
}
@ -81,5 +78,6 @@ func (logger *Logger) Warning(v ...interface{}) {
}
func (logger *Logger) Error(v ...interface{}) {
ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...))
logger.Log(ERROR, v...)
}

View File

@ -1,153 +1,35 @@
package main
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"text/tabwriter"
"time"
"git.csclub.uwaterloo.ca/public/merlin/common"
"golang.org/x/sys/unix"
"git.csclub.uwaterloo.ca/public/merlin/arthur"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
"git.csclub.uwaterloo.ca/public/merlin/sync"
)
var (
cfg common.Config
outLogger *log.Logger
errLogger *log.Logger
repoMap map[string]*common.Repo
repoIdx int
numJobsRunning int
)
func getAndRunCommand(conn net.Conn) {
defer conn.Close()
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
errLogger.Println(err.Error())
return
}
command := buf.String()
args := strings.Split(command, ":")
respondAndLogErr := func(msg string) {
outLogger.Println(msg)
conn.Write([]byte(msg))
}
if args[0] == "status" {
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n")
// for time formating see https://pkg.go.dev/time#pkg-constants
for name, repo := range repoMap {
fmt.Fprintf(status, "%s\t%s\t%s\n",
name,
time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123),
time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123),
)
}
status.Flush()
} else if args[0] == "sync" {
if len(args) != 2 {
respondAndLogErr("Could not parse sync command, forced sync fails.")
return
}
if repo, inMap := repoMap[args[1]]; inMap {
outLogger.Println("Attempting to force sync of " + repo.Name)
if repo.RunIfPossible() {
conn.Write([]byte("Forced sync for " + repo.Name))
numJobsRunning++
} else {
respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.")
}
} else {
respondAndLogErr(args[1] + " is not tracked so cannot sync")
}
} else {
respondAndLogErr("Received unrecognized command: " + command)
}
}
func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// must remove cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(cfg.SockPath) != ".sock" {
panic(fmt.Errorf("Socket file must end with .sock"))
} else if _, err := os.Stat(cfg.SockPath); err == nil {
if err := os.Remove(cfg.SockPath); err != nil {
panic(err)
}
} else if !errors.Is(err, os.ErrNotExist) {
panic(err)
}
ear, err := net.Listen("unix", cfg.SockPath)
if err != nil {
panic(err)
}
outLogger.Println("Listening to unix socket at " + cfg.SockPath)
go func() {
for {
// will exit when ear is closed
conn, err := ear.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
errLogger.Println("Accepted socket error: " + err.Error())
continue
}
errLogger.Println("Unhandlable socket error: " + err.Error())
return
}
connChan <- conn
}
}()
<-stopLisChan
ear.Close()
}
// We use a round-robin strategy. It's not the most efficient, but it's simple
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
func runAsManyAsPossible() {
repos := cfg.Repos
startIdx := repoIdx
for numJobsRunning < cfg.MaxJobs {
repo := repos[repoIdx]
if repo.RunIfPossible() {
numJobsRunning++
}
repoIdx = (repoIdx + 1) % len(repos)
if repoIdx == startIdx {
// we've come full circle
return
}