split into packages

This commit is contained in:
Andrew Wang 2021-12-11 18:28:09 -05:00
parent 18aae3afa6
commit 8d572a0c3f
14 changed files with 476 additions and 452 deletions

114
merlin/arthur/arthur.go Normal file
View File

@ -0,0 +1,114 @@
package arthur
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strings"
"text/tabwriter"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
"git.csclub.uwaterloo.ca/public/merlin/sync"
)
// get repo by name function in config
func GetAndRunCommand(conn net.Conn) (newSync bool) {
newSync = false
defer conn.Close()
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
logger.ErrLog(err.Error())
return
}
command := buf.String()
args := strings.Split(command, ":")
respondAndLogErr := func(msg string) {
logger.OutLog(msg)
conn.Write([]byte(msg))
}
if args[0] == "status" {
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n")
// for time formating see https://pkg.go.dev/time#pkg-constants
for name, repo := range config.RepoMap {
fmt.Fprintf(status, "%s\t%s\t%s\n",
name,
time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123),
time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123),
)
}
status.Flush()
} else if args[0] == "sync" {
if len(args) != 2 {
respondAndLogErr("Could not parse sync command, forced sync fails.")
return
}
if repo, inMap := config.RepoMap[args[1]]; inMap {
logger.OutLog("Attempting to force sync of " + repo.Name)
if sync.SyncIfPossible(repo) {
conn.Write([]byte("Forced sync for " + repo.Name))
newSync = true
} else {
respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.")
}
} else {
respondAndLogErr(args[1] + " is not tracked so cannot sync")
}
} else {
respondAndLogErr("Received unrecognized command: " + command)
}
return
}
func UnixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
sockpath := config.Conf.SockPath
// must remove cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(sockpath) != ".sock" {
panic(fmt.Errorf("Socket file must end with .sock"))
} else if _, err := os.Stat(sockpath); err == nil {
if err := os.Remove(sockpath); err != nil {
panic(err)
}
} else if !errors.Is(err, os.ErrNotExist) {
panic(err)
}
ear, err := net.Listen("unix", sockpath)
if err != nil {
panic(err)
}
logger.OutLog("Listening to unix socket at " + sockpath)
go func() {
for {
// will exit when ear is closed
conn, err := ear.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
logger.ErrLog("Accepted socket error: " + err.Error())
continue
}
logger.ErrLog("Unhandlable socket error: " + err.Error())
return
}
connChan <- conn
}
}()
<-stopLisChan
ear.Close()
}

View File

@ -0,0 +1 @@
package arthur

View File

@ -1 +0,0 @@
package common

View File

@ -1,12 +1,11 @@
package common package config
import ( import (
"fmt"
"os" "os"
"os/exec"
"time"
ini "gopkg.in/ini.v1" "gopkg.in/ini.v1"
"git.csclub.uwaterloo.ca/public/merlin/logger"
) )
const ( const (
@ -51,53 +50,11 @@ const (
TERMINATED // was killed by a signal TERMINATED // was killed by a signal
) )
type Result struct { type SyncResult struct {
Name string Name string
Exit int Exit int
} }
type Repo struct {
// the name of this repo
Name string `ini:"-"`
// this should be one of "csc-sync-standard", etc.
SyncType string `ini:"sync_type"`
// a human-readable frequency, e.g. "bi-hourly"
FrequencyStr string `ini:"frequency"`
// the desired interval (in seconds) between successive runs
Frequency int `ini:"-"`
// the maximum time (in seconds) that each child process of this repo
// can for before being killed
MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download
// dir in the config)
LocalDir string `ini:"local_dir"`
// the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the file for general logging of this repo
LoggerFile string `ini:"log_file"`
// a reference to the general logger
Logger *Logger `ini:"-"`
// the file for logging this repo's rsync
RsyncLogFile string `ini:"rsync_log_file"`
// the file for logging this repo's zfssync
ZfssyncLogFile string `ini:"zfssync_log_file"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos)
DoneChan chan<- Result `ini:"-"`
// the repo should stop syncing if StopChan is closed (shared by all repos)
StopChan chan struct{} `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
// a reference to the global config
cfg *Config `ini:"-"`
}
type Config struct { type Config struct {
// the maximum number of jobs allowed to execute concurrently // the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"` MaxJobs int `ini:"max_jobs"`
@ -124,8 +81,46 @@ type Config struct {
ZfssyncLogDir string `ini:"zfssync_log_dir"` ZfssyncLogDir string `ini:"zfssync_log_dir"`
// the Unix socket path which arthur will use to communicate with us // the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"` SockPath string `ini:"sock_path"`
// a list of all of the repos }
Repos []*Repo `ini:"-"`
type Repo struct {
// the name of this repo
Name string `ini:"-"`
// this should be one of "csc-sync-standard", etc.
SyncType string `ini:"sync_type"`
// a human-readable frequency, e.g. "bi-hourly"
FrequencyStr string `ini:"frequency"`
// the desired interval (in seconds) between successive runs
Frequency int `ini:"-"`
// the maximum time (in seconds) that each child process of this repo
// can for before being killed
MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download
// dir in the config)
LocalDir string `ini:"local_dir"`
// the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"`
// the file for general logging of this repo
LoggerFile string `ini:"log_file"`
// a reference to the general logger
Logger *logger.Logger `ini:"-"`
// the file for logging this repo's rsync
RsyncLogFile string `ini:"rsync_log_file"`
// the file for logging this repo's zfssync
ZfssyncLogFile string `ini:"zfssync_log_file"`
// the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos)
DoneChan chan<- SyncResult `ini:"-"`
// the repo should stop syncing if StopChan is closed (shared by all repos)
StopChan chan struct{} `ini:"-"`
// a struct that stores the repo's status
State RepoState `ini:"-"`
} }
// This should only be modified by the main thread // This should only be modified by the main thread
@ -141,129 +136,18 @@ type RepoState struct {
LastAttemptExit int `ini:"last_attempt_exit"` LastAttemptExit int `ini:"last_attempt_exit"`
} }
// save the current state of the repo to a file var (
func (repo *Repo) SaveState() { Conf Config
repo.Logger.Debug("Saving state") Repos []*Repo
state_cfg := ini.Empty() RepoMap map[string]*Repo
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { )
repo.Logger.Error(err.Error())
}
file, err := os.OpenFile(repo.cfg.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
repo.Logger.Error(err.Error())
}
if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err.Error())
}
repo.Logger.Debug("Saved state")
}
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
// and is not currently running.
// returns true iff a job is started.
func (repo *Repo) RunIfPossible() bool {
if repo.State.IsRunning {
return false
}
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
go repo.StartSyncJob()
return true
}
return false
}
func zfsSync(repo *Repo) {
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
if err != nil {
repo.Logger.Error(err)
} else {
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
}
}
}
// update the repo state with the last attempt time and exit now that the job is done
func (repo *Repo) SyncCompleted(exit int) {
repoState := repo.State
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
nextSync := repo.MaxTime - int(syncTook)
if nextSync < 0 {
nextSync = 0
}
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptRunTime = syncTook
var exitStr string
switch exit {
case SUCCESS:
exitStr = "completed"
case TERMINATED:
exitStr = "terminated"
default:
exitStr = "failed"
}
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
if exit == SUCCESS {
// it is possible that the zfssync from the last repo sync is still running is that fine?
go zfsSync(repo)
}
}
func panicIfErr(e error) {
if e != nil {
panic(e)
}
}
func touchFile(file string) {
fi, err := os.Stat(file)
if err != nil {
f, err := os.OpenFile(file, os.O_CREATE, 0644)
if err != nil {
panic(fmt.Errorf("unable to create file %s", file))
}
f.Close()
} else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file))
} else if os.Geteuid() != 1001 {
// UID 1001 is the hardcoded uid for mirror
err := os.Chown(file, 1001, os.Getegid())
panicIfErr(err)
} else if fi.Mode().Perm() != 0644 {
err := os.Chmod(file, 0644)
panicIfErr(err)
}
}
func touchFiles(files ...string) {
for _, file := range files {
touchFile(file)
}
}
// GetConfig reads the config from a JSON file, initializes default values, // GetConfig reads the config from a JSON file, initializes default values,
// and initializes the non-configurable fields of each repo. // and initializes the non-configurable fields of each repo.
// It returns a Config. // It returns a Config.
func GetConfig(doneChan chan Result, stopChan chan struct{}) Config { func LoadConfig(doneChan chan SyncResult, stopChan chan struct{}) {
// add global configuration in cfg // set default values then load config from file
data, err := ini.Load(CONFIG_PATH) newConf := Config{
panicIfErr(err)
cfg := Config{
MaxJobs: DEFAULT_MAX_JOBS, MaxJobs: DEFAULT_MAX_JOBS,
MaxTime: DEFAULT_MAX_TIME, MaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR, PasswordDir: DEFAULT_PASSWORD_DIR,
@ -273,36 +157,39 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR, RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR, ZfssyncLogDir: DEFAULT_ZFSSYNC_LOG_DIR,
SockPath: DEFAULT_SOCK_PATH, SockPath: DEFAULT_SOCK_PATH,
Repos: make([]*Repo, 0),
} }
err = data.MapTo(&cfg) iniInfo, err := ini.Load(CONFIG_PATH)
panicIfErr(err)
err = iniInfo.MapTo(&newConf)
panicIfErr(err) panicIfErr(err)
for _, dir := range []string{cfg.StateDir, cfg.LoggerDir, cfg.RsyncLogDir, cfg.ZfssyncLogDir} { // check config for major errors
for _, dir := range []string{Conf.StateDir, Conf.LoggerDir, Conf.RsyncLogDir, Conf.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755) err := os.MkdirAll(dir, 0755)
panicIfErr(err) panicIfErr(err)
} }
if cfg.IPv4Address == "" { if Conf.IPv4Address == "" {
panic("Missing IPv4 address from config") panic("Missing IPv4 address from config")
} else if cfg.IPv6Address == "" { } else if Conf.IPv6Address == "" {
panic("Missing IPv6 address from config") panic("Missing IPv6 address from config")
} }
// add each repo configuration to cfg newRepos := make([]*Repo, 0)
for _, section := range data.Sections() { for _, section := range iniInfo.Sections() {
repoName := section.Name() repoName := section.Name()
if repoName == "DEFAULT" { if repoName == "DEFAULT" {
continue continue
} }
// set the default values for the repo then load from file
repo := Repo{ repo := Repo{
Name: repoName, Name: repoName,
SyncType: cfg.SyncType, SyncType: Conf.SyncType,
FrequencyStr: cfg.FrequencyStr, FrequencyStr: Conf.FrequencyStr,
MaxTime: cfg.MaxTime, MaxTime: Conf.MaxTime,
LoggerFile: cfg.LoggerDir + "/" + repoName + ".log", LoggerFile: Conf.LoggerDir + "/" + repoName + ".log",
RsyncLogFile: cfg.RsyncLogDir + "/" + repoName + "-rsync.log", RsyncLogFile: Conf.RsyncLogDir + "/" + repoName + "-rsync.log",
ZfssyncLogFile: cfg.ZfssyncLogDir + "/" + repoName + "-zfssync.log", ZfssyncLogFile: Conf.ZfssyncLogDir + "/" + repoName + "-zfssync.log",
DoneChan: doneChan, DoneChan: doneChan,
StopChan: stopChan, StopChan: stopChan,
} }
@ -314,15 +201,13 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
repo.RsyncLogFile, repo.RsyncLogFile,
repo.ZfssyncLogFile, repo.ZfssyncLogFile,
) )
repo.Logger = logger.NewLogger(repo.Name, repo.LoggerFile)
repo.Logger = NewLogger(repo.Name, repo.LoggerFile)
repo.Frequency = frequencies[repo.FrequencyStr] repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" { if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name) panic("Missing sync type from " + repo.Name)
} else if repo.Frequency == 0 { } else if repo.Frequency == 0 {
panic("Missing or invalid frequency for " + repo.Name) panic("Missing or invalid frequency for " + repo.Name)
} }
repo.cfg = &cfg
repo.State = RepoState{ repo.State = RepoState{
IsRunning: false, IsRunning: false,
@ -332,7 +217,7 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
} }
// create the state file if it does not exist, otherwise load it from existing file // create the state file if it does not exist, otherwise load it from existing file
repoStateFile := cfg.StateDir + "/" + repo.Name repoStateFile := Conf.StateDir + "/" + repo.Name
if _, err := os.Stat(repoStateFile); err != nil { if _, err := os.Stat(repoStateFile); err != nil {
touchFile(repoStateFile) touchFile(repoStateFile)
repo.SaveState() repo.SaveState()
@ -341,12 +226,33 @@ func GetConfig(doneChan chan Result, stopChan chan struct{}) Config {
panicIfErr(err) panicIfErr(err)
} }
cfg.Repos = append(cfg.Repos, &repo) newRepos = append(newRepos, &repo)
} }
if len(cfg.Repos) == 0 { if len(newRepos) == 0 {
panic("No repos found in config") panic("No repos found in config")
} }
return cfg Conf = newConf
Repos = newRepos
for _, repo := range Repos {
RepoMap[repo.Name] = repo
}
}
// save the current state of the repo to a file
func (repo *Repo) SaveState() {
repo.Logger.Debug("Saving state")
state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
repo.Logger.Error(err.Error())
}
file, err := os.OpenFile(Conf.StateDir+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
repo.Logger.Error(err.Error())
}
if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err.Error())
}
repo.Logger.Debug("Saved state")
} }

View File

@ -0,0 +1 @@
package config

38
merlin/config/utils.go Normal file
View File

@ -0,0 +1,38 @@
package config
import (
"fmt"
"os"
)
func panicIfErr(e error) {
if e != nil {
panic(e)
}
}
func touchFile(file string) {
fi, err := os.Stat(file)
if err != nil {
f, err := os.OpenFile(file, os.O_CREATE, 0644)
if err != nil {
panic(fmt.Errorf("unable to create file %s", file))
}
f.Close()
} else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file))
} else if os.Geteuid() != 1001 {
// UID 1001 is the hardcoded uid for mirror
err := os.Chown(file, 1001, os.Getegid())
panicIfErr(err)
} else if fi.Mode().Perm() != 0644 {
err := os.Chmod(file, 0644)
panicIfErr(err)
}
}
func touchFiles(files ...string) {
for _, file := range files {
touchFile(file)
}
}

View File

@ -1,4 +1,4 @@
package common package logger
import ( import (
"log" "log"
@ -30,12 +30,12 @@ var levels = map[int]string{
var outLogger = log.New(os.Stdout, "", log.LstdFlags) var outLogger = log.New(os.Stdout, "", log.LstdFlags)
var errLogger = log.New(os.Stderr, "", log.LstdFlags) var errLogger = log.New(os.Stderr, "", log.LstdFlags)
func OutLogger() *log.Logger { func OutLog(v ...interface{}) {
return outLogger outLogger.Println(v...)
} }
func ErrLogger() *log.Logger { func ErrLog(v ...interface{}) {
return errLogger errLogger.Println(v...)
} }
func NewLogger(name, file string) *Logger { func NewLogger(name, file string) *Logger {
@ -48,15 +48,9 @@ func NewLogger(name, file string) *Logger {
} }
func (logger *Logger) Log(level int, v ...interface{}) { func (logger *Logger) Log(level int, v ...interface{}) {
if level == INFO {
outLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
} else if level == ERROR {
errLogger.Println(append([]interface{}{"[" + logger.name + "]"}, v...))
}
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil { if err != nil {
errLogger.Println(err.Error()) ErrLog(err.Error())
} }
defer f.Close() defer f.Close()
@ -65,7 +59,7 @@ func (logger *Logger) Log(level int, v ...interface{}) {
args = append(args, v...) args = append(args, v...)
logger.SetOutput(f) logger.SetOutput(f)
logger.Println(v...) logger.Println(args)
} }
func (logger *Logger) Debug(v ...interface{}) { func (logger *Logger) Debug(v ...interface{}) {
@ -73,6 +67,7 @@ func (logger *Logger) Debug(v ...interface{}) {
} }
func (logger *Logger) Info(v ...interface{}) { func (logger *Logger) Info(v ...interface{}) {
OutLog(append([]interface{}{"[" + logger.name + "]"}, v...))
logger.Log(INFO, v...) logger.Log(INFO, v...)
} }
@ -81,5 +76,6 @@ func (logger *Logger) Warning(v ...interface{}) {
} }
func (logger *Logger) Error(v ...interface{}) { func (logger *Logger) Error(v ...interface{}) {
ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...))
logger.Log(ERROR, v...) logger.Log(ERROR, v...)
} }

View File

@ -1,153 +1,32 @@
package main package main
import ( import (
"bytes"
"errors"
"fmt" "fmt"
"io"
"log"
"net" "net"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strings"
"syscall" "syscall"
"text/tabwriter"
"time" "time"
"git.csclub.uwaterloo.ca/public/merlin/common"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"git.csclub.uwaterloo.ca/public/merlin/arthur"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
"git.csclub.uwaterloo.ca/public/merlin/sync"
) )
var (
cfg common.Config
outLogger *log.Logger
errLogger *log.Logger
repoMap map[string]*common.Repo
repoIdx int
numJobsRunning int
)
func getAndRunCommand(conn net.Conn) {
defer conn.Close()
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
errLogger.Println(err.Error())
return
}
command := buf.String()
args := strings.Split(command, ":")
respondAndLogErr := func(msg string) {
outLogger.Println(msg)
conn.Write([]byte(msg))
}
if args[0] == "status" {
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\n")
// for time formating see https://pkg.go.dev/time#pkg-constants
for name, repo := range repoMap {
fmt.Fprintf(status, "%s\t%s\t%s\n",
name,
time.Unix(repo.State.LastAttemptStartTime, 0).Format(time.RFC1123),
time.Unix(repo.State.LastAttemptRunTime+int64(repo.Frequency), 0).Format(time.RFC1123),
)
}
status.Flush()
} else if args[0] == "sync" {
if len(args) != 2 {
respondAndLogErr("Could not parse sync command, forced sync fails.")
return
}
if repo, inMap := repoMap[args[1]]; inMap {
outLogger.Println("Attempting to force sync of " + repo.Name)
if repo.RunIfPossible() {
conn.Write([]byte("Forced sync for " + repo.Name))
numJobsRunning++
} else {
respondAndLogErr("Cannot force sync: " + repo.Name + ", already syncing.")
}
} else {
respondAndLogErr(args[1] + " is not tracked so cannot sync")
}
} else {
respondAndLogErr("Received unrecognized command: " + command)
}
}
func unixSockListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// must remove cfg.SockPath otherwise get "bind: address already in use"
if filepath.Ext(cfg.SockPath) != ".sock" {
panic(fmt.Errorf("Socket file must end with .sock"))
} else if _, err := os.Stat(cfg.SockPath); err == nil {
if err := os.Remove(cfg.SockPath); err != nil {
panic(err)
}
} else if !errors.Is(err, os.ErrNotExist) {
panic(err)
}
ear, err := net.Listen("unix", cfg.SockPath)
if err != nil {
panic(err)
}
outLogger.Println("Listening to unix socket at " + cfg.SockPath)
go func() {
for {
// will exit when ear is closed
conn, err := ear.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
errLogger.Println("Accepted socket error: " + err.Error())
continue
}
errLogger.Println("Unhandlable socket error: " + err.Error())
return
}
connChan <- conn
}
}()
<-stopLisChan
ear.Close()
}
// We use a round-robin strategy. It's not the most efficient, but it's simple
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
func runAsManyAsPossible() {
repos := cfg.Repos
startIdx := repoIdx
for numJobsRunning < cfg.MaxJobs {
repo := repos[repoIdx]
if repo.RunIfPossible() {
numJobsRunning++
}
repoIdx = (repoIdx + 1) % len(repos)
if repoIdx == startIdx {
// we've come full circle
return
}
}
}
func main() { func main() {
outLogger = common.OutLogger()
errLogger = common.ErrLogger()
// check that merlin is run as mirror user // check that merlin is run as mirror user
// check that mirror user has pid of 1001 // check that mirror user has pid of 1001
doneChan := make(chan common.Result) // receives a Result struct when a repo stops syncing
doneChan := make(chan config.SyncResult)
// closed when merlin is told to stop running
stopChan := make(chan struct{}) stopChan := make(chan struct{})
// receives a Conn when a client makes a connection to unix socket
connChan := make(chan net.Conn) connChan := make(chan net.Conn)
// signal channel to stop listening to unix socket
stopLisChan := make(chan struct{}) stopLisChan := make(chan struct{})
stopSig := make(chan os.Signal, 1) stopSig := make(chan os.Signal, 1)
@ -157,24 +36,37 @@ func main() {
unix.Umask(002) unix.Umask(002)
numJobsRunning = 0 numJobsRunning := 0
repoIdx := 0
loadConfig := func() { loadConfig := func() {
cfg = common.GetConfig(doneChan, stopChan) config.LoadConfig(doneChan, stopChan)
outLogger.Println("Loaded config:\n" + fmt.Sprintf("%+v\n", cfg)) logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf))
repoMap = make(map[string]*common.Repo)
for _, repo := range cfg.Repos {
repoMap[repo.Name] = repo
}
repoIdx = 0 repoIdx = 0
go unixSockListener(connChan, stopLisChan) go arthur.UnixSockListener(connChan, stopLisChan)
}
// We use a round-robin strategy. It's not the most efficient, but it's simple
// (read: easy to understand) and guarantees each repo will eventually get a chance to run.
runAsManyAsPossible := func() {
startIdx := repoIdx
for numJobsRunning < config.Conf.MaxJobs {
repo := config.Repos[repoIdx]
if sync.SyncIfPossible(repo) {
numJobsRunning++
}
repoIdx = (repoIdx + 1) % len(config.Repos)
if repoIdx == startIdx {
// we've come full circle
return
}
}
} }
loadConfig() loadConfig()
// IsRunning must be false otherwise repo will never sync // ensure that IsRunning is false otherwise repo will never sync
for _, repo := range repoMap { for _, repo := range config.RepoMap {
repo.State.IsRunning = false repo.State.IsRunning = false
} }
runAsManyAsPossible() runAsManyAsPossible()
@ -192,22 +84,27 @@ runLoop:
loadConfig() loadConfig()
case done := <-doneChan: case done := <-doneChan:
repoMap[done.Name].SyncCompleted(done.Exit) sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
numJobsRunning-- numJobsRunning--
case conn := <-connChan: case conn := <-connChan:
getAndRunCommand(conn) // TODO: may want to split this into GetCommand and something else
// to make it more clear tha GetAndRunCommand returns true if
// it starts a sync
if arthur.GetAndRunCommand(conn) {
numJobsRunning--
}
case <-time.After(1 * time.Minute): case <-time.After(1 * time.Minute):
} }
runAsManyAsPossible() runAsManyAsPossible()
} }
// give time for jobs to terminate before exiting // give time for all jobs to terminate before exiting program
for { for {
select { select {
case done := <-doneChan: case done := <-doneChan:
repoMap[done.Name].SyncCompleted(done.Exit) sync.SyncCompleted(config.RepoMap[done.Name], done.Exit)
numJobsRunning-- numJobsRunning--
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):

View File

@ -1,7 +0,0 @@
package main
import "testing"
func TestSock1(t *testing.T) {
}

View File

@ -1,50 +1,51 @@
package common package sync
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"os"
"strconv" "strconv"
"git.csclub.uwaterloo.ca/public/merlin/config"
) )
func (repo *Repo) buildRsyncHost() string { func buildRsyncHost(repo *config.Repo) string {
if repo.RsyncUser != "" { if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
} }
return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir
} }
func (repo *Repo) buildRsyncDaemonHost() string { func buildRsyncDaemonHost(repo *config.Repo) string {
if repo.RsyncUser != "" { if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
} }
return repo.RsyncHost + "::" + repo.RsyncDir return repo.RsyncHost + "::" + repo.RsyncDir
} }
func (repo *Repo) buildRsyncSSHHost() string { func buildRsyncSSHHost(repo *config.Repo) string {
if repo.RsyncUser != "" { if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
} }
return repo.RsyncHost + ":" + repo.RsyncDir return repo.RsyncHost + ":" + repo.RsyncDir
} }
func (repo *Repo) buildDownloadDir() string { func buildDownloadDir(repo *config.Repo) string {
return repo.cfg.DownloadDir + "/" + repo.LocalDir return config.Conf.DownloadDir + "/" + repo.LocalDir
} }
func (repo *Repo) CSCSyncApache() []string { func cscSyncApache(repo *config.Repo) []string {
args := []string{ args := []string{
"nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links", "nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
} }
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args return args
} }
func (repo *Repo) CSCSyncArchLinux() []string { func cscSyncArchLinux(repo *config.Repo) []string {
tempDir := "" // is this option even needed? tempDir := "" // is this option even needed?
@ -52,83 +53,83 @@ func (repo *Repo) CSCSyncArchLinux() []string {
args := []string{ args := []string{
"rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600", "rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600",
"--contimeout=60", "-p", "--delay-updates", "--no-motd", "--contimeout=60", "-p", "--delay-updates", "--no-motd",
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + repo.cfg.IPv4Address, "--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address,
} }
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args return args
} }
func (repo *Repo) CSCSyncBadPerms() []string { func cscSyncBadPerms(repo *config.Repo) []string {
args := []string{ args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete", "nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
} }
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args return args
} }
// TODO ceph // TODO ceph
func (repo *Repo) CSCSyncCDImage() []string { func cscSyncCDImage(repo *config.Repo) []string {
args := []string{ args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete", "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
} }
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args return args
} }
func (repo *Repo) CSCSyncChmod() []string { func cscSyncChmod(repo *config.Repo) []string {
args := []string{ args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links", "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links",
"--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
"--chmod=u=rwX,go=rX", "--chmod=u=rwX,go=rX",
} }
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args return args
} }
func (repo *Repo) CSCSyncDebian() []string { func cscSyncDebian(repo *config.Repo) []string {
// sync /pool // sync /pool
args := []string{"nice", "rsync", "-rlHtvp", args := []string{"nice", "rsync", "-rlHtvp",
"--exclude", ".~tmp~/", "--timeout=3600", "-4", "--exclude", ".~tmp~/", "--timeout=3600", "-4",
"--address=" + repo.cfg.IPv4Address, "--address=" + config.Conf.IPv4Address,
} }
// $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1 // $RSYNC_HOST::$RSYNC_DIR/pool/ $TO/pool/ >> $LOGFILE 2>&1
return args return args
} }
func (repo *Repo) CSCSyncDebianCD() []string { func cscSyncDebianCD(repo *config.Repo) []string {
// this is basically the same as CSCSyncDebian, except it has an extra --exclude // this is basically the same as CSCSyncDebian, except it has an extra --exclude
args := []string{"nice", "rsync", "-rlHtvp", "--delete", args := []string{"nice", "rsync", "-rlHtvp", "--delete",
"--exclude", ".~tmp~/", "--timeout=3600", "-4", "--exclude", ".~tmp~/", "--timeout=3600", "-4",
"--address=" + repo.cfg.IPv4Address, "--address=" + config.Conf.IPv4Address,
// "--exclude", "Archive-Update-in-Progress-${HOSTNAME}" // "--exclude", "Archive-Update-in-Progress-${HOSTNAME}"
} }
// $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1 // $RSYNC_HOST::$RSYNC_DIR $TO >> $LOGFILE 2>&1
return args return args
} }
func (repo *Repo) CSCSyncGentoo() []string { func cscSyncGentoo(repo *config.Repo) []string {
repo.RsyncUser = "gentoo" repo.RsyncUser = "gentoo"
repo.PasswordFile = "gentoo-distfiles" repo.PasswordFile = "gentoo-distfiles"
return repo.CSCSyncStandard() return cscSyncStandard(repo)
} }
// TODO s3 // TODO s3
func (repo *Repo) CSCSyncSSH() []string { func cscSyncSSH(repo *config.Repo) []string {
args := []string{ args := []string{
"rsync", "-aH", "--no-owner", "--no-group", "--delete", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
@ -138,39 +139,39 @@ func (repo *Repo) CSCSyncSSH() []string {
} }
// not sure if we should be assuming ssh identity file is the password file // not sure if we should be assuming ssh identity file is the password file
args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile)) args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile))
args = append(args, repo.buildRsyncSSHHost(), repo.buildDownloadDir()) args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo))
return args return args
} }
func (repo *Repo) CSCSyncStandard() []string { func cscSyncStandard(repo *config.Repo) []string {
args := []string{ args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + repo.cfg.IPv4Address, "--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
} }
if repo.PasswordFile != "" { if repo.PasswordFile != "" {
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile filename := config.Conf.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename) args = append(args, "--password-file", filename)
} }
args = append(args, repo.buildRsyncHost(), repo.buildDownloadDir()) args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args return args
} }
func (repo *Repo) CSCSyncStandardIPV6() []string { func cscSyncStandardIPV6(repo *config.Repo) []string {
args := []string{ args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + repo.cfg.IPv4Address, "--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
} }
args = append(args, repo.buildRsyncDaemonHost(), repo.buildDownloadDir()) args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args return args
} }
// for testing, to be removed later // for testing, to be removed later
func (repo *Repo) CSCSyncDummy() []string { func cscSyncDummy(repo *config.Repo) []string {
sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10) sleepDur := strconv.FormatInt(rand.Int63n(10)+5, 10)
args := []string{"sleep", sleepDur} args := []string{"sleep", sleepDur}
@ -179,7 +180,7 @@ func (repo *Repo) CSCSyncDummy() []string {
} }
// executes a particular sync job depending on repo.SyncType. // executes a particular sync job depending on repo.SyncType.
func (repo *Repo) getSyncCommand() []string { func getSyncCommand(repo *config.Repo) []string {
/* /*
# scripts used by merlin.py # scripts used by merlin.py
csc-sync-debian csc-sync-debian
@ -208,78 +209,38 @@ func (repo *Repo) getSyncCommand() []string {
switch repo.SyncType { switch repo.SyncType {
case "csc-sync-apache": case "csc-sync-apache":
return repo.CSCSyncApache() return cscSyncApache(repo)
case "csc-sync-archlinux": case "csc-sync-archlinux":
return repo.CSCSyncArchLinux() return cscSyncArchLinux(repo)
case "csc-sync-badperms": case "csc-sync-badperms":
return repo.CSCSyncBadPerms() return cscSyncBadPerms(repo)
case "csc-sync-cdimage": case "csc-sync-cdimage":
return repo.CSCSyncCDImage() return cscSyncCDImage(repo)
// case "csc-sync-ceph": // case "csc-sync-ceph":
// return repo.CSCSyncCeph() // return cscSyncCeph(repo)
case "csc-sync-chmod": case "csc-sync-chmod":
return repo.CSCSyncChmod() return cscSyncChmod(repo)
case "csc-sync-debian": case "csc-sync-debian":
return repo.CSCSyncDebian() return cscSyncDebian(repo)
case "csc-sync-debian-cd": case "csc-sync-debian-cd":
return repo.CSCSyncDebianCD() return cscSyncDebianCD(repo)
case "csc-sync-gentoo": case "csc-sync-gentoo":
return repo.CSCSyncGentoo() return cscSyncGentoo(repo)
// case "csc-sync-s3": // case "csc-sync-s3":
// return repo.CSCSyncS3() // return cscSyncS3(repo)
case "csc-sync-ssh": case "csc-sync-ssh":
return repo.CSCSyncSSH() return cscSyncSSH(repo)
case "csc-sync-standard": case "csc-sync-standard":
return repo.CSCSyncStandard() return cscSyncStandard(repo)
case "csc-sync-standard-ipv6": case "csc-sync-standard-ipv6":
return repo.CSCSyncStandardIPV6() return cscSyncStandardIPV6(repo)
// case "csc-sync-wget": // case "csc-sync-wget":
// return repo.CSCSyncWget() // return cscSyncWget(repo)
case "csc-sync-dummy": case "csc-sync-dummy":
return repo.CSCSyncDummy() return cscSyncDummy(repo)
default: default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
} }
return []string{} return []string{}
} }
func (repo *Repo) StartSyncJob() {
status := FAILURE
defer func() {
repo.DoneChan <- Result{
Name: repo.Name,
Exit: status,
}
}()
localDir := repo.buildDownloadDir()
err := os.MkdirAll(localDir, 0775)
if err != nil {
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
// I'm not sure if logger can handle error so just use the string?
repo.Logger.Error(err.Error())
return
}
args := repo.getSyncCommand()
if repo.PasswordFile != "" {
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename)
}
args = append(args, repo.buildRsyncHost(), localDir)
ch := SpawnProcess(repo, args)
if ch == nil {
// SpawnProcess will have already logged error
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = SUCCESS
case -1:
status = TERMINATED
// default is already FAILURE
}
}

View File

@ -0,0 +1 @@
package sync

59
merlin/sync/interface.go Normal file
View File

@ -0,0 +1,59 @@
package sync
import (
"fmt"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
// start sync job for this repo if more than repo.Frequency seconds have elapsed since its last job
// and is not currently running.
// returns true iff a job is started.
func SyncIfPossible(repo *config.Repo) bool {
// Change to SyncIfPossible
if repo.State.IsRunning {
return false
}
curTime := time.Now().Unix()
if curTime-repo.State.LastAttemptStartTime > int64(repo.Frequency) {
repo.State.IsRunning = true
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
go startSync(repo)
return true
}
return false
}
// update the repo state with the last attempt time and exit now that the job is done
func SyncCompleted(repo *config.Repo, exit int) {
repoState := repo.State
syncTook := time.Now().Unix() - repoState.LastAttemptStartTime
nextSync := repo.MaxTime - int(syncTook)
if nextSync < 0 {
nextSync = 0
}
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptRunTime = syncTook
var exitStr string
switch exit {
case config.SUCCESS:
exitStr = "completed"
case config.TERMINATED:
exitStr = "terminated"
default:
exitStr = "failed"
}
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
if exit == config.SUCCESS {
// it is possible that the zfssync from the last repo sync is still running is that fine?
go zfsSync(repo)
}
}

View File

@ -1,10 +1,13 @@
package common package sync
import ( import (
"fmt" "fmt"
"os"
"os/exec" "os/exec"
"syscall" "syscall"
"time" "time"
"git.csclub.uwaterloo.ca/public/merlin/config"
) )
// SpawnProcess spawns a child process for the given repo. The process will // SpawnProcess spawns a child process for the given repo. The process will
@ -12,7 +15,7 @@ import (
// runs for longer than the repo's MaxTime. // runs for longer than the repo's MaxTime.
// It returns a channel through which a Cmd will be sent once it has finished, // It returns a channel through which a Cmd will be sent once it has finished,
// or nil if it was unable to start a process. // or nil if it was unable to start a process.
func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) { func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
// startTime and time took will be handled in common.go by SyncExit // startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...) cmd := exec.Command(args[0], args[1:]...)
@ -68,3 +71,58 @@ func SpawnProcess(repo *Repo, args []string) (ch <-chan *exec.Cmd) {
}() }()
return return
} }
func startSync(repo *config.Repo) {
status := config.FAILURE
defer func() {
repo.DoneChan <- config.SyncResult{
Name: repo.Name,
Exit: status,
}
}()
localDir := buildDownloadDir(repo)
err := os.MkdirAll(localDir, 0775)
if err != nil {
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
// I'm not sure if logger can handle error so just use the string?
repo.Logger.Error(err.Error())
return
}
args := getSyncCommand(repo)
if repo.PasswordFile != "" {
filename := config.Conf.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename)
}
args = append(args, buildRsyncHost(repo), localDir)
ch := spawnSyncProcess(repo, args)
if ch == nil {
// SpawnProcess will have already logged error
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = config.SUCCESS
case -1:
status = config.TERMINATED
// default is already FAILURE
}
}
func zfsSync(repo *config.Repo) {
out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
if err != nil {
repo.Logger.Error(err)
} else {
f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
repo.Logger.Error(err.Error())
} else {
f.Write(out)
}
}
}