mirror-checker/checkers/checker.go

247 lines
5.8 KiB
Go

package checkers
import (
"bufio"
"errors"
"fmt"
"os"
"strconv"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// TODO: uuid for each job
// TODO: job history
type JobDescription struct {
Group *JobGroup
Checker *ProjectChecker
Callback CheckerResultCallback
}
type JobGroup struct {
Projects []string
Name string
Jobs []*JobDescription
Results *(chan CheckerResult)
FinalStatus *(chan CheckerStatus)
Wg *sync.WaitGroup
WorkerID int
}
////////////////////////////////////////////////////////////////////////////////
// TODO: run check without callback
// TODO: create "custom" job groups??
// Given a project, all checks are initiated.
func (p *Project) RunChecks(callback CheckerResultCallback) (*JobGroup, error) {
checks := p.Checkers
n := len(checks)
if n == 0 {
return nil, errors.New("No checkers found for project.")
}
if n != p.NumOfCheckers {
return nil, errors.New("Number of checkers does not match project config.")
}
// TODO: assert job group properties?
log.Debug().Msgf("Running %d checks for project %s", n, p.Name)
var jg JobGroup
statusResult := make(chan CheckerStatus)
jobs := make([]*JobDescription, n)
for i, c := range p.Checkers {
jobs[i] = &JobDescription{
Group: &jg,
Checker: c,
Callback: callback,
}
}
results := make(chan CheckerResult, n)
jg = JobGroup{
Projects: []string{p.Name},
Name: fmt.Sprintf("check_%s_project", p.Name),
Jobs: jobs,
Results: &results,
FinalStatus: &statusResult,
Wg: &sync.WaitGroup{},
}
for i, jd := range jobs {
log.Info().Str("project", p.Name).
Str("check_name", jd.Checker.Name).
Msgf("Running check %d", i)
jd.QueueJob(callback)
}
go func() {
// Wait for all checks to complete
log.Debug().
Str("project", p.Name).
Str("job_group", jg.Name).
Msgf("Waiting for %d checks to complete.", n)
for i := 0; i < n; i++ {
res := <-results
log.Debug().Str("project", p.Name).
Str("job_group", jg.Name).
Str("worker_id", strconv.Itoa(jg.WorkerID)).
Str("checker_name", res.CheckerName).
Msg("Received checker result.")
if res.Status == CHECKER_ERROR {
// TODO: log checker UUID
log.Error().Err(res.Error).Msg("Checker returned error.")
statusResult <- CHECKER_ERROR
// TODO: stop exisiting jobs
return
}
if res.Status == CHECKER_FAIL {
// TODO: log checker UUID
log.Debug().Msg("Checker failed.")
statusResult <- CHECKER_FAIL
return
}
}
close(results)
statusResult <- CHECKER_SUCCESS
}()
return &jg, nil
}
func (jd *JobDescription) addLogProps(evt *zerolog.Event) *zerolog.Event {
return evt.
Str("project", jd.Group.Projects[0]).
Str("job_group", jd.Group.Name).
Str("worker_id", strconv.Itoa(jd.Group.WorkerID)).
Str("checker_name", jd.Checker.Name)
}
func (jd *JobDescription) LogInfo() *zerolog.Event {
return jd.addLogProps(log.Info())
}
func (jd *JobDescription) LogError() *zerolog.Event {
return jd.addLogProps(log.Error())
}
func ReadEnabledFromStdin() error {
scanner := bufio.NewScanner(os.Stdin)
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
token := scanner.Text()
_, err := LoadProject(token)
if err != nil {
return err
}
}
return scanner.Err()
}
////////////////////////////////////////////////////////////////////////////////
const WORKER_COUNT = 5
func StartWorkers() {
log.Debug().Msgf("Starting %d workers.", WORKER_COUNT)
for i := 0; i < WORKER_COUNT; i++ {
go worker(i)
}
}
// TODO: available worker?
// var workerAvailable sync.WaitGroup
var jobs = make(chan JobDescription, WORKER_COUNT)
func (job JobDescription) QueueJob(callback CheckerResultCallback) {
if job.Group.Wg != nil {
job.Group.Wg.Add(1)
}
// TODO: workerAvailable.Wait() // wait for a worker to be available
jobs <- job
log.Debug().Str("checker_name", job.Checker.Name).Msg("Queued checker.")
}
func StopWorkers() {
log.Debug().Msg("Stopping workers.")
close(jobs)
// TODO: stop in process jobs???
}
// TODO: id to uuid?
func worker(id int) {
for j := range jobs {
var res CheckerResult = DefaultCheckerResult
res.CheckerName = j.Checker.Name
res.ProjectName = j.Group.Name
res.Time = time.Now()
log.Debug().Str("project", j.Checker.Name).
Str("worker_id", strconv.FormatInt(int64(id), 10)).
Msgf("Running check.")
success, err := j.Checker.CheckProject()
res.EndTime = time.Now()
if err != nil {
res.Status = CHECKER_ERROR
res.Error = err
} else if success {
res.Status = CHECKER_SUCCESS
} else {
res.Status = CHECKER_FAIL
}
*j.Group.Results <- res
if j.Group.Wg != nil {
j.Group.Wg.Done() // called before callback to prevent unnecessary waiting
}
if j.Callback != nil {
j.Callback(res)
} else {
log.Debug().Str("project", j.Checker.Name).
Str("worker_id", strconv.FormatInt(int64(id), 10)).
Msgf("No callback registered.")
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Looks up a project by name, and returns only if the project is enabled.
func GetProject(name string) (*Project, error) {
res, exists := EnabledProjects[name]
if !exists {
return res, errors.New("requested project not found")
}
return res, nil
}
// Loads a project by name, and returns if the project is found.
func LoadProject(name string) (*Project, error) {
res, exists := SupportedProjects[name]
if !exists {
return res, errors.New("requested project not found")
}
if res.Properties.CSC == "unknown" {
log.Debug().Str("csc", res.Properties.CSC).
Msgf("Requested project %s has default properties.", name)
return res, errors.New("requested project has invalid properties (check the project config)")
}
log.Debug().Msgf("Loading Project %s", name)
EnabledProjects[name] = res
return res, nil
}