Prometheus collector to collect stats from mirror.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
3.9 KiB

package main
import (
"log"
"net"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/hpcloud/tail"
"github.com/prometheus/client_golang/prometheus"
"github.com/satyrius/gonx"
)
type ProftpdExporter struct {
TransferLogPath string
responses *prometheus.SummaryVec
error_responses *prometheus.SummaryVec
}
func NewProftpdExporter(transferLogPath string) (*ProftpdExporter, error) {
const subsystem = "ftp"
return &ProftpdExporter{
TransferLogPath: transferLogPath,
responses: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: Namespace,
Subsystem: subsystem,
Name: "responses_sent_bytes",
Help: "Summary of FTP responses",
},
[]string{"project", "network", "protocol"}),
error_responses: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: Namespace,
Subsystem: subsystem,
Name: "error_responses_sent_bytes",
Help: "Summary of error FTP responses",
},
[]string{"project", "network", "protocol"}),
}, nil
}
// Implements prometheus.Collector
func (e *ProftpdExporter) Describe(ch chan<- *prometheus.Desc) {
e.responses.Describe(ch)
e.error_responses.Describe(ch)
}
// Implements prometheus.Collector
func (e *ProftpdExporter) Collect(ch chan<- prometheus.Metric) {
e.responses.Collect(ch)
e.error_responses.Collect(ch)
}
var proftpdProjectRe = regexp.MustCompile("(?i)^/mirror/root/[^/]+/([^/]+)/")
func (e *ProftpdExporter) processLogLine(line string) {
lineReader := strings.NewReader(line)
reader := gonx.NewReader(
lineReader,
`$current_time_dow $current_time_mon $current_time_day $current_time_time $current_time_year $transfer_time $remote_host $file_size $file_name $transfer_type $specical_action_flag $direction $access_mode $username $service_name $authentication_method $authenticated_user_id $completion_status`)
if reader == nil {
log.Printf("Failed to create reader for line: %s", line)
}
entry, err := reader.Read()
if err != nil {
log.Printf("Failed to get entry on line: %s", line)
log.Print(err)
}
// Process
sourceIP, err := entry.Field("remote_host")
network := "unknown"
protocol := "unknown"
if err == nil {
ip := net.ParseIP(sourceIP)
network = IdentifyNetwork(ip)
protocol = IdentifyIPProtocol(ip)
} else {
log.Println(err)
}
request, err := entry.Field("file_name")
project := "unknown"
if err == nil {
match := proftpdProjectRe.FindStringSubmatch(request)
if len(match) > 1 && IsMirroredProject(match[1]) {
project = match[1]
} else {
project = "none"
}
} else {
log.Println(err)
}
sizeStr, err := entry.Field("file_size")
if err != nil {
sizeStr = "0"
}
size, _ := strconv.ParseFloat(sizeStr, 64)
statusStr, err := entry.Field("completion_status")
success := statusStr == "c"
// Setup labels
labels := prometheus.Labels{
"project": project,
"network": network,
"protocol": protocol,
}
// Increment totals
e.responses.With(labels).Observe(size)
// Ensure an error version is created
errorObserver := e.error_responses.With(labels)
if !success {
errorObserver.Observe(size)
}
}
func (e *ProftpdExporter) Monitor() {
tailConfig := tail.Config{
Follow: true,
ReOpen: true,
Poll: true,
Location: &tail.SeekInfo{
Offset: 0,
Whence: os.SEEK_END,
},
}
t, err := tail.TailFile(e.TransferLogPath, tailConfig)
// Continue to retry every 5 seconds
for ; err != nil; {
t, err = tail.TailFile(e.TransferLogPath, tailConfig)
if err != nil {
time.Sleep(time.Second * 5)
log.Println("Error starting proftpd tail (retrying in 5 seconds)", err)
}
}
for line := range t.Lines {
go e.processLogLine(line.Text)
}
}