freeipa-health-metrics/influx.go
2023-09-05 11:47:46 -05:00

326 lines
8.9 KiB
Go

package main
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/json"
"log"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/line-protocol/v2/lineprotocol"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
// The influx output controller, used to get InfluxDB lineprotocol and
// json output of metrics, and publish metrics on a schedule.
type InfluxOutput struct {
kwriter *kafka.Writer
client *influxdb2.Client
config *InfluxOutputConfig
// Used for testing with a stable timestamp.
OverrideTimestamp time.Time
}
// Creates a new influx output controller.
func NewInfluxOutput() *InfluxOutput {
i := new(InfluxOutput)
// Reload the config.
i.Reload()
return i
}
// Reloads the configuration.
func (i *InfluxOutput) Reload() {
// Update config state.
i.config = &app.config.Influx
i.kwriter = nil
i.client = nil
// If kafka output is configured, setup kafka output.
if len(i.config.KafkaBrokers) != 0 && i.config.KafkaTopic != "" {
// Configure dialer with configured insecure skip verify.
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{InsecureSkipVerify: i.config.KafkaInsecureSkipVerify},
}
// If authentication configured, add to dialer.
if i.config.KafkaUsername != "" {
dialer.SASLMechanism = plain.Mechanism{
Username: i.config.KafkaUsername,
Password: i.config.KafkaPassword,
}
}
// Make the kafka writer.
i.kwriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: i.config.KafkaBrokers,
Topic: i.config.KafkaTopic,
Dialer: dialer,
})
}
// If influx output is configured, setup client.
if i.config.InfluxServer != "" && i.config.Token != "" && i.config.Org != "" && i.config.Bucket != "" {
c := influxdb2.NewClient(i.config.InfluxServer, i.config.Token)
// To allow us to detect rather or not the client is configured, we set the pointer value.
i.client = &c
}
}
// Collect metrics from prometheus, then parse into lineprotocol format.
func (i *InfluxOutput) CollectAndLineprotocolFormat() ([]byte, error) {
res, err := app.registry.Gather()
if err != nil {
return nil, err
}
return i.LineprotocolFormat(res)
}
// Parse promteheus metrics into lineprotocol format.
func (i *InfluxOutput) LineprotocolFormat(res []*io_prometheus_client.MetricFamily) ([]byte, error) {
var enc lineprotocol.Encoder
// Get prefix for transforming prometheus name to influx.
namePrefix := namespace + "_"
enc.SetPrecision(lineprotocol.Microsecond)
now := time.Now()
if !i.OverrideTimestamp.IsZero() {
now = i.OverrideTimestamp
}
// Each metric, send to encoder.
for _, metric := range res {
// Get name, removing prefix.
name := metric.GetName()
if name[0:len(namePrefix)] == namePrefix {
name = name[len(namePrefix):]
}
mtype := metric.GetType()
// There can be multiple results for a metric, with different tags.
// We need to make the influx metric on each result.
for _, m := range metric.GetMetric() {
// Start new line.
enc.StartLine(namespace)
// Add tags.
enc.AddTag("host", app.config.Hostname)
for _, l := range m.Label {
enc.AddTag(l.GetName(), l.GetValue())
}
// Depending on type, add field.
switch mtype {
case io_prometheus_client.MetricType_COUNTER:
enc.AddField(name, lineprotocol.MustNewValue(m.Counter.GetValue()))
case io_prometheus_client.MetricType_GAUGE:
enc.AddField(name, lineprotocol.MustNewValue(m.Gauge.GetValue()))
case io_prometheus_client.MetricType_SUMMARY:
enc.AddField(name, lineprotocol.MustNewValue(m.Summary.GetSampleSum()))
case io_prometheus_client.MetricType_UNTYPED:
enc.AddField(name, lineprotocol.MustNewValue(m.Untyped.GetValue()))
case io_prometheus_client.MetricType_HISTOGRAM:
enc.AddField(name, lineprotocol.MustNewValue(m.Histogram.GetSampleSum()))
case io_prometheus_client.MetricType_GAUGE_HISTOGRAM:
enc.AddField(name, lineprotocol.MustNewValue(m.Histogram.GetSampleSum()))
}
// End line for next metric.
enc.EndLine(now)
}
}
// Check for errors.
err := enc.Err()
if err != nil {
return nil, err
}
return enc.Bytes(), nil
}
// Collect metrics from prometheus, then parse into influx json format.
func (i *InfluxOutput) CollectAndJSONFormat() ([]byte, error) {
res, err := app.registry.Gather()
if err != nil {
return nil, err
}
return i.JSONFormat(res)
}
// Parse promteheus metrics into influx json format.
func (i *InfluxOutput) JSONFormat(res []*io_prometheus_client.MetricFamily) ([]byte, error) {
var buff bytes.Buffer
// Get prefix for transforming prometheus name to influx.
namePrefix := namespace + "_"
now := time.Now()
if !i.OverrideTimestamp.IsZero() {
now = i.OverrideTimestamp
}
// Each metric, send to encoder.
for _, metric := range res {
// Get name, removing prefix.
name := metric.GetName()
if name[0:len(namePrefix)] == namePrefix {
name = name[len(namePrefix):]
}
mtype := metric.GetType()
// There can be multiple results for a metric, with different tags.
// We need to make the influx metric on each result.
for _, m := range metric.GetMetric() {
// Create a base dictionary for housing the metric.
metric := make(map[string]interface{}, 4)
// Add tags.
tags := make(map[string]string, len(m.Label)+1)
tags["host"] = app.config.Hostname
for _, l := range m.Label {
tags[l.GetName()] = l.GetValue()
}
metric["tags"] = tags
// Depending on type, add field.
fields := make(map[string]interface{}, 1)
switch mtype {
case io_prometheus_client.MetricType_COUNTER:
fields[name] = m.Counter.GetValue()
case io_prometheus_client.MetricType_GAUGE:
fields[name] = m.Gauge.GetValue()
case io_prometheus_client.MetricType_SUMMARY:
fields[name] = m.Summary.GetSampleSum()
case io_prometheus_client.MetricType_UNTYPED:
fields[name] = m.Untyped.GetValue()
case io_prometheus_client.MetricType_HISTOGRAM:
fields[name] = m.Histogram.GetSampleSum()
case io_prometheus_client.MetricType_GAUGE_HISTOGRAM:
fields[name] = m.Histogram.GetSampleSum()
}
metric["fields"] = fields
// Set metric name and ending timestamp.
metric["name"] = namespace
metric["timestamp"] = now.UnixNano() / int64(time.Microsecond)
// Serialize into json.
serialized, err := json.Marshal(metric)
if err != nil {
return nil, err
}
// Append new line for parsing into individual metrics.
serialized = append(serialized, '\n')
// Write the serialized metric.
buff.Write(serialized)
}
}
return buff.Bytes(), nil
}
// Returns rather or not output is enabled.
func (i *InfluxOutput) OutputEnabled() bool {
return (i.kwriter != nil || i.client != nil) && i.config.Frequency != 0
}
// Start the influx output schedule.
func (i *InfluxOutput) Start(ctx context.Context) {
// If no outputs configured, stop here.
if !i.OutputEnabled() {
return
}
// Setup schedule.
ticker := time.NewTicker(i.config.Frequency)
for {
select {
// If schedule tick, gather metrics and send output.
case <-ticker.C:
res, err := app.registry.Gather()
if err != nil {
log.Println("Error collecting metric for influx output:", err)
continue
}
// If kafka output enabled, send output to kafka.
if i.kwriter != nil {
var messages []kafka.Message
var data []byte
// Parse metrics based on format.
if i.config.KafkaOutputFormat == "json" {
data, err = i.JSONFormat(res)
} else {
data, err = i.LineprotocolFormat(res)
}
if err != nil {
log.Println("Error formatting metrics for kafka:", err)
}
// Setup parser for new lines.
r := bytes.NewReader(data)
scanner := bufio.NewScanner(r)
scanner.Split(bufio.ScanLines)
// Set routing key to hostname.
routingKey := []byte(app.config.Hostname)
// Scan formatted metrics for each individual metric.
for scanner.Scan() {
b := scanner.Bytes()
// Add back the new line as Kafka output expects it.
b = append(b, '\n')
// Add message.
messages = append(messages, kafka.Message{
Key: routingKey,
Value: b,
})
}
// Write the messages to Kafka.
err := i.kwriter.WriteMessages(ctx, messages...)
if err != nil {
log.Println("Unable to write to Kafka:", err)
}
}
// If influx configured, write metrics to Influx's API.
if i.client != nil {
c := *i.client
writeAPI := c.WriteAPIBlocking(i.config.Org, i.config.Bucket)
// Parse metrics to lineprotocol.
data, err := i.LineprotocolFormat(res)
if err != nil {
log.Println("Error collecting metric for influx output:", err)
continue
}
// Send all metrics to InfluxDB.
writeAPI.WriteRecord(ctx, string(data))
}
// If the context is done, we need to close out connections.
case <-ctx.Done():
if i.kwriter != nil {
i.kwriter.Close()
}
if i.client != nil {
c := *i.client
c.Close()
}
return
}
}
}