This commit is contained in:
Andrew Wang 2021-11-21 00:39:15 -05:00
parent 4a464f768e
commit 7f98384fcd
5 changed files with 57 additions and 38 deletions

View File

@ -124,10 +124,6 @@ type Config struct {
ZfssyncLogDir string `ini:"zfssync_log_dir"` ZfssyncLogDir string `ini:"zfssync_log_dir"`
// the Unix socket path which arthur will use to communicate with us // the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"` SockPath string `ini:"sock_path"`
// logger that writes to stdout
StdoutLog *Logger `ini:"-"`
// logger that writes to stderr
StderrLog *Logger `ini:"-"`
// a list of all of the repos // a list of all of the repos
Repos []*Repo `ini:"-"` Repos []*Repo `ini:"-"`
} }
@ -173,9 +169,7 @@ func (repo *Repo) RunIfPossible() bool {
repo.State.IsRunning = true repo.State.IsRunning = true
repo.State.LastAttemptStartTime = curTime repo.State.LastAttemptStartTime = curTime
repo.SaveState() repo.SaveState()
msg := fmt.Sprintf("Repo %s has started syncing", repo.Name) repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
repo.Logger.Info(msg)
repo.cfg.StdoutLog.Info(msg)
go repo.StartSyncJob() go repo.StartSyncJob()
return true return true
} }
@ -220,10 +214,9 @@ func (repo *Repo) SyncCompleted(exit int) {
exitStr = "failed" exitStr = "failed"
} }
repo.SaveState() repo.SaveState()
msg := fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync) repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
repo.Logger.Info(msg)
repo.cfg.StdoutLog.Info(msg)
if exit == SUCCESS { if exit == SUCCESS {
// it is possible that the zfssync from the last repo sync is still running is that fine?
go zfsSync(repo) go zfsSync(repo)
} }
} }
@ -245,6 +238,7 @@ func touchFile(file string) {
} else if fi.IsDir() { } else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file)) panic(fmt.Errorf("%s is a directory", file))
} else if os.Geteuid() != 1001 { } else if os.Geteuid() != 1001 {
// UID 1001 is the hardcoded uid for mirror
err := os.Chown(file, 1001, os.Getegid()) err := os.Chown(file, 1001, os.Getegid())
panicIfErr(err) panicIfErr(err)
} else if fi.Mode().Perm() != 0644 { } else if fi.Mode().Perm() != 0644 {
@ -277,8 +271,6 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR, ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
SockPath: DEFAULT_SOCK_PATH, SockPath: DEFAULT_SOCK_PATH,
StdoutLog: NewLogger("STDOUT"),
StderrLog: NewLogger("STDERR"),
Repos: make([]*Repo, 0), Repos: make([]*Repo, 0),
} }
err = data.MapTo(&cfg) err = data.MapTo(&cfg)
@ -321,7 +313,7 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
repo.ZfssyncLogFile, repo.ZfssyncLogFile,
) )
repo.Logger = NewLogger(repo.LoggerFile) repo.Logger = NewLogger(repo.Name, repo.LoggerFile)
repo.Frequency = frequencies[repo.FrequencyStr] repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" { if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name) panic("Missing sync type from " + repo.Name)

View File

@ -1,56 +1,75 @@
package common package common
import ( import (
"fmt"
"log" "log"
"os" "os"
) )
type Logger struct { type Logger struct {
*log.Logger *log.Logger
name string
file string file string
} }
// DEBUG/WARNING will only be written to file
// INFO/ERROR will be written to file and printed to stdout/stderr
const ( const (
INFO = iota DEBUG = iota
INFO
WARNING WARNING
ERROR ERROR
) )
var levels = map[int]string{ var levels = map[int]string{
DEBUG: "[DEBUG]",
INFO: "[INFO]", INFO: "[INFO]",
WARNING: "[WARNING]", WARNING: "[WARNING]",
ERROR: "[ERROR]", ERROR: "[ERROR]",
} }
func NewLogger(file string) *Logger { var outLogger = log.New(os.Stdout, "", log.LstdFlags)
var errLogger = log.New(os.Stderr, "", log.LstdFlags)
func OutLogger() *log.Logger {
return outLogger
}
func ErrLogger() *log.Logger {
return errLogger
}
func NewLogger(name, file string) *Logger {
logger := Logger{ logger := Logger{
Logger: log.New(os.Stderr, "", log.LstdFlags), Logger: log.New(os.Stderr, "", log.LstdFlags),
name: name,
file: file, file: file,
} }
return &logger return &logger
} }
func (logger *Logger) Log(level int, v ...interface{}) { func (logger *Logger) Log(level int, v ...interface{}) {
var f *os.File if level == INFO {
if logger.file == "STDOUT" { outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
f = os.Stdout } else if level == ERROR {
} else if logger.file == "STDERR" { errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
f = os.Stderr
} else {
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
fmt.Println(err.Error())
}
defer f.Close()
} }
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
errLogger.Println(err.Error())
}
defer f.Close()
levelStr := levels[level] levelStr := levels[level]
args := []interface{}{levelStr + ":"} args := []interface{}{levelStr + ":"}
args = append(args, v...) args = append(args, v...)
logger.SetOutput(f) logger.SetOutput(f)
logger.Println(args...) logger.Println(v...)
}
func (logger *Logger) Debug(v ...interface{}) {
logger.Log(DEBUG, v...)
} }
func (logger *Logger) Info(v ...interface{}) { func (logger *Logger) Info(v ...interface{}) {

View File

@ -16,9 +16,9 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
// startTime and time took will be handled in common.go by SyncExit // startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...) cmd := exec.Command(args[0], args[1:]...)
repo.Logger.Info("Starting process") repo.Logger.Debug("Starting process")
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err)) repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err).Error())
return return
} }
@ -31,12 +31,14 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
repo.Logger.Error("Could not send signal to process:", err) repo.Logger.Error("Could not send signal to process:", err)
return return
} }
select { select {
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL") repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL")
cmd.Process.Signal(syscall.SIGKILL) cmd.Process.Signal(syscall.SIGKILL)
case <-cmdDoneChan: case <-cmdDoneChan:
repo.Logger.Info("Process has stopped.") repo.Logger.Debug("Process has been stopped.")
} }
} }
@ -56,7 +58,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
} }
case <-repo.StopChan: case <-repo.StopChan:
repo.Logger.Info("Received signal to stop, killing process...") repo.Logger.Debug("Received signal to stop, killing process...")
killProcess() killProcess()
case <-time.After(time.Duration(repo.MaxTime) * time.Second): case <-time.After(time.Duration(repo.MaxTime) * time.Second):

View File

@ -271,7 +271,7 @@ func (repo *Repo) StartSyncJob() {
ch := SpawnProcess(repo, args) ch := SpawnProcess(repo, args)
if ch == nil { if ch == nil {
repo.Logger.Error("Unable to start sync process") // SpawnProcess will have already logged error
return return
} }
cmd := <-ch cmd := <-ch

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"os" "os"
"os/signal" "os/signal"
@ -19,6 +20,8 @@ import (
var ( var (
cfg common.Config cfg common.Config
outLogger *log.Logger
errLogger *log.Logger
repoMap map[string]*common.Repo repoMap map[string]*common.Repo
repoIdx int repoIdx int
numJobsRunning int numJobsRunning int
@ -30,14 +33,14 @@ func getAndRunCommand(conn net.Conn) {
var buf bytes.Buffer var buf bytes.Buffer
_, err := io.Copy(&buf, conn) _, err := io.Copy(&buf, conn)
if err != nil { if err != nil {
cfg.StderrLog.Error(err) errLogger.Println(err.Error())
return return
} }
command := buf.String() command := buf.String()
args := strings.Split(command, ":") args := strings.Split(command, ":")
respondAndLogErr := func(msg string) { respondAndLogErr := func(msg string) {
cfg.StderrLog.Warning(msg) outLogger.Println(msg)
conn.Write([]byte(msg)) conn.Write([]byte(msg))
} }
@ -62,7 +65,7 @@ func getAndRunCommand(conn net.Conn) {
} }
if repo, inMap := repoMap[args[1]]; inMap { if repo, inMap := repoMap[args[1]]; inMap {
cfg.StdoutLog.Info("Attempting to force sync of " + repo.Name) outLogger.Println("Attempting to force sync of " + repo.Name)
if repo.RunIfPossible() { if repo.RunIfPossible() {
conn.Write([]byte("Forced sync for " + repo.Name)) conn.Write([]byte("Forced sync for " + repo.Name))
numJobsRunning++ numJobsRunning++
@ -97,7 +100,7 @@ func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// will exit when ear is closed // will exit when ear is closed
conn, err := ear.Accept() conn, err := ear.Accept()
if err != nil { if err != nil {
cfg.StderrLog.Error(err.Error()) errLogger.Println(err.Error())
return return
} }
connChan <- conn connChan <- conn
@ -128,6 +131,9 @@ func runAsManyAsPossible() {
} }
func main() { func main() {
outLogger = common.OutLogger()
errLogger = common.ErrLogger()
// check that merlin is run as mirror user // check that merlin is run as mirror user
// check that mirror user has pid of 1001 // check that mirror user has pid of 1001
@ -147,7 +153,7 @@ func main() {
loadConfig := func() { loadConfig := func() {
cfg = common.GetConfig(doneChan, stopChan) cfg = common.GetConfig(doneChan, stopChan)
cfg.StdoutLog.Info("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg)) outLogger.Println("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg))
repoMap = make(map[string]*common.Repo) repoMap = make(map[string]*common.Repo)
for _, repo := range cfg.Repos { for _, repo := range cfg.Repos {