partially fix logging

This commit is contained in:
Andrew Wang 2021-11-20 01:04:45 -05:00
parent 644178d533
commit 4a464f768e
6 changed files with 180 additions and 137 deletions

View File

@ -17,7 +17,7 @@ This folder contains the code for merlin (which does the actual syncing) and art
- [ ] wget
### TODO
- [ ] fix logging
- [ ] logging
- [ ] detect if an rsync process is stuck (\*\*)
- [ ] place each rsync process in a separate cgroup (\*\*\*)

View File

@ -21,13 +21,15 @@ const (
CONFIG_PATH = "merlin-config.ini"
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_SOCK_PATH = "/run/merlin/merlin.sock"
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
DEFAULT_SOCK_PATH = "/run/merlin/merlin.sock"
)
var frequencies = map[string]int{
@ -77,10 +79,14 @@ type Repo struct {
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the log file for rsync (optional, defaults to Config.LogFile)
LogFile string `ini:"log_file"`
// a reference to the logger
// the file for general logging of this repo
LoggerFile string `ini:"log_file"`
// a reference to the general logger
Logger *Logger `ini:"-"`
// the file for logging this repo's rsync
RsyncLogFile string `ini:"rsync_log_file"`
// the file for logging this repo's zfssync
ZfssyncLogFile string `ini:"zfssync_log_file"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos)
DoneChan chan<- Result `ini:"-"`
@ -108,12 +114,20 @@ type Config struct {
DownloadDir string `ini:"download_dir"`
// the directory where rsync passwords are stored
PasswordDir string `ini:"password_dir"`
// the path where merlin will store the logs for each repo synced
LogDir string `ini:"log_dir"`
// the path to where the state of each repo's sync is saved
// the directory where the state of each repo sync is saved
StateDir string `ini:"states_dir"`
// the directory where merlin will store the general logs for each repo
LoggerDir string `ini:"log_dir"`
// the directory to store the rsync logs for each repo
RsyncLogDir string `ini:"rsync_log_dir"`
// the directory to store the zfssync logs for each repo
ZfssyncLogDir string `ini:"zfssync_log_dir"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
// logger that writes to stdout
StdoutLog *Logger `ini:"-"`
// logger that writes to stderr
StderrLog *Logger `ini:"-"`
// a list of all of the repos
Repos []*Repo `ini:"-"`
}
@ -135,14 +149,14 @@ type RepoState struct {
func (repo *Repo) SaveState() {
state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
repo.Logger.Error(err)
repo.Logger.Error(err.Error())
}
file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
repo.Logger.Error(err)
repo.Logger.Error(err.Error())
}
if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err)
repo.Logger.Error(err.Error())
}
}
@ -159,8 +173,10 @@ func (repo *Repo) RunIfPossible() bool {
repo.State.IsRunning = true
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
msg := fmt.Sprintf("Repo %s has started syncing", repo.Name)
repo.Logger.Info(msg)
repo.cfg.StdoutLog.Info(msg)
go repo.StartSyncJob()
repo.Logger.Info("Sync has begun")
return true
}
return false
@ -171,7 +187,13 @@ func zfsSync(repo *Repo) {
if err != nil {
repo.Logger.Error(err)
} else {
repo.Logger.Debug(string(out))
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
}
}
}
@ -198,40 +220,75 @@ func (repo *Repo) SyncCompleted(exit int) {
exitStr = "failed"
}
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
msg := fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)
repo.Logger.Info(msg)
repo.cfg.StdoutLog.Info(msg)
if exit == SUCCESS {
go zfsSync(repo)
}
}
func panicIfErr(e error) {
if e != nil {
panic(e)
}
}
func touchFile(file string) {
fi, err := os.Stat(file)
if err != nil {
f, err := os.OpenFile(file, os.O_CREATE, 0644)
if err != nil {
panic(fmt.Errorf("unable to create file %s", file))
}
f.Close()
} else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file))
} else if os.Geteuid() != 1001 {
err := os.Chown(file, 1001, os.Getegid())
panicIfErr(err)
} else if fi.Mode().Perm() != 0644 {
err := os.Chmod(file, 0644)
panicIfErr(err)
}
}
func touchFiles(files ...string) {
for _, file := range files {
touchFile(file)
}
}
// GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo.
// It returns a Config.
func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
// add global configuration in cfg
data, err := ini.Load(CONFIG_PATH)
if err != nil {
panic(err)
}
cfg := Config{
MaxJobs: DEFAULT_MAX_JOBS,
MaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
LogDir: DEFAULT_LOG_DIR,
StateDir: DEFAULT_STATE_DIR,
SockPath: DEFAULT_SOCK_PATH,
Repos: make([]*Repo, 0),
}
if err := data.MapTo(&cfg); err != nil {
panic(err)
}
panicIfErr(err)
if err := os.MkdirAll(cfg.LogDir, 0755); err != nil {
panic("Could not create log path at " + cfg.LogDir)
} else if err := os.MkdirAll(cfg.StateDir, 0755); err != nil {
panic("Could not create states path at " + cfg.StateDir)
} else if cfg.IPv4Address == "" {
cfg := Config{
MaxJobs: DEFAULT_MAX_JOBS,
MaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
StateDir: DEFAULT_STATE_DIR,
LoggerDir: DEFAULT_LOG_DIR,
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
SockPath: DEFAULT_SOCK_PATH,
StdoutLog: NewLogger("STDOUT"),
StderrLog: NewLogger("STDERR"),
Repos: make([]*Repo, 0),
}
err = data.MapTo(&cfg)
panicIfErr(err)
for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
}
if cfg.IPv4Address == "" {
panic("Missing IPv4 address from config")
} else if cfg.IPv6Address == "" {
panic("Missing IPv6 address from config")
@ -239,29 +296,38 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
// add each repo configuration to cfg
for _, section := range data.Sections() {
if section.Name() == "DEFAULT" {
repoName := section.Name()
if repoName == "DEFAULT" {
continue
}
repo := Repo{
Name: section.Name(),
SyncType: cfg.SyncType,
FrequencyStr: cfg.FrequencyStr,
MaxTime: cfg.MaxTime,
DoneChan: doneChan,
StopChan: stopChan,
}
if err := section.MapTo(&repo); err != nil {
panic(err)
Name: repoName,
SyncType: cfg.SyncType,
FrequencyStr: cfg.FrequencyStr,
MaxTime: cfg.MaxTime,
LoggerFile: cfg.LoggerDir + "/" + repoName + ".log",
RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log",
ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log",
DoneChan: doneChan,
StopChan: stopChan,
}
err := section.MapTo(&repo)
panicIfErr(err)
touchFiles(
repo.LoggerFile,
repo.RsyncLogFile,
repo.ZfssyncLogFile,
)
repo.Logger = NewLogger(repo.LoggerFile)
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
} else if repo.Frequency == 0 {
panic("Missing frequency from " + repo.Name)
panic("Missing or invalid frequency for " + repo.Name)
}
repo.Logger = NewLogger(repo.Name)
repo.cfg = &cfg
repo.State = RepoState{
@ -274,12 +340,11 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
// create the state file if it does not exist, otherwise load it from existing file
repoStateFile := cfg.StateDir + "/" + repo.Name
if _, err := os.Stat(repoStateFile); err != nil {
touchFile(repoStateFile)
repo.SaveState()
} else {
err := ini.MapTo(&repo.State, repoStateFile)
if err != nil {
panic(err)
}
panicIfErr(err)
}
cfg.Repos = append(cfg.Repos, &repo)

View File

@ -1,54 +1,56 @@
package common
import (
"fmt"
"log"
"os"
)
type Logger struct {
*log.Logger
name string
// path string
file string
}
const (
// verbose
// DEBUG = 1 << iota
DEBUG = iota
// normal operation
INFO
// bad
INFO = iota
WARNING
// really bad (crash)
ERROR
)
var levels = map[int]string{
DEBUG: "DEBUG",
INFO: "INFO",
WARNING: "WARNING",
ERROR: "ERROR",
INFO: "[INFO]",
WARNING: "[WARNING]",
ERROR: "[ERROR]",
}
// func NewLogger(name string, dir string) *Logger {
func NewLogger(name string) *Logger {
func NewLogger(file string) *Logger {
logger := Logger{
Logger: log.New(os.Stderr, "", 0),
name: name,
// path: dir + name,
Logger: log.New(os.Stderr, "", log.LstdFlags),
file: file,
}
return &logger
}
func (logger *Logger) Log(level int, v ...interface{}) {
levelStr := levels[level]
args := []interface{}{levelStr + ":", logger.name + ":"}
args = append(args, v...)
logger.Println(args...)
}
var f *os.File
if logger.file == "STDOUT" {
f = os.Stdout
} else if logger.file == "STDERR" {
f = os.Stderr
} else {
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
fmt.Println(err.Error())
}
defer f.Close()
}
func (logger *Logger) Debug(v ...interface{}) {
logger.Log(DEBUG, v...)
levelStr := levels[level]
args := []interface{}{levelStr + ":"}
args = append(args, v...)
logger.SetOutput(f)
logger.Println(args...)
}
func (logger *Logger) Info(v ...interface{}) {

View File

@ -1,7 +1,6 @@
package common
import (
"bufio"
"fmt"
"os/exec"
"syscall"
@ -17,37 +16,12 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
// startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...)
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
stdoutScan := bufio.NewScanner(stdout)
stderrScan := bufio.NewScanner(stderr)
repo.Logger.Debug("Starting process")
repo.Logger.Info("Starting process")
if err := cmd.Start(); err != nil {
err = fmt.Errorf("could not start process %s: %w", args[0], err)
repo.Logger.Error(err)
repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err))
return
}
// pass stdout into logger as debug
go func() {
for stdoutScan.Scan() {
hold := stdoutScan.Text()
repo.Logger.Debug(hold)
}
cmd.Wait()
}()
// pass stderr into logger as warning
go func() {
for stderrScan.Scan() {
hold := stderrScan.Text()
repo.Logger.Warning(hold)
}
cmd.Wait()
}()
cmdChan := make(chan *exec.Cmd)
ch = cmdChan
cmdDoneChan := make(chan struct{})
@ -62,7 +36,7 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL")
cmd.Process.Signal(syscall.SIGKILL)
case <-cmdDoneChan:
repo.Logger.Debug("Process has stopped.")
repo.Logger.Info("Process has stopped.")
}
}

View File

@ -37,7 +37,7 @@ func (repo *Repo) CSCSyncApache() []string {
"nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.LogFile,
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
@ -52,7 +52,7 @@ func (repo *Repo) CSCSyncArchLinux() []string {
args := []string{
"rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600",
"--contimeout=60", "-p", "--delay-updates", "--no-motd",
"--temp-dir=" + tempDir, "--log-file=" + repo.LogFile, "--address=" + repo.cfg.IPv4Address,
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + repo.cfg.IPv4Address,
}
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
@ -65,7 +65,7 @@ func (repo *Repo) CSCSyncBadPerms() []string {
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.LogFile,
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
@ -78,7 +78,7 @@ func (repo *Repo) CSCSyncCDImage() []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.LogFile,
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
@ -89,7 +89,7 @@ func (repo *Repo) CSCSyncChmod() []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.LogFile,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
"--chmod=u=rwX,go=rX",
}
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir())
@ -134,7 +134,7 @@ func (repo *Repo) CSCSyncSSH() []string {
"rsync", "-aH", "--no-owner", "--no-group", "--delete",
"--timeout=3600", "-4",
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.LogFile,
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
// not sure if we should be assuming ssh identity file is the password file
args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile))
@ -147,7 +147,7 @@ func (repo *Repo) CSCSyncStandard() []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.LogFile,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
if repo.PasswordFile != "" {
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
@ -162,7 +162,7 @@ func (repo *Repo) CSCSyncStandardIPV6() []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + repo.cfg.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.LogFile,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir())
@ -257,7 +257,8 @@ func (repo *Repo) StartSyncJob() {
err := os.MkdirAll(localDir, 0775)
if err != nil {
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
repo.Logger.Error(err)
// I'm not sure if logger can handle error so just use the string?
repo.Logger.Error(err.Error())
return
}

View File

@ -19,7 +19,6 @@ import (
var (
cfg common.Config
logger *common.Logger
repoMap map[string]*common.Repo
repoIdx int
numJobsRunning int
@ -30,16 +29,15 @@ func getAndRunCommand(conn net.Conn) {
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
logger.Warning(err)
cfg.StderrLog.Error(err)
return
}
command := buf.String()
args := strings.Split(command, ":")
respAndLog := func(msg string) {
logger.Info(msg)
respondAndLogErr := func(msg string) {
cfg.StderrLog.Warning(msg)
conn.Write([]byte(msg))
}
@ -59,38 +57,39 @@ func getAndRunCommand(conn net.Conn) {
status.Flush()
} else if args[0] == "sync" {
if len(args) != 2 {
respAndLog("Could not parse sync command, forced sync fails.")
respondAndLogErr("Could not parse sync command, forced sync fails.")
return
}
if repo, inMap := repoMap[args[1]]; inMap {
cfg.StdoutLog.Info("Attempting to force sync of " + repo.Name)
if repo.RunIfPossible() {
conn.Write([]byte("Forced sync for " + repo.Name))
numJobsRunning++
respAndLog("Forcing sync: " + repo.Name)
} else {
respAndLog("Cannot force sync: " + repo.Name + ", already syncing.")
respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.")
}
} else {
respAndLog(args[1] + " is not tracked so cannot sync")
respondAndLogErr(args[1] + " is not tracked so cannot sync")
}
} else {
respAndLog("Received unrecognized command: " + command)
respondAndLogErr("Received unrecognized command: " + command)
}
}
func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// must remove cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(cfg.SockPath) != ".sock" {
logger.Fatal("Socket file must end with .sock")
panic(fmt.Errorf("Socket file must end with .sock"))
} else if pathInfo, _ := os.Stat(cfg.SockPath); pathInfo.IsDir() {
logger.Fatal("Value specified for socket file is a directory")
panic(fmt.Errorf("Value specified for socket file is a directory"))
} else if err := os.Remove(cfg.SockPath); err != nil {
logger.Fatal(err)
panic(err)
}
ear, err := net.Listen("unix", cfg.SockPath)
if err != nil {
logger.Fatal(err)
panic(err)
}
go func() {
@ -98,7 +97,7 @@ func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// will exit when ear is closed
conn, err := ear.Accept()
if err != nil {
logger.Error(err)
cfg.StderrLog.Error(err.Error())
return
}
connChan <- conn
@ -129,6 +128,9 @@ func runAsManyAsPossible() {
}
func main() {
// check that merlin is run as mirror user
// check that mirror user has pid of 1001
doneChan := make(chan common.Result)
stopChan := make(chan struct{})
connChan := make(chan net.Conn)
@ -141,12 +143,11 @@ func main() {
unix.Umask(002)
logger = common.NewLogger("main")
numJobsRunning = 0
loadConfig := func() {
cfg = common.GetConfig(doneChan, stopChan)
logger.Debug("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg))
cfg.StdoutLog.Info("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg))
repoMap = make(map[string]*common.Repo)
for _, repo := range cfg.Repos {
@ -158,7 +159,7 @@ func main() {
}
loadConfig()
// if IsRunning is true then repo will never be run
// IsRunning must be false otherwise repo will never sync
for _, repo := range repoMap {
repo.State.IsRunning = false
}