Refactor Go Merlin #2
|
@ -0,0 +1,18 @@
|
||||||
|
kind: pipeline
|
||||||
|
type: docker
|
||||||
|
name: default
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: merlin
|
||||||
|
image: golang:1.17
|
||||||
|
commands:
|
||||||
|
# add linter
|
||||||
|
- cd merlin
|
||||||
|
- go build
|
||||||
|
- go test ./...
|
||||||
|
|
||||||
|
trigger:
|
||||||
|
branch:
|
||||||
|
- master
|
||||||
|
- go
|
||||||
|
- refactor
|
|
@ -1,4 +1,3 @@
|
||||||
/.*
|
|
||||||
!.gitignore
|
!.gitignore
|
||||||
.git_old/
|
.git_old/
|
||||||
/dead.letter
|
/dead.letter
|
||||||
|
|
|
@ -20,6 +20,7 @@ This folder contains the code for merlin (which does the actual syncing) and art
|
||||||
- [ ] wget
|
- [ ] wget
|
||||||
|
|
||||||
### TODO
|
### TODO
|
||||||
|
- [ ] ensure that the proper permissions (file mode, group, user) are used
|
||||||
- [ ] detect if an rsync process is stuck (\*\*)
|
- [ ] detect if an rsync process is stuck (\*\*)
|
||||||
- [ ] place each rsync process in a separate cgroup (\*\*\*)
|
- [ ] place each rsync process in a separate cgroup (\*\*\*)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
package arthur
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"text/tabwriter"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Reads and parses the message sent over the accepted connection
|
||||||
|
func GetCommand(conn net.Conn) (command, repoName string) {
|
||||||
|
command = ""
|
||||||
|
repoName = ""
|
||||||
|
|
||||||
|
buf, err := ioutil.ReadAll(conn)
|
||||||
|
if err != nil {
|
||||||
|
logger.ErrLog(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
args := strings.Split(string(buf), ":")
|
||||||
|
if len(args) >= 1 {
|
||||||
|
command = args[0]
|
||||||
|
}
|
||||||
|
if len(args) >= 2 {
|
||||||
|
repoName = args[1]
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func SendAndLog(conn net.Conn, msg string) {
|
||||||
|
logger.OutLog(msg)
|
||||||
|
conn.Write([]byte(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func SendStatus(conn net.Conn) {
|
||||||
|
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
|
||||||
|
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\tRunning\n")
|
||||||
|
|
||||||
|
keys := make([]string, 0)
|
||||||
|
for name, _ := range config.RepoMap {
|
||||||
|
keys = append(keys, name)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
|
||||||
|
// for other ways to format the time see: https://pkg.go.dev/time#pkg-constants
|
||||||
|
for _, name := range keys {
|
||||||
|
repo := config.RepoMap[name]
|
||||||
|
lastSync := repo.State.LastAttemptStartTime
|
||||||
|
nextSync := lastSync + int64(repo.Frequency)
|
||||||
|
|
||||||
|
fmt.Fprintf(status, "%s\t%s\t%s\t%t\n",
|
||||||
|
name,
|
||||||
|
time.Unix(lastSync, 0).Format(time.RFC1123),
|
||||||
|
time.Unix(nextSync, 0).Format(time.RFC1123),
|
||||||
|
repo.State.IsRunning,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
status.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to force the sync of the repo
|
||||||
|
func ForceSync(conn net.Conn, repoName string) (newSync bool) {
|
||||||
|
newSync = false
|
||||||
|
|
||||||
|
// TODO: send repoName and every key in RepoMap to lowercase
|
||||||
|
if repo, isInMap := config.RepoMap[repoName]; isInMap {
|
||||||
|
logger.OutLog("Attempting to force sync of " + repoName)
|
||||||
|
if sync.SyncIfPossible(repo) {
|
||||||
|
conn.Write([]byte("Forced sync for " + repoName))
|
||||||
|
newSync = true
|
||||||
|
} else {
|
||||||
|
SendAndLog(conn, "Could not force sync: "+repoName+" is already syncing.")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SendAndLog(conn, repoName+" is not tracked so cannot sync")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
|
||||||
|
sockpath := config.Conf.SockPath
|
||||||
|
// must remove old cfg.SockPath otherwise get "bind: address already in use"
|
||||||
|
if filepath.Ext(sockpath) != ".sock" {
|
||||||
|
panic(fmt.Errorf("socket file must end with .sock"))
|
||||||
|
} else if _, err := os.Stat(sockpath); err == nil {
|
||||||
|
if err := os.Remove(sockpath); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
} else if !errors.Is(err, os.ErrNotExist) {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ear, err := net.Listen("unix", sockpath)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.OutLog("Listening to unix socket at " + sockpath)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
// Attempting to accept on a closed net.Listener will return a non-temporary error
|
||||||
|
conn, err := ear.Accept()
|
||||||
|
if err != nil {
|
||||||
|
if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() {
|
||||||
|
logger.ErrLog("Accepted socket error: " + err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.ErrLog("Unhandlable socket error: " + err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
connChan <- conn
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// TODO: check handling of multiple SIGHUP
|
||||||
|
<-stopLisChan
|
||||||
|
ear.Close()
|
||||||
|
}
|
|
@ -0,0 +1,283 @@
|
||||||
|
package arthur
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStatusCommand(t *testing.T) {
|
||||||
|
r, w := net.Pipe()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// will only finish write when EOF is sent
|
||||||
|
// only way to send EOF is to close
|
||||||
|
w.Write([]byte("status"))
|
||||||
|
w.Close()
|
||||||
|
}()
|
||||||
|
command, repoName := GetCommand(r)
|
||||||
|
if command != "status" {
|
||||||
|
t.Errorf("command for status should be \"status\", got " + command)
|
||||||
|
} else if repoName != "" {
|
||||||
|
t.Errorf("status should return an empty string for the repoName, got " + repoName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncCommand(t *testing.T) {
|
||||||
|
r, w := net.Pipe()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
w.Write([]byte("sync:ubuntu"))
|
||||||
|
w.Close()
|
||||||
|
}()
|
||||||
|
command, repoName := GetCommand(r)
|
||||||
|
r.Close()
|
||||||
|
if command != "sync" {
|
||||||
|
t.Errorf("command for sync:ubuntu should be \"sync\", got " + command)
|
||||||
|
} else if repoName != "ubuntu" {
|
||||||
|
t.Errorf("name of repo for sync:ubuntu should be \"ubuntu\", got " + repoName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendStatus(t *testing.T) {
|
||||||
|
saveRepoMap := config.RepoMap
|
||||||
|
defer func() {
|
||||||
|
config.RepoMap = saveRepoMap
|
||||||
|
}()
|
||||||
|
|
||||||
|
repoMap := make(map[string]*config.Repo)
|
||||||
|
repoMap["eeeee"] = &config.Repo{
|
||||||
|
Frequency: 30 * 86400,
|
||||||
|
State: config.RepoState{
|
||||||
|
IsRunning: true,
|
||||||
|
LastAttemptStartTime: 1600000000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
repoMap["alinux"] = &config.Repo{
|
||||||
|
Frequency: 7*86400 + 3,
|
||||||
|
State: config.RepoState{
|
||||||
|
IsRunning: true,
|
||||||
|
LastAttemptStartTime: 1620000000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
repoMap["lnux"] = &config.Repo{
|
||||||
|
Frequency: 86400,
|
||||||
|
State: config.RepoState{
|
||||||
|
IsRunning: false,
|
||||||
|
LastAttemptStartTime: 1640000000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config.RepoMap = repoMap
|
||||||
|
|
||||||
|
r, w := net.Pipe()
|
||||||
|
go func() {
|
||||||
|
SendStatus(w)
|
||||||
|
w.Close()
|
||||||
|
}()
|
||||||
|
msg, err := ioutil.ReadAll(r)
|
||||||
|
r.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
expected := `Repository Last Synced Next Expected Sync Running
|
||||||
|
alinux Sun, 02 May 2021 20:00:00 EDT Sun, 09 May 2021 20:00:03 EDT true
|
||||||
|
eeeee Sun, 13 Sep 2020 08:26:40 EDT Tue, 13 Oct 2020 08:26:40 EDT true
|
||||||
|
lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST false
|
||||||
|
`
|
||||||
|
if expected != string(msg) {
|
||||||
|
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForceSync(t *testing.T) {
|
||||||
|
saveRepos := config.Repos
|
||||||
|
saveRepoMap := config.RepoMap
|
||||||
|
doneChan := make(chan config.SyncResult)
|
||||||
|
defer func() {
|
||||||
|
config.Repos = saveRepos
|
||||||
|
config.RepoMap = saveRepoMap
|
||||||
|
close(doneChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Part 1: run a dummy sync
|
||||||
|
repo := config.Repo{
|
||||||
|
Name: "nux",
|
||||||
|
SyncType: "csc-sync-dummy",
|
||||||
|
Frequency: 7 * 86400,
|
||||||
|
MaxTime: 30,
|
||||||
|
Logger: logger.NewLogger("nux", "/tmp/merlin_force_sync_test_logs"),
|
||||||
|
StateFile: "/tmp/merlin_force_sync_test_state",
|
||||||
|
DoneChan: doneChan,
|
||||||
|
State: config.RepoState{
|
||||||
|
IsRunning: false,
|
||||||
|
LastAttemptStartTime: 0,
|
||||||
|
LastAttemptRunTime: 0,
|
||||||
|
LastAttemptExit: config.NOT_RUN_YET,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
config.Repos = nil
|
||||||
|
config.Repos = append(config.Repos, &repo)
|
||||||
|
config.RepoMap = make(map[string]*config.Repo)
|
||||||
|
config.RepoMap["nux"] = &repo
|
||||||
|
|
||||||
|
r, w := net.Pipe()
|
||||||
|
go func() {
|
||||||
|
if !ForceSync(w, "nux") {
|
||||||
|
t.Errorf("Sync for nux did not start")
|
||||||
|
}
|
||||||
|
w.Close()
|
||||||
|
}()
|
||||||
|
msg, err := ioutil.ReadAll(r)
|
||||||
|
r.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
expected := "Forced sync for nux"
|
||||||
|
if expected != string(msg) {
|
||||||
|
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case result := <-doneChan:
|
||||||
|
if result.Exit != config.SUCCESS {
|
||||||
|
t.Errorf("Sync should exit with SUCCESS, got %d", result.Exit)
|
||||||
|
}
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Errorf("Dummy sync should be done in 1 second, waited 3 seconds")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Part 2: attempt the same thing but with repo.State.IsRunning = true
|
||||||
|
r, w = net.Pipe()
|
||||||
|
go func() {
|
||||||
|
if ForceSync(w, "nux") {
|
||||||
|
t.Errorf("Sync for nux should not have started")
|
||||||
|
}
|
||||||
|
w.Close()
|
||||||
|
}()
|
||||||
|
msg, err = ioutil.ReadAll(r)
|
||||||
|
r.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
expected = "Could not force sync: nux is already syncing."
|
||||||
|
if expected != string(msg) {
|
||||||
|
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-doneChan:
|
||||||
|
t.Errorf("Sync for nux should not have been started")
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
}
|
||||||
|
|
||||||
|
// Part 3: attempt a force sync with a repo that does not exist
|
||||||
|
r, w = net.Pipe()
|
||||||
|
go func() {
|
||||||
|
if ForceSync(w, "nixx") {
|
||||||
|
t.Errorf("Sync for nixx should not have started")
|
||||||
|
}
|
||||||
|
w.Close()
|
||||||
|
}()
|
||||||
|
msg, err = ioutil.ReadAll(r)
|
||||||
|
r.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
expected = "nixx is not tracked so cannot sync"
|
||||||
|
if expected != string(msg) {
|
||||||
|
t.Errorf("Expected:\n" + expected + "\nGot:\n" + string(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStartListener(t *testing.T) {
|
||||||
|
saveConf := config.Conf
|
||||||
|
connChan := make(chan net.Conn)
|
||||||
|
stopLisChan := make(chan struct{})
|
||||||
|
wait := make(chan struct{})
|
||||||
|
defer func() {
|
||||||
|
config.Conf = saveConf
|
||||||
|
close(connChan)
|
||||||
|
close(stopLisChan)
|
||||||
|
}()
|
||||||
|
config.Conf = config.Config{
|
||||||
|
SockPath: "/tmp/merlin_listener_test.sock",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 1: check that closing/sending something to stopLisChan will stop the listener
|
||||||
|
// and that a new listener can be created after stopping the old one
|
||||||
|
go func() {
|
||||||
|
StartListener(connChan, stopLisChan)
|
||||||
|
wait <- struct{}{}
|
||||||
|
}()
|
||||||
|
stopLisChan <- struct{}{}
|
||||||
|
select {
|
||||||
|
case <-wait:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
StartListener(connChan, stopLisChan)
|
||||||
|
wait <- struct{}{}
|
||||||
|
}()
|
||||||
|
close(stopLisChan)
|
||||||
|
select {
|
||||||
|
case <-wait:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Errorf("StartListener should stop when stopLisChan is closed")
|
||||||
|
}
|
||||||
|
close(wait)
|
||||||
|
|
||||||
|
// Test 2: check that connections can be made to the unix socket
|
||||||
|
// this test does not appear to be very stable (I think there is a race condition somewhere)
|
||||||
|
stopLisChan = make(chan struct{})
|
||||||
|
go StartListener(connChan, stopLisChan)
|
||||||
|
waitForMsg := func(expected string) {
|
||||||
|
select {
|
||||||
|
case conn := <-connChan:
|
||||||
|
msg, err := ioutil.ReadAll(conn)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
} else if expected != string(msg) {
|
||||||
|
t.Errorf("Message expected was " + expected + " got " + string(msg))
|
||||||
|
}
|
||||||
|
conn.Close()
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Errorf("StartListener should stop when struct{}{} is sent to stopLisChan")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sendMsg := func(msg string) {
|
||||||
|
<-time.After(500 * time.Millisecond)
|
||||||
|
send, err := net.Dial("unix", "/tmp/merlin_listener_test.sock")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
_, err = send.Write([]byte(msg))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
send.Close()
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
waitForMsg("status")
|
||||||
|
}()
|
||||||
|
sendMsg("status")
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
waitForMsg("sync:uuunix")
|
||||||
|
}()
|
||||||
|
sendMsg("sync:uuunix")
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
waitForMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
|
||||||
|
}()
|
||||||
|
sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
|
||||||
|
|
||||||
|
// unsure why I can't put this in the defer
|
||||||
|
os.Remove("/tmp/merlin_listener_test.sock")
|
||||||
|
}
|
|
@ -1,352 +0,0 @@
|
||||||
package common
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
ini "gopkg.in/ini.v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
DAILY = 86400
|
|
||||||
TWICE_DAILY = DAILY / 2
|
|
||||||
HOURLY = 3600
|
|
||||||
TWICE_HOURLY = HOURLY / 2
|
|
||||||
BI_HOURLY = HOURLY * 2
|
|
||||||
TRI_HOURLY = HOURLY * 3
|
|
||||||
TEN_MINUTELY = 600
|
|
||||||
FIVE_MINUTELY = 300
|
|
||||||
|
|
||||||
CONFIG_PATH = "merlin-config.ini"
|
|
||||||
|
|
||||||
DEFAULT_MAX_JOBS = 6
|
|
||||||
DEFAULT_MAX_TIME = DAILY / 4
|
|
||||||
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
|
|
||||||
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
|
|
||||||
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
|
|
||||||
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
|
|
||||||
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
|
|
||||||
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
|
|
||||||
DEFAULT_SOCK_PATH = "/run/merlin.sock"
|
|
||||||
)
|
|
||||||
|
|
||||||
var frequencies = map[string]int{
|
|
||||||
"daily": DAILY,
|
|
||||||
"twice-daily": TWICE_DAILY,
|
|
||||||
"hourly": HOURLY,
|
|
||||||
"twice-hourly": TWICE_HOURLY,
|
|
||||||
"bi-hourly": BI_HOURLY,
|
|
||||||
"tri-hourly": TRI_HOURLY,
|
|
||||||
"ten-minutely": TEN_MINUTELY,
|
|
||||||
"five-minutely": FIVE_MINUTELY,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Last job attempt statuses
|
|
||||||
const (
|
|
||||||
NOT_RUN_YET = iota
|
|
||||||
SUCCESS
|
|
||||||
FAILURE
|
|
||||||
TERMINATED // was killed by a signal
|
|
||||||
)
|
|
||||||
|
|
||||||
type Result struct {
|
|
||||||
Name string
|
|
||||||
Exit int
|
|
||||||
}
|
|
||||||
|
|
||||||
type Repo struct {
|
|
||||||
// the name of this repo
|
|
||||||
Name string `ini:"-"`
|
|
||||||
// this should be one of "csc-sync-standard", etc.
|
|
||||||
SyncType string `ini:"sync_type"`
|
|
||||||
// a human-readable frequency, e.g. "bi-hourly"
|
|
||||||
FrequencyStr string `ini:"frequency"`
|
|
||||||
// the desired interval (in seconds) between successive runs
|
|
||||||
Frequency int `ini:"-"`
|
|
||||||
// the maximum time (in seconds) that each child process of this repo
|
|
||||||
// can for before being killed
|
|
||||||
MaxTime int `ini:"max_time"`
|
|
||||||
// where to download the files for this repo (relative to the download
|
|
||||||
// dir in the config)
|
|
||||||
LocalDir string `ini:"local_dir"`
|
|
||||||
// the remote host to rsync from
|
|
||||||
RsyncHost string `ini:"rsync_host"`
|
|
||||||
// the remote directory on the rsync host
|
|
||||||
RsyncDir string `ini:"rsync_dir"`
|
|
||||||
// the rsync user (optional)
|
|
||||||
RsyncUser string `ini:"rsync_user"`
|
|
||||||
// the file storing the password for rsync (optional)
|
|
||||||
PasswordFile string `ini:"password_file"`
|
|
||||||
// the file for general logging of this repo
|
|
||||||
LoggerFile string `ini:"log_file"`
|
|
||||||
// a reference to the general logger
|
|
||||||
Logger *Logger `ini:"-"`
|
|
||||||
// the file for logging this repo's rsync
|
|
||||||
RsyncLogFile string `ini:"rsync_log_file"`
|
|
||||||
// the file for logging this repo's zfssync
|
|
||||||
ZfssyncLogFile string `ini:"zfssync_log_file"`
|
|
||||||
// the repo will write its name and status in a Result struct to DoneChan
|
|
||||||
// when it has finished a job (shared by all repos)
|
|
||||||
DoneChan chan<- Result `ini:"-"`
|
|
||||||
// the repo should stop syncing if StopChan is closed (shared by all repos)
|
|
||||||
StopChan chan struct{} `ini:"-"`
|
|
||||||
// a struct that stores the repo's status
|
|
||||||
State RepoState `ini:"-"`
|
|
||||||
// a reference to the global config
|
|
||||||
cfg *Config `ini:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
// the maximum number of jobs allowed to execute concurrently
|
|
||||||
MaxJobs int `ini:"max_jobs"`
|
|
||||||
// the IP addresses to use for rsync
|
|
||||||
IPv4Address string `ini:"ipv4_address"`
|
|
||||||
IPv6Address string `ini:"ipv6_address"`
|
|
||||||
// the default sync type
|
|
||||||
SyncType string `ini:"default_sync_type"`
|
|
||||||
// the default frequency string for the repos
|
|
||||||
FrequencyStr string `ini:"default_frequency"`
|
|
||||||
// the default MaxTime for each repo
|
|
||||||
MaxTime int `ini:"default_max_time"`
|
|
||||||
// the directory where rsync should download files
|
|
||||||
DownloadDir string `ini:"download_dir"`
|
|
||||||
// the directory where rsync passwords are stored
|
|
||||||
PasswordDir string `ini:"password_dir"`
|
|
||||||
// the directory where the state of each repo sync is saved
|
|
||||||
StateDir string `ini:"states_dir"`
|
|
||||||
// the directory where merlin will store the general logs for each repo
|
|
||||||
LoggerDir string `ini:"log_dir"`
|
|
||||||
// the directory to store the rsync logs for each repo
|
|
||||||
RsyncLogDir string `ini:"rsync_log_dir"`
|
|
||||||
// the directory to store the zfssync logs for each repo
|
|
||||||
ZfssyncLogDir string `ini:"zfssync_log_dir"`
|
|
||||||
// the Unix socket path which arthur will use to communicate with us
|
|
||||||
SockPath string `ini:"sock_path"`
|
|
||||||
// a list of all of the repos
|
|
||||||
Repos []*Repo `ini:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should only be modified by the main thread
|
|
||||||
type RepoState struct {
|
|
||||||
// these are stored in the states folder
|
|
||||||
// whether this repo is running a job or not
|
|
||||||
IsRunning bool `ini:"is_running"`
|
|
||||||
// the Unix epoch timestamp at which this repo last attempted a job
|
|
||||||
LastAttemptStartTime int64 `ini:"last_attempt_time"`
|
|
||||||
// the number of seconds this repo ran for during its last attempted job
|
|
||||||
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
|
|
||||||
// whether the last attempt was successful or not
|
|
||||||
LastAttemptExit int `ini:"last_attempt_exit"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// save the current state of the repo to a file
|
|
||||||
func (repo *Repo) SaveState() {
|
|
||||||
repo.Logger.Debug("Saving state")
|
|
||||||
state_cfg := ini.Empty()
|
|
||||||
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
|
|
||||||
repo.Logger.Error(err.Error())
|
|
||||||
}
|
|
||||||
file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
|
|
||||||
if err != nil {
|
|
||||||
repo.Logger.Error(err.Error())
|
|
||||||
}
|
|
||||||
if _, err := state_cfg.WriteTo(file); err != nil {
|
|
||||||
repo.Logger.Error(err.Error())
|
|
||||||
}
|
|
||||||
repo.Logger.Debug("Saved state")
|
|
||||||
}
|
|
||||||
|
|
||||||
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
|
|
||||||
// and is not currently running.
|
|
||||||
// returns true iff a job is started.
|
|
||||||
func (repo *Repo) RunIfPossible() bool {
|
|
||||||
if repo.State.IsRunning {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
curTime := time.Now().Unix()
|
|
||||||
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
|
|
||||||
repo.State.IsRunning = true
|
|
||||||
repo.State.LastAttemptStartTime = curTime
|
|
||||||
repo.SaveState()
|
|
||||||
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
|
|
||||||
go repo.StartSyncJob()
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func zfsSync(repo *Repo) {
|
|
||||||
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
|
|
||||||
if err != nil {
|
|
||||||
repo.Logger.Error(err)
|
|
||||||
} else {
|
|
||||||
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
|
||||||
if err != nil {
|
|
||||||
repo.Logger.Error(err.Error())
|
|
||||||
} else {
|
|
||||||
f.Write(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// update the repo state with the last attempt time and exit now that the job is done
|
|
||||||
func (repo *Repo) SyncCompleted(exit int) {
|
|
||||||
repoState := repo.State
|
|
||||||
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
|
|
||||||
nextSync := repo.MaxTime - int(syncTook)
|
|
||||||
if nextSync < 0 {
|
|
||||||
nextSync = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
repoState.IsRunning = false
|
|
||||||
repoState.LastAttemptExit = exit
|
|
||||||
repoState.LastAttemptRunTime = syncTook
|
|
||||||
|
|
||||||
var exitStr string
|
|
||||||
switch exit {
|
|
||||||
case SUCCESS:
|
|
||||||
exitStr = "completed"
|
|
||||||
case TERMINATED:
|
|
||||||
exitStr = "terminated"
|
|
||||||
default:
|
|
||||||
exitStr = "failed"
|
|
||||||
}
|
|
||||||
repo.SaveState()
|
|
||||||
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
|
|
||||||
if exit == SUCCESS {
|
|
||||||
// it is possible that the zfssync from the last repo sync is still running is that fine?
|
|
||||||
go zfsSync(repo)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func panicIfErr(e error) {
|
|
||||||
if e != nil {
|
|
||||||
panic(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func touchFile(file string) {
|
|
||||||
fi, err := os.Stat(file)
|
|
||||||
if err != nil {
|
|
||||||
f, err := os.OpenFile(file, os.O_CREATE, 0644)
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("unable to create file %s", file))
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
} else if fi.IsDir() {
|
|
||||||
panic(fmt.Errorf("%s is a directory", file))
|
|
||||||
} else if os.Geteuid() != 1001 {
|
|
||||||
// UID 1001 is the hardcoded uid for mirror
|
|
||||||
err := os.Chown(file, 1001, os.Getegid())
|
|
||||||
panicIfErr(err)
|
|
||||||
} else if fi.Mode().Perm() != 0644 {
|
|
||||||
err := os.Chmod(file, 0644)
|
|
||||||
panicIfErr(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func touchFiles(files ...string) {
|
|
||||||
for _, file := range files {
|
|
||||||
touchFile(file)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetConfig reads the config from a JSON file, initializes default values,
|
|
||||||
// and initializes the non-configurable fields of each repo.
|
|
||||||
// It returns a Config.
|
|
||||||
func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
|
|
||||||
// add global configuration in cfg
|
|
||||||
data, err := ini.Load(CONFIG_PATH)
|
|
||||||
panicIfErr(err)
|
|
||||||
|
|
||||||
cfg := Config{
|
|
||||||
MaxJobs: DEFAULT_MAX_JOBS,
|
|
||||||
MaxTime: DEFAULT_MAX_TIME,
|
|
||||||
PasswordDir: DEFAULT_PASSWORD_DIR,
|
|
||||||
DownloadDir: DEFAULT_DOWNLOAD_DIR,
|
|
||||||
StateDir: DEFAULT_STATE_DIR,
|
|
||||||
LoggerDir: DEFAULT_LOG_DIR,
|
|
||||||
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
|
|
||||||
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
|
|
||||||
SockPath: DEFAULT_SOCK_PATH,
|
|
||||||
Repos: make([]*Repo, 0),
|
|
||||||
}
|
|
||||||
err = data.MapTo(&cfg)
|
|
||||||
panicIfErr(err)
|
|
||||||
|
|
||||||
for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} {
|
|
||||||
err := os.MkdirAll(dir, 0755)
|
|
||||||
panicIfErr(err)
|
|
||||||
}
|
|
||||||
if cfg.IPv4Address == "" {
|
|
||||||
panic("Missing IPv4 address from config")
|
|
||||||
} else if cfg.IPv6Address == "" {
|
|
||||||
panic("Missing IPv6 address from config")
|
|
||||||
}
|
|
||||||
|
|
||||||
// add each repo configuration to cfg
|
|
||||||
for _, section := range data.Sections() {
|
|
||||||
repoName := section.Name()
|
|
||||||
if repoName == "DEFAULT" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
repo := Repo{
|
|
||||||
Name: repoName,
|
|
||||||
SyncType: cfg.SyncType,
|
|
||||||
FrequencyStr: cfg.FrequencyStr,
|
|
||||||
MaxTime: cfg.MaxTime,
|
|
||||||
LoggerFile: cfg.LoggerDir + "/" + repoName + ".log",
|
|
||||||
RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log",
|
|
||||||
ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log",
|
|
||||||
DoneChan: doneChan,
|
|
||||||
StopChan: stopChan,
|
|
||||||
}
|
|
||||||
err := section.MapTo(&repo)
|
|
||||||
panicIfErr(err)
|
|
||||||
|
|
||||||
touchFiles(
|
|
||||||
repo.LoggerFile,
|
|
||||||
repo.RsyncLogFile,
|
|
||||||
repo.ZfssyncLogFile,
|
|
||||||
)
|
|
||||||
|
|
||||||
repo.Logger = NewLogger(repo.Name, repo.LoggerFile)
|
|
||||||
repo.Frequency = frequencies[repo.FrequencyStr]
|
|
||||||
if repo.SyncType == "" {
|
|
||||||
panic("Missing sync type from " + repo.Name)
|
|
||||||
} else if repo.Frequency == 0 {
|
|
||||||
panic("Missing or invalid frequency for " + repo.Name)
|
|
||||||
}
|
|
||||||
repo.cfg = &cfg
|
|
||||||
|
|
||||||
repo.State = RepoState{
|
|
||||||
IsRunning: false,
|
|
||||||
LastAttemptStartTime: 0,
|
|
||||||
LastAttemptRunTime: 0,
|
|
||||||
LastAttemptExit: NOT_RUN_YET,
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the state file if it does not exist, otherwise load it from existing file
|
|
||||||
repoStateFile := cfg.StateDir + "/" + repo.Name
|
|
||||||
if _, err := os.Stat(repoStateFile); err != nil {
|
|
||||||
touchFile(repoStateFile)
|
|
||||||
repo.SaveState()
|
|
||||||
} else {
|
|
||||||
err := ini.MapTo(&repo.State, repoStateFile)
|
|
||||||
panicIfErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.Repos = append(cfg.Repos, &repo)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(cfg.Repos) == 0 {
|
|
||||||
panic("No repos found in config")
|
|
||||||
}
|
|
||||||
|
|
||||||
return cfg
|
|
||||||
}
|
|
|
@ -1 +0,0 @@
|
||||||
package common
|
|
|
@ -0,0 +1,265 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"gopkg.in/ini.v1"
|
||||||
|
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DAILY = 86400
|
||||||
|
TWICE_DAILY = DAILY / 2
|
||||||
|
HOURLY = 3600
|
||||||
|
TWICE_HOURLY = HOURLY / 2
|
||||||
|
BI_HOURLY = HOURLY * 2
|
||||||
|
TRI_HOURLY = HOURLY * 3
|
||||||
|
TEN_MINUTELY = 600
|
||||||
|
FIVE_MINUTELY = 300
|
||||||
|
|
||||||
|
// could change this into a default_config
|
||||||
|
DEFAULT_MAX_JOBS = 6
|
||||||
|
DEFAULT_MAX_TIME = DAILY / 4
|
||||||
|
DEFAULT_SYNC_TYPE = "csc-sync-standard"
|
||||||
|
DEFAULT_FREQUENCY_STRING = "by-hourly"
|
||||||
|
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
|
||||||
|
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
|
||||||
|
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
|
||||||
|
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
|
||||||
|
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
|
||||||
|
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
|
||||||
|
DEFAULT_SOCK_PATH = "/run/merlin.sock"
|
||||||
|
)
|
||||||
|
|
||||||
|
var frequencies = map[string]int{
|
||||||
|
"daily": DAILY,
|
||||||
|
"twice-daily": TWICE_DAILY,
|
||||||
|
"hourly": HOURLY,
|
||||||
|
"twice-hourly": TWICE_HOURLY,
|
||||||
|
"bi-hourly": BI_HOURLY,
|
||||||
|
"tri-hourly": TRI_HOURLY,
|
||||||
|
"ten-minutely": TEN_MINUTELY,
|
||||||
|
"five-minutely": FIVE_MINUTELY,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Last job attempt statuses
|
||||||
|
const (
|
||||||
|
NOT_RUN_YET = iota
|
||||||
|
SUCCESS
|
||||||
|
FAILURE
|
||||||
|
TERMINATED // was killed by a signal
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncResult struct {
|
||||||
|
Name string
|
||||||
|
Exit int
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
// the maximum number of jobs allowed to execute concurrently
|
||||||
|
MaxJobs int `ini:"max_jobs"`
|
||||||
|
// the IP addresses to use for rsync
|
||||||
|
IPv4Address string `ini:"ipv4_address"`
|
||||||
|
IPv6Address string `ini:"ipv6_address"`
|
||||||
|
// the default sync type
|
||||||
|
DefaultSyncType string `ini:"default_sync_type"`
|
||||||
|
// the default frequency string
|
||||||
|
DefaultFrequencyStr string `ini:"default_frequency"`
|
||||||
|
// the default MaxTime
|
||||||
|
DefaultMaxTime int `ini:"default_max_time"`
|
||||||
|
// the directory where rsync should download files
|
||||||
|
DownloadDir string `ini:"download_dir"`
|
||||||
|
// the directory where rsync passwords are stored
|
||||||
|
PasswordDir string `ini:"password_dir"`
|
||||||
|
// the directory where the state of each repo sync is saved
|
||||||
|
StateDir string `ini:"states_dir"`
|
||||||
|
// the directory where merlin will store the general logs for each repo
|
||||||
|
RepoLogDir string `ini:"repo_logs_dir"`
|
||||||
|
// the directory to store the rsync logs for each repo
|
||||||
|
RsyncLogDir string `ini:"rsync_logs_dir"`
|
||||||
|
// the directory to store the zfssync logs for each repo
|
||||||
|
ZfssyncLogDir string `ini:"zfssync_logs_dir"`
|
||||||
|
// the Unix socket path which arthur will use to communicate with us
|
||||||
|
SockPath string `ini:"sock_path"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// make it more clear when full path should be used vs when just the file name is needed
|
||||||
|
type Repo struct {
|
||||||
|
// the name of this repo
|
||||||
|
Name string `ini:"-"`
|
||||||
|
// this should be one of "csc-sync-standard", etc.
|
||||||
|
SyncType string `ini:"sync_type"`
|
||||||
|
// a human-readable frequency, e.g. "bi-hourly"
|
||||||
|
FrequencyStr string `ini:"frequency"`
|
||||||
|
// the desired interval (in seconds) between successive runs
|
||||||
|
Frequency int `ini:"-"`
|
||||||
|
// the maximum time (in seconds) that each child process of this repo
|
||||||
|
// can for before being killed
|
||||||
|
MaxTime int `ini:"max_time"`
|
||||||
|
// where to download the files for this repo (relative to the download
|
||||||
|
// dir in the config)
|
||||||
|
LocalDir string `ini:"local_dir"`
|
||||||
|
// the remote host to rsync from
|
||||||
|
RsyncHost string `ini:"rsync_host"`
|
||||||
|
// the remote directory on the rsync host
|
||||||
|
RsyncDir string `ini:"rsync_dir"`
|
||||||
|
// the rsync user (optional)
|
||||||
|
RsyncUser string `ini:"rsync_user"`
|
||||||
|
// the file storing the password for rsync (optional)
|
||||||
|
PasswordFile string `ini:"password_file"`
|
||||||
|
// the file storing the repo sync state (used to override default)
|
||||||
|
StateFile string `ini:"state_file"`
|
||||||
|
// the full file path for general logging of this repo (used to override default)
|
||||||
|
RepoLogFile string `ini:"repo_log_file"`
|
||||||
|
// a reference to the general logger
|
||||||
|
Logger *logger.Logger `ini:"-"`
|
||||||
|
// the full file path for logging this repo's rsync (used to override default)
|
||||||
|
RsyncLogFile string `ini:"rsync_log_file"`
|
||||||
|
// the full file path for logging this repo's zfssync (used to override default)
|
||||||
|
ZfssyncLogFile string `ini:"zfssync_log_file"`
|
||||||
|
// the repo will write its name and status in a Result struct to DoneChan
|
||||||
|
// when it has finished a job (shared by all repos)
|
||||||
|
DoneChan chan<- SyncResult `ini:"-"`
|
||||||
|
// repos should stop syncing if StopChan is closed (shared by all repos)
|
||||||
|
StopChan chan struct{} `ini:"-"`
|
||||||
|
// a struct that stores the repo's status
|
||||||
|
State RepoState `ini:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should only be modified by the main thread
|
||||||
|
type RepoState struct {
|
||||||
|
// these are stored in the states folder
|
||||||
|
// whether this repo is running a job or not
|
||||||
|
IsRunning bool `ini:"is_running"`
|
||||||
|
// the Unix epoch timestamp at which this repo last attempted a job
|
||||||
|
LastAttemptStartTime int64 `ini:"last_attempt_time"`
|
||||||
|
// the number of seconds this repo ran for during its last attempted job
|
||||||
|
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
|
||||||
|
// whether the last attempt was successful or not
|
||||||
|
LastAttemptExit int `ini:"last_attempt_exit"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
Conf Config
|
||||||
|
Repos []*Repo
|
||||||
|
RepoMap map[string]*Repo
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetConfig reads the config from a JSON file, initializes default values,
|
||||||
|
// and initializes the non-configurable fields of each repo.
|
||||||
|
// It returns a Config.
|
||||||
|
func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) {
|
||||||
|
// set default values then load config from file
|
||||||
|
newConf := Config{
|
||||||
|
MaxJobs: DEFAULT_MAX_JOBS,
|
||||||
|
DefaultSyncType: DEFAULT_SYNC_TYPE,
|
||||||
|
DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING,
|
||||||
|
DefaultMaxTime: DEFAULT_MAX_TIME,
|
||||||
|
PasswordDir: DEFAULT_PASSWORD_DIR,
|
||||||
|
DownloadDir: DEFAULT_DOWNLOAD_DIR,
|
||||||
|
StateDir: DEFAULT_STATE_DIR,
|
||||||
|
RepoLogDir: DEFAULT_LOG_DIR,
|
||||||
|
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
|
||||||
|
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
|
||||||
|
SockPath: DEFAULT_SOCK_PATH,
|
||||||
|
}
|
||||||
|
iniInfo, err := ini.Load(configPath)
|
||||||
|
panicIfErr(err)
|
||||||
|
err = iniInfo.MapTo(&newConf)
|
||||||
|
panicIfErr(err)
|
||||||
|
|
||||||
|
// check config for major errors
|
||||||
|
for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} {
|
||||||
|
err := os.MkdirAll(dir, 0755)
|
||||||
|
panicIfErr(err)
|
||||||
|
}
|
||||||
|
if newConf.IPv4Address == "" {
|
||||||
|
panic("Missing IPv4 address from config")
|
||||||
|
} else if newConf.IPv6Address == "" {
|
||||||
|
panic("Missing IPv6 address from config")
|
||||||
|
}
|
||||||
|
|
||||||
|
newRepos := make([]*Repo, 0)
|
||||||
|
for _, section := range iniInfo.Sections() {
|
||||||
|
repoName := section.Name()
|
||||||
|
if repoName == "DEFAULT" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the default values for the repo then load from file
|
||||||
|
// TODO: check if local_dir and repoName are always the same value
|
||||||
|
// TODO: check to ensure that every Repo.Name is unique (may already be done by ini)
|
||||||
|
repo := Repo{
|
||||||
|
Name: repoName,
|
||||||
|
SyncType: newConf.DefaultSyncType,
|
||||||
|
FrequencyStr: newConf.DefaultFrequencyStr,
|
||||||
|
MaxTime: newConf.DefaultMaxTime,
|
||||||
|
StateFile: filepath.Join(newConf.StateDir, repoName),
|
||||||
|
RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log",
|
||||||
|
RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log",
|
||||||
|
ZfssyncLogFile: filepath.Join(newConf.ZfssyncLogDir, repoName) + "-zfssync.log",
|
||||||
|
DoneChan: doneChan,
|
||||||
|
StopChan: stopChan,
|
||||||
|
}
|
||||||
|
err := section.MapTo(&repo)
|
||||||
|
panicIfErr(err)
|
||||||
|
|
||||||
|
// TODO: ensure that the parent dirs to the file also exist when touching
|
||||||
|
// or just remove the ability to override
|
||||||
|
touchFiles(
|
||||||
|
repo.StateFile,
|
||||||
|
repo.RepoLogFile,
|
||||||
|
repo.RsyncLogFile,
|
||||||
|
repo.ZfssyncLogFile,
|
||||||
|
)
|
||||||
|
repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile)
|
||||||
|
repo.Frequency = frequencies[repo.FrequencyStr]
|
||||||
|
if repo.SyncType == "" {
|
||||||
|
panic("Missing sync type from " + repo.Name)
|
||||||
|
} else if repo.Frequency == 0 {
|
||||||
|
panic("Missing or invalid frequency for " + repo.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
repo.State = RepoState{
|
||||||
|
IsRunning: false,
|
||||||
|
LastAttemptStartTime: 0,
|
||||||
|
LastAttemptRunTime: 0,
|
||||||
|
LastAttemptExit: NOT_RUN_YET,
|
||||||
|
}
|
||||||
|
err = ini.MapTo(&repo.State, repo.StateFile)
|
||||||
|
panicIfErr(err)
|
||||||
|
repo.SaveState()
|
||||||
|
|
||||||
|
newRepos = append(newRepos, &repo)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(newRepos) == 0 {
|
||||||
|
panic("No repos found in config")
|
||||||
|
}
|
||||||
|
|
||||||
|
Conf = newConf
|
||||||
|
Repos = newRepos
|
||||||
|
RepoMap = make(map[string]*Repo)
|
||||||
|
for _, repo := range Repos {
|
||||||
|
RepoMap[repo.Name] = repo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// save the current state of the repo to a file
|
||||||
|
func (repo *Repo) SaveState() {
|
||||||
|
// repo.Logger.Debug("Saving state")
|
||||||
|
state_cfg := ini.Empty()
|
||||||
|
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
|
||||||
|
repo.Logger.Error(err.Error())
|
||||||
|
}
|
||||||
|
file, err := os.OpenFile(repo.StateFile, os.O_RDWR|os.O_CREATE, 0644)
|
||||||
|
if err != nil {
|
||||||
|
repo.Logger.Error(err.Error())
|
||||||
|
}
|
||||||
|
if _, err := state_cfg.WriteTo(file); err != nil {
|
||||||
|
repo.Logger.Error(err.Error())
|
||||||
|
}
|
||||||
|
// repo.Logger.Debug("Saved state")
|
||||||
|
}
|
|
@ -0,0 +1,122 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTouchFiles(t *testing.T) {
|
||||||
|
files := []string{
|
||||||
|
"/tmp/merlin_touch_test_1",
|
||||||
|
"/tmp/merlin_touch_test_2",
|
||||||
|
"/tmp/merlin_touch_test_3",
|
||||||
|
}
|
||||||
|
touchFiles(files[0], files[1], files[2])
|
||||||
|
for _, file := range files {
|
||||||
|
if _, err := os.Stat(file); err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
} else if err := os.Remove(file); err != nil {
|
||||||
|
t.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPanicIfErr(t *testing.T) {
|
||||||
|
panicIfErr(nil)
|
||||||
|
defer func() { recover() }()
|
||||||
|
panicIfErr(errors.New("AAAAAAAAAA"))
|
||||||
|
t.Errorf("panicIfErr should have panicked")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadConfig(t *testing.T) {
|
||||||
|
doneChan := make(chan SyncResult)
|
||||||
|
stopChan := make(chan struct{})
|
||||||
|
LoadConfig("config_test.ini", doneChan, stopChan)
|
||||||
|
// TODO: Fill out parts not part of the ini or state file
|
||||||
|
expectedConfig := Config{
|
||||||
|
MaxJobs: 6,
|
||||||
|
IPv4Address: "129.97.134.129",
|
||||||
|
IPv6Address: "2620:101:f000:4901:c5c::129",
|
||||||
|
DefaultSyncType: "csc-sync-standard",
|
||||||
|
DefaultFrequencyStr: "daily",
|
||||||
|
DefaultMaxTime: 1000,
|
||||||
|
DownloadDir: "/tmp/test-mirror",
|
||||||
|
PasswordDir: "/home/mirror/passwords",
|
||||||
|
StateDir: "test_files",
|
||||||
|
RepoLogDir: "test_files/logs",
|
||||||
|
RsyncLogDir: "test_files/rsync",
|
||||||
|
ZfssyncLogDir: "test_files/zfssync",
|
||||||
|
SockPath: "test_files/test.sock",
|
||||||
|
}
|
||||||
|
expectedRepo1 := Repo{
|
||||||
|
Name: "eelinux",
|
||||||
|
SyncType: "csc-sync-nonstandard",
|
||||||
|
FrequencyStr: "tri-hourly",
|
||||||
|
Frequency: 10800,
|
||||||
|
MaxTime: 2000,
|
||||||
|
LocalDir: "eelinux",
|
||||||
|
RsyncHost: "rsync.releases.eelinux.ca",
|
||||||
|
RsyncDir: "releases",
|
||||||
|
StateFile: "test_files/eeelinux",
|
||||||
|
RepoLogFile: "test_files/logs/eelinux.log",
|
||||||
|
Logger: Repos[0].Logger,
|
||||||
|
RsyncLogFile: "test_files/rsync/eelinux.log",
|
||||||
|
ZfssyncLogFile: "test_files/zfssync/eelinux.log",
|
||||||
|
DoneChan: doneChan,
|
||||||
|
StopChan: stopChan,
|
||||||
|
State: RepoState{
|
||||||
|
IsRunning: false,
|
||||||
|
LastAttemptStartTime: 1600000000,
|
||||||
|
LastAttemptRunTime: 100,
|
||||||
|
LastAttemptExit: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
expectedRepo2 := Repo{
|
||||||
|
Name: "yoland",
|
||||||
|
SyncType: "csc-sync-standard",
|
||||||
|
FrequencyStr: "daily",
|
||||||
|
Frequency: 86400,
|
||||||
|
MaxTime: 1000,
|
||||||
|
LocalDir: "yoland-releases",
|
||||||
|
RsyncHost: "rsync.releases.yoland.io",
|
||||||
|
RsyncDir: "releases",
|
||||||
|
StateFile: "test_files/yoland",
|
||||||
|
RepoLogFile: "test_files/logs/yoland.log",
|
||||||
|
Logger: Repos[1].Logger,
|
||||||
|
RsyncLogFile: "test_files/rsync/yoland-rsync.log",
|
||||||
|
ZfssyncLogFile: "test_files/zfssync/yoland-zfssync.log",
|
||||||
|
DoneChan: doneChan,
|
||||||
|
StopChan: stopChan,
|
||||||
|
State: RepoState{
|
||||||
|
IsRunning: false,
|
||||||
|
LastAttemptStartTime: 0,
|
||||||
|
LastAttemptRunTime: 0,
|
||||||
|
LastAttemptExit: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(expectedConfig, Conf) {
|
||||||
|
t.Errorf("Config loaded does not match expected config")
|
||||||
|
spew.Dump(expectedConfig)
|
||||||
|
spew.Dump(Conf)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(expectedRepo1, *Repos[0]) {
|
||||||
|
t.Errorf("The eelinux repo loaded does not match the exected repo config")
|
||||||
|
spew.Dump(expectedRepo1)
|
||||||
|
spew.Dump(*Repos[0])
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(expectedRepo2, *Repos[1]) {
|
||||||
|
t.Errorf("The yoland repo loaded does not match the exected repo config")
|
||||||
|
spew.Dump(expectedRepo2)
|
||||||
|
spew.Dump(*Repos[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
os.Remove("test_files/yoland")
|
||||||
|
os.RemoveAll("test_files/logs")
|
||||||
|
os.RemoveAll("test_files/rsync")
|
||||||
|
os.RemoveAll("test_files/zfssync")
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
; default values are commented out
|
||||||
|
|
||||||
|
; max_jobs = 6
|
||||||
|
ipv4_address = 129.97.134.129
|
||||||
|
ipv6_address = 2620:101:f000:4901:c5c::129
|
||||||
|
; default_sync_type = csc-sync-standard
|
||||||
|
default_frequency = daily
|
||||||
|
default_max_time = 1000
|
||||||
|
download_dir = /tmp/test-mirror
|
||||||
|
; password_dir = /home/mirror/passwords
|
||||||
|
states_dir = test_files
|
||||||
|
repo_logs_dir = test_files/logs
|
||||||
|
rsync_logs_dir = test_files/rsync
|
||||||
|
zfssync_logs_dir = test_files/zfssync
|
||||||
|
sock_path = test_files/test.sock
|
||||||
|
|
||||||
|
[eelinux]
|
||||||
|
sync_type = csc-sync-nonstandard
|
||||||
|
frequency = tri-hourly
|
||||||
|
max_time = 2000
|
||||||
|
local_dir = eelinux
|
||||||
|
rsync_host = rsync.releases.eelinux.ca
|
||||||
|
rsync_dir = releases
|
||||||
|
state_file = test_files/eeelinux
|
||||||
|
repo_log_file = test_files/logs/eelinux.log
|
||||||
|
rsync_log_file = test_files/rsync/eelinux.log
|
||||||
|
zfssync_log_file = test_files/zfssync/eelinux.log
|
||||||
|
|
||||||
|
[yoland]
|
||||||
|
; sync_type = csc-sync-standard
|
||||||
|
; frequency = daily
|
||||||
|
; max_time = 1000
|
||||||
|
local_dir = yoland-releases
|
||||||
|
rsync_host = rsync.releases.yoland.io
|
||||||
|
rsync_dir = releases
|
||||||
|
; state_file = test_files/yoland
|
||||||
|
; repo_log_file = test_files/logs/yoland.log
|
||||||
|
; rsync_log_file = test_files/rsync/yoland-rsync.log
|
||||||
|
; zfssync_log_file = test_files/zfssync/yoland-zfssync.log
|
|
@ -0,0 +1,5 @@
|
||||||
|
is_running = false
|
||||||
|
last_attempt_time = 1600000000
|
||||||
|
last_attempt_runtime = 100
|
||||||
|
last_attempt_exit = 1
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
func panicIfErr(e error) {
|
||||||
|
if e != nil {
|
||||||
|
panic(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func touchFile(file string) {
|
||||||
|
fi, err := os.Stat(file)
|
||||||
|
if err != nil {
|
||||||
|
f, err := os.OpenFile(file, os.O_CREATE, 0644)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("unable to create file %s", file))
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
} else if fi.IsDir() {
|
||||||
|
panic(fmt.Errorf("%s is a directory", file))
|
||||||
|
// } else if os.Geteuid() != 1001 {
|
||||||
|
// // mirror is UID 1001
|
||||||
|
// err := os.Chown(file, 1001, os.Getegid())
|
||||||
|
// panicIfErr(err)
|
||||||
|
}
|
||||||
|
err = os.Chmod(file, 0644)
|
||||||
|
panicIfErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func touchFiles(files ...string) {
|
||||||
|
for _, file := range files {
|
||||||
|
touchFile(file)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
module git.csclub.uwaterloo.ca/public/merlin
|
module git.csclub.uwaterloo.ca/public/merlin
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/davecgh/go-spew v1.1.0
|
||||||
github.com/stretchr/testify v1.7.0 // indirect
|
github.com/stretchr/testify v1.7.0 // indirect
|
||||||
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164
|
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164
|
||||||
gopkg.in/ini.v1 v1.63.2
|
gopkg.in/ini.v1 v1.63.2
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package common
|
package logger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
@ -30,12 +30,12 @@ var levels = map[int]string{
|
||||||
var outLogger = log.New(os.Stdout, "", log.LstdFlags)
|
var outLogger = log.New(os.Stdout, "", log.LstdFlags)
|
||||||
var errLogger = log.New(os.Stderr, "", log.LstdFlags)
|
var errLogger = log.New(os.Stderr, "", log.LstdFlags)
|
||||||
|
|
||||||
func OutLogger() *log.Logger {
|
func OutLog(v ...interface{}) {
|
||||||
return outLogger
|
outLogger.Println(v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ErrLogger() *log.Logger {
|
func ErrLog(v ...interface{}) {
|
||||||
return errLogger
|
errLogger.Println(v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLogger(name, file string) *Logger {
|
func NewLogger(name, file string) *Logger {
|
||||||
|
@ -48,15 +48,9 @@ func NewLogger(name, file string) *Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) Log(level int, v ...interface{}) {
|
func (logger *Logger) Log(level int, v ...interface{}) {
|
||||||
if level == INFO {
|
|
||||||
outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
|
|
||||||
} else if level == ERROR {
|
|
||||||
errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errLogger.Println(err.Error())
|
ErrLog(err.Error())
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
|
@ -65,7 +59,7 @@ func (logger *Logger) Log(level int, v ...interface{}) {
|
||||||
args = append(args, v...)
|
args = append(args, v...)
|
||||||
|
|
||||||
logger.SetOutput(f)
|
logger.SetOutput(f)
|
||||||
logger.Println(v...)
|
logger.Println(args)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) Debug(v ...interface{}) {
|
func (logger *Logger) Debug(v ...interface{}) {
|
||||||
|
@ -73,6 +67,9 @@ func (logger *Logger) Debug(v ...interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) Info(v ...interface{}) {
|
func (logger *Logger) Info(v ...interface{}) {
|
||||||
|
// src := []interface{}{logger.name + ":"}
|
||||||
|
// args := append(src, v...)
|
||||||
|
OutLog(v...)
|
||||||
logger.Log(INFO, v...)
|
logger.Log(INFO, v...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,5 +78,6 @@ func (logger *Logger) Warning(v ...interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (logger *Logger) Error(v ...interface{}) {
|
func (logger *Logger) Error(v ...interface{}) {
|
||||||
|
ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...))
|
||||||
logger.Log(ERROR, v...)
|
logger.Log(ERROR, v...)
|
||||||
}
|
}
|
201
merlin/merlin.go
201
merlin/merlin.go
|
@ -1,153 +1,35 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"text/tabwriter"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.csclub.uwaterloo.ca/public/merlin/common"
|
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
|
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/arthur"
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/logger"
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
// get config path from command args
|
||||||
cfg common.Config
|
var CONFIG_PATH = "merlin-config.ini"
|
||||||
outLogger *log.Logger
|
|
||||||
errLogger *log.Logger
|
|
||||||
repoMap map[string]*common.Repo
|
|
||||||
repoIdx int
|
|
||||||
numJobsRunning int
|
|
||||||
)
|
|
||||||
|
|
||||||
func getAndRunCommand(conn net.Conn) {
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
var buf bytes.Buffer
|
|
||||||
_, err := io.Copy(&buf, conn)
|
|
||||||
if err != nil {
|
|
||||||
errLogger.Println(err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
command := buf.String()
|
|
||||||
args := strings.Split(command, ":")
|
|
||||||
respondAndLogErr := func(msg string) {
|
|
||||||
outLogger.Println(msg)
|
|
||||||
conn.Write([]byte(msg))
|
|
||||||
}
|
|
||||||
|
|
||||||
if args[0] == "status" {
|
|
||||||
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
|
|
||||||
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n")
|
|
||||||
|
|
||||||
// for time formating see https://pkg.go.dev/time#pkg-constants
|
|
||||||
for name, repo := range repoMap {
|
|
||||||
fmt.Fprintf(status, "%s\t%s\t%s\n",
|
|
||||||
name,
|
|
||||||
time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123),
|
|
||||||
time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
status.Flush()
|
|
||||||
} else if args[0] == "sync" {
|
|
||||||
if len(args) != 2 {
|
|
||||||
respondAndLogErr("Could not parse sync command, forced sync fails.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if repo, inMap := repoMap[args[1]]; inMap {
|
|
||||||
outLogger.Println("Attempting to force sync of " + repo.Name)
|
|
||||||
if repo.RunIfPossible() {
|
|
||||||
conn.Write([]byte("Forced sync for " + repo.Name))
|
|
||||||
numJobsRunning++
|
|
||||||
} else {
|
|
||||||
respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
respondAndLogErr(args[1] + " is not tracked so cannot sync")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
respondAndLogErr("Received unrecognized command: " + command)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
|
|
||||||
// must remove cfg.SockPath otherwise get "bind: address already in use"
|
|
||||||
if filepath.Ext(cfg.SockPath) != ".sock" {
|
|
||||||
panic(fmt.Errorf("Socket file must end with .sock"))
|
|
||||||
} else if _, err := os.Stat(cfg.SockPath); err == nil {
|
|
||||||
if err := os.Remove(cfg.SockPath); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
} else if !errors.Is(err, os.ErrNotExist) {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ear, err := net.Listen("unix", cfg.SockPath)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
outLogger.Println("Listening to unix socket at " + cfg.SockPath)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
// will exit when ear is closed
|
|
||||||
conn, err := ear.Accept()
|
|
||||||
if err != nil {
|
|
||||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
|
||||||
errLogger.Println("Accepted socket error: " + err.Error())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
errLogger.Println("Unhandlable socket error: " + err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
connChan <- conn
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-stopLisChan
|
|
||||||
ear.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// We use a round-robin strategy. It's not the most efficient, but it's simple
|
|
||||||
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
|
|
||||||
func runAsManyAsPossible() {
|
|
||||||
repos := cfg.Repos
|
|
||||||
startIdx := repoIdx
|
|
||||||
|
|
||||||
for numJobsRunning < cfg.MaxJobs {
|
|
||||||
repo := repos[repoIdx]
|
|
||||||
if repo.RunIfPossible() {
|
|
||||||
numJobsRunning++
|
|
||||||
}
|
|
||||||
repoIdx = (repoIdx + 1) % len(repos)
|
|
||||||
if repoIdx == startIdx {
|
|
||||||
// we've come full circle
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
outLogger = common.OutLogger()
|
|
||||||
errLogger = common.ErrLogger()
|
|
||||||
|
|
||||||
// check that merlin is run as mirror user
|
// check that merlin is run as mirror user
|
||||||
// check that mirror user has pid of 1001
|
// check that mirror user has pid of 1001
|
||||||
|
|
||||||
doneChan := make(chan common.Result)
|
// receives a Result struct when a repo stops syncing
|
||||||
|
doneChan := make(chan config.SyncResult)
|
||||||
|
// closed when merlin is told to stop running
|
||||||
stopChan := make(chan struct{})
|
stopChan := make(chan struct{})
|
||||||
|
// receives a Conn when a client makes a connection to unix socket
|
||||||
connChan := make(chan net.Conn)
|
connChan := make(chan net.Conn)
|
||||||
|
// signal channel to stop listening to unix socket
|
||||||
stopLisChan := make(chan struct{})
|
stopLisChan := make(chan struct{})
|
||||||
|
|
||||||
stopSig := make(chan os.Signal, 1)
|
stopSig := make(chan os.Signal, 1)
|
||||||
|
@ -157,24 +39,38 @@ func main() {
|
||||||
|
|
||||||
unix.Umask(002)
|
unix.Umask(002)
|
||||||
|
|
||||||
numJobsRunning = 0
|
numJobsRunning := 0
|
||||||
|
repoIdx := 0
|
||||||
|
|
||||||
loadConfig := func() {
|
loadConfig := func() {
|
||||||
cfg = common.GetConfig(doneChan, stopChan)
|
config.LoadConfig(CONFIG_PATH, doneChan, stopChan)
|
||||||
outLogger.Println("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg))
|
logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf))
|
||||||
|
|
||||||
repoMap = make(map[string]*common.Repo)
|
|
||||||
for _, repo := range cfg.Repos {
|
|
||||||
repoMap[repo.Name] = repo
|
|
||||||
}
|
|
||||||
|
|
||||||
repoIdx = 0
|
repoIdx = 0
|
||||||
go unixSockListener(connChan, stopLisChan)
|
go arthur.StartListener(connChan, stopLisChan)
|
||||||
|
}
|
||||||
|
// We use a round-robin strategy. It's not the most efficient, but it's simple
|
||||||
|
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
|
||||||
|
runAsManyAsPossible := func() {
|
||||||
|
startIdx := repoIdx
|
||||||
|
|
||||||
|
for numJobsRunning < config.Conf.MaxJobs {
|
||||||
|
repo := config.Repos[repoIdx]
|
||||||
|
if sync.SyncIfPossible(repo) {
|
||||||
|
numJobsRunning++
|
||||||
|
}
|
||||||
|
repoIdx = (repoIdx + 1) % len(config.Repos)
|
||||||
|
if repoIdx == startIdx {
|
||||||
|
// we've come full circle
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loadConfig()
|
loadConfig()
|
||||||
// IsRunning must be false otherwise repo will never sync
|
// ensure that IsRunning is false otherwise repo will never sync
|
||||||
for _, repo := range repoMap {
|
// (only on startup can we assume that repos were not previously syncing)
|
||||||
|
for _, repo := range config.Repos {
|
||||||
repo.State.IsRunning = false
|
repo.State.IsRunning = false
|
||||||
}
|
}
|
||||||
runAsManyAsPossible()
|
runAsManyAsPossible()
|
||||||
|
@ -190,24 +86,39 @@ runLoop:
|
||||||
case <-reloadSig:
|
case <-reloadSig:
|
||||||
stopLisChan <- struct{}{}
|
stopLisChan <- struct{}{}
|
||||||
loadConfig()
|
loadConfig()
|
||||||
|
// ensure that SyncCompleted can handle it if reloading config
|
||||||
|
// removes a repo that was already syncing
|
||||||
|
|
||||||
case done := <-doneChan:
|
case done := <-doneChan:
|
||||||
repoMap[done.Name].SyncCompleted(done.Exit)
|
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
|
||||||
numJobsRunning--
|
numJobsRunning--
|
||||||
|
|
||||||
case conn := <-connChan:
|
case conn := <-connChan:
|
||||||
getAndRunCommand(conn)
|
command, repoName := arthur.GetCommand(conn)
|
||||||
|
switch command {
|
||||||
|
case "status":
|
||||||
|
arthur.SendStatus(conn)
|
||||||
|
case "sync":
|
||||||
|
if arthur.ForceSync(conn, repoName) {
|
||||||
|
numJobsRunning++
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
arthur.SendAndLog(conn, "Received unrecognized command: "+command)
|
||||||
|
}
|
||||||
|
// None of the arthur functions close the connection so you will need to
|
||||||
|
// close it manually for the message to be sent
|
||||||
|
conn.Close()
|
||||||
|
|
||||||
case <-time.After(1 * time.Minute):
|
case <-time.After(1 * time.Minute):
|
||||||
}
|
}
|
||||||
runAsManyAsPossible()
|
runAsManyAsPossible()
|
||||||
}
|
}
|
||||||
|
|
||||||
// give time for jobs to terminate before exiting
|
// give time for all jobs to terminate before exiting program
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case done := <-doneChan:
|
case done := <-doneChan:
|
||||||
repoMap[done.Name].SyncCompleted(done.Exit)
|
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
|
||||||
numJobsRunning--
|
numJobsRunning--
|
||||||
|
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
|
|
|
@ -1,7 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TestSock1(t *testing.T) {
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,50 +1,51 @@
|
||||||
package common
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (repo *Repo) buildRsyncHost() string {
|
func buildRsyncHost(repo *config.Repo) string {
|
||||||
if repo.RsyncUser != "" {
|
if repo.RsyncUser != "" {
|
||||||
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
||||||
}
|
}
|
||||||
return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir
|
return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) buildRsyncDaemonHost() string {
|
func buildRsyncDaemonHost(repo *config.Repo) string {
|
||||||
if repo.RsyncUser != "" {
|
if repo.RsyncUser != "" {
|
||||||
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
||||||
}
|
}
|
||||||
return repo.RsyncHost + "::" + repo.RsyncDir
|
return repo.RsyncHost + "::" + repo.RsyncDir
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) buildRsyncSSHHost() string {
|
func buildRsyncSSHHost(repo *config.Repo) string {
|
||||||
if repo.RsyncUser != "" {
|
if repo.RsyncUser != "" {
|
||||||
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
|
||||||
}
|
}
|
||||||
return repo.RsyncHost + ":" + repo.RsyncDir
|
return repo.RsyncHost + ":" + repo.RsyncDir
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) buildDownloadDir() string {
|
// TODO: remove this (repo.LocalDir should be the full path)
|
||||||
return repo.cfg.DownloadDir + "/" + repo.LocalDir
|
func buildDownloadDir(repo *config.Repo) string {
|
||||||
|
return config.Conf.DownloadDir + "/" + repo.LocalDir
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncApache() []string {
|
func cscSyncApache(repo *config.Repo) []string {
|
||||||
args := []string{
|
args := []string{
|
||||||
"nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links",
|
"nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links",
|
||||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||||
"--exclude", ".~tmp~/",
|
"--exclude", ".~tmp~/",
|
||||||
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||||
}
|
}
|
||||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncArchLinux() []string {
|
func cscSyncArchLinux(repo *config.Repo) []string {
|
||||||
|
|
||||||
tempDir := "" // is this option even needed?
|
tempDir := "" // is this option even needed?
|
||||||
|
|
||||||
|
@ -52,83 +53,83 @@ func (repo *Repo) CSCSyncArchLinux() []string {
|
||||||
args := []string{
|
args := []string{
|
||||||
"rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600",
|
"rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600",
|
||||||
"--contimeout=60", "-p", "--delay-updates", "--no-motd",
|
"--contimeout=60", "-p", "--delay-updates", "--no-motd",
|
||||||
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + repo.cfg.IPv4Address,
|
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address,
|
||||||
}
|
}
|
||||||
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncBadPerms() []string {
|
func cscSyncBadPerms(repo *config.Repo) []string {
|
||||||
args := []string{
|
args := []string{
|
||||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete",
|
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete",
|
||||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||||
"--exclude", ".~tmp~/",
|
"--exclude", ".~tmp~/",
|
||||||
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||||
}
|
}
|
||||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO ceph
|
// TODO ceph
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncCDImage() []string {
|
func cscSyncCDImage(repo *config.Repo) []string {
|
||||||
args := []string{
|
args := []string{
|
||||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
|
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
|
||||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||||
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||||
}
|
}
|
||||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncChmod() []string {
|
func cscSyncChmod(repo *config.Repo) []string {
|
||||||
args := []string{
|
args := []string{
|
||||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links",
|
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links",
|
||||||
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||||
"--chmod=u=rwX,go=rX",
|
"--chmod=u=rwX,go=rX",
|
||||||
}
|
}
|
||||||
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncDebian() []string {
|
func cscSyncDebian(repo *config.Repo) []string {
|
||||||
|
|
||||||
// sync /pool
|
// sync /pool
|
||||||
args := []string{"nice", "rsync", "-rlHtvp",
|
args := []string{"nice", "rsync", "-rlHtvp",
|
||||||
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
|
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
|
||||||
"--address=" + repo.cfg.IPv4Address,
|
"--address=" + config.Conf.IPv4Address,
|
||||||
}
|
}
|
||||||
// $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1
|
// $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncDebianCD() []string {
|
func cscSyncDebianCD(repo *config.Repo) []string {
|
||||||
|
|
||||||
// this is basically the same as CSCSyncDebian, except it has an extra --exclude
|
// this is basically the same as CSCSyncDebian, except it has an extra --exclude
|
||||||
args := []string{"nice", "rsync", "-rlHtvp", "--delete",
|
args := []string{"nice", "rsync", "-rlHtvp", "--delete",
|
||||||
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
|
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
|
||||||
"--address=" + repo.cfg.IPv4Address,
|
"--address=" + config.Conf.IPv4Address,
|
||||||
// "--exclude", "Archive-Update-in-Progress-${HOSTNAME}"
|
// "--exclude", "Archive-Update-in-Progress-${HOSTNAME}"
|
||||||
}
|
}
|
||||||
// $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1
|
// $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncGentoo() []string {
|
func cscSyncGentoo(repo *config.Repo) []string {
|
||||||
repo.RsyncUser = "gentoo"
|
repo.RsyncUser = "gentoo"
|
||||||
repo.PasswordFile = "gentoo-distfiles"
|
repo.PasswordFile = "gentoo-distfiles"
|
||||||
return repo.CSCSyncStandard()
|
return cscSyncStandard(repo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO s3
|
// TODO s3
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncSSH() []string {
|
func cscSyncSSH(repo *config.Repo) []string {
|
||||||
|
|
||||||
args := []string{
|
args := []string{
|
||||||
"rsync", "-aH", "--no-owner", "--no-group", "--delete",
|
"rsync", "-aH", "--no-owner", "--no-group", "--delete",
|
||||||
|
@ -138,48 +139,45 @@ func (repo *Repo) CSCSyncSSH() []string {
|
||||||
}
|
}
|
||||||
// not sure if we should be assuming ssh identity file is the password file
|
// not sure if we should be assuming ssh identity file is the password file
|
||||||
args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile))
|
args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile))
|
||||||
args = append(args, repo.buildRsyncSSHHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncStandard() []string {
|
func cscSyncStandard(repo *config.Repo) []string {
|
||||||
args := []string{
|
args := []string{
|
||||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
||||||
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
|
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
|
||||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||||
}
|
}
|
||||||
if repo.PasswordFile != "" {
|
if repo.PasswordFile != "" {
|
||||||
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
|
filename := config.Conf.PasswordDir + "/" + repo.PasswordFile
|
||||||
args = append(args, "--password-file", filename)
|
args = append(args, "--password-file", filename)
|
||||||
}
|
}
|
||||||
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) CSCSyncStandardIPV6() []string {
|
func cscSyncStandardIPV6(repo *config.Repo) []string {
|
||||||
args := []string{
|
args := []string{
|
||||||
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
|
||||||
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + repo.cfg.IPv4Address,
|
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address,
|
||||||
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
|
||||||
}
|
}
|
||||||
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
|
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
// for testing, to be removed later
|
func cscSyncDummy(repo *config.Repo) []string {
|
||||||
func (repo *Repo) CSCSyncDummy() []string {
|
args := []string{"sleep", "1"}
|
||||||
|
|
||||||
sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10)
|
|
||||||
args := []string{"sleep", sleepDur}
|
|
||||||
|
|
||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
// executes a particular sync job depending on repo.SyncType.
|
// executes a particular sync job depending on repo.SyncType.
|
||||||
func (repo *Repo) getSyncCommand() []string {
|
func getSyncCommand(repo *config.Repo) (args []string) {
|
||||||
/*
|
/*
|
||||||
# scripts used by merlin.py
|
# scripts used by merlin.py
|
||||||
csc-sync-debian
|
csc-sync-debian
|
||||||
|
@ -205,81 +203,47 @@ func (repo *Repo) getSyncCommand() []string {
|
||||||
make-torrents
|
make-torrents
|
||||||
ubuntu-releases-sync
|
ubuntu-releases-sync
|
||||||
*/
|
*/
|
||||||
switch repo.SyncType {
|
if repo.SyncType == "csc-sync-dummy" {
|
||||||
|
return cscSyncDummy(repo)
|
||||||
case "csc-sync-apache":
|
|
||||||
return repo.CSCSyncApache()
|
|
||||||
case "csc-sync-archlinux":
|
|
||||||
return repo.CSCSyncArchLinux()
|
|
||||||
case "csc-sync-badperms":
|
|
||||||
return repo.CSCSyncBadPerms()
|
|
||||||
case "csc-sync-cdimage":
|
|
||||||
return repo.CSCSyncCDImage()
|
|
||||||
// case "csc-sync-ceph":
|
|
||||||
// return repo.CSCSyncCeph()
|
|
||||||
case "csc-sync-chmod":
|
|
||||||
return repo.CSCSyncChmod()
|
|
||||||
case "csc-sync-debian":
|
|
||||||
return repo.CSCSyncDebian()
|
|
||||||
case "csc-sync-debian-cd":
|
|
||||||
return repo.CSCSyncDebianCD()
|
|
||||||
case "csc-sync-gentoo":
|
|
||||||
return repo.CSCSyncGentoo()
|
|
||||||
// case "csc-sync-s3":
|
|
||||||
// return repo.CSCSyncS3()
|
|
||||||
case "csc-sync-ssh":
|
|
||||||
return repo.CSCSyncSSH()
|
|
||||||
case "csc-sync-standard":
|
|
||||||
return repo.CSCSyncStandard()
|
|
||||||
case "csc-sync-standard-ipv6":
|
|
||||||
return repo.CSCSyncStandardIPV6()
|
|
||||||
// case "csc-sync-wget":
|
|
||||||
// return repo.CSCSyncWget()
|
|
||||||
case "csc-sync-dummy":
|
|
||||||
return repo.CSCSyncDummy()
|
|
||||||
default:
|
|
||||||
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return []string{}
|
localDir := buildDownloadDir(repo)
|
||||||
}
|
if err := os.MkdirAll(localDir, 0775); err != nil {
|
||||||
|
|
||||||
func (repo *Repo) StartSyncJob() {
|
|
||||||
status := FAILURE
|
|
||||||
defer func() {
|
|
||||||
repo.DoneChan <- Result{
|
|
||||||
Name: repo.Name,
|
|
||||||
Exit: status,
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
localDir := repo.buildDownloadDir()
|
|
||||||
err := os.MkdirAll(localDir, 0775)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
|
|
||||||
// I'm not sure if logger can handle error so just use the string?
|
|
||||||
repo.Logger.Error(err.Error())
|
repo.Logger.Error(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
args := repo.getSyncCommand()
|
switch repo.SyncType {
|
||||||
if repo.PasswordFile != "" {
|
case "csc-sync-apache":
|
||||||
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
|
return cscSyncApache(repo)
|
||||||
args = append(args, "--password-file", filename)
|
case "csc-sync-archlinux":
|
||||||
}
|
return cscSyncArchLinux(repo)
|
||||||
args = append(args, repo.buildRsyncHost(), localDir)
|
case "csc-sync-badperms":
|
||||||
|
return cscSyncBadPerms(repo)
|
||||||
ch := SpawnProcess(repo, args)
|
case "csc-sync-cdimage":
|
||||||
if ch == nil {
|
return cscSyncCDImage(repo)
|
||||||
// SpawnProcess will have already logged error
|
// case "csc-sync-ceph":
|
||||||
return
|
// return cscSyncCeph(repo)
|
||||||
}
|
case "csc-sync-chmod":
|
||||||
cmd := <-ch
|
return cscSyncChmod(repo)
|
||||||
switch cmd.ProcessState.ExitCode() {
|
case "csc-sync-debian":
|
||||||
case 0:
|
return cscSyncDebian(repo)
|
||||||
status = SUCCESS
|
case "csc-sync-debian-cd":
|
||||||
case -1:
|
return cscSyncDebianCD(repo)
|
||||||
status = TERMINATED
|
case "csc-sync-gentoo":
|
||||||
// default is already FAILURE
|
return cscSyncGentoo(repo)
|
||||||
|
// case "csc-sync-s3":
|
||||||
|
// return cscSyncS3(repo)
|
||||||
|
case "csc-sync-ssh":
|
||||||
|
return cscSyncSSH(repo)
|
||||||
|
case "csc-sync-standard":
|
||||||
|
return cscSyncStandard(repo)
|
||||||
|
case "csc-sync-standard-ipv6":
|
||||||
|
return cscSyncStandardIPV6(repo)
|
||||||
|
// case "csc-sync-wget":
|
||||||
|
// return cscSyncWget(repo)
|
||||||
|
default:
|
||||||
|
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
package sync
|
|
@ -0,0 +1,61 @@
|
||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
|
||||||
|
// and is not currently running.
|
||||||
|
// returns true iff a job is started.
|
||||||
|
func SyncIfPossible(repo *config.Repo) bool {
|
||||||
|
// Change to SyncIfPossible
|
||||||
|
if repo.State.IsRunning {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
curTime := time.Now().Unix()
|
||||||
|
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
|
||||||
|
repo.State.IsRunning = true
|
||||||
|
repo.State.LastAttemptStartTime = curTime
|
||||||
|
repo.SaveState()
|
||||||
|
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
|
||||||
|
go startSync(repo)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// update the repo state with the last attempt time and exit now that the job is done
|
||||||
|
func SyncCompleted(repo *config.Repo, exit int) {
|
||||||
|
repoState := repo.State
|
||||||
|
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
|
||||||
|
nextSync := repo.MaxTime - int(syncTook)
|
||||||
|
if nextSync < 0 {
|
||||||
|
nextSync = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
repoState.IsRunning = false
|
||||||
|
repoState.LastAttemptExit = exit
|
||||||
|
repoState.LastAttemptRunTime = syncTook
|
||||||
|
|
||||||
|
var exitStr string
|
||||||
|
switch exit {
|
||||||
|
case config.SUCCESS:
|
||||||
|
exitStr = "completed"
|
||||||
|
case config.TERMINATED:
|
||||||
|
exitStr = "terminated"
|
||||||
|
default:
|
||||||
|
exitStr = "failed"
|
||||||
|
}
|
||||||
|
repo.SaveState()
|
||||||
|
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
|
||||||
|
|
||||||
|
// TODO: make it possible for this SyncCompleted to be run without zfsSync being run
|
||||||
|
if exit == config.SUCCESS {
|
||||||
|
// it is possible that the zfssync from the last repo sync is still running is that fine?
|
||||||
|
go zfsSync(repo)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,10 +1,13 @@
|
||||||
package common
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.csclub.uwaterloo.ca/public/merlin/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SpawnProcess spawns a child process for the given repo. The process will
|
// SpawnProcess spawns a child process for the given repo. The process will
|
||||||
|
@ -12,7 +15,7 @@ import (
|
||||||
// runs for longer than the repo's MaxTime.
|
// runs for longer than the repo's MaxTime.
|
||||||
// It returns a channel through which a Cmd will be sent once it has finished,
|
// It returns a channel through which a Cmd will be sent once it has finished,
|
||||||
// or nil if it was unable to start a process.
|
// or nil if it was unable to start a process.
|
||||||
func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
|
func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
|
||||||
// startTime and time took will be handled in common.go by SyncExit
|
// startTime and time took will be handled in common.go by SyncExit
|
||||||
cmd := exec.Command(args[0], args[1:]...)
|
cmd := exec.Command(args[0], args[1:]...)
|
||||||
|
|
||||||
|
@ -55,6 +58,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
|
||||||
case <-cmdDoneChan:
|
case <-cmdDoneChan:
|
||||||
if !cmd.ProcessState.Success() {
|
if !cmd.ProcessState.Success() {
|
||||||
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
|
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
|
||||||
|
// repo.Logger.Info(strings.Join(args, " "))
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-repo.StopChan:
|
case <-repo.StopChan:
|
||||||
|
@ -68,3 +72,48 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
|
||||||
}()
|
}()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func startSync(repo *config.Repo) {
|
||||||
|
status := config.FAILURE
|
||||||
|
defer func() {
|
||||||
|
repo.DoneChan <- config.SyncResult{
|
||||||
|
Name: repo.Name,
|
||||||
|
Exit: status,
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
args := getSyncCommand(repo)
|
||||||
|
if len(args) == 0 {
|
||||||
|
// getSyncCommand will have already logged error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := spawnSyncProcess(repo, args)
|
||||||
|
if ch == nil {
|
||||||
|
// spawnSyncProcess will have already logged error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cmd := <-ch
|
||||||
|
switch cmd.ProcessState.ExitCode() {
|
||||||
|
case 0:
|
||||||
|
status = config.SUCCESS
|
||||||
|
case -1:
|
||||||
|
status = config.TERMINATED
|
||||||
|
// default is already FAILURE
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func zfsSync(repo *config.Repo) {
|
||||||
|
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
repo.Logger.Error(err)
|
||||||
|
} else {
|
||||||
|
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
||||||
|
if err != nil {
|
||||||
|
repo.Logger.Error(err.Error())
|
||||||
|
} else {
|
||||||
|
f.Write(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue