Browse Source

Add proftpd xferlog exporter

master
Zachary Seguin 5 years ago
parent
commit
ab12f1f445
  1. 8
      mirror_exporter.go
  2. 12
      networks.go
  3. 7
      nginx.go
  4. 186
      proftpd.go

8
mirror_exporter.go

@ -17,6 +17,7 @@ func main() {
metricsPath = flag.String("net.telemetry-path", "/metrics", "Path to expose metrics on")
nginxAccessLogPath = flag.String("nginx.access-log", "/var/log/nginx/access.log", "Path to nginx access log")
proftpdTransferLogPath = flag.String("proftpd.transfer-log", "/var/log/proftpd/xferlog", "Path to proftpd transfer log")
)
flag.Parse()
@ -27,7 +28,14 @@ func main() {
}
prometheus.MustRegister(nginxExporter)
proftpdExporter, err := NewProftpdExporter(*proftpdTransferLogPath)
if err != nil {
log.Fatal(err)
}
prometheus.MustRegister(proftpdExporter)
go nginxExporter.Monitor()
go proftpdExporter.Monitor()
http.Handle(*metricsPath, prometheus.Handler())
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

12
networks.go

@ -1,6 +1,6 @@
package main
import(
import (
"net"
)
@ -28,3 +28,13 @@ func IdentifyNetwork(ip net.IP) string {
return "other"
}
func IdentifyIPProtocol(ip net.IP) string {
if ip.To4() != nil {
return "ipv4"
} else if ip.To16() != nil {
return "ipv6"
}
return "other"
}

7
nginx.go

@ -114,12 +114,7 @@ func (e *NginxExporter) processLogLine(line string) {
if err == nil {
ip := net.ParseIP(sourceIP)
network = IdentifyNetwork(ip)
if ip.To4() != nil {
protocol = "ipv4"
} else if ip.To16() != nil {
protocol = "ipv6"
}
protocol = IdentifyIPProtocol(ip)
} else {
log.Println(err)
}

186
proftpd.go

@ -0,0 +1,186 @@
package main
import (
"log"
"net"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/hpcloud/tail"
"github.com/prometheus/client_golang/prometheus"
"github.com/satyrius/gonx"
)
type ProftpdExporter struct {
TransferLogPath string
// Request counts
responses *prometheus.CounterVec
error_responses *prometheus.CounterVec
// Bytes count
bytes_sent *prometheus.CounterVec
error_bytes_sent *prometheus.CounterVec
}
func NewProftpdExporter(transferLogPath string) (*ProftpdExporter, error) {
const subsystem = "ftp"
return &ProftpdExporter{
TransferLogPath: transferLogPath,
responses: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: subsystem,
Name: "responses_total",
Help: "Number of FTP responses",
},
[]string{"project", "network", "protocol"},
),
error_responses: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: subsystem,
Name: "error_responses_total",
Help: "Number of FTP error responses",
},
[]string{"project", "network", "protocol"},
),
bytes_sent: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: subsystem,
Name: "responses_sent_bytes",
Help: "Number of bytes sent in FTP responses",
},
[]string{"project", "network", "protocol"},
),
error_bytes_sent: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: subsystem,
Name: "error_responses_sent_bytes",
Help: "Number of bytes sent in error FTP responses",
},
[]string{"project", "network", "protocol"},
),
}, nil
}
// Implements prometheus.Collector
func (e *ProftpdExporter) Describe(ch chan<- *prometheus.Desc) {
e.responses.Describe(ch)
e.error_responses.Describe(ch)
e.bytes_sent.Describe(ch)
e.error_bytes_sent.Describe(ch)
}
// Implements prometheus.Collector
func (e *ProftpdExporter) Collect(ch chan<- prometheus.Metric) {
e.responses.Collect(ch)
e.error_responses.Collect(ch)
e.bytes_sent.Collect(ch)
e.error_bytes_sent.Collect(ch)
}
var proftpdProjectRe = regexp.MustCompile("(?i)^/mirror/root/[^/]+/([^/]+)/")
func (e *ProftpdExporter) processLogLine(line string) {
lineReader := strings.NewReader(line)
reader := gonx.NewReader(
lineReader,
`$current_time_dow $current_time_mon $current_time_day $current_time_time $current_time_year $transfer_time $remote_host $file_size $file_name $transfer_type $specical_action_flag $direction $access_mode $username $service_name $authentication_method $authenticated_user_id $completion_status`)
if reader == nil {
log.Printf("Failed to create reader for line: %s", line)
}
entry, err := reader.Read()
if err != nil {
log.Printf("Failed to get entry on line: %s", line)
log.Print(err)
}
// Process
sourceIP, err := entry.Field("remote_host")
network := "unknown"
protocol := "unknown"
if err == nil {
ip := net.ParseIP(sourceIP)
network = IdentifyNetwork(ip)
protocol = IdentifyIPProtocol(ip)
} else {
log.Println(err)
}
request, err := entry.Field("file_name")
project := "unknown"
if err == nil {
match := proftpdProjectRe.FindStringSubmatch(request)
if len(match) > 1 {
project = match[1]
} else {
project = "none"
}
} else {
log.Println(err)
}
sizeStr, err := entry.Field("file_size")
if err != nil {
sizeStr = "0"
}
size, _ := strconv.ParseFloat(sizeStr, 64)
statusStr, err := entry.Field("completion_status")
success := statusStr == "c"
// Setup labels
labels := prometheus.Labels{
"project": project,
"network": network,
"protocol": protocol,
}
// Increment totals
e.responses.With(labels).Inc()
e.bytes_sent.With(labels).Add(size)
if !success {
e.error_responses.With(labels).Inc()
e.error_bytes_sent.With(labels).Add(size)
}
}
func (e *ProftpdExporter) Monitor() {
tailConfig := tail.Config{
Follow: true,
ReOpen: true,
Poll: true,
Location: &tail.SeekInfo{
Offset: 0,
Whence: os.SEEK_END,
},
}
t, err := tail.TailFile(e.TransferLogPath, tailConfig)
// Continue to retry every 5 seconds
for ; err != nil; {
t, err = tail.TailFile(e.TransferLogPath, tailConfig)
if err != nil {
time.Sleep(time.Second * 5)
log.Println("Error starting proftpd tail (retrying in 5 seconds)", err)
}
}
for line := range t.Lines {
go e.processLogLine(line.Text)
}
}
Loading…
Cancel
Save