This commit is contained in:
koitu 2021-10-23 23:14:41 -04:00
parent db2815029d
commit 2833a62378
3 changed files with 153 additions and 81 deletions

View File

@ -1,7 +1,6 @@
package common
import (
"fmt"
"os"
"time"
@ -23,6 +22,7 @@ const (
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_SOCK_PATH = "/run/merlin/merlin.sock"
DEFAULT_STATE_PATH = "/home/mirror/merlin/states"
DEFAULT_LOG_FILE = "/home/mirror/merlin/logs/transfer.log"
DEFAULT_PASSWORD_DIR = "/home/mirror/passwords"
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
@ -43,7 +43,7 @@ var frequencies = map[string]int{
const (
NOT_RUN_YET = iota
SUCCESS
FAILURE
FAILURE // change to TIMEOUT?
TERMINATED // was killed by a signal
)
@ -78,14 +78,10 @@ type Repo struct {
// the repo will write its name to DoneChan when it has finished a job
// (successfully or unsuccessfully)
DoneChan chan<- string `ini:"-"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptTime int64 `ini:"-"`
// whether the last attempt was successful or not
LastAttemptStatus int `ini:"-"`
// a reference to the global config
cfg *Config
// whether this repo is running a job or not
isRunning bool
cfg *Config `ini:"-"`
// a struct that stores the repo's status
state State `ini:"-"`
}
type Config struct {
@ -95,6 +91,8 @@ type Config struct {
MaxTime int `ini:"max_time"`
// the Unix socket path which arthur will use to communicate with us
SockPath string `ini:"sock_path"`
// the path to where the state of each repo's sync is saved
StatePath string `ini:"states_path"`
// the default LogFile for each repo
LogFile string `ini:"log_file"`
// the IP addresses to use for rsync
@ -110,68 +108,21 @@ type Config struct {
Repos []*Repo `ini:"-"`
}
type State struct {
// these are stored in the states folder
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptTime int64 `ini:"last_attempt_time"`
// the number of seconds this repo ran for during its last attempted job
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
// whether the last attempt was successful or not
LastAttemptStatus int `ini:"last_attempt_status"`
// whether this repo is running a job or not
isRunning bool `ini:"is_running"`
}
// IsRunning returns true if the repo is currently running a sync job.
func (repo *Repo) IsRunning() bool {
return repo.isRunning
}
// CSCSyncStandard performs a standard rsync job.
func (repo *Repo) CSCSyncStandard() {
startTime := time.Now().Unix()
status := FAILURE
defer func() {
repo.LastAttemptTime = startTime
repo.LastAttemptStatus = status
repo.isRunning = false
repo.DoneChan <- repo.Name
}()
localDir := repo.cfg.DownloadDir + "/" + repo.LocalDir
err := os.MkdirAll(localDir, 0775)
if err != nil {
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
repo.Logger.Error(err)
return
}
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
address := repo.cfg.IPv4Address
logFile := repo.LogFile
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + logFile,
}
if repo.PasswordFile != "" {
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename)
}
args = append(args, "rsync://"+rsyncHost+"/"+repo.RsyncDir, localDir)
ch := SpawnProcess(repo, args)
if ch == nil {
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = SUCCESS
case -1:
status = TERMINATED
// default is already FAILURE
}
}
// StartSyncJob executes a particular sync job depending on repo.SyncType.
func (repo *Repo) StartSyncJob() {
switch repo.SyncType {
case "csc-sync-standard":
repo.CSCSyncStandard()
default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
}
return repo.state.isRunning
}
// RunIfScheduled starts a sync job for this repo if more than repo.Frequency
@ -179,13 +130,13 @@ func (repo *Repo) StartSyncJob() {
// It returns true iff a job is started.
func (repo *Repo) RunIfScheduled() bool {
// sanity check; don't run if a job is already running
if repo.isRunning {
if repo.state.isRunning {
return false
}
if time.Now().Unix()-repo.LastAttemptTime > int64(repo.Frequency) {
if time.Now().Unix()-repo.state.LastAttemptTime > int64(repo.Frequency) {
// this should be set in the caller's thread so that the check
// above will always work
repo.isRunning = true
repo.state.isRunning = true
go repo.StartSyncJob()
return true
}
@ -204,20 +155,24 @@ func GetConfig() Config {
MaxJobs: DEFAULT_MAX_JOBS,
MaxTime: DEFAULT_MAX_TIME,
SockPath: DEFAULT_SOCK_PATH,
StatePath: DEFAULT_STATE_PATH,
LogFile: DEFAULT_LOG_FILE,
PasswordDir: DEFAULT_PASSWORD_DIR,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
Repos: make([]*Repo, 0),
Repos: make([]*Repo, 0),
}
err = data.MapTo(&cfg)
if err != nil {
if err := data.MapTo(&cfg); err != nil {
panic(err)
}
if cfg.IPv4Address == "" {
if err := os.MkdirAll(cfg.StatePath, 0755); err != nil {
panic("Could not create states path at " + cfg.StatePath)
} else if cfg.IPv4Address == "" {
panic("Missing IPv4 address from config")
} else if cfg.IPv6Address == "" {
panic("Missing IPv6 address from config")
}
doneChan := make(chan string)
cfg.DoneChan = doneChan
for _, section := range data.Sections() {
@ -225,10 +180,10 @@ func GetConfig() Config {
continue
}
repo := Repo{Name: section.Name()}
err = section.MapTo(&repo)
if err != nil {
if err := section.MapTo(&repo); err != nil {
panic(err)
}
repo.Frequency = frequencies[repo.FrequencyStr]
if repo.MaxTime == 0 {
repo.MaxTime = cfg.MaxTime
@ -237,9 +192,17 @@ func GetConfig() Config {
repo.DoneChan = doneChan
repo.StopChan = make(chan bool, 1)
repo.cfg = &cfg
// TODO: save and restore LastAttempt info (from stamps directory)
repo.LastAttemptTime = 0
repo.LastAttemptStatus = NOT_RUN_YET
repo.state = State{
LastAttemptTime: 0,
LastAttemptRunTime: 0,
LastAttemptStatus: NOT_RUN_YET,
isRunning: false,
}
if err := ini.MapTo(&repo.state, cfg.StatePath+"/"+repo.Name); err != nil {
panic(err)
}
cfg.Repos = append(cfg.Repos, &repo)
}
if len(cfg.Repos) == 0 {

105
merlin/common/sync.go Normal file
View File

@ -0,0 +1,105 @@
package common
import (
"fmt"
"os"
"time"
ini "gopkg.in/ini.v1"
)
// CSCSyncStandard performs a standard rsync job.
func (repo *Repo) CSCSyncStandard() {
startTime := time.Now().Unix()
status := FAILURE
defer func() {
repo.state.LastAttemptTime = startTime
repo.state.LastAttemptRunTime = time.Now().Unix() - startTime
repo.state.LastAttemptStatus = status
repo.state.isRunning = false
cfg := ini.Empty()
if err := ini.ReflectFrom(cfg, &repo.state); err != nil {
// log error
}
file, err := os.OpenFile(repo.cfg.StatePath+"/"+repo.Name, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
// log error
}
if _, err := cfg.WriteTo(file); err != nil {
// log error
}
repo.DoneChan <- repo.Name
}()
localDir := repo.cfg.DownloadDir + "/" + repo.LocalDir
err := os.MkdirAll(localDir, 0775)
if err != nil {
err = fmt.Errorf("Could not create directory %s: %w", localDir, err)
repo.Logger.Error(err)
return
}
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
address := repo.cfg.IPv4Address
logFile := repo.LogFile
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + logFile,
}
if repo.PasswordFile != "" {
filename := repo.cfg.PasswordDir + "/" + repo.PasswordFile
args = append(args, "--password-file", filename)
}
args = append(args, "rsync://"+rsyncHost+"/"+repo.RsyncDir, localDir)
ch := SpawnProcess(repo, args)
if ch == nil {
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = SUCCESS
case -1:
status = TERMINATED
// default is already FAILURE
}
}
// StartSyncJob executes a particular sync job depending on repo.SyncType.
func (repo *Repo) StartSyncJob() {
switch repo.SyncType {
/*
# scripts used by merlin.py
csc-sync-debian
csc-sync-standard
csc-sync-ssh
csc-sync-apache
csc-sync-archlinux
csc-sync-debian-cd
csc-sync-gentoo
csc-sync-wget
csc-sync-s3
csc-sync-standard-ipv6
csc-sync-ceph
zfssync
report_mirror (what is this?)
# other things in bin/
csc-sync-archlinux-old
csc-sync-cdimage
csc-sync-chmod
csc-sync-badperms
make-torrents
ubuntu-releases-sync
*/
case "csc-sync-standard":
repo.CSCSyncStandard()
default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
}
}

4
merlin/ubuntu-releases Normal file
View File

@ -0,0 +1,4 @@
; move to states folder
last_run = 0
runtime = 0
status = 0