Compare commits

...

2 Commits

7 changed files with 25 additions and 18 deletions

View File

@ -100,7 +100,7 @@ func ForceSync(conn net.Conn, repoName string) (newSync bool) {
}
// Create and start listening to a unix socket and accept and forward connections to the connChan channel
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
func StartListener(connChan chan net.Conn, stopLisChan <-chan struct{}, stopLisAckChan chan<- struct{}) {
sockpath := config.Conf.SockPath
if filepath.Ext(sockpath) != ".sock" {
panic(fmt.Errorf("socket file must end with .sock"))
@ -128,12 +128,13 @@ func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
// Note: cannot use select with ear.Accept() for some reason (it does block until connection is made)
conn, err := ear.Accept()
if err != nil {
// attempting to accept on a closed net.Listener will return a non-temporary error
if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() {
logger.ErrLog("Accepted socket error: " + err.Error())
continue
if errors.Is(err, net.ErrClosed) {
// Expected behaviour
} else {
logger.ErrLog("Unhandlable socket error: " + err.Error())
ear.Close()
}
logger.ErrLog("Unhandlable socket error: " + err.Error())
stopLisAckChan <- struct{}{}
return
}
connChan <- conn

View File

@ -120,7 +120,6 @@ func TestForceSync(t *testing.T) {
State: &config.RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: config.NOT_RUN_YET,
},
}

View File

@ -167,8 +167,6 @@ type RepoState struct {
IsRunning bool `ini:"is_running"`
// the Unix epoch timestamp at which this repo last attempted a job
LastAttemptStartTime int64 `ini:"last_attempt_time"`
// the number of seconds this repo ran for during its last attempted job
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
// whether the last attempt was successful or not
LastAttemptExit int `ini:"last_attempt_exit"`
}
@ -291,7 +289,6 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
repo.State = &RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: NOT_RUN_YET,
}
err = ini.MapTo(&repo.State, repo.StateFile)

View File

@ -71,7 +71,6 @@ func TestLoadConfig(t *testing.T) {
State: &RepoState{
IsRunning: false,
LastAttemptStartTime: 1600000000,
LastAttemptRunTime: 100,
LastAttemptExit: 1,
},
}
@ -95,7 +94,6 @@ func TestLoadConfig(t *testing.T) {
State: &RepoState{
IsRunning: false,
LastAttemptStartTime: 0,
LastAttemptRunTime: 0,
LastAttemptExit: 0,
},
}

View File

@ -9,6 +9,10 @@ User=mirror
Group=mirror
SyslogIdentifier=merlin
ExecReload=kill -HUP $MAINPID
# By default (KillMode=control-group), systemd will send SIGTERM to child
# processes as well, which is not what we want because we need merlin to
# have control over the shutdown of the rsync processes.
KillMode=mixed
[Install]
WantedBy=multi-user.target

View File

@ -48,6 +48,8 @@ func main() {
connChan := make(chan net.Conn)
// closed or receives a signal to stop listening to the unix socket
stopLisChan := make(chan struct{})
// acknowledgement that the goroutine listening on the unix socket has cleaned up
stopLisAckChan := make(chan struct{})
// gets unblocked when SIGINT or SIGTERM is sent, will begin process of stopping the program
stopSig := make(chan os.Signal, 1)
@ -61,13 +63,13 @@ func main() {
repoIdx := 0
numJobsRunning := 0
loadConfig := func() {
loadConfigAndStartListener := func() {
config.LoadConfig(*configPath, doneChan, stopChan)
logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf))
// reset the round-robin index and recreate the socket listener
repoIdx = 0
go arthur.StartListener(connChan, stopLisChan)
go arthur.StartListener(connChan, stopLisChan, stopLisAckChan)
}
// We use a round-robin strategy. It's not the most efficient, but it's simple
@ -87,12 +89,19 @@ func main() {
}
}
loadConfig()
loadConfigAndStartListener()
// ensure that IsRunning is false otherwise repo will never sync.
// only on startup can we assume that repos were not previously syncing
// since reloading the config does not stop repos with a sync in progress
for _, repo := range config.Repos {
wasRunning := repo.State.IsRunning
repo.State.IsRunning = false
if wasRunning {
// If some jobs were terminated early last time, try to resume them
if sync.SyncIfPossible(repo, true) {
numJobsRunning++
}
}
}
runAsManyAsPossible()
@ -108,8 +117,8 @@ mainLoop:
case <-reloadSig: // caught a SIGHUP
// temporary stop the socket listener and load the config again
close(stopLisChan)
loadConfig()
stopLisChan <- struct{}{}
loadConfigAndStartListener()
case done := <-doneChan: // a sync is done and sends its exit status
// repo could be removed from config while sync in progress

View File

@ -38,7 +38,6 @@ func SyncCompleted(repo *config.Repo, exit int) {
}
repoState.IsRunning = false
repoState.LastAttemptExit = exit
repoState.LastAttemptRunTime = syncTook
repo.SaveState()
exitStr := config.StatusToString(exit)