write merlin.go

This commit is contained in:
Max Erenberg 2021-09-17 00:31:49 -04:00
parent a7207011ba
commit db2815029d
9 changed files with 488 additions and 1 deletions

2
merlin/.gitignore vendored
View File

@ -4,4 +4,4 @@
/rebuild_logs
/stamps
merlin.sock
/merlin

28
merlin/README.md Normal file
View File

@ -0,0 +1,28 @@
This folder contains the code for merlin (which does the actual syncing) and
arthur (which sends commands to merlin).
## Checklist
- [x] write process manager
- [ ] implement all sync types (csc-sync-debian, csc-sync-apache, etc.)
- [ ] save state (last attempted time, last attempted status) for each repo,
and restore state on startup (e.g. use JSON/INI file for each repo)
- [ ] use separate log file for each child process (currently sharing stdout/stderr with parent)
- [ ] calculate difference between the scheduled time of a job and the time at which it actually
ran; log this
- [ ] add all repos to merlin-config.ini
- [ ] listen on Unix socket in merlin
- [ ] implement arthur.go (commands: sync and status)
- [ ] implement zfssync in merlin (just invoke the existing Python script)
- [ ] handle termination signals in merlin (SIGINT, SIGTERM); close stopChan for this
- [ ] allow dynamic reloading in merlin (\*)
- [ ] detect if an rsync process is stuck (\*\*)
- [ ] place each rsync process in a separate cgroup (\*\*\*)
\* This is optional because the current version of merlin doesn't support it. If it turns
out to be complicated, then don't worry about it. If you decide to implement it, use SIGHUP
as the reload signal.
\*\* This is optional. If you decide to implement it, you may want to watch the
stdout/stderr of the rsync process.
\*\*\* I need to do more research into this - don't worry about it for now.

249
merlin/common/common.go Normal file
View File

@ -0,0 +1,249 @@
package common
import (
"fmt"
"os"
"time"
ini "gopkg.in/ini.v1"
)
const (
DAILY = 86400
TWICE_DAILY = DAILY / 2
HOURLY = 3600
BI_HOURLY = HOURLY * 2
TRI_HOURLY = HOURLY * 3
TWICE_HOURLY = HOURLY / 2
TEN_MINUTELY = 600
FIVE_MINUTELY = 300
CONFIG_PATH = "merlin-config.ini"
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_SOCK_PATH = "/run/merlin/merlin.sock"
DEFAULT_LOG_FILE = "/home/mirror/merlin/logs/transfer.log"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
)
var frequencies = map[string]int{
"daily": DAILY,
"twice-daily": TWICE_DAILY,
"hourly": HOURLY,
"bi-hourly": BI_HOURLY,
"tri-hourly": TRI_HOURLY,
"twice-hourly": TWICE_HOURLY,
"ten-minutely": TEN_MINUTELY,
"five-minutely": FIVE_MINUTELY,
}
// Last job attempt statuses
const (
NOT_RUN_YET = iota
SUCCESS
FAILURE
TERMINATED // was killed by a signal
)
type Repo struct {
// this should be one of "csc-sync-standard", etc.
SyncType string `ini:"sync_type"`
// a human-readable frequency, e.g. "bi-hourly"
FrequencyStr string `ini:"frequency"`
// the maximum time (in seconds) that each child process of this repo
// can for before being killed
MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download
// dir in the config)
LocalDir string `ini:"local_dir"`
// the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the log file for rsync (optional, defaults to Config.LogFile)
LogFile string `ini:"log_file"`
// the desired interval (in seconds) between successive runs
Frequency int `ini:"-"`
// the name of this repo
Name string `ini:"-"`
Logger *Logger `ini:"-"`
// the repo should stop syncing if StopChan is closed
StopChan <-chan bool `ini:"-"`
// the repo will write its name to DoneChan when it has finished a job
// (successfully or unsuccessfully)
DoneChan chan<- string `ini:"-"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptTime int64 `ini:"-"`
// whether the last attempt was successful or not
LastAttemptStatus int `ini:"-"`
// a reference to the global config
cfg *Config
// whether this repo is running a job or not
isRunning bool
}
type Config struct {
// the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"`
// the default MaxTime for each repo
MaxTime int `ini:"max_time"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
// the default LogFile for each repo
LogFile string `ini:"log_file"`
// the IP addresses to use for rsync
IPv4Address string `ini:"ipv4_address"`
IPv6Address string `ini:"ipv6_address"`
// the directory where rsync passwords are stored
PasswordDir string `ini:"password_dir"`
// the directory where rsync should download files
DownloadDir string `ini:"download_dir"`
// the DoneChan for each repo (shared instance)
DoneChan <-chan string `ini:"-"`
// a list of all of the repos
Repos []*Repo `ini:"-"`
}
// IsRunning returns true if the repo is currently running a sync job.
func (repo *Repo) IsRunning() bool {
return repo.isRunning
}
// CSCSyncStandard performs a standard rsync job.
func (repo *Repo) CSCSyncStandard() {
startTime := time.Now().Unix()
status := FAILURE
defer func() {
repo.LastAttemptTime = startTime
repo.LastAttemptStatus = status
repo.isRunning = false
repo.DoneChan <- repo.Name
}()
localDir := repo.cfg.DownloadDir + "/" + repo.LocalDir
err := os.MkdirAll(localDir, 0775)
if err != nil {
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
repo.Logger.Error(err)
return
}
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
address := repo.cfg.IPv4Address
logFile := repo.LogFile
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + logFile,
}
if repo.PasswordFile != "" {
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename)
}
args = append(args, "rsync://"+rsyncHost+"/"+repo.RsyncDir, localDir)
ch := SpawnProcess(repo, args)
if ch == nil {
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = SUCCESS
case -1:
status = TERMINATED
// default is already FAILURE
}
}
// StartSyncJob executes a particular sync job depending on repo.SyncType.
func (repo *Repo) StartSyncJob() {
switch repo.SyncType {
case "csc-sync-standard":
repo.CSCSyncStandard()
default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
}
}
// RunIfScheduled starts a sync job for this repo if more than repo.Frequency
// seconds have elapsed since its last job.
// It returns true iff a job is started.
func (repo *Repo) RunIfScheduled() bool {
// sanity check; don't run if a job is already running
if repo.isRunning {
return false
}
if time.Now().Unix()-repo.LastAttemptTime > int64(repo.Frequency) {
// this should be set in the caller's thread so that the check
// above will always work
repo.isRunning = true
go repo.StartSyncJob()
return true
}
return false
}
// GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo.
// It returns a Config.
func GetConfig() Config {
data, err := ini.Load(CONFIG_PATH)
if err != nil {
panic(err)
}
cfg := Config{
MaxJobs: DEFAULT_MAX_JOBS,
MaxTime: DEFAULT_MAX_TIME,
SockPath: DEFAULT_SOCK_PATH,
LogFile: DEFAULT_LOG_FILE,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
Repos: make([]*Repo, 0),
}
err = data.MapTo(&cfg)
if err != nil {
panic(err)
}
if cfg.IPv4Address == "" {
panic("Missing IPv4 address from config")
} else if cfg.IPv6Address == "" {
panic("Missing IPv6 address from config")
}
doneChan := make(chan string)
cfg.DoneChan = doneChan
for _, section := range data.Sections() {
if section.Name() == "DEFAULT" {
continue
}
repo := Repo{Name: section.Name()}
err = section.MapTo(&repo)
if err != nil {
panic(err)
}
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.MaxTime == 0 {
repo.MaxTime = cfg.MaxTime
}
repo.Logger = NewLogger(repo.Name)
repo.DoneChan = doneChan
repo.StopChan = make(chan bool, 1)
repo.cfg = &cfg
// TODO: save and restore LastAttempt info (from stamps directory)
repo.LastAttemptTime = 0
repo.LastAttemptStatus = NOT_RUN_YET
cfg.Repos = append(cfg.Repos, &repo)
}
if len(cfg.Repos) == 0 {
panic("No repos found in config")
}
return cfg
}

56
merlin/common/logger.go Normal file
View File

@ -0,0 +1,56 @@
package common
import (
"log"
"os"
)
type Logger struct {
*log.Logger
name string
}
const (
DEBUG = 1 << iota
INFO
WARNING
ERROR
)
var levels = map[int]string{
DEBUG: "DEBUG",
INFO: "INFO",
WARNING: "WARNING",
ERROR: "ERROR",
}
func NewLogger(name string) *Logger {
logger := Logger{
Logger: log.New(os.Stderr, "", 0),
name: name,
}
return &logger
}
func (logger *Logger) Log(level int, v ...interface{}) {
levelStr := levels[level]
args := []interface{}{levelStr + ":", logger.name + ":"}
args = append(args, v...)
logger.Println(args...)
}
func (logger *Logger) Debug(v ...interface{}) {
logger.Log(DEBUG, v...)
}
func (logger *Logger) Info(v ...interface{}) {
logger.Log(INFO, v...)
}
func (logger *Logger) Warning(v ...interface{}) {
logger.Log(WARNING, v...)
}
func (logger *Logger) Error(v ...interface{}) {
logger.Log(ERROR, v...)
}

View File

@ -0,0 +1,71 @@
package common
import (
"fmt"
"os"
"os/exec"
"syscall"
"time"
)
// SpawnProcess spawns a child process for the given repo. The process will
// be stopped early if the repo receives a stop signal, or if the process
// runs for longer than the repo's MaxTime.
// It returns a channel through which a Cmd will be sent once it has finished,
// or nil if it was unable to start a process.
func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
startTime := time.Now().Unix()
repo.Logger.Debug("Starting process")
err := cmd.Start()
if err != nil {
err = fmt.Errorf("Could not start process %s: %w", args[0], err)
repo.Logger.Error(err)
return
}
cmdChan := make(chan *exec.Cmd)
ch = cmdChan
procDoneChan := make(chan bool, 1)
killProcess := func() {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
repo.Logger.Error("Could not send signal to process:", err)
return
}
select {
case <-time.After(30 * time.Second):
repo.Logger.Warning("Process still hasn't stopped; sending SIGKILL now")
cmd.Process.Signal(syscall.SIGKILL)
case <-procDoneChan:
repo.Logger.Debug("Process has stopped.")
}
}
go func() {
cmd.Wait()
procDoneChan <- true
}()
go func() {
defer func() {
cmdChan <- cmd
}()
select {
case <-repo.StopChan:
repo.Logger.Info("Received signal to stop, killing process...")
killProcess()
case <-procDoneChan:
if cmd.ProcessState.Success() {
repo.Logger.Debug("Process ended successfully")
} else {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
}
timeTook := time.Now().Unix() - startTime
repo.Logger.Debug(fmt.Sprintf("Process took %d seconds", timeTook))
case <-time.After(time.Duration(repo.MaxTime) * time.Second):
repo.Logger.Warning("Process has exceeded its max time; killing now")
killProcess()
}
}()
return
}

9
merlin/go.mod Normal file
View File

@ -0,0 +1,9 @@
module git.csclub.uwaterloo.ca/public/merlin
require (
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164
gopkg.in/ini.v1 v1.63.2
)
go 1.13

15
merlin/go.sum Normal file
View File

@ -0,0 +1,15 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164 h1:7ZDGnxgHAMw7thfC5bEos0RDAccZKxioiWBhfIe+tvw=
golang.org/x/sys v0.0.0-20210915083310-ed5796bab164/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c=
gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

11
merlin/merlin-config.ini Normal file
View File

@ -0,0 +1,11 @@
ipv4_address = 129.97.134.129
ipv6_address = 2620:101:f000:4901:c5c::129
download_dir = /tmp/test-mirror
log_file = /tmp/test-mirror/test.log
[ubuntu-releases]
sync_type = csc-sync-standard
frequency = bi-hourly
local_dir = ubuntu-releases
rsync_host = rsync.releases.ubuntu.com
rsync_dir = releases

48
merlin/merlin.go Normal file
View File

@ -0,0 +1,48 @@
package main
import (
"fmt"
"git.csclub.uwaterloo.ca/public/merlin/common"
"golang.org/x/sys/unix"
"time"
)
func main() {
logger := common.NewLogger("main")
cfg := common.GetConfig()
logger.Debug("Read config:")
logger.Debug(fmt.Sprintf("%+v\n", cfg))
unix.Umask(002)
repos := cfg.Repos
doneChan := cfg.DoneChan
repoIdx := 0
numJobsRunning := 0
runAsManyAsPossible := func() {
// We use a round-robin strategy. It's not the most efficient,
// but it's simple (read: easy to understand) and guarantees
// that each repo will eventually get a chance to run.
startIdx := repoIdx
for numJobsRunning < cfg.MaxJobs {
repo := repos[repoIdx]
if !repo.IsRunning() && repo.RunIfScheduled() {
numJobsRunning++
}
repoIdx = (repoIdx + 1) % len(repos)
if repoIdx == startIdx {
// we've come full circle
break
}
}
}
runAsManyAsPossible()
for {
select {
case <-doneChan:
numJobsRunning--
case <-time.After(1 * time.Minute):
}
runAsManyAsPossible()
}
}