146 lines
3.9 KiB
Go
146 lines
3.9 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"log"
|
|
"time"
|
|
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/segmentio/kafka-go"
|
|
"github.com/segmentio/kafka-go/sasl/plain"
|
|
)
|
|
|
|
// InfluxOutput pushes drive metrics, as InfluxDB line protocol or JSON, to the
|
|
// InfluxDB v2 API and/or Kafka on a fixed schedule.
|
|
type InfluxOutput struct {
|
|
kwriter *kafka.Writer
|
|
client *influxdb2.Client
|
|
config *InfluxOutputConfig
|
|
}
|
|
|
|
// NewInfluxOutput creates the output and applies the current configuration.
|
|
func NewInfluxOutput() *InfluxOutput {
|
|
i := new(InfluxOutput)
|
|
i.Reload()
|
|
return i
|
|
}
|
|
|
|
// Reload rebuilds the Kafka writer and InfluxDB client from the configuration.
|
|
// A destination is only configured when its required settings are present.
|
|
func (i *InfluxOutput) Reload() {
|
|
i.config = &app.config.Influx
|
|
i.kwriter = nil
|
|
i.client = nil
|
|
|
|
// Kafka output.
|
|
if len(i.config.KafkaBrokers) != 0 && i.config.KafkaTopic != "" {
|
|
dialer := &kafka.Dialer{
|
|
Timeout: 10 * time.Second,
|
|
DualStack: true,
|
|
TLS: &tls.Config{InsecureSkipVerify: i.config.KafkaInsecureSkipVerify},
|
|
}
|
|
if i.config.KafkaUsername != "" {
|
|
dialer.SASLMechanism = plain.Mechanism{
|
|
Username: i.config.KafkaUsername,
|
|
Password: i.config.KafkaPassword,
|
|
}
|
|
}
|
|
i.kwriter = kafka.NewWriter(kafka.WriterConfig{
|
|
Brokers: i.config.KafkaBrokers,
|
|
Topic: i.config.KafkaTopic,
|
|
Dialer: dialer,
|
|
})
|
|
}
|
|
|
|
// InfluxDB v2 API output.
|
|
if i.config.InfluxServer != "" && i.config.Token != "" && i.config.Org != "" && i.config.Bucket != "" {
|
|
c := influxdb2.NewClient(i.config.InfluxServer, i.config.Token)
|
|
i.client = &c
|
|
}
|
|
}
|
|
|
|
// CollectAndLineprotocolFormat discovers the drives and renders them as InfluxDB
|
|
// line protocol.
|
|
func (i *InfluxOutput) CollectAndLineprotocolFormat() []byte {
|
|
drives, tsNs := collect()
|
|
return []byte(recordsToInflux(drives, tsNs))
|
|
}
|
|
|
|
// CollectAndJSONFormat discovers the drives and renders them as InfluxDB JSON.
|
|
func (i *InfluxOutput) CollectAndJSONFormat() []byte {
|
|
drives, tsNs := collect()
|
|
return recordsToInfluxJSON(drives, tsNs)
|
|
}
|
|
|
|
// OutputEnabled reports whether a destination is configured and a push interval
|
|
// is set.
|
|
func (i *InfluxOutput) OutputEnabled() bool {
|
|
return (i.kwriter != nil || i.client != nil) && i.config.Frequency != 0
|
|
}
|
|
|
|
// Start runs the scheduled push loop until the context is cancelled.
|
|
func (i *InfluxOutput) Start(ctx context.Context) {
|
|
if !i.OutputEnabled() {
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(i.config.Frequency)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
i.push(ctx)
|
|
|
|
case <-ctx.Done():
|
|
if i.kwriter != nil {
|
|
i.kwriter.Close()
|
|
}
|
|
if i.client != nil {
|
|
(*i.client).Close()
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// push collects metrics once and writes them to every configured destination.
|
|
func (i *InfluxOutput) push(ctx context.Context) {
|
|
// Kafka receives one message per drive in the configured format.
|
|
if i.kwriter != nil {
|
|
var data []byte
|
|
if i.config.KafkaOutputFormat == "json" {
|
|
data = i.CollectAndJSONFormat()
|
|
} else {
|
|
data = i.CollectAndLineprotocolFormat()
|
|
}
|
|
|
|
var messages []kafka.Message
|
|
routingKey := []byte(app.config.Hostname)
|
|
scanner := bufio.NewScanner(bytes.NewReader(data))
|
|
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
|
scanner.Split(bufio.ScanLines)
|
|
for scanner.Scan() {
|
|
b := append(scanner.Bytes(), '\n')
|
|
messages = append(messages, kafka.Message{Key: routingKey, Value: b})
|
|
}
|
|
if len(messages) != 0 {
|
|
if err := i.kwriter.WriteMessages(ctx, messages...); err != nil {
|
|
log.Println("Unable to write to Kafka:", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// InfluxDB API receives the full line-protocol document.
|
|
if i.client != nil {
|
|
data := i.CollectAndLineprotocolFormat()
|
|
if len(data) != 0 {
|
|
writeAPI := (*i.client).WriteAPIBlocking(i.config.Org, i.config.Bucket)
|
|
if err := writeAPI.WriteRecord(ctx, string(data)); err != nil {
|
|
log.Println("Unable to write to InfluxDB:", err)
|
|
}
|
|
}
|
|
}
|
|
}
|