From 86a1aabd55f9fe878f2ea8a3abd40b7ae83ea198 Mon Sep 17 00:00:00 2001 From: James Coleman Date: Thu, 20 Apr 2023 22:28:28 -0500 Subject: [PATCH] First commit --- .gitignore | 2 + License.txt | 19 +++ README.md | 93 +++++++++++ config.go | 87 ++++++++++ flags.go | 35 ++++ go.mod | 15 ++ go.sum | 18 ++ main.go | 38 +++++ relay.go | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 779 insertions(+) create mode 100644 .gitignore create mode 100644 License.txt create mode 100644 README.md create mode 100644 config.go create mode 100644 flags.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 relay.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..76b0d9f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +config.yaml +osc-mqtt-bridge diff --git a/License.txt b/License.txt new file mode 100644 index 0000000..3c9e3c8 --- /dev/null +++ b/License.txt @@ -0,0 +1,19 @@ +Copyright (c) 2023 Mr. Gecko's Media (James Coleman). http://mrgeckosmedia.com/ + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5b5e770 --- /dev/null +++ b/README.md @@ -0,0 +1,93 @@ +# osc-mqtt-bridge +A bridge between [Open Sound Control](https://en.wikipedia.org/wiki/Open_Sound_Control) (OSC) and MQTT, allowind bidirectional communication. The main purpose of this tool is to provide a way to talk to devices that support OSC via MQTT messages for automation. + +## Example configuration +```yaml +relays: + - mqtt_host: 10.0.0.2 + mqtt_port: 1883 + mqtt_client_id: osc_mqtt_bridge + mqtt_user: mqtt + mqtt_password: PASSWORD + mqtt_topic: osc/behringer_wing + osc_host: 10.0.0.3 + osc_port: 2223 + osc_bind_addr: 10.0.0.4 # Change to this machine's IP address. Expected to be a static IP. + log_level: 2 +``` + +## Configuration specification + +### Relay +- `mqtt_host`: Hostname of the MQTT broker. +- `mqtt_port`: Port of the MQTT broker. +- `mqtt_client_id`: MQTT client ID of this relay. +- `mqtt_user`: User name used for MQTT authentication. +- `mqtt_password`: Password used for MQTT authentication. +- `mqtt_topic`: Topic where MQTT messages are pushed and received. + + Set topic to `osc/example` and the following topics will be setup. + - `osc/example/cmd/$OSC_CMD` - Any commands received on OSC will publish here. + - `osc/example/send/$OSC_CMD` - Any commands pushed via MQTT will be forwarded to OSC. + - `osc/example/bundle` - OSC Bundle messages. + - `osc/example/bundle/send` - Send OSC Bundle messages. + - `osc/example/status` - Configuration is published on startup. + - `osc/example/status/check` - Request status. +

+ +- `mqtt_disable_config_send`: Disables the config send. +- `osc_host`: Hostname for OSC client connection. +- `osc_port`: Port for OSC client connection. +- `osc_bind_addr`: Bind address of the OSC server. + + To have bidirectional mode, you must specify at least this, OscHost, and OscPort defined. You must specify the unicast IP address, cannot be `0.0.0.0`. + +- `osc_bind_port`: Port of the OSC server. Defaults to OscPort if specified. +- `osc_disallow_arbritary_command`: Disallows pushing to arbritary commands to the cmd topic. +- `commands`: Pre-defined commands to relay. + + This is an array with the following variables. + + - `command`: The command path to send. + - `mqtt_topic`: Absolute MQTT topic to subscribe. + - `mqtt_sub_topic`: Sub topic off relay MQTT topic to subscribe. + osc/example/$SUB_TOPIC + - `disallow_payload`: Rather or not to disallow payload to be relayed. + - `default_payload`: Payload to send if no payload is provided via MQTT or if DisallowPayload is true. This is an array of strings/integers/timestamps/bools. +

+ +- `osc_subscriptions`: OSC Comamnds to send at regular intervals. Useful for OSC servers that offers data subscriptions. + + This is an array with the following variables. + + - `command`: The command to send every interval. + - `payload`: Payload to send. This is an array of strings/integers/timestamps/bools. + - `interval`: How often to call the command. +

+ +- `log_level`: How much logging. + + - 0 - Errors + - 1 - MQTT and OSC receive logging. + - 2 - MQTT and OSC send logging. + - 3 - Debug + +## MQTT Message Example + +**Mute Behringer Wing channel 1**
+Topic: osc/behringer_wing/send/ch/1/mute
+Payload: `["1"]` + +**Behringer Wing get info**
+Topic: osc/behringer_wing/send/?
+Payload: + +## Build + +```bash +go build +``` + +## Config file location + +Same directory as the binary, in your home directory at `~/.config/mqtt-osc-bridge/config.yaml`, or under etc at `/etc/mqtt-osc-bridge/config.yaml`. diff --git a/config.go b/config.go new file mode 100644 index 0000000..b5a74a6 --- /dev/null +++ b/config.go @@ -0,0 +1,87 @@ +package main + +import ( + "log" + "os" + "os/user" + "path/filepath" + + "gopkg.in/yaml.v2" +) + +// Configuration Structure +type Config struct { + // Relays: Different relays available. + Relays []*Relay `yaml:"relays"` +} + +// ReadConfig Read the configuration file +func (a *App) ReadConfig() { + usr, err := user.Current() + if err != nil { + log.Fatal(err) + } + + // Configuration paths. + localConfig, _ := filepath.Abs("./config.yaml") + homeDirConfig := usr.HomeDir + "/.config/mqtt-osc-bridge/config.yaml" + etcConfig := "/etc/mqtt-osc-bridge/config.yaml" + + // Determine which configuration to use. + var configFile string + if _, err := os.Stat(app.flags.ConfigPath); err == nil && app.flags.ConfigPath != "" { + configFile = app.flags.ConfigPath + } else if _, err := os.Stat(localConfig); err == nil { + configFile = localConfig + } else if _, err := os.Stat(homeDirConfig); err == nil { + configFile = homeDirConfig + } else if _, err := os.Stat(etcConfig); err == nil { + configFile = etcConfig + } else { + log.Fatal("Unable to find a configuration file.") + } + + app.config = new(Config) + + yamlFile, err := os.ReadFile(configFile) + if err != nil { + log.Fatalf("Error reading YAML file: %s\n", err) + } + + err = yaml.Unmarshal(yamlFile, &app.config) + if err != nil { + log.Fatalf("Error parsing YAML file: %s\n", err) + } + + if len(app.config.Relays) == 0 { + log.Fatal("No relays defined in the configuration file.") + } + + for _, relay := range app.config.Relays { + if relay.OscBindAddr != "" && relay.OscBindPort == 0 { + relay.OscBindPort = relay.OscPort + } + } + + for i, relay := range app.config.Relays { + if relay.MqttHost == "" || relay.MqttPort == 0 { + log.Fatalf("Relay %d: MQTT host and port are required configurations.", i) + } + if relay.MqttTopic == "" { + log.Fatalf("Relay %d: MQTT topic is a required configuration.", i) + } + if relay.OscBindAddr == "" && relay.OscHost == "" { + log.Fatalf("Relay %d: You must define either a bind address or an OSC host in the configuration.", i) + } + for b, relay2 := range app.config.Relays { + if b != i { + if relay.MqttTopic == relay2.MqttTopic { + log.Fatalf("Relay %d: MQTT topic cannot exist on 2 different relays.", i) + } + if relay.OscBindPort == relay2.OscBindPort { + log.Fatalf("Relay %d: Cannot use the same OSC bind port on 2 different relays.", i) + } + } + } + } +} diff --git a/flags.go b/flags.go new file mode 100644 index 0000000..6db33a9 --- /dev/null +++ b/flags.go @@ -0,0 +1,35 @@ +package main + +import ( + "flag" + "fmt" + "os" +) + +// Flags Configuration options for cli execution +type Flags struct { + ConfigPath string +} + +// InitFlags Parses configuration options +func (a *App) InitFlags() { + app.flags = new(Flags) + flag.Usage = func() { + fmt.Printf(serviceName + ": " + serviceDescription + ".\n\nUsage:\n") + flag.PrintDefaults() + } + + var printVersion bool + flag.BoolVar(&printVersion, "v", false, "Print version") + + usage := "Load configuration from `FILE`" + flag.StringVar(&app.flags.ConfigPath, "config", "", usage) + flag.StringVar(&app.flags.ConfigPath, "c", "", usage+" (shorthand)") + + flag.Parse() + + if printVersion { + fmt.Println(serviceName + ": " + serviceVersion) + os.Exit(0) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3a7a5bd --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/GRMrGecko/osc-mqtt-bridge + +go 1.19 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.2 + github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5 + gopkg.in/yaml.v2 v2.4.0 +) + +require ( + github.com/gorilla/websocket v1.4.2 // indirect + golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0276101 --- /dev/null +++ b/go.sum @@ -0,0 +1,18 @@ +github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= +github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5 h1:fqwINudmUrvGCuw+e3tedZ2UJ0hklSw6t8UPomctKyQ= +github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5/go.mod h1:lqMjoCs0y0GoRRujSPZRBaGb4c5ER6TfkFKSClxkMbY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..8e432bc --- /dev/null +++ b/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "log" + "os" + "path/filepath" +) + +const serviceName = "osc-mqtt-bridge" +const serviceDescription = "Bridges MQTT messages to OSC" +const serviceVersion = "0.1" + +// App is the global application structure for communicating between servers and storing information. +type App struct { + flags *Flags + config *Config +} + +var app *App + +func main() { + thisPath, err := os.Executable() + if err != nil { + log.Panic(err) + } + os.Chdir(filepath.Dir(thisPath)) + + app = new(App) + app.InitFlags() + app.ReadConfig() + + for _, relay := range app.config.Relays { + relay.Start() + } + + for { + } +} diff --git a/relay.go b/relay.go new file mode 100644 index 0000000..7f2c1aa --- /dev/null +++ b/relay.go @@ -0,0 +1,472 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net" + "strings" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/hypebeast/go-osc/osc" +) + +// LogLevel Definition +type LogLevel int + +const ( + // ErrorLog Logs only errors. + ErrorLog LogLevel = iota + // ReceiveLog MQTT and OSC receive logging. + ReceiveLog + // SendLog MQTT and OSC send logging. + SendLog + // DebugLog Debug messages. + DebugLog +) + +// String: Provides a string value for a log level. +func (l LogLevel) String() string { + return [...]string{"Error", "Receive", "Send", "Debug"}[l] +} + +// Relay command definition +type RelayCommand struct { + // Command: The command path to send. + Command string `yaml:"command" json:"command"` + // MqttTopic: Absolute MQTT topic to subscribe. + MqttTopic string `yaml:"mqtt_topic" json:"mqtt_topic"` + // MqttSubTopic: Sub topic off relay MQTT topic to subscribe. + // osc/example/$SUB_TOPIC + MqttSubTopic string `yaml:"mqtt_sub_topic" json:"mqtt_sub_topic"` + // DisallowPayload: Rather or not to disallow payload to be relayed. + DisallowPayload bool `yaml:"disallow_payload" json:"disallow_payload"` + // DefaultPayload: Payload to send if no payload is provided via MQTT or if DisallowPayload is true. + DefaultPayload []interface{} `yaml:"default_payload" json:"default_payload"` +} + +// Relay OSC subscription +type RelayOscSubscription struct { + // Command: The command to send every interval. + Command string `yaml:"command" json:"command"` + // Payload: Payload to send. + Payload []interface{} `yaml:"payload" json:"payload"` + // Interval: How often to call the command. + Interval time.Duration `yaml:"interval" json:"interval"` +} + +// Relay configurations +type Relay struct { + // MqttHost: Hostname of the MQTT broker. + MqttHost string `yaml:"mqtt_host" json:"mqtt_host"` + // MqttPort: Port of the MQTT broker. + MqttPort int `yaml:"mqtt_port" json:"mqtt_port"` + // MqttClientId: MQTT client ID of this relay. + MqttClientId string `yaml:"mqtt_client_id" json:"mqtt_client_id"` + // MqttUser: User name used for MQTT authentication. + MqttUser string `yaml:"mqtt_user" json:"mqtt_user"` + // MqttPassword: Password used for MQTT authentication. + MqttPassword string `yaml:"mqtt_password" json:"mqtt_password"` + // MqttTopic: Topic where MQTT messages are pushed and received. + // Set topic to `osc/example` and the following topics will be setup. + // osc/example/cmd/$OSC_CMD - Any commands received on OSC will publish here. + // osc/example/send/$OSC_CMD - Any commands pushed via MQTT will be forwarded to OSC. + // osc/example/bundle - OSC Bundle messages. + // osc/example/bundle/send - Send OSC Bundle messages. + // osc/example/status - Configuration is published on startup. + // osc/example/status/check - Request status. + MqttTopic string `yaml:"mqtt_topic" json:"mqtt_topic"` + // MqttDisableConfigSend: Disables the config send. + MqttDisableConfigSend bool `yaml:"mqtt_disable_config_send" json:"mqtt_disable_config_send"` + + // OscHost: Hostname for OSC client connection. + OscHost string `yaml:"osc_host" json:"osc_host"` + // OscPort: Port for OSC client connection. + OscPort int `yaml:"osc_port" json:"osc_port"` + // OscBindAddr: Bind address of the OSC server. + // To have bidirectional mode, you must specify at least this, OscHost, and OscPort defined. + // You must specify the unicast IP address, cannot be 0.0.0.0. + OscBindAddr string `yaml:"osc_bind_addr" json:"osc_bind_addr"` + // OscBindPort: Port of the OSC server. Defaults to OscPort if specified. + OscBindPort int `yaml:"osc_bind_port" json:"osc_bind_port"` + // OscDisallowArbritaryCommand: Disallows pushing to arbritary commands to the cmd topic. + OscDisallowArbritaryCommand bool `yaml:"osc_disallow_arbritary_command" json:"osc_disallow_arbritary_command"` + + // RelayCommands: Pre-defined commands to relay. + Commands []RelayCommand `yaml:"relay_commands" json:"commands"` + // RelayOscSubscriptions: OSC Comamnds to send at regular intervals. Useful for OSC servers that offers data subscriptions. + OscSubscriptions []RelayOscSubscription `yaml:"osc_subscriptions" json:"osc_subscriptions"` + + // LogLevel: How much logging. + // 0 - Errors + // 1 - MQTT and OSC receive logging. + // 2 - MQTT and OSC send logging. + // 3 - Debug + LogLevel LogLevel `yaml:"log_level" json:"log_level"` + + // MqttClient: The client connection to MQTT. + MqttClient mqtt.Client `yaml:"-" json:"-"` + // OscClient: The client connection to OSC. + OscClient *osc.Client `yaml:"-" json:"-"` + // OscServer: OSC Server. + OscServer *osc.Server `yaml:"-" json:"-"` + // OscServerConn: Server connection. + // The OSC software is limited in bidirectional support, so I do my own connection here. + OscServerConn net.PacketConn `yaml:"-" json:"-"` +} + +// OscMessage: Used for json encode/decode to/from MQTT for bundles. +type OscMessage struct { + Address string `json:"address"` + Arguments []interface{} `json:"arguments"` +} + +// OscBundle: Used for json encode/decode to/from MQTT. +type OscBundle struct { + Timetag time.Time `json:"timetag"` + Messages []*OscMessage `json:"messages"` + Bundles []*OscBundle `json:"bundles"` +} + +// OscDispatcher: Handles OSC messages. +type OscDispatcher struct { + r *Relay +} + +// makeBundle: Makes an OscBundle from an osc.Bundle. +func (d OscDispatcher) makeBundle(bundle *osc.Bundle) *OscBundle { + b := new(OscBundle) + b.Timetag = bundle.Timetag.Time() + for _, message := range bundle.Messages { + m := new(OscMessage) + m.Address = message.Address + m.Arguments = message.Arguments + b.Messages = append(b.Messages, m) + } + + for _, sbundle := range bundle.Bundles { + subBundle := d.makeBundle(sbundle) + b.Bundles = append(b.Bundles, subBundle) + } + return b +} + +// Dispatch: Handle OSC packet. +func (d OscDispatcher) Dispatch(packet osc.Packet) { + // Determine packet type and process. + if packet != nil { + switch packet.(type) { + default: + d.r.Log(ErrorLog, "Unknown OSC packet received.") + + // Message packets can just go to /cmd/$OSC_CMD and arguments encoded to JSON. + case *osc.Message: + message := packet.(*osc.Message) + d.r.Log(ReceiveLog, "<- [OSC] %s: %s", message.Address, message.Arguments) + topic := d.r.MqttTopic + "/cmd" + message.Address + data, err := json.Marshal(message.Arguments) + if err != nil { + d.r.Log(ErrorLog, "Json Encode: %s", err) + return + } + d.r.MqttClient.Publish(topic, 0, true, data) + d.r.Log(SendLog, "-> [MQTT] %s: %s", topic, data) + + // Bundle packets are capable of having multiple messages and bundles embeded in it, + // so I translate to my own bundle structure that is JSON aware. + case *osc.Bundle: + b := d.makeBundle(packet.(*osc.Bundle)) + d.r.Log(ReceiveLog, "<- [OSC] Bundle %s", b.Timetag) + topic := d.r.MqttTopic + "/bundle" + data, err := json.Marshal(b) + if err != nil { + d.r.Log(ErrorLog, "Json Encode: %s", err) + return + } + d.r.MqttClient.Publish(topic, 0, true, data) + d.r.Log(SendLog, "-> [MQTT] %s: %s", topic, data) + } + } +} + +// OscSend: Sends an OSC packet. I use my own function to allow bidirectional communication. +func (r *Relay) OscSend(packet osc.Packet) error { + // Do not send nil packets. + if packet == nil { + return nil + } + + // Log send request. + switch packet.(type) { + default: + case *osc.Message: + message := packet.(*osc.Message) + r.Log(SendLog, "-> [OSC] %s: %s", message.Address, message.Arguments) + case *osc.Bundle: + bundle := packet.(*osc.Bundle) + r.Log(SendLog, "-> [OSC] Bundle %s", bundle.Timetag.Time()) + } + + // Hosts can be DNS names, or IP addresses, so we need to resolve. + var err error + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", r.OscHost, r.OscPort)) + if err != nil { + return err + } + + // Convert packet to OSC bytes. + data, err := packet.MarshalBinary() + if err != nil { + return err + } + if r.LogLevel >= DebugLog { + r.Log(DebugLog, "-> [OSC] Binary %s", bytes.ReplaceAll(data, []byte{byte(0)}, []byte("~"))) + } + + // If we have an OSC Server defined, we use its connection to write the data for bidirectional support. + if r.OscServer != nil { + _, err = r.OscServerConn.WriteTo(data, addr) + } else { + // Otherwise, we dial the address with a unused source port. + // Specifying a manual source port could end up with conflicts. + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + return err + } + defer conn.Close() + + _, err = conn.Write(data) + } + + return err +} + +// SendStatus: Send config to MQTT status. +func (r *Relay) SendStatus() { + // If disabled, ignore. + if r.MqttDisableConfigSend { + return + } + + // Make JSON dump. + config, err := json.Marshal(&r) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + } + + // Send config. + r.MqttClient.Publish(r.MqttTopic+"/status", 0, true, config) +} + +// MakeOSCBundle: Makes an osc.Bundle. from an OscBundle. +func (r *Relay) MakeOSCBundle(bundle *OscBundle) *osc.Bundle { + b := osc.NewBundle(bundle.Timetag) + + // Add attached messages. + for _, message := range bundle.Messages { + m := osc.NewMessage(message.Address) + m.Arguments = message.Arguments + b.Append(m) + } + + // Add sub bundles. + for _, sbundle := range bundle.Bundles { + subBundle := r.MakeOSCBundle(sbundle) + b.Append(subBundle) + } + return b +} + +// MqttOnEvent: Handle MQTT events. +func (r *Relay) MqttOnEvent(client mqtt.Client, message mqtt.Message) { + r.Log(ReceiveLog, "<- [MQTT] %s: %s\n", message.Topic(), message.Payload()) + + // Check commands to see if one matches this topic. + for _, cmd := range r.Commands { + if message.Topic() == cmd.MqttTopic || + (cmd.MqttSubTopic != "" && message.Topic() == r.MqttTopic+"/"+cmd.MqttSubTopic) { + // Configure OSC message. + oscMessage := osc.NewMessage(cmd.Command) + + // If arguments allowed and provided, parse, otherwise use default payload. + var arguments []interface{} + if !cmd.DisallowPayload && len(message.Payload()) != 0 { + err := json.Unmarshal(message.Payload(), &arguments) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + } else if len(cmd.DefaultPayload) != 0 { + arguments = cmd.DefaultPayload + } + oscMessage.Arguments = arguments + + // Send OSC message. + err := r.OscSend(oscMessage) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } + } + + // If standard send topic. + if strings.HasPrefix(message.Topic(), r.MqttTopic+"/send") { + // Verify arbritary commands can be sent. + if r.OscDisallowArbritaryCommand { + r.Log(ErrorLog, "Arbritary commands are disabled on this relay.") + return + } + + // Get the command from topic. + cmd := strings.Replace(message.Topic(), r.MqttTopic+"/send", "", 1) + if cmd == "" { + cmd = "/" + } + + // Parse the arguments. + var arguments []interface{} + if len(message.Payload()) != 0 { + err := json.Unmarshal(message.Payload(), &arguments) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + } + + // Create OSC message. + oscMessage := osc.NewMessage(cmd) + oscMessage.Arguments = arguments + + // Send OSC message. + err := r.OscSend(oscMessage) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } else if message.Topic() == r.MqttTopic+"/bundle/send" { + // Verify arbritary commands can be sent. + if r.OscDisallowArbritaryCommand { + r.Log(ErrorLog, "Arbritary commands are disabled on this relay.") + return + } + + // Create bundle. + bundle := new(OscBundle) + err := json.Unmarshal(message.Payload(), bundle) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + + // Make the OSC bundle based on received bundle. + b := r.MakeOSCBundle(bundle) + + // Send OSC bundle. + err = r.OscSend(b) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } else if message.Topic() == r.MqttTopic+"/status/check" { + r.SendStatus() + } +} + +// MqttSubscribe: Subscribe to MQTT Topic. +func (r *Relay) MqttSubscribe(topic string) { + r.Log(DebugLog, "Subscribing MQTT: %s", topic) + if t := r.MqttClient.Subscribe(topic, 0, r.MqttOnEvent); t.Wait() && t.Error() != nil { + r.Log(ErrorLog, "MQTT Subscribe Error: %s", t.Error()) + } +} + +// Log: Logging function to allow log levels. +func (r *Relay) Log(level LogLevel, format string, args ...interface{}) { + if level <= r.LogLevel { + log.Println(fmt.Sprintf(format, args...)) + } +} + +// Start: Start the relay. +func (r *Relay) Start() { + // Connect to MQTT. + mqtt_opts := mqtt.NewClientOptions() + mqtt_opts.AddBroker(fmt.Sprintf("tcp://%s:%d", r.MqttHost, r.MqttPort)) + mqtt_opts.SetClientID(r.MqttClientId) + mqtt_opts.SetUsername(r.MqttUser) + mqtt_opts.SetPassword(r.MqttPassword) + r.MqttClient = mqtt.NewClient(mqtt_opts) + + // Connect and failures are fatal exiting service. + r.Log(DebugLog, "Connecting to MQTT") + if t := r.MqttClient.Connect(); t.Wait() && t.Error() != nil { + log.Fatalf("MQTT error: %s", t.Error()) + return + } + + // Subscribe to MQTT topics. + r.MqttSubscribe(r.MqttTopic + "/send/#") + r.MqttSubscribe(r.MqttTopic + "/bundle/send") + r.MqttSubscribe(r.MqttTopic + "/status/check") + // Subscribe to command topics configured. + for _, cmd := range r.Commands { + if cmd.MqttTopic != "" { + r.MqttSubscribe(cmd.MqttTopic) + } + if cmd.MqttSubTopic != "" { + r.MqttSubscribe(r.MqttTopic + "/" + cmd.MqttSubTopic) + } + } + + // If an OSC client configuration is provided, setup client. + if r.OscHost != "" && r.OscPort != 0 { + r.OscClient = osc.NewClient(r.OscHost, r.OscPort) + } + + // If OSC server configured, setup server. + if r.OscBindAddr != "" && r.OscBindPort != 0 { + r.OscServer = &osc.Server{Addr: fmt.Sprintf("%s:%d", r.OscBindAddr, r.OscBindPort), Dispatcher: OscDispatcher{r: r}} + + // Run server in thread. + go func() { + r.Log(DebugLog, "Starting OSC Server") + var err error + // I setup our own UDP connection to overcome a limit in go-osc + // where bidirectional isn't built in. + r.OscServerConn, err = net.ListenPacket("udp", r.OscServer.Addr) + if err != nil { + log.Fatal(err) + } + + // Close connection when function ends. + defer r.OscServerConn.Close() + + // Have Go-OSC handle OSC traffic on this connection. + if err = r.OscServer.Serve(r.OscServerConn); err != nil { + log.Fatal(err) + } + }() + } + + // Setup subscriptions. + for _, subcription := range r.OscSubscriptions { + // Each subscription runs in its own thread. + go func(subcription RelayOscSubscription) { + r.Log(DebugLog, "Started subscription: %s", subcription.Command) + ticker := time.NewTicker(subcription.Interval) + for range ticker.C { + // Send OSC message as configured. + r.Log(DebugLog, "Running subscription: %s", subcription.Command) + oscMessage := osc.NewMessage(subcription.Command) + oscMessage.Arguments = subcription.Payload + err := r.OscSend(oscMessage) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } + }(subcription) + } + + // Send current config to MQTT. + r.SendStatus() +}