drive-health-metrics/influx.go
James Coleman ddafa90a02
Some checks failed
Go package / build (push) Has been cancelled
first commit
2026-06-22 17:16:34 -05:00

146 lines
3.9 KiB
Go

package main
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"log"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
// InfluxOutput pushes drive metrics, as InfluxDB line protocol or JSON, to the
// InfluxDB v2 API and/or Kafka on a fixed schedule.
type InfluxOutput struct {
kwriter *kafka.Writer
client *influxdb2.Client
config *InfluxOutputConfig
}
// NewInfluxOutput creates the output and applies the current configuration.
func NewInfluxOutput() *InfluxOutput {
i := new(InfluxOutput)
i.Reload()
return i
}
// Reload rebuilds the Kafka writer and InfluxDB client from the configuration.
// A destination is only configured when its required settings are present.
func (i *InfluxOutput) Reload() {
i.config = &app.config.Influx
i.kwriter = nil
i.client = nil
// Kafka output.
if len(i.config.KafkaBrokers) != 0 && i.config.KafkaTopic != "" {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{InsecureSkipVerify: i.config.KafkaInsecureSkipVerify},
}
if i.config.KafkaUsername != "" {
dialer.SASLMechanism = plain.Mechanism{
Username: i.config.KafkaUsername,
Password: i.config.KafkaPassword,
}
}
i.kwriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: i.config.KafkaBrokers,
Topic: i.config.KafkaTopic,
Dialer: dialer,
})
}
// InfluxDB v2 API output.
if i.config.InfluxServer != "" && i.config.Token != "" && i.config.Org != "" && i.config.Bucket != "" {
c := influxdb2.NewClient(i.config.InfluxServer, i.config.Token)
i.client = &c
}
}
// CollectAndLineprotocolFormat discovers the drives and renders them as InfluxDB
// line protocol.
func (i *InfluxOutput) CollectAndLineprotocolFormat() []byte {
drives, tsNs := collect()
return []byte(recordsToInflux(drives, tsNs))
}
// CollectAndJSONFormat discovers the drives and renders them as InfluxDB JSON.
func (i *InfluxOutput) CollectAndJSONFormat() []byte {
drives, tsNs := collect()
return recordsToInfluxJSON(drives, tsNs)
}
// OutputEnabled reports whether a destination is configured and a push interval
// is set.
func (i *InfluxOutput) OutputEnabled() bool {
return (i.kwriter != nil || i.client != nil) && i.config.Frequency != 0
}
// Start runs the scheduled push loop until the context is cancelled.
func (i *InfluxOutput) Start(ctx context.Context) {
if !i.OutputEnabled() {
return
}
ticker := time.NewTicker(i.config.Frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
i.push(ctx)
case <-ctx.Done():
if i.kwriter != nil {
i.kwriter.Close()
}
if i.client != nil {
(*i.client).Close()
}
return
}
}
}
// push collects metrics once and writes them to every configured destination.
func (i *InfluxOutput) push(ctx context.Context) {
// Kafka receives one message per drive in the configured format.
if i.kwriter != nil {
var data []byte
if i.config.KafkaOutputFormat == "json" {
data = i.CollectAndJSONFormat()
} else {
data = i.CollectAndLineprotocolFormat()
}
var messages []kafka.Message
routingKey := []byte(app.config.Hostname)
scanner := bufio.NewScanner(bytes.NewReader(data))
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
b := append(scanner.Bytes(), '\n')
messages = append(messages, kafka.Message{Key: routingKey, Value: b})
}
if len(messages) != 0 {
if err := i.kwriter.WriteMessages(ctx, messages...); err != nil {
log.Println("Unable to write to Kafka:", err)
}
}
}
// InfluxDB API receives the full line-protocol document.
if i.client != nil {
data := i.CollectAndLineprotocolFormat()
if len(data) != 0 {
writeAPI := (*i.client).WriteAPIBlocking(i.config.Org, i.config.Bucket)
if err := writeAPI.WriteRecord(ctx, string(data)); err != nil {
log.Println("Unable to write to InfluxDB:", err)
}
}
}
}