package main import ( "bufio" "bytes" "context" "crypto/tls" "log" "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" ) // InfluxOutput pushes drive metrics, as InfluxDB line protocol or JSON, to the // InfluxDB v2 API and/or Kafka on a fixed schedule. type InfluxOutput struct { kwriter *kafka.Writer client *influxdb2.Client config *InfluxOutputConfig } // NewInfluxOutput creates the output and applies the current configuration. func NewInfluxOutput() *InfluxOutput { i := new(InfluxOutput) i.Reload() return i } // Reload rebuilds the Kafka writer and InfluxDB client from the configuration. // A destination is only configured when its required settings are present. func (i *InfluxOutput) Reload() { i.config = &app.config.Influx i.kwriter = nil i.client = nil // Kafka output. if len(i.config.KafkaBrokers) != 0 && i.config.KafkaTopic != "" { dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{InsecureSkipVerify: i.config.KafkaInsecureSkipVerify}, } if i.config.KafkaUsername != "" { dialer.SASLMechanism = plain.Mechanism{ Username: i.config.KafkaUsername, Password: i.config.KafkaPassword, } } i.kwriter = kafka.NewWriter(kafka.WriterConfig{ Brokers: i.config.KafkaBrokers, Topic: i.config.KafkaTopic, Dialer: dialer, }) } // InfluxDB v2 API output. if i.config.InfluxServer != "" && i.config.Token != "" && i.config.Org != "" && i.config.Bucket != "" { c := influxdb2.NewClient(i.config.InfluxServer, i.config.Token) i.client = &c } } // CollectAndLineprotocolFormat discovers the drives and renders them as InfluxDB // line protocol. func (i *InfluxOutput) CollectAndLineprotocolFormat() []byte { drives, tsNs := collect() return []byte(recordsToInflux(drives, tsNs)) } // CollectAndJSONFormat discovers the drives and renders them as InfluxDB JSON. func (i *InfluxOutput) CollectAndJSONFormat() []byte { drives, tsNs := collect() return recordsToInfluxJSON(drives, tsNs) } // OutputEnabled reports whether a destination is configured and a push interval // is set. func (i *InfluxOutput) OutputEnabled() bool { return (i.kwriter != nil || i.client != nil) && i.config.Frequency != 0 } // Start runs the scheduled push loop until the context is cancelled. func (i *InfluxOutput) Start(ctx context.Context) { if !i.OutputEnabled() { return } ticker := time.NewTicker(i.config.Frequency) defer ticker.Stop() for { select { case <-ticker.C: i.push(ctx) case <-ctx.Done(): if i.kwriter != nil { i.kwriter.Close() } if i.client != nil { (*i.client).Close() } return } } } // push collects metrics once and writes them to every configured destination. func (i *InfluxOutput) push(ctx context.Context) { // Kafka receives one message per drive in the configured format. if i.kwriter != nil { var data []byte if i.config.KafkaOutputFormat == "json" { data = i.CollectAndJSONFormat() } else { data = i.CollectAndLineprotocolFormat() } var messages []kafka.Message routingKey := []byte(app.config.Hostname) scanner := bufio.NewScanner(bytes.NewReader(data)) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) scanner.Split(bufio.ScanLines) for scanner.Scan() { b := append(scanner.Bytes(), '\n') messages = append(messages, kafka.Message{Key: routingKey, Value: b}) } if len(messages) != 0 { if err := i.kwriter.WriteMessages(ctx, messages...); err != nil { log.Println("Unable to write to Kafka:", err) } } } // InfluxDB API receives the full line-protocol document. if i.client != nil { data := i.CollectAndLineprotocolFormat() if len(data) != 0 { writeAPI := (*i.client).WriteAPIBlocking(i.config.Org, i.config.Bucket) if err := writeAPI.WriteRecord(ctx, string(data)); err != nil { log.Println("Unable to write to InfluxDB:", err) } } } }