resolve most TODOs and simplify configuration
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Andrew Wang 2022-02-25 23:23:06 -05:00
parent 72c143a00b
commit 1e14603367
16 changed files with 167 additions and 169 deletions

View File

@ -5,18 +5,12 @@ This folder contains the code for merlin (which does the actual syncing) and art
Check out the the [mirror env](https://git.csclub.uwaterloo.ca/public/mirror-env) for a testing environment Check out the the [mirror env](https://git.csclub.uwaterloo.ca/public/mirror-env) for a testing environment
### TODO ### Nice Features To Add
- [ ] ensure that the proper permissions (file mode, group, user) are being used - detect if an rsync process is stuck (watch the stdout/stderr of the rsync processes)
- [ ] document configs - set user and group in config
- [ ] detect if an rsync process is stuck (optional) (\*\*)
- [ ] add bwlimit option for each rsync process (\*\*\*)
\*\* watch the stdout/stderr of the rsync processes
\*\*\* previously wanted to place each rsync process in a separate cgroup
### Completed ### Completed
- [x] add bwlimit option for each rsync process
- [x] write process manager - [x] write process manager
- [x] save state (last attempted time, last attempted status) for each repo, and restore state on startup (e.g. use JSON/INI file for each repo) - [x] save state (last attempted time, last attempted status) for each repo, and restore state on startup (e.g. use JSON/INI file for each repo)
- [x] calculate difference between the scheduled time of a job and the time at which it actually ran; log this - [x] calculate difference between the scheduled time of a job and the time at which it actually ran; log this
@ -27,5 +21,3 @@ Check out the the [mirror env](https://git.csclub.uwaterloo.ca/public/mirror-env
- [x] allow dynamic reloading in merlin - [x] allow dynamic reloading in merlin
- [x] use separate log file for each child process (currently sharing stdout/stderr with parent) - [x] use separate log file for each child process (currently sharing stdout/stderr with parent)
- [x] implement zfssync in merlin (just invoke the existing Python script) - [x] implement zfssync in merlin (just invoke the existing Python script)
\* there are some parts that I don't understand (trace_host, csc-sync-ceph, csc-sync-saltstack, etc)

View File

@ -38,11 +38,13 @@ func GetCommand(conn net.Conn) (command, repoName string) {
return return
} }
// Print a message to stdout also send the message to the connection passed in
func SendAndLog(conn net.Conn, msg string) { func SendAndLog(conn net.Conn, msg string) {
logger.OutLog(msg) logger.OutLog(msg)
conn.Write([]byte(msg)) conn.Write([]byte(msg))
} }
// Send the status of the repos to the connection created passed in
func SendStatus(conn net.Conn) { func SendStatus(conn net.Conn) {
// Force arthur to send back time information in America/Toronto time // Force arthur to send back time information in America/Toronto time
location, err := time.LoadLocation("America/Toronto") location, err := time.LoadLocation("America/Toronto")
@ -80,7 +82,6 @@ func SendStatus(conn net.Conn) {
func ForceSync(conn net.Conn, repoName string) (newSync bool) { func ForceSync(conn net.Conn, repoName string) (newSync bool) {
newSync = false newSync = false
// TODO: send repoName and every key in RepoMap to lowercase
if repo, isInMap := config.RepoMap[repoName]; isInMap { if repo, isInMap := config.RepoMap[repoName]; isInMap {
logger.OutLog("Attempting to force sync of " + repoName) logger.OutLog("Attempting to force sync of " + repoName)
if sync.SyncIfPossible(repo) { if sync.SyncIfPossible(repo) {
@ -95,6 +96,7 @@ func ForceSync(conn net.Conn, repoName string) (newSync bool) {
return return
} }
// Create the unix socket and send a new connection to the connChan channel for every connection made
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) { func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
sockpath := config.Conf.SockPath sockpath := config.Conf.SockPath
// must remove old cfg.SockPath otherwise get "bind: address already in use" // must remove old cfg.SockPath otherwise get "bind: address already in use"
@ -130,7 +132,6 @@ func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
} }
}() }()
// TODO: check handling of multiple SIGHUP
<-stopLisChan <-stopLisChan
ear.Close() ear.Close()
} }

View File

@ -11,6 +11,7 @@ import (
"git.csclub.uwaterloo.ca/public/merlin/logger" "git.csclub.uwaterloo.ca/public/merlin/logger"
) )
// Test that GetCommand is able to accept a connection and parse a request for the status
func TestStatusCommand(t *testing.T) { func TestStatusCommand(t *testing.T) {
r, w := net.Pipe() r, w := net.Pipe()
@ -28,6 +29,7 @@ func TestStatusCommand(t *testing.T) {
} }
} }
// Test that GetCommand is able to accept a connection and parse a request for the a forced sync
func TestSyncCommand(t *testing.T) { func TestSyncCommand(t *testing.T) {
r, w := net.Pipe() r, w := net.Pipe()
@ -44,6 +46,7 @@ func TestSyncCommand(t *testing.T) {
} }
} }
// Test that SendStatus returns the correct status for some repo state
func TestSendStatus(t *testing.T) { func TestSendStatus(t *testing.T) {
saveRepoMap := config.RepoMap saveRepoMap := config.RepoMap
defer func() { defer func() {
@ -94,6 +97,7 @@ lnux Mon, 20 Dec 2021 06:33:20 EST Tue, 21 Dec 2021 06:33:20 EST
} }
} }
// Test that ForceSync behaves properly
func TestForceSync(t *testing.T) { func TestForceSync(t *testing.T) {
saveRepos := config.Repos saveRepos := config.Repos
saveRepoMap := config.RepoMap saveRepoMap := config.RepoMap
@ -194,6 +198,7 @@ func TestForceSync(t *testing.T) {
} }
} }
// Test that StartListener stops properly when told to and that it properly creates a unix socket
func TestStartListener(t *testing.T) { func TestStartListener(t *testing.T) {
saveConf := config.Conf saveConf := config.Conf
connChan := make(chan net.Conn) connChan := make(chan net.Conn)
@ -278,6 +283,5 @@ func TestStartListener(t *testing.T) {
}() }()
sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*") sendMsg("$UPz2L2yWsE^UY8iG9JX@^dBb@5yb*")
// unsure why I can't put this in the defer
os.Remove("/tmp/merlin_listener_test.sock") os.Remove("/tmp/merlin_listener_test.sock")
} }

View File

@ -1,6 +1,7 @@
package config package config
import ( import (
"fmt"
"os" "os"
"path/filepath" "path/filepath"
@ -22,10 +23,10 @@ const (
// could change this into a default_config // could change this into a default_config
DEFAULT_MAX_JOBS = 6 DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4 DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_MAX_RSYNC_IO = -1
DEFAULT_SYNC_TYPE = "csc-sync-standard" DEFAULT_SYNC_TYPE = "csc-sync-standard"
DEFAULT_FREQUENCY_STRING = "by-hourly" DEFAULT_FREQUENCY_STRING = "by-hourly"
DEFAULT_DOWNLOAD_DIR = "/mirror/root" DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states" DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs" DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync" DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
@ -58,34 +59,34 @@ type SyncResult struct {
} }
type Config struct { type Config struct {
// the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"`
// the IP addresses to use for rsync // the IP addresses to use for rsync
IPv4Address string `ini:"ipv4_address"` IPv4Address string `ini:"ipv4_address"`
IPv6Address string `ini:"ipv6_address"` IPv6Address string `ini:"ipv6_address"`
// the maximum number of jobs allowed to execute concurrently
MaxJobs int `ini:"max_jobs"`
// the default maximum time before killing rsync proccess
DefaultMaxTime int `ini:"default_max_time"`
// the default value for the maximum bandwidth a repo can use while syncing
// (set to -1 for unlimited)
DefaultMaxRsyncIO int `ini:"default_max_rsync_io"`
// the default sync type // the default sync type
DefaultSyncType string `ini:"default_sync_type"` DefaultSyncType string `ini:"default_sync_type"`
// the default frequency string // the default sync frequency string
DefaultFrequencyStr string `ini:"default_frequency"` DefaultFrequencyStr string `ini:"default_frequency"`
// the default MaxTime // directory where rsync should download files
DefaultMaxTime int `ini:"default_max_time"`
// the directory where rsync should download files
DownloadDir string `ini:"download_dir"` DownloadDir string `ini:"download_dir"`
// the directory where rsync passwords are stored // directory where the state of each repo is saved
PasswordDir string `ini:"password_dir"`
// the directory where the state of each repo sync is saved
StateDir string `ini:"states_dir"` StateDir string `ini:"states_dir"`
// the directory where merlin will store the general logs for each repo // directory where merlin will store the merlin logs for each repo
RepoLogDir string `ini:"repo_logs_dir"` RepoLogDir string `ini:"repo_logs_dir"`
// the directory to store the rsync logs for each repo // directory to store the rsync logs for each repo
RsyncLogDir string `ini:"rsync_logs_dir"` RsyncLogDir string `ini:"rsync_logs_dir"`
// the directory to store the zfssync logs for each repo // directory to store the zfssync logs for each repo
ZfssyncLogDir string `ini:"zfssync_logs_dir"` ZfssyncLogDir string `ini:"zfssync_logs_dir"`
// the Unix socket path which arthur will use to communicate with us // path to the unix socket for arthur to use for communication
SockPath string `ini:"sock_path"` SockPath string `ini:"sock_path"`
} }
// make it more clear when full path should be used vs when just the file name is needed
type Repo struct { type Repo struct {
// the name of this repo // the name of this repo
Name string `ini:"-"` Name string `ini:"-"`
@ -98,27 +99,29 @@ type Repo struct {
// the maximum time (in seconds) that each child process of this repo // the maximum time (in seconds) that each child process of this repo
// can for before being killed // can for before being killed
MaxTime int `ini:"max_time"` MaxTime int `ini:"max_time"`
// where to download the files for this repo (relative to the download // limit the amount of bandwidth a repo can use while syncing
// dir in the config) // (set to -1 for unlimited)
MaxRsyncIO int `ini:"max_rsync_io"`
// where to download the files for this repo (relative to Conf.DownloadDir)
LocalDir string `ini:"local_dir"` LocalDir string `ini:"local_dir"`
// the remote host to rsync from // the address to the remote host to rsync from
RsyncHost string `ini:"rsync_host"` RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host // the remote directory on the rsync host
RsyncDir string `ini:"rsync_dir"` RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional) // the rsync user (optional)
RsyncUser string `ini:"rsync_user"` RsyncUser string `ini:"rsync_user"`
// the file storing the password for rsync (optional) // full path to file storing the password for rsync (optional)
PasswordFile string `ini:"password_file"` PasswordFile string `ini:"password_file"`
// the file storing the repo sync state (used to override default) // full path to file storing the repo sync state
StateFile string `ini:"state_file"` StateFile string `ini:"-"`
// the full file path for general logging of this repo (used to override default) // full path for file storing general logging of this repo
RepoLogFile string `ini:"repo_log_file"` RepoLogFile string `ini:"-"`
// a reference to the general logger // a pointer to the general logger
Logger *logger.Logger `ini:"-"` Logger *logger.Logger `ini:"-"`
// the full file path for logging this repo's rsync (used to override default) // full file path for file logging this repo's rsync
RsyncLogFile string `ini:"rsync_log_file"` RsyncLogFile string `ini:"-"`
// the full file path for logging this repo's zfssync (used to override default) // full file path for file logging this repo's zfssync
ZfssyncLogFile string `ini:"zfssync_log_file"` ZfssyncLogFile string `ini:"-"`
// the repo will write its name and status in a Result struct to DoneChan // the repo will write its name and status in a Result struct to DoneChan
// when it has finished a job (shared by all repos) // when it has finished a job (shared by all repos)
DoneChan chan<- SyncResult `ini:"-"` DoneChan chan<- SyncResult `ini:"-"`
@ -147,17 +150,16 @@ var (
RepoMap map[string]*Repo RepoMap map[string]*Repo
) )
// GetConfig reads the config from a JSON file, initializes default values, // LoadConfig initializes the default config values then overrides them with
// and initializes the non-configurable fields of each repo. // the values it reads from the INI config file
// It returns a Config.
func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) { func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struct{}) {
// set default values then load config from file // set default values then load config from file
newConf := Config{ newConf := Config{
MaxJobs: DEFAULT_MAX_JOBS, MaxJobs: DEFAULT_MAX_JOBS,
DefaultMaxTime: DEFAULT_MAX_TIME,
DefaultMaxRsyncIO: DEFAULT_MAX_RSYNC_IO,
DefaultSyncType: DEFAULT_SYNC_TYPE, DefaultSyncType: DEFAULT_SYNC_TYPE,
DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING, DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING,
DefaultMaxTime: DEFAULT_MAX_TIME,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR, DownloadDir: DEFAULT_DOWNLOAD_DIR,
StateDir: DEFAULT_STATE_DIR, StateDir: DEFAULT_STATE_DIR,
RepoLogDir: DEFAULT_LOG_DIR, RepoLogDir: DEFAULT_LOG_DIR,
@ -170,18 +172,23 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
err = iniInfo.MapTo(&newConf) err = iniInfo.MapTo(&newConf)
panicIfErr(err) panicIfErr(err)
// check config for major errors // check newConf for possible configuration errors
for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
}
if newConf.IPv4Address == "" { if newConf.IPv4Address == "" {
panic("Missing IPv4 address from config") panic("Missing IPv4 address from config")
} else if newConf.IPv6Address == "" { } else if newConf.IPv6Address == "" {
panic("Missing IPv6 address from config") panic("Missing IPv6 address from config")
} else if _, check := frequencies[newConf.DefaultFrequencyStr]; !check {
panic(fmt.Errorf("%s is not a valid frequency", newConf.DefaultFrequencyStr))
} else if _, err := os.Stat(newConf.DownloadDir); os.IsNotExist(err) {
panic(fmt.Errorf("the path %s does not exist", newConf.DownloadDir))
}
for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
} }
newRepos := make([]*Repo, 0) newRepos := make([]*Repo, 0)
check := false
for _, section := range iniInfo.Sections() { for _, section := range iniInfo.Sections() {
repoName := section.Name() repoName := section.Name()
if repoName == "DEFAULT" { if repoName == "DEFAULT" {
@ -189,13 +196,13 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
} }
// set the default values for the repo then load from file // set the default values for the repo then load from file
// TODO: check if local_dir and repoName are always the same value
// TODO: check to ensure that every Repo.Name is unique (may already be done by ini)
repo := Repo{ repo := Repo{
Name: repoName, Name: repoName,
SyncType: newConf.DefaultSyncType, SyncType: newConf.DefaultSyncType,
FrequencyStr: newConf.DefaultFrequencyStr, FrequencyStr: newConf.DefaultFrequencyStr,
MaxTime: newConf.DefaultMaxTime, MaxTime: newConf.DefaultMaxTime,
MaxRsyncIO: newConf.DefaultMaxRsyncIO,
LocalDir: repoName,
StateFile: filepath.Join(newConf.StateDir, repoName), StateFile: filepath.Join(newConf.StateDir, repoName),
RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log", RepoLogFile: filepath.Join(newConf.RepoLogDir, repoName) + ".log",
RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log", RsyncLogFile: filepath.Join(newConf.RsyncLogDir, repoName) + "-rsync.log",
@ -206,21 +213,33 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
err := section.MapTo(&repo) err := section.MapTo(&repo)
panicIfErr(err) panicIfErr(err)
// TODO: ensure that the parent dirs to the file also exist when touching // checks for validity of repo configuration
// or just remove the ability to override repo.Frequency, check = frequencies[repo.FrequencyStr]
if !check {
panic("Missing or invalid frequency for " + repo.Name)
} else if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
} else if repo.LocalDir == "" {
panic("Missing local download location for " + repo.Name)
} else if repo.RsyncHost == "" {
panic("Missing rsync host for " + repo.Name)
}
localDirFull := filepath.Join(newConf.DownloadDir, repo.LocalDir)
if _, err := os.Stat(localDirFull); os.IsNotExist(err) {
panic("the path " + localDirFull + " does not exist")
} else if repo.PasswordFile != "" {
if _, err := os.Stat(repo.PasswordFile); os.IsNotExist(err) {
panic("the file " + repo.PasswordFile + " does not exist")
}
}
touchFiles( touchFiles(
repo.StateFile, repo.StateFile,
repo.RepoLogFile, repo.RepoLogFile,
repo.RsyncLogFile, repo.RsyncLogFile,
repo.ZfssyncLogFile, repo.ZfssyncLogFile,
) )
repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile) repo.Logger = logger.NewLogger(repo.Name, repo.RepoLogFile)
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.SyncType == "" {
panic("Missing sync type from " + repo.Name)
} else if repo.Frequency == 0 {
panic("Missing or invalid frequency for " + repo.Name)
}
repo.State = RepoState{ repo.State = RepoState{
IsRunning: false, IsRunning: false,
@ -236,7 +255,7 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
} }
if len(newRepos) == 0 { if len(newRepos) == 0 {
panic("No repos found in config") panic("no repos found in config")
} }
Conf = newConf Conf = newConf
@ -247,9 +266,8 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
} }
} }
// save the current state of the repo to a file // Save the current state of the repo to the state file
func (repo *Repo) SaveState() { func (repo *Repo) SaveState() {
// repo.Logger.Debug("Saving state")
state_cfg := ini.Empty() state_cfg := ini.Empty()
if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil { if err := ini.ReflectFrom(state_cfg, &repo.State); err != nil {
repo.Logger.Error(err.Error()) repo.Logger.Error(err.Error())
@ -261,5 +279,4 @@ func (repo *Repo) SaveState() {
if _, err := state_cfg.WriteTo(file); err != nil { if _, err := state_cfg.WriteTo(file); err != nil {
repo.Logger.Error(err.Error()) repo.Logger.Error(err.Error())
} }
// repo.Logger.Debug("Saved state")
} }

View File

@ -36,7 +36,6 @@ func TestLoadConfig(t *testing.T) {
doneChan := make(chan SyncResult) doneChan := make(chan SyncResult)
stopChan := make(chan struct{}) stopChan := make(chan struct{})
LoadConfig("config_test.ini", doneChan, stopChan) LoadConfig("config_test.ini", doneChan, stopChan)
// TODO: Fill out parts not part of the ini or state file
expectedConfig := Config{ expectedConfig := Config{
MaxJobs: 6, MaxJobs: 6,
IPv4Address: "129.97.134.129", IPv4Address: "129.97.134.129",
@ -44,8 +43,8 @@ func TestLoadConfig(t *testing.T) {
DefaultSyncType: "csc-sync-standard", DefaultSyncType: "csc-sync-standard",
DefaultFrequencyStr: "daily", DefaultFrequencyStr: "daily",
DefaultMaxTime: 1000, DefaultMaxTime: 1000,
DownloadDir: "/tmp/test-mirror", DefaultMaxRsyncIO: -1,
PasswordDir: "/home/mirror/passwords", DownloadDir: "test_files/download",
StateDir: "test_files", StateDir: "test_files",
RepoLogDir: "test_files/logs", RepoLogDir: "test_files/logs",
RsyncLogDir: "test_files/rsync", RsyncLogDir: "test_files/rsync",
@ -58,14 +57,15 @@ func TestLoadConfig(t *testing.T) {
FrequencyStr: "tri-hourly", FrequencyStr: "tri-hourly",
Frequency: 10800, Frequency: 10800,
MaxTime: 2000, MaxTime: 2000,
MaxRsyncIO: 100,
LocalDir: "eelinux", LocalDir: "eelinux",
RsyncHost: "rsync.releases.eelinux.ca", RsyncHost: "rsync.releases.eelinux.ca",
RsyncDir: "releases", RsyncDir: "releases",
StateFile: "test_files/eeelinux", StateFile: "test_files/eelinux",
RepoLogFile: "test_files/logs/eelinux.log", RepoLogFile: "test_files/logs/eelinux.log",
Logger: Repos[0].Logger, Logger: Repos[0].Logger,
RsyncLogFile: "test_files/rsync/eelinux.log", RsyncLogFile: "test_files/rsync/eelinux-rsync.log",
ZfssyncLogFile: "test_files/zfssync/eelinux.log", ZfssyncLogFile: "test_files/zfssync/eelinux-zfssync.log",
DoneChan: doneChan, DoneChan: doneChan,
StopChan: stopChan, StopChan: stopChan,
State: RepoState{ State: RepoState{
@ -81,6 +81,7 @@ func TestLoadConfig(t *testing.T) {
FrequencyStr: "daily", FrequencyStr: "daily",
Frequency: 86400, Frequency: 86400,
MaxTime: 1000, MaxTime: 1000,
MaxRsyncIO: -1,
LocalDir: "yoland-releases", LocalDir: "yoland-releases",
RsyncHost: "rsync.releases.yoland.io", RsyncHost: "rsync.releases.yoland.io",
RsyncDir: "releases", RsyncDir: "releases",

View File

@ -6,8 +6,7 @@ ipv6_address = 2620:101:f000:4901:c5c::129
; default_sync_type = csc-sync-standard ; default_sync_type = csc-sync-standard
default_frequency = daily default_frequency = daily
default_max_time = 1000 default_max_time = 1000
download_dir = /tmp/test-mirror download_dir = test_files/download
; password_dir = /home/mirror/passwords
states_dir = test_files states_dir = test_files
repo_logs_dir = test_files/logs repo_logs_dir = test_files/logs
rsync_logs_dir = test_files/rsync rsync_logs_dir = test_files/rsync
@ -18,13 +17,10 @@ sock_path = test_files/test.sock
sync_type = csc-sync-nonstandard sync_type = csc-sync-nonstandard
frequency = tri-hourly frequency = tri-hourly
max_time = 2000 max_time = 2000
max_rsync_io = 100
local_dir = eelinux local_dir = eelinux
rsync_host = rsync.releases.eelinux.ca rsync_host = rsync.releases.eelinux.ca
rsync_dir = releases rsync_dir = releases
state_file = test_files/eeelinux
repo_log_file = test_files/logs/eelinux.log
rsync_log_file = test_files/rsync/eelinux.log
zfssync_log_file = test_files/zfssync/eelinux.log
[yoland] [yoland]
; sync_type = csc-sync-standard ; sync_type = csc-sync-standard
@ -32,8 +28,4 @@ zfssync_log_file = test_files/zfssync/eelinux.log
; max_time = 1000 ; max_time = 1000
local_dir = yoland-releases local_dir = yoland-releases
rsync_host = rsync.releases.yoland.io rsync_host = rsync.releases.yoland.io
rsync_dir = releases rsync_dir = releases
; state_file = test_files/yoland
; repo_log_file = test_files/logs/yoland.log
; rsync_log_file = test_files/rsync/yoland-rsync.log
; zfssync_log_file = test_files/zfssync/yoland-zfssync.log

View File

@ -21,10 +21,6 @@ func touchFile(file string) {
f.Close() f.Close()
} else if fi.IsDir() { } else if fi.IsDir() {
panic(fmt.Errorf("%s is a directory", file)) panic(fmt.Errorf("%s is a directory", file))
// } else if os.Geteuid() != 1001 {
// // mirror is UID 1001
// err := os.Chown(file, 1001, os.Getegid())
// panicIfErr(err)
} }
err = os.Chmod(file, 0644) err = os.Chmod(file, 0644)
panicIfErr(err) panicIfErr(err)

View File

@ -30,14 +30,17 @@ var levels = map[int]string{
var outLogger = log.New(os.Stdout, "", log.LstdFlags) var outLogger = log.New(os.Stdout, "", log.LstdFlags)
var errLogger = log.New(os.Stderr, "", log.LstdFlags) var errLogger = log.New(os.Stderr, "", log.LstdFlags)
// log to stdout
func OutLog(v ...interface{}) { func OutLog(v ...interface{}) {
outLogger.Println(v...) outLogger.Println(v...)
} }
// log to stderr
func ErrLog(v ...interface{}) { func ErrLog(v ...interface{}) {
errLogger.Println(v...) errLogger.Println(v...)
} }
// initialize a logger
func NewLogger(name, file string) *Logger { func NewLogger(name, file string) *Logger {
logger := Logger{ logger := Logger{
Logger: log.New(os.Stderr, "", log.LstdFlags), Logger: log.New(os.Stderr, "", log.LstdFlags),
@ -47,7 +50,8 @@ func NewLogger(name, file string) *Logger {
return &logger return &logger
} }
func (logger *Logger) Log(level int, v ...interface{}) { // write something to the logger
func (logger *Logger) log(level int, v ...interface{}) {
f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) f, err := os.OpenFile(logger.file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil { if err != nil {
ErrLog(err.Error()) ErrLog(err.Error())
@ -62,22 +66,24 @@ func (logger *Logger) Log(level int, v ...interface{}) {
logger.Println(args) logger.Println(args)
} }
// write debug information to the logfile
func (logger *Logger) Debug(v ...interface{}) { func (logger *Logger) Debug(v ...interface{}) {
logger.Log(DEBUG, v...) logger.log(DEBUG, v...)
} }
// write information to the logfile and to stdout
func (logger *Logger) Info(v ...interface{}) { func (logger *Logger) Info(v ...interface{}) {
// src := []interface{}{logger.name + ":"} OutLog(append([]interface{}{"[" + logger.name + "]"}, v...))
// args := append(src, v...) logger.log(INFO, v...)
OutLog(v...)
logger.Log(INFO, v...)
} }
// write warnings to the logfile
func (logger *Logger) Warning(v ...interface{}) { func (logger *Logger) Warning(v ...interface{}) {
logger.Log(WARNING, v...) logger.log(WARNING, v...)
} }
// write errors to the logfile and to stderr
func (logger *Logger) Error(v ...interface{}) { func (logger *Logger) Error(v ...interface{}) {
ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...)) ErrLog(append([]interface{}{"[" + logger.name + "]"}, v...))
logger.Log(ERROR, v...) logger.log(ERROR, v...)
} }

View File

@ -16,6 +16,8 @@ download_dir = /tmp/test-mirror
# add default frequency # add default frequency
# add default sync_type # add default sync_type
; [DEFAULT]
;
[debian] [debian]
@ -198,6 +200,7 @@ local_dir = gnome
rsync_host = master.gnome.org rsync_host = master.gnome.org
rsync_dir = gnomeftp rsync_dir = gnomeftp
; password_file = gnome ; password_file = gnome
; password_file = /home/mirror/passwords/gnome
[damnsmalllinux] [damnsmalllinux]
sync_type = csc-sync-standard sync_type = csc-sync-standard

View File

@ -34,8 +34,10 @@ func main() {
configPath := flag.String("config", DEFAULT_CONFIG_PATH, "alternate config file") configPath := flag.String("config", DEFAULT_CONFIG_PATH, "alternate config file")
flag.Parse() flag.Parse()
// check that merlin is run as mirror user // check the user that program is running under (should be mirror)
// check that mirror user has pid of 1001 if os.Getuid() == 0 {
panic("Merlin should not be run as root")
}
// receives a Result struct when a repo stops syncing // receives a Result struct when a repo stops syncing
doneChan := make(chan config.SyncResult) doneChan := make(chan config.SyncResult)
@ -63,6 +65,7 @@ func main() {
repoIdx = 0 repoIdx = 0
go arthur.StartListener(connChan, stopLisChan) go arthur.StartListener(connChan, stopLisChan)
} }
// We use a round-robin strategy. It's not the most efficient, but it's simple // We use a round-robin strategy. It's not the most efficient, but it's simple
// (read: easy to understand) and guarantees each repo will eventually get a chance to run. // (read: easy to understand) and guarantees each repo will eventually get a chance to run.
runAsManyAsPossible := func() { runAsManyAsPossible := func() {
@ -100,11 +103,13 @@ runLoop:
case <-reloadSig: case <-reloadSig:
stopLisChan <- struct{}{} stopLisChan <- struct{}{}
loadConfig() loadConfig()
// ensure that SyncCompleted can handle it if reloading config
// removes a repo that was already syncing
case done := <-doneChan: case done := <-doneChan:
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) // checking for when a repo is removed while it is still syncing
repo, check := config.RepoMap[done.Name]
if check {
sync.SyncCompleted(repo, done.Exit)
}
numJobsRunning-- numJobsRunning--
case conn := <-connChan: case conn := <-connChan:
@ -119,8 +124,7 @@ runLoop:
default: default:
arthur.SendAndLog(conn, "Received unrecognized command: "+command) arthur.SendAndLog(conn, "Received unrecognized command: "+command)
} }
// None of the arthur functions close the connection so you will need to // close the received connection so that the message is sent
// close it manually for the message to be sent
conn.Close() conn.Close()
case <-time.After(1 * time.Minute): case <-time.After(1 * time.Minute):
@ -132,7 +136,10 @@ runLoop:
for { for {
select { select {
case done := <-doneChan: case done := <-doneChan:
sync.SyncCompleted(config.RepoMap[done.Name], done.Exit) repo, check := config.RepoMap[done.Name]
if check {
sync.SyncCompleted(repo, done.Exit)
}
numJobsRunning-- numJobsRunning--
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):

View File

@ -3,6 +3,7 @@ package sync
import ( import (
"fmt" "fmt"
"os" "os"
"path/filepath"
"git.csclub.uwaterloo.ca/public/merlin/config" "git.csclub.uwaterloo.ca/public/merlin/config"
) )
@ -28,9 +29,8 @@ func buildRsyncSSHHost(repo *config.Repo) string {
return repo.RsyncHost + ":" + repo.RsyncDir return repo.RsyncHost + ":" + repo.RsyncDir
} }
// TODO: remove this (repo.LocalDir should be the full path)
func buildDownloadDir(repo *config.Repo) string { func buildDownloadDir(repo *config.Repo) string {
return config.Conf.DownloadDir + "/" + repo.LocalDir return filepath.Join(config.Conf.DownloadDir, repo.LocalDir)
} }
func cscSyncApache(repo *config.Repo) []string { func cscSyncApache(repo *config.Repo) []string {
@ -188,8 +188,7 @@ func cscSyncStandard(repo *config.Repo) []string {
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile, "--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
} }
if repo.PasswordFile != "" { if repo.PasswordFile != "" {
filename := config.Conf.PasswordDir + "/" + repo.PasswordFile args = append(args, "--password-file", repo.PasswordFile)
args = append(args, "--password-file", filename)
} }
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo)) args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
@ -215,72 +214,56 @@ func cscSyncDummy(repo *config.Repo) []string {
// executes a particular sync job depending on repo.SyncType. // executes a particular sync job depending on repo.SyncType.
func getSyncCommand(repo *config.Repo) (args []string) { func getSyncCommand(repo *config.Repo) (args []string) {
/*
# scripts used by merlin.py
csc-sync-debian
csc-sync-standard
csc-sync-ssh
csc-sync-apache
csc-sync-archlinux
csc-sync-debian-cd
csc-sync-gentoo
csc-sync-wget
csc-sync-s3
csc-sync-standard-ipv6
csc-sync-ceph
zfssync
report_mirror (what is this?)
# other things in bin/
csc-sync-archlinux-old
csc-sync-cdimage
csc-sync-chmod
csc-sync-badperms
make-torrents
ubuntu-releases-sync
*/
if repo.SyncType == "csc-sync-dummy" { if repo.SyncType == "csc-sync-dummy" {
return cscSyncDummy(repo) return cscSyncDummy(repo)
} }
localDir := buildDownloadDir(repo) // check that the download directory exists
if err := os.MkdirAll(localDir, 0775); err != nil { if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) {
repo.Logger.Error(err.Error()) repo.Logger.Error(err.Error())
return return
} }
switch repo.SyncType { switch repo.SyncType {
case "csc-sync-apache": case "csc-sync-apache":
return cscSyncApache(repo) args = cscSyncApache(repo)
case "csc-sync-archlinux": case "csc-sync-archlinux":
return cscSyncArchLinux(repo) args = cscSyncArchLinux(repo)
case "csc-sync-badperms": case "csc-sync-badperms":
return cscSyncBadPerms(repo) args = cscSyncBadPerms(repo)
case "csc-sync-cdimage": case "csc-sync-cdimage":
return cscSyncCDImage(repo) args = cscSyncCDImage(repo)
case "csc-sync-ceph": case "csc-sync-ceph":
return cscSyncCeph(repo) args = cscSyncCeph(repo)
case "csc-sync-chmod": case "csc-sync-chmod":
return cscSyncChmod(repo) args = cscSyncChmod(repo)
case "csc-sync-debian": case "csc-sync-debian":
return cscSyncDebian(repo) args = cscSyncDebian(repo)
case "csc-sync-debian-cd": case "csc-sync-debian-cd":
return cscSyncDebianCD(repo) args = cscSyncDebianCD(repo)
case "csc-sync-gentoo": case "csc-sync-gentoo":
return cscSyncGentoo(repo) args = cscSyncGentoo(repo)
case "csc-sync-s3": case "csc-sync-s3":
return cscSyncS3(repo) args = cscSyncS3(repo)
case "csc-sync-ssh": case "csc-sync-ssh":
return cscSyncSSH(repo) args = cscSyncSSH(repo)
case "csc-sync-standard": case "csc-sync-standard":
return cscSyncStandard(repo) args = cscSyncStandard(repo)
case "csc-sync-standard-ipv6": case "csc-sync-standard-ipv6":
return cscSyncStandardIPV6(repo) args = cscSyncStandardIPV6(repo)
// case "csc-sync-wget": // case "csc-sync-wget":
// return cscSyncWget(repo) // return cscSyncWget(repo)
default: default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'") repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
return
} }
if repo.MaxRsyncIO >= 0 {
args = append(args, fmt.Sprintf("--bwlimit=%d", repo.MaxRsyncIO))
}
// TODO: remove buildDownloadDir and check what cscSyncDebian and co are about
// (the ones that do not append buildDownloadDir at the end)
args = append(args, buildDownloadDir(repo))
return return
} }

View File

@ -53,11 +53,7 @@ func SyncCompleted(repo *config.Repo, exit int) {
repo.SaveState() repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync)) repo.Logger.Info(fmt.Sprintf("Sync "+exitStr+" after running for %d seconds, will run again in %d seconds", syncTook, nextSync))
// We are no longer creating snapshots so we don't need the zfssync script anymore if exit == config.SUCCESS {
go zfsSync(repo)
// // TODO: make it possible for this SyncCompleted to be run without zfsSync being run }
// if exit == config.SUCCESS {
// // it is possible that the zfssync from the last repo sync is still running is that fine?
// go zfsSync(repo)
// }
} }

View File

@ -66,7 +66,6 @@ func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
case <-cmdDoneChan: case <-cmdDoneChan:
if !cmd.ProcessState.Success() { if !cmd.ProcessState.Success() {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode()) repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
// repo.Logger.Info(strings.Join(args, " "))
} }
case <-repo.StopChan: case <-repo.StopChan:
@ -111,16 +110,17 @@ func startSync(repo *config.Repo) {
} }
} }
// func zfsSync(repo *config.Repo) { // we are not using zfs snapshots at the moment
// out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput() func zfsSync(repo *config.Repo) {
// if err != nil { // out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
// repo.Logger.Error(err) // if err != nil {
// } else { // repo.Logger.Error(err)
// f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) // } else {
// if err != nil { // f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
// repo.Logger.Error(err.Error()) // if err != nil {
// } else { // repo.Logger.Error(err.Error())
// f.Write(out) // } else {
// } // f.Write(out)
// } // }
// } // }
}