osc-mqtt-bridge/relay.go
2023-04-20 22:33:16 -05:00

473 lines
14 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/hypebeast/go-osc/osc"
)
// LogLevel Definition
type LogLevel int
const (
// ErrorLog Logs only errors.
ErrorLog LogLevel = iota
// ReceiveLog MQTT and OSC receive logging.
ReceiveLog
// SendLog MQTT and OSC send logging.
SendLog
// DebugLog Debug messages.
DebugLog
)
// String: Provides a string value for a log level.
func (l LogLevel) String() string {
return [...]string{"Error", "Receive", "Send", "Debug"}[l]
}
// Relay command definition
type RelayCommand struct {
// Command: The command path to send.
Command string `yaml:"command" json:"command"`
// MqttTopic: Absolute MQTT topic to subscribe.
MqttTopic string `yaml:"mqtt_topic" json:"mqtt_topic"`
// MqttSubTopic: Sub topic off relay MQTT topic to subscribe.
// osc/example/$SUB_TOPIC
MqttSubTopic string `yaml:"mqtt_sub_topic" json:"mqtt_sub_topic"`
// DisallowPayload: Rather or not to disallow payload to be relayed.
DisallowPayload bool `yaml:"disallow_payload" json:"disallow_payload"`
// DefaultPayload: Payload to send if no payload is provided via MQTT or if DisallowPayload is true.
DefaultPayload []interface{} `yaml:"default_payload" json:"default_payload"`
}
// Relay OSC subscription
type RelayOscSubscription struct {
// Command: The command to send every interval.
Command string `yaml:"command" json:"command"`
// Payload: Payload to send.
Payload []interface{} `yaml:"default_payload" json:"default_payload"`
// Interval: How often to call the command.
Interval time.Duration `yaml:"interval" json:"interval"`
}
// Relay configurations
type Relay struct {
// MqttHost: Hostname of the MQTT broker.
MqttHost string `yaml:"mqtt_host" json:"mqtt_host"`
// MqttPort: Port of the MQTT broker.
MqttPort int `yaml:"mqtt_port" json:"mqtt_port"`
// MqttClientId: MQTT client ID of this relay.
MqttClientId string `yaml:"mqtt_client_id" json:"mqtt_client_id"`
// MqttUser: User name used for MQTT authentication.
MqttUser string `yaml:"mqtt_user" json:"mqtt_user"`
// MqttPassword: Password used for MQTT authentication.
MqttPassword string `yaml:"mqtt_password" json:"mqtt_password"`
// MqttTopic: Topic where MQTT messages are pushed and received.
// Set topic to `osc/example` and the following topics will be setup.
// osc/example/cmd/$OSC_CMD - Any commands received on OSC will publish here.
// osc/example/send/$OSC_CMD - Any commands pushed via MQTT will be forwarded to OSC.
// osc/example/bundle - OSC Bundle messages.
// osc/example/bundle/send - Send OSC Bundle messages.
// osc/example/status - Configuration is published on startup.
// osc/example/status/check - Request status.
MqttTopic string `yaml:"mqtt_topic" json:"mqtt_topic"`
// MqttDisableConfigSend: Disables the config send.
MqttDisableConfigSend bool `yaml:"mqtt_disable_config_send" json:"mqtt_disable_config_send"`
// OscHost: Hostname for OSC client connection.
OscHost string `yaml:"osc_host" json:"osc_host"`
// OscPort: Port for OSC client connection.
OscPort int `yaml:"osc_port" json:"osc_port"`
// OscBindAddr: Bind address of the OSC server.
// To have bidirectional mode, you must specify at least this, OscHost, and OscPort defined.
// You must specify the unicast IP address, cannot be 0.0.0.0.
OscBindAddr string `yaml:"osc_bind_addr" json:"osc_bind_addr"`
// OscBindPort: Port of the OSC server. Defaults to OscPort if specified.
OscBindPort int `yaml:"osc_bind_port" json:"osc_bind_port"`
// OscDisallowArbritaryCommand: Disallows pushing to arbritary commands to the cmd topic.
OscDisallowArbritaryCommand bool `yaml:"osc_disallow_arbritary_command" json:"osc_disallow_arbritary_command"`
// RelayCommands: Pre-defined commands to relay.
Commands []RelayCommand `yaml:"relay_commands" json:"commands"`
// RelayOscSubscriptions: OSC Comamnds to send at regular intervals. Useful for OSC servers that offers data subscriptions.
OscSubscriptions []RelayOscSubscription `yaml:"osc_subscriptions" json:"osc_subscriptions"`
// LogLevel: How much logging.
// 0 - Errors
// 1 - MQTT and OSC receive logging.
// 2 - MQTT and OSC send logging.
// 3 - Debug
LogLevel LogLevel `yaml:"log_level" json:"log_level"`
// MqttClient: The client connection to MQTT.
MqttClient mqtt.Client `yaml:"-" json:"-"`
// OscClient: The client connection to OSC.
OscClient *osc.Client `yaml:"-" json:"-"`
// OscServer: OSC Server.
OscServer *osc.Server `yaml:"-" json:"-"`
// OscServerConn: Server connection.
// The OSC software is limited in bidirectional support, so I do my own connection here.
OscServerConn net.PacketConn `yaml:"-" json:"-"`
}
// OscMessage: Used for json encode/decode to/from MQTT for bundles.
type OscMessage struct {
Address string `json:"address"`
Arguments []interface{} `json:"arguments"`
}
// OscBundle: Used for json encode/decode to/from MQTT.
type OscBundle struct {
Timetag time.Time `json:"timetag"`
Messages []*OscMessage `json:"messages"`
Bundles []*OscBundle `json:"bundles"`
}
// OscDispatcher: Handles OSC messages.
type OscDispatcher struct {
r *Relay
}
// makeBundle: Makes an OscBundle from an osc.Bundle.
func (d OscDispatcher) makeBundle(bundle *osc.Bundle) *OscBundle {
b := new(OscBundle)
b.Timetag = bundle.Timetag.Time()
for _, message := range bundle.Messages {
m := new(OscMessage)
m.Address = message.Address
m.Arguments = message.Arguments
b.Messages = append(b.Messages, m)
}
for _, sbundle := range bundle.Bundles {
subBundle := d.makeBundle(sbundle)
b.Bundles = append(b.Bundles, subBundle)
}
return b
}
// Dispatch: Handle OSC packet.
func (d OscDispatcher) Dispatch(packet osc.Packet) {
// Determine packet type and process.
if packet != nil {
switch packet.(type) {
default:
d.r.Log(ErrorLog, "Unknown OSC packet received.")
// Message packets can just go to /cmd/$OSC_CMD and arguments encoded to JSON.
case *osc.Message:
message := packet.(*osc.Message)
d.r.Log(ReceiveLog, "<- [OSC] %s: %s", message.Address, message.Arguments)
topic := d.r.MqttTopic + "/cmd" + message.Address
data, err := json.Marshal(message.Arguments)
if err != nil {
d.r.Log(ErrorLog, "Json Encode: %s", err)
return
}
d.r.MqttClient.Publish(topic, 0, true, data)
d.r.Log(SendLog, "-> [MQTT] %s: %s", topic, data)
// Bundle packets are capable of having multiple messages and bundles embeded in it,
// so I translate to my own bundle structure that is JSON aware.
case *osc.Bundle:
b := d.makeBundle(packet.(*osc.Bundle))
d.r.Log(ReceiveLog, "<- [OSC] Bundle %s", b.Timetag)
topic := d.r.MqttTopic + "/bundle"
data, err := json.Marshal(b)
if err != nil {
d.r.Log(ErrorLog, "Json Encode: %s", err)
return
}
d.r.MqttClient.Publish(topic, 0, true, data)
d.r.Log(SendLog, "-> [MQTT] %s: %s", topic, data)
}
}
}
// OscSend: Sends an OSC packet. I use my own function to allow bidirectional communication.
func (r *Relay) OscSend(packet osc.Packet) error {
// Do not send nil packets.
if packet == nil {
return nil
}
// Log send request.
switch packet.(type) {
default:
case *osc.Message:
message := packet.(*osc.Message)
r.Log(SendLog, "-> [OSC] %s: %s", message.Address, message.Arguments)
case *osc.Bundle:
bundle := packet.(*osc.Bundle)
r.Log(SendLog, "-> [OSC] Bundle %s", bundle.Timetag.Time())
}
// Hosts can be DNS names, or IP addresses, so we need to resolve.
var err error
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", r.OscHost, r.OscPort))
if err != nil {
return err
}
// Convert packet to OSC bytes.
data, err := packet.MarshalBinary()
if err != nil {
return err
}
if r.LogLevel >= DebugLog {
r.Log(DebugLog, "-> [OSC] Binary %s", bytes.ReplaceAll(data, []byte{byte(0)}, []byte("~")))
}
// If we have an OSC Server defined, we use its connection to write the data for bidirectional support.
if r.OscServer != nil {
_, err = r.OscServerConn.WriteTo(data, addr)
} else {
// Otherwise, we dial the address with a unused source port.
// Specifying a manual source port could end up with conflicts.
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Write(data)
}
return err
}
// SendStatus: Send config to MQTT status.
func (r *Relay) SendStatus() {
// If disabled, ignore.
if r.MqttDisableConfigSend {
return
}
// Make JSON dump.
config, err := json.Marshal(&r)
if err != nil {
r.Log(ErrorLog, "Json Error: %s", err)
}
// Send config.
r.MqttClient.Publish(r.MqttTopic+"/status", 0, true, config)
}
// MakeOSCBundle: Makes an osc.Bundle. from an OscBundle.
func (r *Relay) MakeOSCBundle(bundle *OscBundle) *osc.Bundle {
b := osc.NewBundle(bundle.Timetag)
// Add attached messages.
for _, message := range bundle.Messages {
m := osc.NewMessage(message.Address)
m.Arguments = message.Arguments
b.Append(m)
}
// Add sub bundles.
for _, sbundle := range bundle.Bundles {
subBundle := r.MakeOSCBundle(sbundle)
b.Append(subBundle)
}
return b
}
// MqttOnEvent: Handle MQTT events.
func (r *Relay) MqttOnEvent(client mqtt.Client, message mqtt.Message) {
r.Log(ReceiveLog, "<- [MQTT] %s: %s\n", message.Topic(), message.Payload())
// Check commands to see if one matches this topic.
for _, cmd := range r.Commands {
if message.Topic() == cmd.MqttTopic ||
(cmd.MqttSubTopic != "" && message.Topic() == r.MqttTopic+"/"+cmd.MqttSubTopic) {
// Configure OSC message.
oscMessage := osc.NewMessage(cmd.Command)
// If arguments allowed and provided, parse, otherwise use default payload.
var arguments []interface{}
if !cmd.DisallowPayload && len(message.Payload()) != 0 {
err := json.Unmarshal(message.Payload(), &arguments)
if err != nil {
r.Log(ErrorLog, "Json Error: %s", err)
return
}
} else if len(cmd.DefaultPayload) != 0 {
arguments = cmd.DefaultPayload
}
oscMessage.Arguments = arguments
// Send OSC message.
err := r.OscSend(oscMessage)
if err != nil {
r.Log(ErrorLog, "Send Error: %s", err)
}
}
}
// If standard send topic.
if strings.HasPrefix(message.Topic(), r.MqttTopic+"/send") {
// Verify arbritary commands can be sent.
if r.OscDisallowArbritaryCommand {
r.Log(ErrorLog, "Arbritary commands are disabled on this relay.")
return
}
// Get the command from topic.
cmd := strings.Replace(message.Topic(), r.MqttTopic+"/send", "", 1)
if cmd == "" {
cmd = "/"
}
// Parse the arguments.
var arguments []interface{}
if len(message.Payload()) != 0 {
err := json.Unmarshal(message.Payload(), &arguments)
if err != nil {
r.Log(ErrorLog, "Json Error: %s", err)
return
}
}
// Create OSC message.
oscMessage := osc.NewMessage(cmd)
oscMessage.Arguments = arguments
// Send OSC message.
err := r.OscSend(oscMessage)
if err != nil {
r.Log(ErrorLog, "Send Error: %s", err)
}
} else if message.Topic() == r.MqttTopic+"/bundle/send" {
// Verify arbritary commands can be sent.
if r.OscDisallowArbritaryCommand {
r.Log(ErrorLog, "Arbritary commands are disabled on this relay.")
return
}
// Create bundle.
bundle := new(OscBundle)
err := json.Unmarshal(message.Payload(), bundle)
if err != nil {
r.Log(ErrorLog, "Json Error: %s", err)
return
}
// Make the OSC bundle based on received bundle.
b := r.MakeOSCBundle(bundle)
// Send OSC bundle.
err = r.OscSend(b)
if err != nil {
r.Log(ErrorLog, "Send Error: %s", err)
}
} else if message.Topic() == r.MqttTopic+"/status/check" {
r.SendStatus()
}
}
// MqttSubscribe: Subscribe to MQTT Topic.
func (r *Relay) MqttSubscribe(topic string) {
r.Log(DebugLog, "Subscribing MQTT: %s", topic)
if t := r.MqttClient.Subscribe(topic, 0, r.MqttOnEvent); t.Wait() && t.Error() != nil {
r.Log(ErrorLog, "MQTT Subscribe Error: %s", t.Error())
}
}
// Log: Logging function to allow log levels.
func (r *Relay) Log(level LogLevel, format string, args ...interface{}) {
if level <= r.LogLevel {
log.Println(fmt.Sprintf(format, args...))
}
}
// Start: Start the relay.
func (r *Relay) Start() {
// Connect to MQTT.
mqtt_opts := mqtt.NewClientOptions()
mqtt_opts.AddBroker(fmt.Sprintf("tcp://%s:%d", r.MqttHost, r.MqttPort))
mqtt_opts.SetClientID(r.MqttClientId)
mqtt_opts.SetUsername(r.MqttUser)
mqtt_opts.SetPassword(r.MqttPassword)
r.MqttClient = mqtt.NewClient(mqtt_opts)
// Connect and failures are fatal exiting service.
r.Log(DebugLog, "Connecting to MQTT")
if t := r.MqttClient.Connect(); t.Wait() && t.Error() != nil {
log.Fatalf("MQTT error: %s", t.Error())
return
}
// Subscribe to MQTT topics.
r.MqttSubscribe(r.MqttTopic + "/send/#")
r.MqttSubscribe(r.MqttTopic + "/bundle/send")
r.MqttSubscribe(r.MqttTopic + "/status/check")
// Subscribe to command topics configured.
for _, cmd := range r.Commands {
if cmd.MqttTopic != "" {
r.MqttSubscribe(cmd.MqttTopic)
}
if cmd.MqttSubTopic != "" {
r.MqttSubscribe(r.MqttTopic + "/" + cmd.MqttSubTopic)
}
}
// If an OSC client configuration is provided, setup client.
if r.OscHost != "" && r.OscPort != 0 {
r.OscClient = osc.NewClient(r.OscHost, r.OscPort)
}
// If OSC server configured, setup server.
if r.OscBindAddr != "" && r.OscBindPort != 0 {
r.OscServer = &osc.Server{Addr: fmt.Sprintf("%s:%d", r.OscBindAddr, r.OscBindPort), Dispatcher: OscDispatcher{r: r}}
// Run server in thread.
go func() {
r.Log(DebugLog, "Starting OSC Server")
var err error
// I setup our own UDP connection to overcome a limit in go-osc
// where bidirectional isn't built in.
r.OscServerConn, err = net.ListenPacket("udp", r.OscServer.Addr)
if err != nil {
log.Fatal(err)
}
// Close connection when function ends.
defer r.OscServerConn.Close()
// Have Go-OSC handle OSC traffic on this connection.
if err = r.OscServer.Serve(r.OscServerConn); err != nil {
log.Fatal(err)
}
}()
}
// Setup subscriptions.
for _, subcription := range r.OscSubscriptions {
// Each subscription runs in its own thread.
go func(subcription RelayOscSubscription) {
r.Log(DebugLog, "Started subscription: %s", subcription.Command)
ticker := time.NewTicker(subcription.Interval)
for range ticker.C {
// Send OSC message as configured.
r.Log(DebugLog, "Running subscription: %s", subcription.Command)
oscMessage := osc.NewMessage(subcription.Command)
oscMessage.Arguments = subcription.Payload
err := r.OscSend(oscMessage)
if err != nil {
r.Log(ErrorLog, "Send Error: %s", err)
}
}
}(subcription)
}
// Send current config to MQTT.
r.SendStatus()
}