package main import ( "log" "net" "os" "regexp" "strconv" "strings" "time" "github.com/hpcloud/tail" "github.com/prometheus/client_golang/prometheus" "github.com/satyrius/gonx" ) type ProftpdExporter struct { TransferLogPath string responses *prometheus.SummaryVec error_responses *prometheus.SummaryVec } func NewProftpdExporter(transferLogPath string) (*ProftpdExporter, error) { const subsystem = "ftp" return &ProftpdExporter{ TransferLogPath: transferLogPath, responses: prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: Namespace, Subsystem: subsystem, Name: "responses_sent_bytes", Help: "Summary of FTP responses", }, []string{"project", "network", "protocol"}), error_responses: prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: Namespace, Subsystem: subsystem, Name: "error_responses_sent_bytes", Help: "Summary of error FTP responses", }, []string{"project", "network", "protocol"}), }, nil } // Implements prometheus.Collector func (e *ProftpdExporter) Describe(ch chan<- *prometheus.Desc) { e.responses.Describe(ch) e.error_responses.Describe(ch) } // Implements prometheus.Collector func (e *ProftpdExporter) Collect(ch chan<- prometheus.Metric) { e.responses.Collect(ch) e.error_responses.Collect(ch) } var proftpdProjectRe = regexp.MustCompile("(?i)^/mirror/root/[^/]+/([^/]+)/") func (e *ProftpdExporter) processLogLine(line string) { lineReader := strings.NewReader(line) reader := gonx.NewReader( lineReader, `$current_time_dow $current_time_mon $current_time_day $current_time_time $current_time_year $transfer_time $remote_host $file_size $file_name $transfer_type $specical_action_flag $direction $access_mode $username $service_name $authentication_method $authenticated_user_id $completion_status`) if reader == nil { log.Printf("Failed to create reader for line: %s", line) } entry, err := reader.Read() if err != nil { log.Printf("Failed to get entry on line: %s", line) log.Print(err) } // Process sourceIP, err := entry.Field("remote_host") network := "unknown" protocol := "unknown" if err == nil { ip := net.ParseIP(sourceIP) network = IdentifyNetwork(ip) protocol = IdentifyIPProtocol(ip) } else { log.Println(err) } request, err := entry.Field("file_name") project := "unknown" if err == nil { match := proftpdProjectRe.FindStringSubmatch(request) if len(match) > 1 && IsMirroredProject(match[1]) { project = match[1] } else { project = "none" } } else { log.Println(err) } sizeStr, err := entry.Field("file_size") if err != nil { sizeStr = "0" } size, _ := strconv.ParseFloat(sizeStr, 64) statusStr, err := entry.Field("completion_status") success := statusStr == "c" // Setup labels labels := prometheus.Labels{ "project": project, "network": network, "protocol": protocol, } // Increment totals e.responses.With(labels).Observe(size) // Ensure an error version is created errorObserver := e.error_responses.With(labels) if !success { errorObserver.Observe(size) } } func (e *ProftpdExporter) Monitor() { tailConfig := tail.Config{ Follow: true, ReOpen: true, Poll: true, Location: &tail.SeekInfo{ Offset: 0, Whence: os.SEEK_END, }, } t, err := tail.TailFile(e.TransferLogPath, tailConfig) // Continue to retry every 5 seconds for ; err != nil; { t, err = tail.TailFile(e.TransferLogPath, tailConfig) if err != nil { time.Sleep(time.Second * 5) log.Println("Error starting proftpd tail (retrying in 5 seconds)", err) } } for line := range t.Lines { go e.processLogLine(line.Text) } }