rewrite framework and new job queuing system

Signed-off-by: Nathan13888 <29968201+Nathan13888@users.noreply.github.com>
This commit is contained in:
Nathan Chung 2023-06-25 00:49:05 -04:00
parent 4502ee4171
commit f62168a081
Signed by: n4chung
SSH Key Fingerprint: SHA256:/+NqYA5MfQIjjfwa4z16mw3Y+lxgY/Ml8wCeGnh6qBU
3 changed files with 166 additions and 22 deletions

View File

@ -2,6 +2,9 @@ package checkers
import (
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/rs/zerolog/log"
@ -31,7 +34,9 @@ type ProjectProperties struct {
}
type ProjectChecker struct {
CheckProject func() CheckerResult
Name string
CheckProject func() (bool, error)
// TODO: other severity levels?
}
// //////////////////////////////////////////////////////////////////////////////
@ -46,43 +51,167 @@ var CHECKER_FAIL CheckerStatus = "fail"
type CheckerResult struct {
ProjectName string `json:"project_name"`
CheckerName string `json:"checker_name"`
Time time.Time `json:"time"`
Time time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Status CheckerStatus `json:"status"`
Error error `json:"error"`
}
type CheckerResultCallback func(CheckerResult)
// TODO: uuid for each job
// TODO: job history
type JobDescription struct {
Group *JobGroup
Checker *ProjectChecker
Callback CheckerResultCallback
}
type JobGroup struct {
Projects []string
Name string
Jobs []*JobDescription
Results *(chan CheckerResult)
FinalStatus *(chan CheckerStatus)
Wg *sync.WaitGroup
WorkerID int
}
////////////////////////////////////////////////////////////////////////////////
// TODO: run check without callback
// TODO: create "custom" job groups??
// Given a project, all checks are initiated.
func (p Project) RunChecksAsync(callback CheckerResultCallback) error {
func (p Project) RunChecks(callback CheckerResultCallback) (*JobGroup, error) {
checks := p.Checkers
n := len(checks)
// TODO: assert other checkers?
// assert if the number of checkers match the expected number of checkers
if n == 0 {
return nil, errors.New("No checkers found for project.")
}
if n != p.NumOfCheckers {
return errors.New("Number of checkers does not match the expected number of checkers.")
return nil, errors.New("Number of checkers does not match project config.")
}
// TODO: assert job group properties?
log.Debug().Msgf("Running %d checks for project %s", n, p.Name)
var jg JobGroup
statusResult := make(chan CheckerStatus)
jobs := make([]*JobDescription, n)
for i, c := range p.Checkers {
jobs[i] = &JobDescription{
Group: &jg,
Checker: c,
Callback: callback,
}
}
results := make(chan CheckerResult, n)
jg = JobGroup{
Projects: []string{p.Name},
Name: fmt.Sprintf("check_%s_project", p.Name),
Jobs: jobs,
Results: &results,
FinalStatus: &statusResult,
Wg: &sync.WaitGroup{},
}
log.Info().Msgf("Running %d checks for project %s", n, p.Name)
for i := 0; i < n; i++ {
check := checks[i]
log.Info().Str("project", p.Name).Msgf("Running check %d", i)
check.RunCheckAsync(callback)
for i, jd := range jobs {
log.Info().Str("project", p.Name).
Str("check_name", jd.Checker.Name).
Msgf("Running check %d", i)
jd.QueueJob(callback)
}
return nil
go func() {
// Wait for all checks to complete
log.Debug().
Str("project", p.Name).
Msgf("Waiting for %d checks to complete.", n)
for i := 0; i < n; i++ {
res := <-results
log.Debug().Str("project", p.Name).
Str("checker_name", res.CheckerName).
Msg("Received checker result.")
if res.Status == CHECKER_ERROR {
log.Error().Err(res.Error).Msg("Checker returned error.")
}
}
close(results)
statusResult <- CHECKER_SUCCESS
}()
return &jg, nil
}
// Given a check, run the check asynchronously and call the callback function.
func (c ProjectChecker) RunCheckAsync(callback CheckerResultCallback) error {
go func() {
res := c.CheckProject()
callback(res)
}()
return nil
////////////////////////////////////////////////////////////////////////////////
const WORKER_COUNT = 5
func StartWorkers() {
log.Debug().Msgf("Starting %d workers.", WORKER_COUNT)
for i := 0; i < WORKER_COUNT; i++ {
go worker(i)
}
}
// TODO: available worker?
// var workerAvailable sync.WaitGroup
var jobs = make(chan JobDescription, WORKER_COUNT)
func (job JobDescription) QueueJob(callback CheckerResultCallback) {
if job.Group.Wg != nil {
job.Group.Wg.Add(1)
}
// TODO: workerAvailable.Wait() // wait for a worker to be available
jobs <- job
log.Debug().Str("checker_name", job.Checker.Name).Msg("Queued checker.")
}
func StopWorkers() {
log.Debug().Msg("Stopping workers.")
close(jobs)
// TODO: stop in process jobs???
}
// TODO: id to uuid?
func worker(id int) {
for j := range jobs {
var res CheckerResult = DefaultCheckerResult
res.CheckerName = j.Checker.Name
res.ProjectName = j.Group.Name
res.Time = time.Now()
log.Debug().Str("project", j.Checker.Name).
Str("worker_id", strconv.FormatInt(int64(id), 10)).
Msgf("Running check.")
success, err := j.Checker.CheckProject()
res.EndTime = time.Now()
if err != nil {
res.Status = CHECKER_ERROR
res.Error = err
} else if success {
res.Status = CHECKER_SUCCESS
} else {
res.Status = CHECKER_FAIL
}
*j.Group.Results <- res
if j.Group.Wg != nil {
j.Group.Wg.Done() // called before callback to prevent unnecessary waiting
}
j.Callback(res)
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -1,6 +1,7 @@
package checkers
import (
"fmt"
"time"
"github.com/rs/zerolog/log"
@ -40,9 +41,10 @@ func LoadDefaultProjects() {
func GetDefaultChecker(name string) *ProjectChecker {
return &ProjectChecker{
CheckProject: func() CheckerResult {
// TODO: implement
return DefaultCheckerResult
Name: fmt.Sprintf("default_checker_%s", name),
CheckProject: func() (bool, error) {
fmt.Println("Default Checker: ", name)
return true, nil
},
}
}

15
main.go
View File

@ -79,6 +79,8 @@ func main() {
log.Fatal().Msg("No projects specified.")
}
checkers.StartWorkers()
log.Debug().Msgf("Checking all specified projects.")
for _, arg := range projects {
@ -91,7 +93,7 @@ func main() {
return err
}
err = proj.RunChecksAsync(func(res checkers.CheckerResult) {
res, err := proj.RunChecks(func(res checkers.CheckerResult) {
if res.Error != nil {
log.Error().Err(res.Error).
Time("time", res.Time).
@ -103,11 +105,22 @@ func main() {
Msgf("Completed check %s for project %s", res.CheckerName, res.ProjectName)
}
})
if err != nil {
log.Fatal().Err(err).Msg("Failed to check project.")
}
status := <-*res.FinalStatus
if status == checkers.CHECKER_ERROR {
log.Fatal().Err(err).Msg("Failed to check project.")
}
log.Info().Str("final_status", string(status)).
Msgf("Completed all checks for project `%s`", arg)
}
checkers.StopWorkers()
return nil
},
// TODO: auto complete available mirrors, https://cli.urfave.org/v2/examples/combining-short-options/