add trace for debian and archlinux

This commit is contained in:
Andrew Wang 2022-06-04 18:45:10 -04:00
parent 1a6a3f01f4
commit f06600c228
11 changed files with 704 additions and 409 deletions

View File

@ -23,11 +23,12 @@ const (
// could change this into a default_config
DEFAULT_MAX_JOBS = 6
DEFAULT_MAX_TIME = DAILY / 4
DEFAULT_MAX_RSYNC_IO = -1
DEFAULT_MAX_RSYNC_IO = 0
DEFAULT_SYNC_TYPE = "csc-sync-standard"
DEFAULT_FREQUENCY_STRING = "by-hourly"
DEFAULT_DOWNLOAD_DIR = "/mirror/root"
DEFAULT_STATE_DIR = "/home/mirror/merlin/states"
DEFAULT_TRACE_DIR = "/home/mirror/merlin/trace"
DEFAULT_STATE_DIR = "/home/mirror/merlin/state"
DEFAULT_LOG_DIR = "/home/mirror/merlin/logs"
DEFAULT_RSYNC_LOG_DIR = "/home/mirror/merlin/logs-rsync"
DEFAULT_ZFSSYNC_LOG_DIR = "/home/mirror/merlin/logs-zfssync"
@ -75,8 +76,10 @@ type Config struct {
DefaultFrequencyStr string `ini:"default_frequency"`
// directory where rsync should download files
DownloadDir string `ini:"download_dir"`
// directory where the trace of each repo is saved
TraceDir string `ini:"trace_dir"`
// directory where the state of each repo is saved
StateDir string `ini:"states_dir"`
StateDir string `ini:"state_dir"`
// directory where merlin will store the merlin logs for each repo
RepoLogDir string `ini:"repo_logs_dir"`
// directory to store the rsync logs for each repo
@ -100,13 +103,15 @@ type Repo struct {
// can for before being killed
MaxTime int `ini:"max_time"`
// limit the amount of bandwidth a repo can use while syncing
// (set to -1 for unlimited)
// (set to 0 to disable the limit) (unit is KiB)
MaxRsyncIO int `ini:"max_rsync_io"`
// where to download the files for this repo (relative to Conf.DownloadDir)
LocalDir string `ini:"local_dir"`
// the address to the trace file (how this url will be used depends on SyncType)
TraceUrl string `ini:"trace_url"`
// the address to the remote host to rsync from
RsyncHost string `ini:"rsync_host"`
// the remote directory on the rsync host
// the remote directory on the rsync host (optional)
RsyncDir string `ini:"rsync_dir"`
// the rsync user (optional)
RsyncUser string `ini:"rsync_user"`
@ -161,6 +166,7 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
DefaultSyncType: DEFAULT_SYNC_TYPE,
DefaultFrequencyStr: DEFAULT_FREQUENCY_STRING,
DownloadDir: DEFAULT_DOWNLOAD_DIR,
TraceDir: DEFAULT_TRACE_DIR,
StateDir: DEFAULT_STATE_DIR,
RepoLogDir: DEFAULT_LOG_DIR,
RsyncLogDir: DEFAULT_RSYNC_LOG_DIR,
@ -182,7 +188,13 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
} else if _, err := os.Stat(newConf.DownloadDir); os.IsNotExist(err) {
panic(fmt.Errorf("the path %s does not exist", newConf.DownloadDir))
}
for _, dir := range []string{newConf.StateDir, newConf.RepoLogDir, newConf.RsyncLogDir, newConf.ZfssyncLogDir} {
for _, dir := range []string{
newConf.TraceDir,
newConf.StateDir,
newConf.RepoLogDir,
newConf.RsyncLogDir,
newConf.ZfssyncLogDir,
} {
err := os.MkdirAll(dir, 0755)
panicIfErr(err)
}

View File

@ -43,7 +43,7 @@ func TestLoadConfig(t *testing.T) {
DefaultSyncType: "csc-sync-standard",
DefaultFrequencyStr: "daily",
DefaultMaxTime: 1000,
DefaultMaxRsyncIO: -1,
DefaultMaxRsyncIO: 0,
DownloadDir: "test_files/download",
StateDir: "test_files",
RepoLogDir: "test_files/logs",
@ -81,7 +81,7 @@ func TestLoadConfig(t *testing.T) {
FrequencyStr: "daily",
Frequency: 86400,
MaxTime: 1000,
MaxRsyncIO: -1,
MaxRsyncIO: 0,
LocalDir: "yoland-releases",
RsyncHost: "rsync.releases.yoland.io",
RsyncDir: "releases",

View File

@ -7,7 +7,7 @@ ipv6_address = 2620:101:f000:4901:c5c::129
default_frequency = daily
default_max_time = 1000
download_dir = test_files/download
states_dir = test_files
state_dir = test_files
repo_logs_dir = test_files/logs
rsync_logs_dir = test_files/rsync
zfssync_logs_dir = test_files/zfssync

View File

@ -1,3 +1,5 @@
github.com/andelf/go-curl v0.0.0-20200630032108-fd49ff24ed97 h1:Nyfs+rh56aORy2tGMI9GCYEqTfePwL1v47qOzebfv/o=
github.com/andelf/go-curl v0.0.0-20200630032108-fd49ff24ed97/go.mod h1:WO1d2m1QDzkoPcgn9lgHVMi7qQR5j3jxYjIIvMTHpC0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

View File

@ -3,7 +3,7 @@ max_jobs = 6
sock_path = /run/merlin/merlin.sock
states_path = /home/test-mirror/merlin/states
# log_path? (just spes a dir and make files)
; log_path? (just spes a dir and make files)
log_file = /tmp/test-mirror/test.log
ipv4_address = 129.97.134.129
@ -12,9 +12,9 @@ download_dir = /tmp/test-mirror
; 5 hours
; default_max_time = 18000
# add default max_time
# add default frequency
# add default sync_type
; add default max_time
; add default frequency
; add default sync_type
; [DEFAULT]
;
@ -125,10 +125,11 @@ rsync_dir = CPAN
sync_type = csc-sync-ssh
frequency = twice-daily
local_dir = CRAN
rsync_host = cran.r-project.org
rsync_dir = ""
; rsync_user = cran-rsync ~/.ssh/id_cran_rsa
; password_file = ~/.ssh/id_cran_rsa
rsync_host = cran.r-project.org
; address is $RSYNC_HOST:$RSYNC_DIR/ (and $RSYNC_DIR="") but rsync_dir should not be empty
rsync_dir = /
rsync_user = cran-rsync
password_file = ~/.ssh/id_cran_rsa
[CTAN]
sync_type = csc-sync-standard
@ -260,11 +261,15 @@ rsync_dir = applicationdata
; We are a Tier 1 arch mirror (https://bugs.archlinux.org/task/52853)
; so our IP is important.
; our rsync_host is rsync://rsync.archlinux.org/ftp_tier1
[archlinux]
; csc-sync-standard archlinux archlinux.mirror.rafal.ca archlinux
sync_type = csc-sync-archlinux
frequency = five-minutely
local_dir = archlinux
rsync_host = rsync.archlinux.org
rsync_dir = ftp_tier1
trace_host = http://rsync.archlinux.org/lastupdate
[debian-ports]
sync_type = csc-sync-standard
@ -358,13 +363,12 @@ local_dir = gutenberg
rsync_host = ftp@ftp.ibiblio.org
rsync_dir = gutenberg
; I donno what csc-sync-wget is doing
[racket-installers]
sync_type = csc-sync-wget
frequency = twice-daily
local_dir = racket/racket-installers
; path = https://mirror.racket-lang.org/installers/
; cut = 1
rsync_host = https://mirror.racket-lang.org/installers/
; for csc-sync-wget the --cut-dirs=1 is hardcoded
[plt-bundles]
sync_type = csc-sync-standard
@ -462,7 +466,7 @@ rsync_dir = tdf-pub
sync_type = csc-sync-s3
frequency = daily
local_dir = saltstack
; endpoint = https://s3.repo.saltproject.io
rsync_host = https://s3.repo.saltproject.io
; [kali]
; csc-sync-standard kali kali.mirror.globo.tech kali

View File

@ -1,279 +0,0 @@
package sync
import (
"fmt"
"os"
"path/filepath"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
func buildRsyncHost(repo *config.Repo) string {
if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
return "rsync://" + repo.RsyncHost + "/" + repo.RsyncDir
}
func buildRsyncDaemonHost(repo *config.Repo) string {
if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
return repo.RsyncHost + "::" + repo.RsyncDir
}
func buildRsyncSSHHost(repo *config.Repo) string {
if repo.RsyncUser != "" {
repo.RsyncHost = repo.RsyncUser + "@" + repo.RsyncHost
}
return repo.RsyncHost + ":" + repo.RsyncDir
}
func buildDownloadDir(repo *config.Repo) string {
return filepath.Join(config.Conf.DownloadDir, repo.LocalDir)
}
func cscSyncApache(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-az", "--no-owner", "--no-group", "--delete", "--safe-links",
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncArchLinux(repo *config.Repo) []string {
tempDir := "" // is this option even needed?
// add option for verbose flag?
args := []string{
"rsync", "-rtlH", "--safe-links", "--delete-after", "--timeout=600",
"--contimeout=60", "-p", "--delay-updates", "--no-motd",
"--temp-dir=" + tempDir, "--log-file=" + repo.RsyncLogFile, "--address=" + config.Conf.IPv4Address,
}
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncBadPerms(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--chmod=o=rX", "--delete",
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncCDImage(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete",
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".*/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncCeph(repo *config.Repo) []string {
args := []string{
"rsync", "--stats", "--progress", "--quiet", "-4", "--address=" + config.Conf.IPv4Address,
repo.RsyncHost + "::ceph",
"--recursive", "--times", "--links", "--hard-links",
"--exclude", "Packages*",
"--exclude", "Sources*",
"--exclude", "Release*",
"--exclude", "InRelease",
"--exclude", "i18n/*",
"--exclude", "ls-lR*",
"--exclude", "repodata/*",
}
args = append(args, buildDownloadDir(repo))
args = append(args, []string{
"&&", "rsync", "--stats", "--progress", "--quiet", "-4", "--address=" + config.Conf.IPv4Address,
repo.RsyncHost + "::ceph",
"--recursive", "--times", "--links", "--hard-links", "--delete-after",
}...)
args = append(args, buildDownloadDir(repo))
return args
}
func cscSyncChmod(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after", "--delay-updates", "--safe-links",
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
"--chmod=u=rwX,go=rX",
}
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncDebian(repo *config.Repo) []string {
// sync /pool
args := []string{"nice", "rsync", "-rlHtvp",
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
"--address=" + config.Conf.IPv4Address,
}
args = append(args, buildRsyncDaemonHost(repo) + "/pool/", buildDownloadDir(repo) + "/pool/")
args = append(args, []string{"&&", "nice", "rsync", "-rlHtvp",
"--delay-updates", "--delete-after",
// --exclude "Archive-Update-in-Progress-${HOSTNAME}" \
// --exclude "${TRACE_DIR}/${HOSTNAME}" \
"--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
}...)
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncDebianCD(repo *config.Repo) []string {
// this is basically the same as CSCSyncDebian, except it has an extra --exclude
args := []string{"nice", "rsync", "-rlHtvp", "--delete",
"--exclude", ".~tmp~/", "--timeout=3600", "-4",
"--address=" + config.Conf.IPv4Address,
// "--exclude", "Archive-Update-in-Progress-${HOSTNAME}"
}
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncGentoo(repo *config.Repo) []string {
repo.RsyncUser = "gentoo"
repo.PasswordFile = "gentoo-distfiles"
return cscSyncStandard(repo)
}
func cscSyncS3(repo *config.Repo) []string {
// set these env vars before using
// export RCLONE_CONFIG_S3_ENDPOINT=https://s3.repo.saltproject.io RCLONE_CONFIG_S3_TYPE=s3 RCLONE_CONFIG_S3_PROVIDER=Other RCLONE_CONFIG_S3_ENV_AUTH=false
args := []string{
"nice", "rclone", "sync", "--fast-list", "--use-server-modtime",
"--bind", config.Conf.IPv4Address,
"s3:s3/",
}
args = append(args, buildDownloadDir(repo))
return args
}
func cscSyncSSH(repo *config.Repo) []string {
args := []string{
"rsync", "-aH", "--no-owner", "--no-group", "--delete",
"--timeout=3600", "-4",
"--exclude", ".~tmp~/",
"--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
// not sure if we should be assuming ssh identity file is the password file
args = append(args, "-e", fmt.Sprintf("'ssh -i %s'", repo.PasswordFile))
args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncStandard(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-4", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
if repo.PasswordFile != "" {
args = append(args, "--password-file", repo.PasswordFile)
}
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncStandardIPV6(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH", "--no-owner", "--no-group", "--delete-after",
"--delay-updates", "--safe-links", "--timeout=3600", "-6", "--address=" + config.Conf.IPv4Address,
"--exclude", ".~tmp~/", "--quiet", "--stats", "--log-file=" + repo.RsyncLogFile,
}
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncDummy(repo *config.Repo) []string {
args := []string{"sleep", "1"}
return args
}
// executes a particular sync job depending on repo.SyncType.
func getSyncCommand(repo *config.Repo) (args []string) {
if repo.SyncType == "csc-sync-dummy" {
return cscSyncDummy(repo)
}
// check that the download directory exists
if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) {
repo.Logger.Error(err.Error())
return
}
switch repo.SyncType {
case "csc-sync-apache":
args = cscSyncApache(repo)
case "csc-sync-archlinux":
args = cscSyncArchLinux(repo)
case "csc-sync-badperms":
args = cscSyncBadPerms(repo)
case "csc-sync-cdimage":
args = cscSyncCDImage(repo)
case "csc-sync-ceph":
args = cscSyncCeph(repo)
case "csc-sync-chmod":
args = cscSyncChmod(repo)
case "csc-sync-debian":
args = cscSyncDebian(repo)
case "csc-sync-debian-cd":
args = cscSyncDebianCD(repo)
case "csc-sync-gentoo":
args = cscSyncGentoo(repo)
case "csc-sync-s3":
args = cscSyncS3(repo)
case "csc-sync-ssh":
args = cscSyncSSH(repo)
case "csc-sync-standard":
args = cscSyncStandard(repo)
case "csc-sync-standard-ipv6":
args = cscSyncStandardIPV6(repo)
// case "csc-sync-wget":
// return cscSyncWget(repo)
default:
repo.Logger.Error("Unrecognized sync type", "'"+repo.SyncType+"'")
return
}
if repo.MaxRsyncIO >= 0 {
args = append(args, fmt.Sprintf("--bwlimit=%d", repo.MaxRsyncIO))
}
// TODO: remove buildDownloadDir and check what cscSyncDebian and co are about
// (the ones that do not append buildDownloadDir at the end)
args = append(args, buildDownloadDir(repo))
return
}

View File

@ -22,7 +22,7 @@ func SyncIfPossible(repo *config.Repo) bool {
repo.State.LastAttemptStartTime = curTime
repo.SaveState()
repo.Logger.Info(fmt.Sprintf("Repo %s has started syncing", repo.Name))
go startSync(repo)
go startRepoSync(repo)
return true
}
return false

139
merlin/sync/process.go Normal file
View File

@ -0,0 +1,139 @@
package sync
import (
"fmt"
"os"
"os/exec"
"syscall"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
// SpawnProcess spawns a child process for the given repo. The process will
// be stopped early if the repo receives a stop signal, or if the process
// runs for longer than the repo's MaxTime.
// It returns a channel through which a Cmd will be sent once it has finished,
// or nil if it was unable to start a process.
func spawnProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
// startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...)
repo.Logger.Debug("Starting process")
if err := cmd.Start(); err != nil {
repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err).Error())
return
}
cmdChan := make(chan *exec.Cmd)
ch = cmdChan
cmdDoneChan := make(chan struct{})
killProcess := func() {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
repo.Logger.Error("Could not send signal to process:", err)
return
}
select {
case <-time.After(30 * time.Second):
repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL")
cmd.Process.Signal(syscall.SIGKILL)
case <-cmdDoneChan:
repo.Logger.Debug("Process has been stopped.")
}
}
go func() {
cmd.Wait()
close(cmdDoneChan)
}()
go func() {
defer func() {
cmdChan <- cmd
}()
select {
case <-cmdDoneChan:
if !cmd.ProcessState.Success() {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
}
case <-repo.StopChan:
repo.Logger.Debug("Received signal to stop, killing process...")
killProcess()
case <-time.After(time.Duration(repo.MaxTime) * time.Second):
repo.Logger.Warning("Process has exceeded its max time; killing now")
killProcess()
}
}()
return
}
func startRepoSync(repo *config.Repo) {
status := config.FAILURE
defer func() {
repo.DoneChan <- config.SyncResult{
Name: repo.Name,
Exit: status,
}
}()
if repo.TraceUrl != "" {
repo.Logger.Debug("Checking for changes")
continueSync := true
status, continueSync = checkIfSyncNeeded(repo)
if !continueSync {
return
}
repo.Logger.Debug("Changes found; will attempt to sync")
}
args := getSyncCommand(repo)
if len(args) == 0 {
// getSyncCommand will have already logged error
return
}
// clear the rsync log file before starting the sync
if repo.RsyncLogFile != "" {
err := os.Truncate(repo.RsyncLogFile, 0)
if err != nil {
repo.Logger.Error(err.Error())
}
}
ch := spawnProcess(repo, args)
if ch == nil {
// spawnSyncProcess will have already logged error
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = config.SUCCESS
case -1:
status = config.TERMINATED
// default is already FAILURE
}
}
// we are not using zfs snapshots at the moment
func zfsSync(repo *config.Repo) {
// out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
// if err != nil {
// repo.Logger.Error(err)
// } else {
// f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
// if err != nil {
// repo.Logger.Error(err.Error())
// } else {
// f.Write(out)
// }
// }
}

View File

@ -3,124 +3,367 @@ package sync
import (
"fmt"
"os"
"os/exec"
"syscall"
"time"
"path/filepath"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
// SpawnProcess spawns a child process for the given repo. The process will
// be stopped early if the repo receives a stop signal, or if the process
// runs for longer than the repo's MaxTime.
// It returns a channel through which a Cmd will be sent once it has finished,
// or nil if it was unable to start a process.
func spawnSyncProcess(repo *config.Repo, args []string) (ch <-chan *exec.Cmd) {
// clear the rsync log file before starting the sync
if repo.RsyncLogFile != "" {
err := os.Truncate(repo.RsyncLogFile, 0)
if err != nil {
repo.Logger.Error(err.Error())
}
func buildRsyncHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return "rsync://" + rsyncHost + "/" + repo.RsyncDir
}
func buildRsyncDaemonHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return rsyncHost + "::" + repo.RsyncDir
}
func buildRsyncSSHHost(repo *config.Repo) string {
rsyncHost := repo.RsyncHost
if repo.RsyncUser != "" {
rsyncHost = repo.RsyncUser + "@" + rsyncHost
}
return rsyncHost + ":" + repo.RsyncDir
}
func buildDownloadDir(repo *config.Repo) string {
return filepath.Join(config.Conf.DownloadDir, repo.LocalDir)
}
const (
noOwnerNoGroup = 1 << iota
timeout3600
logFile
quiet
ipv4
ipv6
delete
delayUpdatesDeleteAfter
)
func addConditionalFlags(repo *config.Repo, flags int) []string {
args := []string{}
if flags&noOwnerNoGroup != 0 {
args = append(args, "--no-owner", "--no-group")
}
if flags&timeout3600 != 0 {
args = append(args, "--timeout=3600")
}
if flags&logFile != 0 {
args = append(args, "--log-file="+repo.RsyncLogFile)
}
if flags&quiet != 0 {
args = append(args, "--quiet")
}
// startTime and time took will be handled in common.go by SyncExit
cmd := exec.Command(args[0], args[1:]...)
if flags&ipv4 != 0 {
args = append(args, "-4", "--address="+config.Conf.IPv4Address)
} else if flags&ipv6 != 0 {
args = append(args, "-6", "--address="+config.Conf.IPv6Address)
}
repo.Logger.Debug("Starting process")
if err := cmd.Start(); err != nil {
repo.Logger.Error(fmt.Errorf("could not start process for %s: %w", repo.Name, err).Error())
if flags&delete != 0 {
args = append(args, "--delete")
}
if flags&delayUpdatesDeleteAfter != 0 {
args = append(args, "--delay-updates", "--delete-after")
}
// add bwlimit (bandwidth limit) only when MaxRsyncIO is set to positive value
if repo.MaxRsyncIO > 0 {
args = append(args, fmt.Sprintf("--bwlimit=%d", repo.MaxRsyncIO))
}
return args
}
func cscSyncApache(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-az",
"--delete",
"--safe-links",
"--stats",
"--exclude", ".~tmp~/",
}
args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4)...)
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncArchLinux(repo *config.Repo) []string {
tempDir := "/home/mirror/tmp"
args := []string{
"rsync", "-rtlHp",
"--safe-links",
"--timeout=600",
"--contimeout=60",
"--no-motd",
"--temp-dir=" + tempDir,
"--address=" + config.Conf.IPv4Address,
}
args = append(args, addConditionalFlags(repo, logFile|delayUpdatesDeleteAfter)...)
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncBadPerms(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH",
"--chmod=o=rX",
"--exclude", ".~tmp~/",
"--stats",
}
args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delete)...)
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncCDImage(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH",
"--exclude", "\".*/\"",
"--stats",
}
args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delete)...)
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncCeph(repo *config.Repo) []string {
args := []string{
"rsync", "--stats", "--progress",
repo.RsyncHost + "::ceph",
"--recursive", "--times", "--links",
"--hard-links",
"--exclude", "Packages*",
"--exclude", "Sources*",
"--exclude", "Release*",
"--exclude", "InRelease",
"--exclude", "i18n/*",
"--exclude", "ls-lR*",
"--exclude", "repodata/*",
}
args = append(args, addConditionalFlags(repo, quiet|ipv4)...)
args = append(args, buildDownloadDir(repo))
args = append(args, []string{
"&&", "rsync", "--stats", "--progress",
repo.RsyncHost + "::ceph",
"--recursive", "--times", "--links",
"--hard-links", "--delete-after",
}...)
args = append(args, addConditionalFlags(repo, quiet|ipv4)...)
args = append(args, buildDownloadDir(repo))
return args
}
func cscSyncChmod(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH",
"--safe-links",
"--exclude", ".~tmp~/",
"--stats",
"--chmod=u=rwX,go=rX",
}
args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delayUpdatesDeleteAfter)...)
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncDebian(repo *config.Repo) []string {
// some repos specify a trace file but does not exit script even if trace files are the same
// https://git.csclub.uwaterloo.ca/public/mirror/src/branch/go/bin/csc-sync-debian#L205-L218
// ubuntu repos are trying to get trace/drescher.canonical.com but this file does not exist
// http://archive.ubuntu.com/ubuntu/project/trace/
// if are adding a trace check ensure it can handle when the file does not exist on the remote
args := []string{
"nice", "rsync", "-rlHtvp",
"--exclude", ".~tmp~/",
}
args = append(args, addConditionalFlags(repo, timeout3600|logFile|quiet|ipv4)...)
args = append(args, buildRsyncDaemonHost(repo)+"/pool/", buildDownloadDir(repo)+"/pool/")
args = append(args, []string{
"&&", "nice", "rsync", "-rlHtvp",
"--exclude", ".~tmp~/",
}...)
args = append(args, addConditionalFlags(repo, timeout3600|logFile|quiet|ipv4|delayUpdatesDeleteAfter)...)
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
// TODO: run 'LANG=C date -u > "${TO}/${TRACE_DIR}/${HOSTNAME}"' after the sync is done
return args
}
func cscSyncDebianCD(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-rlHtvp4",
"--exclude", ".~tmp~/",
}
args = append(args, addConditionalFlags(repo, timeout3600|logFile|quiet|ipv4|delete)...)
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncGentoo(repo *config.Repo) []string {
repo.RsyncUser = "gentoo"
repo.PasswordFile = "gentoo-distfiles"
return cscSyncStandard(repo)
}
// TODO: check for special stuff that rcloning S3 needs
func cscSyncS3(repo *config.Repo) []string {
args := []string{
// RsyncHost is just a regular host (https://s3.repo.saltproject.io) in this case
"RCLONE_CONFIG_S3_ENDPOINT=" + repo.RsyncHost,
"RCLONE_CONFIG_S3_TYPE=s3",
"RCLONE_CONFIG_S3_PROVIDER=Other",
"RCLONE_CONFIG_S3_ENV_AUTH=false",
"nice", "rclone", "sync",
"--fast-list",
"--use-server-modtime",
"--bind", config.Conf.IPv4Address,
"s3:s3/",
}
args = append(args, buildDownloadDir(repo))
return args
}
func cscSyncSSH(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH",
"--exclude", ".~tmp~/",
"--stats", "-4",
}
args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|delete)...)
// PasswordFile should point to the SSH_KEYFILE
args = append(args, "-e", fmt.Sprintf("'ssh -b %s -i %s'", config.Conf.IPv4Address, repo.PasswordFile))
args = append(args, buildRsyncSSHHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncStandard(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH",
"--safe-links",
"--exclude", ".~tmp~/",
"--stats",
}
if repo.PasswordFile != "" {
args = append(args, "--password-file", repo.PasswordFile)
}
args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv4|delayUpdatesDeleteAfter)...)
args = append(args, buildRsyncHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncStandardIPV6(repo *config.Repo) []string {
args := []string{
"nice", "rsync", "-aH",
"--safe-links",
"--exclude", ".~tmp~/",
"--stats",
}
if repo.PasswordFile != "" {
args = append(args, "--password-file", repo.PasswordFile)
}
args = append(args, addConditionalFlags(repo, noOwnerNoGroup|timeout3600|logFile|quiet|ipv6|delayUpdatesDeleteAfter)...)
args = append(args, buildRsyncDaemonHost(repo), buildDownloadDir(repo))
return args
}
func cscSyncWget(repo *config.Repo) []string {
args := []string{
"nice", "wget", "-q",
"--bind-address=" + config.Conf.IPv4Address,
"--mirror",
"--no-parent",
"--no-host-directories",
"--cut-dirs=1",
"--content-disposition",
"--execute",
"robots=off",
"--recursive",
"--reject", "\"*\\?*\"",
"--directory-prefix=" + buildDownloadDir(repo),
// RsyncHost is just a regular host (https://mirror.racket-lang.org/installers/) in this case
repo.RsyncHost,
}
return args
}
func cscSyncDummy(repo *config.Repo) []string {
return []string{"sleep", "1"}
}
// executes a particular sync job depending on repo.SyncType.
func getSyncCommand(repo *config.Repo) (args []string) {
if repo.SyncType == "csc-sync-dummy" {
return cscSyncDummy(repo)
}
// check that the download directory exists
if _, err := os.Stat(buildDownloadDir(repo)); os.IsNotExist(err) {
repo.Logger.Error(err.Error())
return
}
cmdChan := make(chan *exec.Cmd)
ch = cmdChan
cmdDoneChan := make(chan struct{})
killProcess := func() {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
repo.Logger.Error("Could not send signal to process:", err)
return
}
select {
case <-time.After(30 * time.Second):
repo.Logger.Warning("Process still hasn't stopped after 30 seconds; sending SIGKILL")
cmd.Process.Signal(syscall.SIGKILL)
case <-cmdDoneChan:
repo.Logger.Debug("Process has been stopped.")
}
}
go func() {
cmd.Wait()
close(cmdDoneChan)
}()
go func() {
defer func() {
cmdChan <- cmd
}()
select {
case <-cmdDoneChan:
if !cmd.ProcessState.Success() {
repo.Logger.Warning("Process ended with status code", cmd.ProcessState.ExitCode())
}
case <-repo.StopChan:
repo.Logger.Debug("Received signal to stop, killing process...")
killProcess()
case <-time.After(time.Duration(repo.MaxTime) * time.Second):
repo.Logger.Warning("Process has exceeded its max time; killing now")
killProcess()
}
}()
return
}
func startSync(repo *config.Repo) {
status := config.FAILURE
defer func() {
repo.DoneChan <- config.SyncResult{
Name: repo.Name,
Exit: status,
}
}()
args := getSyncCommand(repo)
if len(args) == 0 {
// getSyncCommand will have already logged error
switch repo.SyncType {
case "csc-sync-apache":
return cscSyncApache(repo)
case "csc-sync-archlinux":
return cscSyncArchLinux(repo)
case "csc-sync-badperms":
return cscSyncBadPerms(repo)
case "csc-sync-cdimage":
return cscSyncCDImage(repo)
case "csc-sync-ceph":
return cscSyncCeph(repo)
case "csc-sync-chmod":
return cscSyncChmod(repo)
case "csc-sync-debian":
return cscSyncDebian(repo)
case "csc-sync-debian-cd":
return cscSyncDebianCD(repo)
case "csc-sync-gentoo":
return cscSyncGentoo(repo)
case "csc-sync-s3":
return cscSyncS3(repo)
case "csc-sync-ssh":
return cscSyncSSH(repo)
case "csc-sync-standard":
return cscSyncStandard(repo)
case "csc-sync-standard-ipv6":
return cscSyncStandardIPV6(repo)
case "csc-sync-wget":
return cscSyncWget(repo)
default:
repo.Logger.Error("Unrecognized sync type ", "'"+repo.SyncType+"'")
return
}
ch := spawnSyncProcess(repo, args)
if ch == nil {
// spawnSyncProcess will have already logged error
return
}
cmd := <-ch
switch cmd.ProcessState.ExitCode() {
case 0:
status = config.SUCCESS
case -1:
status = config.TERMINATED
// default is already FAILURE
}
}
// we are not using zfs snapshots at the moment
func zfsSync(repo *config.Repo) {
// out, err := exec.Command("/home/mirror/bin/zfssync", repo.Name).CombinedOutput()
// if err != nil {
// repo.Logger.Error(err)
// } else {
// f, err := os.OpenFile(repo.ZfssyncLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
// if err != nil {
// repo.Logger.Error(err.Error())
// } else {
// f.Write(out)
// }
// }
}

174
merlin/sync/trace.go Normal file
View File

@ -0,0 +1,174 @@
package sync
import (
"os"
"path"
"strings"
"git.csclub.uwaterloo.ca/public/merlin/config"
)
func cscTraceArchLinux(repo *config.Repo, destPath string) []string {
args := []string{
"curl",
"-s", repo.TraceUrl,
"-o", destPath,
}
return args
}
// return false iff both files exist, are non-empty, and have the same contents
func cscTraceArchLinuxDiff(repo *config.Repo, destPath string) bool {
readFile := func(file string) string {
f, err := os.ReadFile(file)
if err != nil {
repo.Logger.Debug(err.Error())
return ""
}
return strings.TrimSpace(string(f))
}
file1 := readFile(destPath)
file2 := readFile(path.Join(buildDownloadDir(repo), "lastupdate"))
if file1 == "" || file2 == "" {
return true
}
return file1 != file2
}
func cscTraceArchLinuxUpdate(repo *config.Repo) (args []string) {
rsyncDir := repo.RsyncDir
localDir := repo.LocalDir
repo.RsyncDir = repo.RsyncDir + "/lastsync"
repo.LocalDir = repo.LocalDir + "/lastsync"
args = cscSyncArchLinux(repo)
repo.RsyncDir = rsyncDir
repo.LocalDir = localDir
return args
}
func cscTraceDebian(repo *config.Repo, destPath string) []string {
args := []string{
"nice", "rsync", "-tv", "-4",
"--address=" + config.Conf.IPv4Address,
"--quiet",
// "--log-file=" + repo.RepoLogFile,
repo.RsyncHost + "::" + path.Join(repo.RsyncDir, "project/trace", repo.TraceUrl),
destPath,
}
return args
}
// return false iff both files exist and were modified at the same time
func cscTraceDebianDiff(repo *config.Repo, destPath string) bool {
statFile := func(file string) int64 {
f, err := os.Stat(file)
if err != nil {
repo.Logger.Debug(err.Error())
return 0
}
return f.ModTime().Unix()
}
file1 := statFile(destPath)
file2 := statFile(path.Join(buildDownloadDir(repo), "project/trace", "lastupdate"))
if file1 == 0 || file2 == 0 {
return true
}
return file1 != file2
}
func checkIfSyncNeeded(repo *config.Repo) (status int, continueSync bool) {
// default is to keep the default status and continue the sync
status = config.FAILURE
continueSync = true
// create a temp file
temp, err := os.CreateTemp("/tmp", ".tmp.merlin."+repo.Name+"-")
if err != nil {
repo.Logger.Error(err.Error())
return
}
temp.Close()
defer os.Remove(temp.Name())
// get the trace command
var args []string
switch repo.SyncType {
case "csc-sync-archlinux":
args = cscTraceArchLinux(repo, temp.Name())
case "csc-sync-debian":
args = cscTraceDebian(repo, temp.Name())
default:
if repo.SyncType != "csc-sync-dummy" {
repo.Logger.Error("Trace files are not implemented for sync type ", "'"+repo.SyncType+"'")
}
return
}
ch := spawnProcess(repo, args)
if ch == nil {
// spawnSyncProcess will have already logged error
return
}
cmd := <-ch
if cmd.ProcessState.ExitCode() != 0 {
// if the process was terminated then don't continue to sync
if cmd.ProcessState.ExitCode() == -1 {
return config.TERMINATED, false
}
return
}
filesDiffer := true
switch repo.SyncType {
case "csc-sync-archlinux":
filesDiffer = cscTraceArchLinuxDiff(repo, temp.Name())
case "csc-sync-debian":
filesDiffer = cscTraceDebianDiff(repo, temp.Name())
}
// if the files are the same then don't continue to sync
if !filesDiffer {
repo.Logger.Debug("No changes found; no sync made")
// archlinux wants to sync a "lastsync" file even if lastupdate prevents sync
var args []string
switch repo.SyncType {
case "csc-sync-archlinux":
args = cscTraceArchLinuxUpdate(repo)
default:
return config.SUCCESS, false
}
ch := spawnProcess(repo, args)
if ch == nil {
// spawnSyncProcess will have already logged error
return
}
cmd := <-ch
if cmd.ProcessState.ExitCode() != 0 {
// if the process was terminated then don't continue to sync
if cmd.ProcessState.ExitCode() == -1 {
return config.TERMINATED, false
}
return
}
return config.SUCCESS, false
}
return
}