commit 86a1aabd55f9fe878f2ea8a3abd40b7ae83ea198 Author: James Coleman Date: Thu Apr 20 22:28:28 2023 -0500 First commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..76b0d9f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +config.yaml +osc-mqtt-bridge diff --git a/License.txt b/License.txt new file mode 100644 index 0000000..3c9e3c8 --- /dev/null +++ b/License.txt @@ -0,0 +1,19 @@ +Copyright (c) 2023 Mr. Gecko's Media (James Coleman). http://mrgeckosmedia.com/ + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5b5e770 --- /dev/null +++ b/README.md @@ -0,0 +1,93 @@ +# osc-mqtt-bridge +A bridge between [Open Sound Control](https://en.wikipedia.org/wiki/Open_Sound_Control) (OSC) and MQTT, allowind bidirectional communication. The main purpose of this tool is to provide a way to talk to devices that support OSC via MQTT messages for automation. + +## Example configuration +```yaml +relays: + - mqtt_host: 10.0.0.2 + mqtt_port: 1883 + mqtt_client_id: osc_mqtt_bridge + mqtt_user: mqtt + mqtt_password: PASSWORD + mqtt_topic: osc/behringer_wing + osc_host: 10.0.0.3 + osc_port: 2223 + osc_bind_addr: 10.0.0.4 # Change to this machine's IP address. Expected to be a static IP. + log_level: 2 +``` + +## Configuration specification + +### Relay +- `mqtt_host`: Hostname of the MQTT broker. +- `mqtt_port`: Port of the MQTT broker. +- `mqtt_client_id`: MQTT client ID of this relay. +- `mqtt_user`: User name used for MQTT authentication. +- `mqtt_password`: Password used for MQTT authentication. +- `mqtt_topic`: Topic where MQTT messages are pushed and received. + + Set topic to `osc/example` and the following topics will be setup. + - `osc/example/cmd/$OSC_CMD` - Any commands received on OSC will publish here. + - `osc/example/send/$OSC_CMD` - Any commands pushed via MQTT will be forwarded to OSC. + - `osc/example/bundle` - OSC Bundle messages. + - `osc/example/bundle/send` - Send OSC Bundle messages. + - `osc/example/status` - Configuration is published on startup. + - `osc/example/status/check` - Request status. +

+ +- `mqtt_disable_config_send`: Disables the config send. +- `osc_host`: Hostname for OSC client connection. +- `osc_port`: Port for OSC client connection. +- `osc_bind_addr`: Bind address of the OSC server. + + To have bidirectional mode, you must specify at least this, OscHost, and OscPort defined. You must specify the unicast IP address, cannot be `0.0.0.0`. + +- `osc_bind_port`: Port of the OSC server. Defaults to OscPort if specified. +- `osc_disallow_arbritary_command`: Disallows pushing to arbritary commands to the cmd topic. +- `commands`: Pre-defined commands to relay. + + This is an array with the following variables. + + - `command`: The command path to send. + - `mqtt_topic`: Absolute MQTT topic to subscribe. + - `mqtt_sub_topic`: Sub topic off relay MQTT topic to subscribe. + osc/example/$SUB_TOPIC + - `disallow_payload`: Rather or not to disallow payload to be relayed. + - `default_payload`: Payload to send if no payload is provided via MQTT or if DisallowPayload is true. This is an array of strings/integers/timestamps/bools. +

+ +- `osc_subscriptions`: OSC Comamnds to send at regular intervals. Useful for OSC servers that offers data subscriptions. + + This is an array with the following variables. + + - `command`: The command to send every interval. + - `payload`: Payload to send. This is an array of strings/integers/timestamps/bools. + - `interval`: How often to call the command. +

+ +- `log_level`: How much logging. + + - 0 - Errors + - 1 - MQTT and OSC receive logging. + - 2 - MQTT and OSC send logging. + - 3 - Debug + +## MQTT Message Example + +**Mute Behringer Wing channel 1**
+Topic: osc/behringer_wing/send/ch/1/mute
+Payload: `["1"]` + +**Behringer Wing get info**
+Topic: osc/behringer_wing/send/?
+Payload: + +## Build + +```bash +go build +``` + +## Config file location + +Same directory as the binary, in your home directory at `~/.config/mqtt-osc-bridge/config.yaml`, or under etc at `/etc/mqtt-osc-bridge/config.yaml`. diff --git a/config.go b/config.go new file mode 100644 index 0000000..b5a74a6 --- /dev/null +++ b/config.go @@ -0,0 +1,87 @@ +package main + +import ( + "log" + "os" + "os/user" + "path/filepath" + + "gopkg.in/yaml.v2" +) + +// Configuration Structure +type Config struct { + // Relays: Different relays available. + Relays []*Relay `yaml:"relays"` +} + +// ReadConfig Read the configuration file +func (a *App) ReadConfig() { + usr, err := user.Current() + if err != nil { + log.Fatal(err) + } + + // Configuration paths. + localConfig, _ := filepath.Abs("./config.yaml") + homeDirConfig := usr.HomeDir + "/.config/mqtt-osc-bridge/config.yaml" + etcConfig := "/etc/mqtt-osc-bridge/config.yaml" + + // Determine which configuration to use. + var configFile string + if _, err := os.Stat(app.flags.ConfigPath); err == nil && app.flags.ConfigPath != "" { + configFile = app.flags.ConfigPath + } else if _, err := os.Stat(localConfig); err == nil { + configFile = localConfig + } else if _, err := os.Stat(homeDirConfig); err == nil { + configFile = homeDirConfig + } else if _, err := os.Stat(etcConfig); err == nil { + configFile = etcConfig + } else { + log.Fatal("Unable to find a configuration file.") + } + + app.config = new(Config) + + yamlFile, err := os.ReadFile(configFile) + if err != nil { + log.Fatalf("Error reading YAML file: %s\n", err) + } + + err = yaml.Unmarshal(yamlFile, &app.config) + if err != nil { + log.Fatalf("Error parsing YAML file: %s\n", err) + } + + if len(app.config.Relays) == 0 { + log.Fatal("No relays defined in the configuration file.") + } + + for _, relay := range app.config.Relays { + if relay.OscBindAddr != "" && relay.OscBindPort == 0 { + relay.OscBindPort = relay.OscPort + } + } + + for i, relay := range app.config.Relays { + if relay.MqttHost == "" || relay.MqttPort == 0 { + log.Fatalf("Relay %d: MQTT host and port are required configurations.", i) + } + if relay.MqttTopic == "" { + log.Fatalf("Relay %d: MQTT topic is a required configuration.", i) + } + if relay.OscBindAddr == "" && relay.OscHost == "" { + log.Fatalf("Relay %d: You must define either a bind address or an OSC host in the configuration.", i) + } + for b, relay2 := range app.config.Relays { + if b != i { + if relay.MqttTopic == relay2.MqttTopic { + log.Fatalf("Relay %d: MQTT topic cannot exist on 2 different relays.", i) + } + if relay.OscBindPort == relay2.OscBindPort { + log.Fatalf("Relay %d: Cannot use the same OSC bind port on 2 different relays.", i) + } + } + } + } +} diff --git a/flags.go b/flags.go new file mode 100644 index 0000000..6db33a9 --- /dev/null +++ b/flags.go @@ -0,0 +1,35 @@ +package main + +import ( + "flag" + "fmt" + "os" +) + +// Flags Configuration options for cli execution +type Flags struct { + ConfigPath string +} + +// InitFlags Parses configuration options +func (a *App) InitFlags() { + app.flags = new(Flags) + flag.Usage = func() { + fmt.Printf(serviceName + ": " + serviceDescription + ".\n\nUsage:\n") + flag.PrintDefaults() + } + + var printVersion bool + flag.BoolVar(&printVersion, "v", false, "Print version") + + usage := "Load configuration from `FILE`" + flag.StringVar(&app.flags.ConfigPath, "config", "", usage) + flag.StringVar(&app.flags.ConfigPath, "c", "", usage+" (shorthand)") + + flag.Parse() + + if printVersion { + fmt.Println(serviceName + ": " + serviceVersion) + os.Exit(0) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3a7a5bd --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/GRMrGecko/osc-mqtt-bridge + +go 1.19 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.2 + github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5 + gopkg.in/yaml.v2 v2.4.0 +) + +require ( + github.com/gorilla/websocket v1.4.2 // indirect + golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0276101 --- /dev/null +++ b/go.sum @@ -0,0 +1,18 @@ +github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= +github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5 h1:fqwINudmUrvGCuw+e3tedZ2UJ0hklSw6t8UPomctKyQ= +github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5/go.mod h1:lqMjoCs0y0GoRRujSPZRBaGb4c5ER6TfkFKSClxkMbY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..8e432bc --- /dev/null +++ b/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "log" + "os" + "path/filepath" +) + +const serviceName = "osc-mqtt-bridge" +const serviceDescription = "Bridges MQTT messages to OSC" +const serviceVersion = "0.1" + +// App is the global application structure for communicating between servers and storing information. +type App struct { + flags *Flags + config *Config +} + +var app *App + +func main() { + thisPath, err := os.Executable() + if err != nil { + log.Panic(err) + } + os.Chdir(filepath.Dir(thisPath)) + + app = new(App) + app.InitFlags() + app.ReadConfig() + + for _, relay := range app.config.Relays { + relay.Start() + } + + for { + } +} diff --git a/relay.go b/relay.go new file mode 100644 index 0000000..7f2c1aa --- /dev/null +++ b/relay.go @@ -0,0 +1,472 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net" + "strings" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/hypebeast/go-osc/osc" +) + +// LogLevel Definition +type LogLevel int + +const ( + // ErrorLog Logs only errors. + ErrorLog LogLevel = iota + // ReceiveLog MQTT and OSC receive logging. + ReceiveLog + // SendLog MQTT and OSC send logging. + SendLog + // DebugLog Debug messages. + DebugLog +) + +// String: Provides a string value for a log level. +func (l LogLevel) String() string { + return [...]string{"Error", "Receive", "Send", "Debug"}[l] +} + +// Relay command definition +type RelayCommand struct { + // Command: The command path to send. + Command string `yaml:"command" json:"command"` + // MqttTopic: Absolute MQTT topic to subscribe. + MqttTopic string `yaml:"mqtt_topic" json:"mqtt_topic"` + // MqttSubTopic: Sub topic off relay MQTT topic to subscribe. + // osc/example/$SUB_TOPIC + MqttSubTopic string `yaml:"mqtt_sub_topic" json:"mqtt_sub_topic"` + // DisallowPayload: Rather or not to disallow payload to be relayed. + DisallowPayload bool `yaml:"disallow_payload" json:"disallow_payload"` + // DefaultPayload: Payload to send if no payload is provided via MQTT or if DisallowPayload is true. + DefaultPayload []interface{} `yaml:"default_payload" json:"default_payload"` +} + +// Relay OSC subscription +type RelayOscSubscription struct { + // Command: The command to send every interval. + Command string `yaml:"command" json:"command"` + // Payload: Payload to send. + Payload []interface{} `yaml:"payload" json:"payload"` + // Interval: How often to call the command. + Interval time.Duration `yaml:"interval" json:"interval"` +} + +// Relay configurations +type Relay struct { + // MqttHost: Hostname of the MQTT broker. + MqttHost string `yaml:"mqtt_host" json:"mqtt_host"` + // MqttPort: Port of the MQTT broker. + MqttPort int `yaml:"mqtt_port" json:"mqtt_port"` + // MqttClientId: MQTT client ID of this relay. + MqttClientId string `yaml:"mqtt_client_id" json:"mqtt_client_id"` + // MqttUser: User name used for MQTT authentication. + MqttUser string `yaml:"mqtt_user" json:"mqtt_user"` + // MqttPassword: Password used for MQTT authentication. + MqttPassword string `yaml:"mqtt_password" json:"mqtt_password"` + // MqttTopic: Topic where MQTT messages are pushed and received. + // Set topic to `osc/example` and the following topics will be setup. + // osc/example/cmd/$OSC_CMD - Any commands received on OSC will publish here. + // osc/example/send/$OSC_CMD - Any commands pushed via MQTT will be forwarded to OSC. + // osc/example/bundle - OSC Bundle messages. + // osc/example/bundle/send - Send OSC Bundle messages. + // osc/example/status - Configuration is published on startup. + // osc/example/status/check - Request status. + MqttTopic string `yaml:"mqtt_topic" json:"mqtt_topic"` + // MqttDisableConfigSend: Disables the config send. + MqttDisableConfigSend bool `yaml:"mqtt_disable_config_send" json:"mqtt_disable_config_send"` + + // OscHost: Hostname for OSC client connection. + OscHost string `yaml:"osc_host" json:"osc_host"` + // OscPort: Port for OSC client connection. + OscPort int `yaml:"osc_port" json:"osc_port"` + // OscBindAddr: Bind address of the OSC server. + // To have bidirectional mode, you must specify at least this, OscHost, and OscPort defined. + // You must specify the unicast IP address, cannot be 0.0.0.0. + OscBindAddr string `yaml:"osc_bind_addr" json:"osc_bind_addr"` + // OscBindPort: Port of the OSC server. Defaults to OscPort if specified. + OscBindPort int `yaml:"osc_bind_port" json:"osc_bind_port"` + // OscDisallowArbritaryCommand: Disallows pushing to arbritary commands to the cmd topic. + OscDisallowArbritaryCommand bool `yaml:"osc_disallow_arbritary_command" json:"osc_disallow_arbritary_command"` + + // RelayCommands: Pre-defined commands to relay. + Commands []RelayCommand `yaml:"relay_commands" json:"commands"` + // RelayOscSubscriptions: OSC Comamnds to send at regular intervals. Useful for OSC servers that offers data subscriptions. + OscSubscriptions []RelayOscSubscription `yaml:"osc_subscriptions" json:"osc_subscriptions"` + + // LogLevel: How much logging. + // 0 - Errors + // 1 - MQTT and OSC receive logging. + // 2 - MQTT and OSC send logging. + // 3 - Debug + LogLevel LogLevel `yaml:"log_level" json:"log_level"` + + // MqttClient: The client connection to MQTT. + MqttClient mqtt.Client `yaml:"-" json:"-"` + // OscClient: The client connection to OSC. + OscClient *osc.Client `yaml:"-" json:"-"` + // OscServer: OSC Server. + OscServer *osc.Server `yaml:"-" json:"-"` + // OscServerConn: Server connection. + // The OSC software is limited in bidirectional support, so I do my own connection here. + OscServerConn net.PacketConn `yaml:"-" json:"-"` +} + +// OscMessage: Used for json encode/decode to/from MQTT for bundles. +type OscMessage struct { + Address string `json:"address"` + Arguments []interface{} `json:"arguments"` +} + +// OscBundle: Used for json encode/decode to/from MQTT. +type OscBundle struct { + Timetag time.Time `json:"timetag"` + Messages []*OscMessage `json:"messages"` + Bundles []*OscBundle `json:"bundles"` +} + +// OscDispatcher: Handles OSC messages. +type OscDispatcher struct { + r *Relay +} + +// makeBundle: Makes an OscBundle from an osc.Bundle. +func (d OscDispatcher) makeBundle(bundle *osc.Bundle) *OscBundle { + b := new(OscBundle) + b.Timetag = bundle.Timetag.Time() + for _, message := range bundle.Messages { + m := new(OscMessage) + m.Address = message.Address + m.Arguments = message.Arguments + b.Messages = append(b.Messages, m) + } + + for _, sbundle := range bundle.Bundles { + subBundle := d.makeBundle(sbundle) + b.Bundles = append(b.Bundles, subBundle) + } + return b +} + +// Dispatch: Handle OSC packet. +func (d OscDispatcher) Dispatch(packet osc.Packet) { + // Determine packet type and process. + if packet != nil { + switch packet.(type) { + default: + d.r.Log(ErrorLog, "Unknown OSC packet received.") + + // Message packets can just go to /cmd/$OSC_CMD and arguments encoded to JSON. + case *osc.Message: + message := packet.(*osc.Message) + d.r.Log(ReceiveLog, "<- [OSC] %s: %s", message.Address, message.Arguments) + topic := d.r.MqttTopic + "/cmd" + message.Address + data, err := json.Marshal(message.Arguments) + if err != nil { + d.r.Log(ErrorLog, "Json Encode: %s", err) + return + } + d.r.MqttClient.Publish(topic, 0, true, data) + d.r.Log(SendLog, "-> [MQTT] %s: %s", topic, data) + + // Bundle packets are capable of having multiple messages and bundles embeded in it, + // so I translate to my own bundle structure that is JSON aware. + case *osc.Bundle: + b := d.makeBundle(packet.(*osc.Bundle)) + d.r.Log(ReceiveLog, "<- [OSC] Bundle %s", b.Timetag) + topic := d.r.MqttTopic + "/bundle" + data, err := json.Marshal(b) + if err != nil { + d.r.Log(ErrorLog, "Json Encode: %s", err) + return + } + d.r.MqttClient.Publish(topic, 0, true, data) + d.r.Log(SendLog, "-> [MQTT] %s: %s", topic, data) + } + } +} + +// OscSend: Sends an OSC packet. I use my own function to allow bidirectional communication. +func (r *Relay) OscSend(packet osc.Packet) error { + // Do not send nil packets. + if packet == nil { + return nil + } + + // Log send request. + switch packet.(type) { + default: + case *osc.Message: + message := packet.(*osc.Message) + r.Log(SendLog, "-> [OSC] %s: %s", message.Address, message.Arguments) + case *osc.Bundle: + bundle := packet.(*osc.Bundle) + r.Log(SendLog, "-> [OSC] Bundle %s", bundle.Timetag.Time()) + } + + // Hosts can be DNS names, or IP addresses, so we need to resolve. + var err error + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", r.OscHost, r.OscPort)) + if err != nil { + return err + } + + // Convert packet to OSC bytes. + data, err := packet.MarshalBinary() + if err != nil { + return err + } + if r.LogLevel >= DebugLog { + r.Log(DebugLog, "-> [OSC] Binary %s", bytes.ReplaceAll(data, []byte{byte(0)}, []byte("~"))) + } + + // If we have an OSC Server defined, we use its connection to write the data for bidirectional support. + if r.OscServer != nil { + _, err = r.OscServerConn.WriteTo(data, addr) + } else { + // Otherwise, we dial the address with a unused source port. + // Specifying a manual source port could end up with conflicts. + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + return err + } + defer conn.Close() + + _, err = conn.Write(data) + } + + return err +} + +// SendStatus: Send config to MQTT status. +func (r *Relay) SendStatus() { + // If disabled, ignore. + if r.MqttDisableConfigSend { + return + } + + // Make JSON dump. + config, err := json.Marshal(&r) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + } + + // Send config. + r.MqttClient.Publish(r.MqttTopic+"/status", 0, true, config) +} + +// MakeOSCBundle: Makes an osc.Bundle. from an OscBundle. +func (r *Relay) MakeOSCBundle(bundle *OscBundle) *osc.Bundle { + b := osc.NewBundle(bundle.Timetag) + + // Add attached messages. + for _, message := range bundle.Messages { + m := osc.NewMessage(message.Address) + m.Arguments = message.Arguments + b.Append(m) + } + + // Add sub bundles. + for _, sbundle := range bundle.Bundles { + subBundle := r.MakeOSCBundle(sbundle) + b.Append(subBundle) + } + return b +} + +// MqttOnEvent: Handle MQTT events. +func (r *Relay) MqttOnEvent(client mqtt.Client, message mqtt.Message) { + r.Log(ReceiveLog, "<- [MQTT] %s: %s\n", message.Topic(), message.Payload()) + + // Check commands to see if one matches this topic. + for _, cmd := range r.Commands { + if message.Topic() == cmd.MqttTopic || + (cmd.MqttSubTopic != "" && message.Topic() == r.MqttTopic+"/"+cmd.MqttSubTopic) { + // Configure OSC message. + oscMessage := osc.NewMessage(cmd.Command) + + // If arguments allowed and provided, parse, otherwise use default payload. + var arguments []interface{} + if !cmd.DisallowPayload && len(message.Payload()) != 0 { + err := json.Unmarshal(message.Payload(), &arguments) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + } else if len(cmd.DefaultPayload) != 0 { + arguments = cmd.DefaultPayload + } + oscMessage.Arguments = arguments + + // Send OSC message. + err := r.OscSend(oscMessage) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } + } + + // If standard send topic. + if strings.HasPrefix(message.Topic(), r.MqttTopic+"/send") { + // Verify arbritary commands can be sent. + if r.OscDisallowArbritaryCommand { + r.Log(ErrorLog, "Arbritary commands are disabled on this relay.") + return + } + + // Get the command from topic. + cmd := strings.Replace(message.Topic(), r.MqttTopic+"/send", "", 1) + if cmd == "" { + cmd = "/" + } + + // Parse the arguments. + var arguments []interface{} + if len(message.Payload()) != 0 { + err := json.Unmarshal(message.Payload(), &arguments) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + } + + // Create OSC message. + oscMessage := osc.NewMessage(cmd) + oscMessage.Arguments = arguments + + // Send OSC message. + err := r.OscSend(oscMessage) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } else if message.Topic() == r.MqttTopic+"/bundle/send" { + // Verify arbritary commands can be sent. + if r.OscDisallowArbritaryCommand { + r.Log(ErrorLog, "Arbritary commands are disabled on this relay.") + return + } + + // Create bundle. + bundle := new(OscBundle) + err := json.Unmarshal(message.Payload(), bundle) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + + // Make the OSC bundle based on received bundle. + b := r.MakeOSCBundle(bundle) + + // Send OSC bundle. + err = r.OscSend(b) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } else if message.Topic() == r.MqttTopic+"/status/check" { + r.SendStatus() + } +} + +// MqttSubscribe: Subscribe to MQTT Topic. +func (r *Relay) MqttSubscribe(topic string) { + r.Log(DebugLog, "Subscribing MQTT: %s", topic) + if t := r.MqttClient.Subscribe(topic, 0, r.MqttOnEvent); t.Wait() && t.Error() != nil { + r.Log(ErrorLog, "MQTT Subscribe Error: %s", t.Error()) + } +} + +// Log: Logging function to allow log levels. +func (r *Relay) Log(level LogLevel, format string, args ...interface{}) { + if level <= r.LogLevel { + log.Println(fmt.Sprintf(format, args...)) + } +} + +// Start: Start the relay. +func (r *Relay) Start() { + // Connect to MQTT. + mqtt_opts := mqtt.NewClientOptions() + mqtt_opts.AddBroker(fmt.Sprintf("tcp://%s:%d", r.MqttHost, r.MqttPort)) + mqtt_opts.SetClientID(r.MqttClientId) + mqtt_opts.SetUsername(r.MqttUser) + mqtt_opts.SetPassword(r.MqttPassword) + r.MqttClient = mqtt.NewClient(mqtt_opts) + + // Connect and failures are fatal exiting service. + r.Log(DebugLog, "Connecting to MQTT") + if t := r.MqttClient.Connect(); t.Wait() && t.Error() != nil { + log.Fatalf("MQTT error: %s", t.Error()) + return + } + + // Subscribe to MQTT topics. + r.MqttSubscribe(r.MqttTopic + "/send/#") + r.MqttSubscribe(r.MqttTopic + "/bundle/send") + r.MqttSubscribe(r.MqttTopic + "/status/check") + // Subscribe to command topics configured. + for _, cmd := range r.Commands { + if cmd.MqttTopic != "" { + r.MqttSubscribe(cmd.MqttTopic) + } + if cmd.MqttSubTopic != "" { + r.MqttSubscribe(r.MqttTopic + "/" + cmd.MqttSubTopic) + } + } + + // If an OSC client configuration is provided, setup client. + if r.OscHost != "" && r.OscPort != 0 { + r.OscClient = osc.NewClient(r.OscHost, r.OscPort) + } + + // If OSC server configured, setup server. + if r.OscBindAddr != "" && r.OscBindPort != 0 { + r.OscServer = &osc.Server{Addr: fmt.Sprintf("%s:%d", r.OscBindAddr, r.OscBindPort), Dispatcher: OscDispatcher{r: r}} + + // Run server in thread. + go func() { + r.Log(DebugLog, "Starting OSC Server") + var err error + // I setup our own UDP connection to overcome a limit in go-osc + // where bidirectional isn't built in. + r.OscServerConn, err = net.ListenPacket("udp", r.OscServer.Addr) + if err != nil { + log.Fatal(err) + } + + // Close connection when function ends. + defer r.OscServerConn.Close() + + // Have Go-OSC handle OSC traffic on this connection. + if err = r.OscServer.Serve(r.OscServerConn); err != nil { + log.Fatal(err) + } + }() + } + + // Setup subscriptions. + for _, subcription := range r.OscSubscriptions { + // Each subscription runs in its own thread. + go func(subcription RelayOscSubscription) { + r.Log(DebugLog, "Started subscription: %s", subcription.Command) + ticker := time.NewTicker(subcription.Interval) + for range ticker.C { + // Send OSC message as configured. + r.Log(DebugLog, "Running subscription: %s", subcription.Command) + oscMessage := osc.NewMessage(subcription.Command) + oscMessage.Arguments = subcription.Payload + err := r.OscSend(oscMessage) + if err != nil { + r.Log(ErrorLog, "Send Error: %s", err) + } + } + }(subcription) + } + + // Send current config to MQTT. + r.SendStatus() +}