mirror/merlin/arthur/arthur.go

145 lines
3.9 KiB
Go

package arthur
import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"sort"
"strings"
"text/tabwriter"
"time"
"git.csclub.uwaterloo.ca/public/merlin/config"
"git.csclub.uwaterloo.ca/public/merlin/logger"
"git.csclub.uwaterloo.ca/public/merlin/sync"
)
// Reads and parses the message sent over the accepted connection
func GetCommand(conn net.Conn) (command, repoName string) {
command = ""
repoName = ""
buf, err := ioutil.ReadAll(conn)
if err != nil {
logger.ErrLog(err.Error())
return
}
args := strings.Split(string(buf), ":")
if len(args) >= 1 {
command = args[0]
}
if len(args) >= 2 {
repoName = args[1]
}
return
}
// Receives an message and writes it to both stdout and the accepted connection
func SendAndLog(conn net.Conn, msg string) {
logger.OutLog(msg)
conn.Write([]byte(msg))
}
// Writes the status of the repos to the accepted connection
func SendStatus(conn net.Conn) {
// Force arthur to send back time information in America/Toronto time
location, err := time.LoadLocation("America/Toronto")
if err != nil {
logger.ErrLog(err)
}
status := tabwriter.NewWriter(conn, 5, 5, 5, ' ', 0)
fmt.Fprintf(status, "Repository\tLast Synced\tNext Expected Sync\tRunning\n")
// get the names of all of the repos in the config
var keys []string
for name, _ := range config.RepoMap {
keys = append(keys, name)
}
sort.Strings(keys)
// print out the state of each repo in the config (last and next sync time + if it is currently running)
// for other ways to format the time see: https://pkg.go.dev/time#pkg-constants
for _, name := range keys {
repo := config.RepoMap[name]
lastSync := repo.State.LastAttemptStartTime
nextSync := lastSync + int64(repo.Frequency)
fmt.Fprintf(status, "%s\t%s\t%s\t%t\n",
name,
time.Unix(lastSync, 0).In(location).Format(time.RFC1123),
time.Unix(nextSync, 0).In(location).Format(time.RFC1123),
repo.State.IsRunning,
)
}
status.Flush()
}
// Attempt to force the sync of the repo. Returns true iff a sync was started.
func ForceSync(conn net.Conn, repoName string) (newSync bool) {
newSync = false
if repo, isInMap := config.RepoMap[repoName]; isInMap {
logger.OutLog("Attempting to force sync of " + repoName)
if sync.SyncIfPossible(repo, true) {
conn.Write([]byte("Forced sync for " + repoName))
newSync = true
} else {
SendAndLog(conn, "Could not force sync: "+repoName+" is already syncing.")
}
} else {
SendAndLog(conn, repoName+" is not tracked so cannot sync")
}
return
}
// Create and start listening to a unix socket and accept and forward connections to the connChan channel
func StartListener(connChan chan net.Conn, stopLisChan chan struct{}) {
sockpath := config.Conf.SockPath
if filepath.Ext(sockpath) != ".sock" {
panic(fmt.Errorf("socket file must end with .sock"))
}
// Attempt to remove sockpath if exists, continue if does not exist.
// If old sockpath is not removed then will get "bind: address already in use".
if _, err := os.Stat(sockpath); err == nil { // sockpath exists
if err := os.Remove(sockpath); err != nil {
panic(err)
}
} else if !errors.Is(err, os.ErrNotExist) { // error is not that the sockpath does not exist
panic(err)
}
// creates and starts listening to a unix socket
ear, err := net.Listen("unix", sockpath)
if err != nil {
panic(err)
}
logger.OutLog("Listening to unix socket at " + sockpath)
go func() {
for {
// Note: cannot use select with ear.Accept() for some reason (it does block until connection is made)
conn, err := ear.Accept()
if err != nil {
// attempting to accept on a closed net.Listener will return a non-temporary error
if netErr, isTemp := err.(net.Error); isTemp && netErr.Temporary() {
logger.ErrLog("Accepted socket error: " + err.Error())
continue
}
logger.ErrLog("Unhandlable socket error: " + err.Error())
return
}
connChan <- conn
}
}()
// wait for stopLisChan to close or signal to close the socket listener
<-stopLisChan
ear.Close()
}