zfssync and finish dynamic reloading

This commit is contained in:
Andrew Wang 2021-11-14 17:22:32 -05:00
parent cf13b6abfe
commit 644178d533
6 changed files with 209 additions and 168 deletions

View File

@ -17,8 +17,7 @@ This folder contains the code for merlin (which does the actual syncing) and art
- [ ] wget
### TODO
- [ ] implement zfssync in merlin (just invoke the existing Python script)
- [ ] use separate log file for each child process (currently sharing stdout/stderr with parent)
- [ ] fix logging
- [ ] detect if an rsync process is stuck (\*\*)
- [ ] place each rsync process in a separate cgroup (\*\*\*)
@ -35,8 +34,8 @@ stdout/stderr of the rsync process.
- [x] handle termination signals in merlin (SIGINT, SIGTERM); close stopChan for this
- [x] listen on Unix socket in merlin
- [x] implement arthur.go (commands: sync and status)
- [x] allow dynamic reloading in merlin (\*\*)
- [x] allow dynamic reloading in merlin
- [x] use separate log file for each child process (currently sharing stdout/stderr with parent)
- [x] implement zfssync in merlin (just invoke the existing Python script)
\* there are some parts that I don't understand (trace_host, csc-sync-ceph, csc-sync-saltstack, etc)
\*\* may be a good idea to stop all repo syncs? (or just the ones not in the new config after the reload?)

View File

@ -1,7 +1,9 @@
package common
import (
"fmt"
"os"
"os/exec"
"time"
ini "gopkg.in/ini.v1"
@ -80,9 +82,9 @@ type Repo struct {
// a reference to the logger
Logger *Logger `ini:"-"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job
// when it has finished a job (shared by all repos)
DoneChan chan<- Result `ini:"-"`
// the repo should stop syncing if StopChan is closed
// the repo should stop syncing if StopChan is closed (shared by all repos)
StopChan chan struct{} `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
@ -112,8 +114,6 @@ type Config struct {
StateDir string `ini:"states_dir"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
// the DoneChan for each repo (shared instance)
DoneChan <-chan Result `ini:"-"`
// a list of all of the repos
Repos []*Repo `ini:"-"`
}
@ -123,15 +123,15 @@ type RepoState struct {
// these are stored in the states folder
// whether this repo is running a job or not
IsRunning bool `ini:"is_running"`
// whether the last attempt was successful or not
LastAttemptExit int `ini:"last_attempt_exit"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptTime int64 `ini:"last_attempt_time"`
LastAttemptStartTime int64 `ini:"last_attempt_time"`
// the number of seconds this repo ran for during its last attempted job
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
// whether the last attempt was successful or not
LastAttemptExit int `ini:"last_attempt_exit"`
}
// save the save the current state of a repo to a file
// save the current state of the repo to a file
func (repo *Repo) SaveState() {
state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
@ -146,50 +146,68 @@ func (repo *Repo) SaveState() {
}
}
// RunIfScheduled starts a sync job for this repo if more than repo.Frequency
// seconds have elapsed since its last job.
// It returns true iff a job is started.
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
// and is not currently running.
// returns true iff a job is started.
func (repo *Repo) RunIfPossible() bool {
// don't run if a job is already running
if repo.State.IsRunning {
return false
}
// this should be set in the caller's thread so that the "if" will work
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptTime > int64(repo.Frequency) {
// log that job has started
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptTime = curTime
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
go repo.StartSyncJob()
repo.Logger.Info("Sync has begun")
return true
}
return false
}
// func zfsSync(repo string) {
// // /home/mirror/bin/zfssync
// cmd := []string{"zfssync", repo}
// SpawnProcess(logger, cmd)
// }
func zfsSync(repo *Repo) {
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
if err != nil {
repo.Logger.Error(err)
} else {
repo.Logger.Debug(string(out))
}
}
// update the repo state with the last attempt time and exit now that the job is done
func (repo *Repo) SyncCompleted(exit int) {
repoState := repo.State
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
nextSync := repo.MaxTime - int(syncTook)
if nextSync < 0 {
nextSync = 0
}
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptTime = time.Now().Unix() - repoState.LastAttemptTime
// repo.Logger.Debug(fmt.Sprintf("Process exited after %d seconds", repoState.LastAttemptTime))
repoState.LastAttemptRunTime = syncTook
var exitStr string
switch exit {
case SUCCESS:
exitStr = "completed"
case TERMINATED:
exitStr = "terminated"
default:
exitStr = "failed"
}
repo.SaveState()
// implement zfssync in merlin (just invoke the existing Python script)
// go zfsSync(repo.Name)
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
if exit == SUCCESS {
go zfsSync(repo)
}
}
// GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo.
// It returns a Config.
func GetConfig() Config {
func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
// add global configuration in cfg
data, err := ini.Load(CONFIG_PATH)
if err != nil {
@ -209,7 +227,6 @@ func GetConfig() Config {
panic(err)
}
// validate global configuration
if err := os.MkdirAll(cfg.LogDir, 0755); err != nil {
panic("Could not create log path at " + cfg.LogDir)
} else if err := os.MkdirAll(cfg.StateDir, 0755); err != nil {
@ -220,25 +237,24 @@ func GetConfig() Config {
panic("Missing IPv6 address from config")
}
doneChan := make(chan Result, cfg.MaxJobs)
cfg.DoneChan = doneChan
// add each repo configuration to cfg
for _, section := range data.Sections() {
if section.Name() == "DEFAULT" {
continue
}
// create the repo configuration
repo := Repo{
Name: section.Name(),
SyncType: cfg.SyncType,
FrequencyStr: cfg.FrequencyStr,
MaxTime: cfg.MaxTime,
DoneChan: doneChan,
StopChan: stopChan,
}
if err := section.MapTo(&repo); err != nil {
panic(err)
}
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
@ -246,34 +262,32 @@ func GetConfig() Config {
panic("Missing frequency from " + repo.Name)
}
repo.Logger = NewLogger(repo.Name)
repo.DoneChan = doneChan
repo.StopChan = make(chan struct{})
repo.cfg = &cfg
// create the default repo state
repo.State = RepoState{
IsRunning: false,
LastAttemptExit: NOT_RUN_YET,
LastAttemptTime: 0,
LastAttemptRunTime: 0,
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: NOT_RUN_YET,
}
// create the state file if it does not exist otherwise sync the state
// create the state file if it does not exist, otherwise load it from existing file
repoStateFile := cfg.StateDir + "/" + repo.Name
if _, err := os.Stat(repoStateFile); err != nil {
repo.SaveState()
} else if err := ini.MapTo(&repo.State, repoStateFile); err != nil {
panic(err)
} else {
err := ini.MapTo(&repo.State, repoStateFile)
if err != nil {
panic(err)
}
}
// repo state must be initially not running, otherwise will never run
repo.State.IsRunning = false
// append a reference to the new repo in the slice of repos
cfg.Repos = append(cfg.Repos, &repo)
}
if len(cfg.Repos) == 0 {
panic("No repos found in config")
}
return cfg
}

View File

@ -8,6 +8,7 @@ import (
type Logger struct {
*log.Logger
name string
// path string
}
const (
@ -29,10 +30,12 @@ var levels = map[int]string{
ERROR: "ERROR",
}
// func NewLogger(name string, dir string) *Logger {
func NewLogger(name string) *Logger {
logger := Logger{
Logger: log.New(os.Stderr, "", 0),
name: name,
// path: dir + name,
}
return &logger
}

View File

@ -1,8 +1,8 @@
package common
import (
"bufio"
"fmt"
"os"
"os/exec"
"syscall"
"time"
@ -17,26 +17,12 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
// startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...)
// TODO: change stdout and stderr to something else
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
// I have no idea how to do this
stdoutScan := bufio.NewScanner(stdout)
stderrScan := bufio.NewScanner(stderr)
// stdout, err := cmd.StdoutPipe()
// if err != nil {
// repo.Logger.Warning(err)
// }
// stderr, err := cmd.StderrPipe()
// if err != nil {
// repo.Logger.Warning(err)
// }
// multi := io.MultiReader(stdout, stderr)
// in := bufio.NewScanner(multi)
// repo.Logger.Debug(in.Text())
// repo.Logger.Warning(in.Err())
// startTime := time.Now().Unix()
repo.Logger.Debug("Starting process")
if err := cmd.Start(); err != nil {
err = fmt.Errorf("could not start process %s: %w", args[0], err)
@ -44,9 +30,27 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
return
}
// pass stdout into logger as debug
go func() {
for stdoutScan.Scan() {
hold := stdoutScan.Text()
repo.Logger.Debug(hold)
}
cmd.Wait()
}()
// pass stderr into logger as warning
go func() {
for stderrScan.Scan() {
hold := stderrScan.Text()
repo.Logger.Warning(hold)
}
cmd.Wait()
}()
cmdChan := make(chan *exec.Cmd)
ch = cmdChan
procDoneChan := make(chan bool, 1)
cmdDoneChan := make(chan struct{})
killProcess := func() {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
@ -57,14 +61,14 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
case <-time.After(30 * time.Second):
repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL")
cmd.Process.Signal(syscall.SIGKILL)
case <-procDoneChan:
case <-cmdDoneChan:
repo.Logger.Debug("Process has stopped.")
}
}
go func() {
cmd.Wait()
procDoneChan <- true
close(cmdDoneChan)
}()
go func() {
@ -72,18 +76,15 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
cmdChan <- cmd
}()
select {
case <-cmdDoneChan:
if !cmd.ProcessState.Success() {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
}
case <-repo.StopChan:
repo.Logger.Info("Received signal to stop, killing process...")
killProcess()
case <-procDoneChan:
// the following could be moved to SyncExit in common.go
if cmd.ProcessState.Success() {
repo.Logger.Debug("Process ended successfully")
} else {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
}
// timeTook := time.Now().Unix() - startTime
// repo.Logger.Debug(fmt.Sprintf("Process took %d seconds", timeTook))
case <-time.After(time.Duration(repo.MaxTime) * time.Second):
repo.Logger.Warning("Process has exceeded its max time; killing now")
killProcess()

View File

@ -2,9 +2,9 @@ package common
import (
"fmt"
"math/rand"
"os"
"strconv"
"math/rand"
)
func (repo *Repo) buildRsyncHost() string {
@ -120,14 +120,6 @@ func (repo *Repo) CSCSyncDebianCD() []string {
return args
}
func (repo *Repo) CSCSyncDummy() []string {
sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10)
args := []string{"sleep", sleepDur}
return args
}
func (repo *Repo) CSCSyncGentoo() []string {
repo.RsyncUser = "gentoo"
repo.PasswordFile = "gentoo-distfiles"
@ -177,7 +169,14 @@ func (repo *Repo) CSCSyncStandardIPV6() []string {
return args
}
// TODO wget
// for testing, to be removed later
func (repo *Repo) CSCSyncDummy() []string {
sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10)
args := []string{"sleep", sleepDur}
return args
}
// executes a particular sync job depending on repo.SyncType.
func (repo *Repo) getSyncCommand() []string {
@ -208,36 +207,36 @@ func (repo *Repo) getSyncCommand() []string {
*/
switch repo.SyncType {
case "csc-sync-apache":
return repo.CSCSyncApache()
case "csc-sync-archlinux":
return repo.CSCSyncArchLinux()
case "csc-sync-badperms":
return repo.CSCSyncBadPerms()
case "csc-sync-cdimage":
return repo.CSCSyncCDImage()
// case "csc-sync-ceph":
// return repo.CSCSyncCeph()
case "csc-sync-chmod":
return repo.CSCSyncChmod()
case "csc-sync-debian":
return repo.CSCSyncDebian()
case "csc-sync-debian-cd":
return repo.CSCSyncDebianCD()
case "csc-sync-dummy":
return repo.CSCSyncDummy()
case "csc-sync-gentoo":
return repo.CSCSyncGentoo()
// case "csc-sync-s3":
// return repo.CSCSyncS3()
case "csc-sync-ssh":
return repo.CSCSyncSSH()
case "csc-sync-standard":
return repo.CSCSyncStandard()
case "csc-sync-standard-ipv6":
return repo.CSCSyncStandardIPV6()
case "csc-sync-apache":
return repo.CSCSyncApache()
case "csc-sync-archlinux":
return repo.CSCSyncArchLinux()
case "csc-sync-badperms":
return repo.CSCSyncBadPerms()
case "csc-sync-cdimage":
return repo.CSCSyncCDImage()
// case "csc-sync-ceph":
// return repo.CSCSyncCeph()
case "csc-sync-chmod":
return repo.CSCSyncChmod()
case "csc-sync-debian":
return repo.CSCSyncDebian()
case "csc-sync-debian-cd":
return repo.CSCSyncDebianCD()
case "csc-sync-gentoo":
return repo.CSCSyncGentoo()
// case "csc-sync-s3":
// return repo.CSCSyncS3()
case "csc-sync-ssh":
return repo.CSCSyncSSH()
case "csc-sync-standard":
return repo.CSCSyncStandard()
case "csc-sync-standard-ipv6":
return repo.CSCSyncStandardIPV6()
// case "csc-sync-wget":
// return repo.CSCSyncWget()
case "csc-sync-dummy":
return repo.CSCSyncDummy()
default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
}
@ -271,7 +270,7 @@ func (repo *Repo) StartSyncJob() {
ch := SpawnProcess(repo, args)
if ch == nil {
// Log that something failed?
repo.Logger.Error("Unable to start sync process")
return
}
cmd := <-ch
@ -283,4 +282,3 @@ func (repo *Repo) StartSyncJob() {
// default is already FAILURE
}
}

View File

@ -7,6 +7,7 @@ import (
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"text/tabwriter"
@ -19,28 +20,28 @@ import (
var (
cfg common.Config
logger *common.Logger
repos []*common.Repo
repoMap map[string]*common.Repo
repoIdx int
numJobsRunning int
doneChan <-chan common.Result
)
func runCommand(conn net.Conn) {
func getAndRunCommand(conn net.Conn) {
defer conn.Close()
var buf bytes.Buffer
n, err := io.Copy(&buf, conn)
_, err := io.Copy(&buf, conn)
if err != nil {
// log warning
return
} else if n == 0 {
// log warning
logger.Warning(err)
return
}
command := buf.String()
args := strings.Split(command, ":")
respAndLog := func(msg string) {
logger.Info(msg)
conn.Write([]byte(msg))
}
if args[0] == "status" {
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
@ -50,7 +51,7 @@ func runCommand(conn net.Conn) {
for name, repo := range repoMap {
fmt.Fprintf(status, "%s\t%s\t%s\n",
name,
time.Unix(repo.State.LastAttemptTime, 0).Format(time.RFC1123),
time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123),
time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123),
)
}
@ -58,48 +59,62 @@ func runCommand(conn net.Conn) {
status.Flush()
} else if args[0] == "sync" {
if len(args) != 2 {
// log invalid command
respAndLog("Could not parse sync command, forced sync fails.")
return
}
if repo, inMap := repoMap[args[1]]; inMap {
if repo.RunIfPossible() {
numJobsRunning++
respAndLog("Forcing sync: " + repo.Name)
} else {
respAndLog("Cannot force sync: " + repo.Name + ", already syncing.")
}
} else {
respAndLog(args[1] + " is not tracked so cannot sync")
}
} else {
respAndLog("Received unrecognized command: " + command)
}
}
// TODO: add checking that SockPath is proper (ends with .sock)
// if cfg.SockPath already exists then will get a "bind: address already in use"
func unixSockListen(connChan chan net.Conn, reloadSig chan os.Signal) {
if err := os.RemoveAll(cfg.SockPath); err != nil {
func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// must remove cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(cfg.SockPath) != ".sock" {
logger.Fatal("Socket file must end with .sock")
} else if pathInfo, _ := os.Stat(cfg.SockPath); pathInfo.IsDir() {
logger.Fatal("Value specified for socket file is a directory")
} else if err := os.Remove(cfg.SockPath); err != nil {
logger.Fatal(err)
}
ear, err := net.Listen("unix", cfg.SockPath)
if err != nil {
logger.Fatal(err)
}
defer ear.Close()
go func() {
for {
// will exit when ear is closed
conn, err := ear.Accept()
if err != nil {
logger.Error(err)
return
}
connChan <- conn
}
}()
select {
// add stopSig
case <-reloadSig:
}
<-stopLisChan
ear.Close()
}
// We use a round-robin strategy. It's not the most efficient, but it's simple
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
func runAsManyAsPossible() {
repos := cfg.Repos
startIdx := repoIdx
for numJobsRunning < cfg.MaxJobs {
repo := repos[repoIdx]
if repo.RunIfPossible() {
@ -114,64 +129,75 @@ func runAsManyAsPossible() {
}
func main() {
stopSig := make(chan os.Signal, 1)
signal.Notify(stopSig, syscall.SIGINT, syscall.SIGTERM)
reloadSig := make(chan os.Signal, 1)
signal.Notify(reloadSig, syscall.SIGHUP)
doneChan := make(chan common.Result)
stopChan := make(chan struct{})
connChan := make(chan net.Conn)
stopLisChan := make(chan struct{})
stopSig := make(chan os.Signal, 1)
reloadSig := make(chan os.Signal, 1)
signal.Notify(stopSig, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(reloadSig, syscall.SIGHUP)
unix.Umask(002)
loadConfigs := func() {
cfg = common.GetConfig()
logger = common.NewLogger("main")
logger.Debug("Read config:")
logger.Debug(fmt.Sprintf("%+v\n", cfg))
repos = cfg.Repos
logger = common.NewLogger("main")
numJobsRunning = 0
loadConfig := func() {
cfg = common.GetConfig(doneChan, stopChan)
logger.Debug("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg))
repoMap = make(map[string]*common.Repo)
for _, repo := range repos {
for _, repo := range cfg.Repos {
repoMap[repo.Name] = repo
}
repoIdx = 0
numJobsRunning = 0
doneChan = cfg.DoneChan
go unixSockListen(connChan, reloadSig)
go unixSockListener(connChan, stopLisChan)
}
loadConfigs()
loadConfig()
// if IsRunning is true then repo will never be run
for _, repo := range repoMap {
repo.State.IsRunning = false
}
runAsManyAsPossible()
runLoop:
for {
runAsManyAsPossible()
select {
case <-stopSig:
for i := 0; i < len(repos); i++ {
close(repos[i].StopChan)
}
close(stopChan)
close(stopLisChan)
break runLoop
case <-reloadSig:
loadConfigs()
stopLisChan <- struct{}{}
loadConfig()
case done := <-doneChan:
repoMap[done.Name].SyncCompleted(done.Exit)
numJobsRunning--
case conn := <-connChan:
runCommand(conn)
getAndRunCommand(conn)
case <-time.After(1 * time.Minute):
}
runAsManyAsPossible()
}
// TODO: Logging of job exits
// wait for every running job to stop running
for {
select {
case done := <-doneChan:
repoMap[done.Name].SyncCompleted(done.Exit)
numJobsRunning--
case <-time.After(1 * time.Second):
if numJobsRunning == 0 {
return
}
}
if numJobsRunning <= 0 {
return
}
}
}