178 lines
4.6 KiB
Go
178 lines
4.6 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
// influxMeasurement is the InfluxDB measurement name; it matches the Prometheus
|
|
// namespace so both outputs describe the same series.
|
|
const influxMeasurement = namespace
|
|
|
|
// enclosureSlot formats the physical location as "<enclosure>:<slot>", falling
|
|
// back to whichever single value is known.
|
|
func (d *Drive) enclosureSlot() string {
|
|
switch {
|
|
case d.Enclosure != "" && d.Slot != "":
|
|
return d.Enclosure + ":" + d.Slot
|
|
case d.Slot != "":
|
|
return d.Slot
|
|
default:
|
|
return d.Enclosure
|
|
}
|
|
}
|
|
|
|
// csvEscape quotes s and doubles embedded quotes when it contains a comma,
|
|
// quote, or newline, per RFC 4180.
|
|
func csvEscape(s string) string {
|
|
if strings.ContainsAny(s, ",\"\n") {
|
|
return "\"" + strings.ReplaceAll(s, "\"", "\"\"") + "\""
|
|
}
|
|
return s
|
|
}
|
|
|
|
// recordsToCSV renders the drives as a CSV document: the header row followed by
|
|
// one escaped row per drive, in schema (columns) order.
|
|
func recordsToCSV(drives []*Drive) string {
|
|
var b strings.Builder
|
|
names := make([]string, len(columns))
|
|
for i, c := range columns {
|
|
names[i] = c.name
|
|
}
|
|
b.WriteString(strings.Join(names, ","))
|
|
for _, d := range drives {
|
|
b.WriteByte('\n')
|
|
cells := make([]string, len(columns))
|
|
for i, c := range columns {
|
|
cells[i] = csvEscape(format(c.raw(d)))
|
|
}
|
|
b.WriteString(strings.Join(cells, ","))
|
|
}
|
|
return b.String()
|
|
}
|
|
|
|
// influxTagEscape escapes spaces, commas, and equals signs in an InfluxDB tag
|
|
// value, which the line protocol treats as delimiters.
|
|
func influxTagEscape(s string) string {
|
|
r := strings.NewReplacer(" ", `\ `, ",", `\,`, "=", `\=`)
|
|
return r.Replace(s)
|
|
}
|
|
|
|
// recordsToInflux renders the drives as InfluxDB line protocol, one line per
|
|
// drive: tag columns become tags, the remaining (non-csvOnly) columns become
|
|
// typed fields (int "i" suffix, float, bool, or quoted string), all sharing the
|
|
// collection timestamp. A drive with no usable fields is skipped.
|
|
func recordsToInflux(drives []*Drive, tsNs int64) string {
|
|
var lines []string
|
|
for _, d := range drives {
|
|
// Tags.
|
|
tags := map[string]string{}
|
|
for _, c := range columns {
|
|
if !c.influxTag {
|
|
continue
|
|
}
|
|
if v := format(c.raw(d)); v != "" {
|
|
tags[c.name] = influxTagEscape(v)
|
|
}
|
|
}
|
|
// Fields: every non-csvOnly, non-tag column with a value.
|
|
fields := map[string]string{}
|
|
for _, c := range columns {
|
|
if c.csvOnly || c.influxTag {
|
|
continue
|
|
}
|
|
r := c.raw(d)
|
|
if r == nil {
|
|
continue
|
|
}
|
|
switch c.kind {
|
|
case kindInt:
|
|
fields[c.name] = strconv.Itoa(r.(int)) + "i"
|
|
case kindFloat:
|
|
fields[c.name] = format(r)
|
|
case kindBool:
|
|
fields[c.name] = format(r) // "true"/"false".
|
|
default:
|
|
fields[c.name] = "\"" + strings.ReplaceAll(format(r), "\"", "\\\"") + "\""
|
|
}
|
|
}
|
|
if len(fields) == 0 {
|
|
continue
|
|
}
|
|
tagStr := joinSorted(tags)
|
|
fieldStr := joinSorted(fields)
|
|
if tagStr != "" {
|
|
lines = append(lines, fmt.Sprintf("%s,%s %s %d", influxMeasurement, tagStr, fieldStr, tsNs))
|
|
} else {
|
|
lines = append(lines, fmt.Sprintf("%s %s %d", influxMeasurement, fieldStr, tsNs))
|
|
}
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
}
|
|
|
|
// recordsToInfluxJSON renders the drives as newline-delimited InfluxDB JSON, one
|
|
// object per drive ({name, tags, fields, timestamp}). It applies the same
|
|
// tag/field split and typing as recordsToInflux; the timestamp is microseconds.
|
|
// A drive with no usable fields is skipped.
|
|
func recordsToInfluxJSON(drives []*Drive, tsNs int64) []byte {
|
|
var buff bytes.Buffer
|
|
tsMicro := tsNs / 1000
|
|
for _, d := range drives {
|
|
// Tags.
|
|
tags := map[string]string{}
|
|
for _, c := range columns {
|
|
if !c.influxTag {
|
|
continue
|
|
}
|
|
if v := format(c.raw(d)); v != "" {
|
|
tags[c.name] = v
|
|
}
|
|
}
|
|
// Typed fields.
|
|
fields := map[string]interface{}{}
|
|
for _, c := range columns {
|
|
if c.csvOnly || c.influxTag {
|
|
continue
|
|
}
|
|
r := c.raw(d)
|
|
if r == nil {
|
|
continue
|
|
}
|
|
fields[c.name] = r // int, float64, bool, or string — already typed.
|
|
}
|
|
if len(fields) == 0 {
|
|
continue
|
|
}
|
|
serialized, err := json.Marshal(map[string]interface{}{
|
|
"name": influxMeasurement,
|
|
"tags": tags,
|
|
"fields": fields,
|
|
"timestamp": tsMicro,
|
|
})
|
|
if err != nil {
|
|
continue
|
|
}
|
|
buff.Write(serialized)
|
|
buff.WriteByte('\n')
|
|
}
|
|
return buff.Bytes()
|
|
}
|
|
|
|
// joinSorted joins m as "k=v" pairs in key order, keeping line-protocol tag and
|
|
// field sets deterministic across runs.
|
|
func joinSorted(m map[string]string) string {
|
|
keys := make([]string, 0, len(m))
|
|
for k := range m {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
parts := make([]string, len(keys))
|
|
for i, k := range keys {
|
|
parts[i] = k + "=" + m[k]
|
|
}
|
|
return strings.Join(parts, ",")
|
|
}
|