resume jobs which were terminated early in previous run
This commit is contained in:
parent
0e72c526f3
commit
0509d030b9
|
@ -100,7 +100,7 @@ func ForceSync(conn net.Conn, repoName string) (newSync bool) {
|
|||
}
|
||||
|
||||
// Create and start listening to a unix socket and accept and forward connections to the connChan channel
|
||||
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
|
||||
func StartListener(connChan chan net.Conn, stopLisChan <-chan struct{}, stopLisAckChan chan<- struct{}) {
|
||||
sockpath := config.Conf.SockPath
|
||||
if filepath.Ext(sockpath) != ".sock" {
|
||||
panic(fmt.Errorf("socket file must end with .sock"))
|
||||
|
@ -128,12 +128,13 @@ func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
|
|||
// Note: cannot use select with ear.Accept() for some reason (it does block until connection is made)
|
||||
conn, err := ear.Accept()
|
||||
if err != nil {
|
||||
// attempting to accept on a closed net.Listener will return a non-temporary error
|
||||
if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() {
|
||||
logger.ErrLog("Accepted socket error: " + err.Error())
|
||||
continue
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
// Expected behaviour
|
||||
} else {
|
||||
logger.ErrLog("Unhandlable socket error: " + err.Error())
|
||||
ear.Close()
|
||||
}
|
||||
logger.ErrLog("Unhandlable socket error: " + err.Error())
|
||||
stopLisAckChan <- struct{}{}
|
||||
return
|
||||
}
|
||||
connChan <- conn
|
||||
|
|
|
@ -120,7 +120,6 @@ func TestForceSync(t *testing.T) {
|
|||
State: &config.RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: config.NOT_RUN_YET,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -167,8 +167,6 @@ type RepoState struct {
|
|||
IsRunning bool `ini:"is_running"`
|
||||
// the Unix epoch timestamp at which this repo last attempted a job
|
||||
LastAttemptStartTime int64 `ini:"last_attempt_time"`
|
||||
// the number of seconds this repo ran for during its last attempted job
|
||||
LastAttemptRunTime int64 `ini:"last_attempt_runtime"`
|
||||
// whether the last attempt was successful or not
|
||||
LastAttemptExit int `ini:"last_attempt_exit"`
|
||||
}
|
||||
|
@ -291,7 +289,6 @@ func LoadConfig(configPath string, doneChan chan SyncResult, stopChan chan struc
|
|||
repo.State = &RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: NOT_RUN_YET,
|
||||
}
|
||||
err = ini.MapTo(&repo.State, repo.StateFile)
|
||||
|
|
|
@ -71,7 +71,6 @@ func TestLoadConfig(t *testing.T) {
|
|||
State: &RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 1600000000,
|
||||
LastAttemptRunTime: 100,
|
||||
LastAttemptExit: 1,
|
||||
},
|
||||
}
|
||||
|
@ -95,7 +94,6 @@ func TestLoadConfig(t *testing.T) {
|
|||
State: &RepoState{
|
||||
IsRunning: false,
|
||||
LastAttemptStartTime: 0,
|
||||
LastAttemptRunTime: 0,
|
||||
LastAttemptExit: 0,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -9,6 +9,10 @@ User=mirror
|
|||
Group=mirror
|
||||
SyslogIdentifier=merlin
|
||||
ExecReload=kill -HUP $MAINPID
|
||||
# By default (KillMode=control-group), systemd will send SIGTERM to child
|
||||
# processes as well, which is not what we want because we need merlin to
|
||||
# have control over the shutdown of the rsync processes.
|
||||
KillMode=mixed
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
|
|
@ -48,6 +48,8 @@ func main() {
|
|||
connChan := make(chan net.Conn)
|
||||
// closed or receives a signal to stop listening to the unix socket
|
||||
stopLisChan := make(chan struct{})
|
||||
// acknowledgement that the goroutine listening on the unix socket has cleaned up
|
||||
stopLisAckChan := make(chan struct{})
|
||||
|
||||
// gets unblocked when SIGINT or SIGTERM is sent, will begin process of stopping the program
|
||||
stopSig := make(chan os.Signal, 1)
|
||||
|
@ -61,13 +63,13 @@ func main() {
|
|||
repoIdx := 0
|
||||
numJobsRunning := 0
|
||||
|
||||
loadConfig := func() {
|
||||
loadConfigAndStartListener := func() {
|
||||
config.LoadConfig(*configPath, doneChan, stopChan)
|
||||
logger.OutLog("Loaded config:\n" + fmt.Sprintf("%+v\n", config.Conf))
|
||||
|
||||
// reset the round-robin index and recreate the socket listener
|
||||
repoIdx = 0
|
||||
go arthur.StartListener(connChan, stopLisChan)
|
||||
go arthur.StartListener(connChan, stopLisChan, stopLisAckChan)
|
||||
}
|
||||
|
||||
// We use a round-robin strategy. It's not the most efficient, but it's simple
|
||||
|
@ -87,12 +89,19 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
loadConfig()
|
||||
loadConfigAndStartListener()
|
||||
// ensure that IsRunning is false otherwise repo will never sync.
|
||||
// only on startup can we assume that repos were not previously syncing
|
||||
// since reloading the config does not stop repos with a sync in progress
|
||||
for _, repo := range config.Repos {
|
||||
wasRunning := repo.State.IsRunning
|
||||
repo.State.IsRunning = false
|
||||
if wasRunning {
|
||||
// If some jobs were terminated early last time, try to resume them
|
||||
if sync.SyncIfPossible(repo, true) {
|
||||
numJobsRunning++
|
||||
}
|
||||
}
|
||||
}
|
||||
runAsManyAsPossible()
|
||||
|
||||
|
@ -109,7 +118,7 @@ mainLoop:
|
|||
case <-reloadSig: // caught a SIGHUP
|
||||
// temporary stop the socket listener and load the config again
|
||||
stopLisChan <- struct{}{}
|
||||
loadConfig()
|
||||
loadConfigAndStartListener()
|
||||
|
||||
case done := <-doneChan: // a sync is done and sends its exit status
|
||||
// repo could be removed from config while sync in progress
|
||||
|
|
|
@ -38,7 +38,6 @@ func SyncCompleted(repo *config.Repo, exit int) {
|
|||
}
|
||||
repoState.IsRunning = false
|
||||
repoState.LastAttemptExit = exit
|
||||
repoState.LastAttemptRunTime = syncTook
|
||||
repo.SaveState()
|
||||
|
||||
exitStr := config.StatusToString(exit)
|
||||
|
|
Loading…
Reference in New Issue