diff --git a/README.md b/README.md index 11441e7..8eca4df 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # midi-request-trigger -A service that triggers HTTP requests when MIDI messages are recieved and triggers MIDI messages when HTTP requests are received. +A service that triggers HTTP requests and/or MQTT messages when MIDI messages are recieved and triggers MIDI messages when HTTP requests and/or MQTT messages are received. ## Install @@ -175,4 +175,4 @@ midi_routers: Content-Type: - multipart/form-data; boundary=---------------------------888832887744 debug_request: true -``` \ No newline at end of file +``` diff --git a/go.mod b/go.mod index 4d36dc2..4aaf014 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/GRMrGecko/midi-request-trigger go 1.20 require ( + github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 github.com/kkyr/fig v0.3.2 @@ -11,7 +12,10 @@ require ( require ( github.com/felixge/httpsnoop v1.0.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/pelletier/go-toml v1.9.3 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 793acb5..1ca0566 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,13 @@ +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4= github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/kkyr/fig v0.3.2 h1:+vMj52FL6RJUxeKOBB6JXIMyyi1/2j1ERDrZXjoBjzM= github.com/kkyr/fig v0.3.2/go.mod h1:ItUILF8IIzgZOMhx5xpJ1W/bviQsWRKOwKXfE/tqUoA= github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= @@ -12,6 +16,10 @@ github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5d github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= gitlab.com/gomidi/midi/v2 v2.0.30 h1:RgRYbQeQSab5ZaP1lqRcCTnTSBQroE3CE6V9HgMmOAc= gitlab.com/gomidi/midi/v2 v2.0.30/go.mod h1:Y6IFFyABN415AYsFMPJb0/43TRIuVYDpGKp2gDYLTLI= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/main.go b/main.go index 9684f06..9fffe56 100644 --- a/main.go +++ b/main.go @@ -14,8 +14,8 @@ import ( const ( serviceName = "midi-request-trigger" - serviceDescription = "Takes trigger MIDI messages by HTTP requests and trigger HTTP requests by MIDI messages" - serviceVersion = "0.1" + serviceDescription = "Takes trigger MIDI messages by HTTP or MQTT requests and trigger HTTP or MQTT requests by MIDI messages" + serviceVersion = "0.2" ) // App is the global application structure for communicating between servers and storing information. diff --git a/midiRouter.go b/midiRouter.go index 7bdc053..61da0e2 100644 --- a/midiRouter.go +++ b/midiRouter.go @@ -2,6 +2,7 @@ package main import ( "crypto/tls" + "encoding/json" "fmt" "io" "log" @@ -11,50 +12,166 @@ import ( "strconv" "strings" + mqtt "github.com/eclipse/paho.mqtt.golang" "gitlab.com/gomidi/midi/v2" "gitlab.com/gomidi/midi/v2/drivers" ) -// Triggers that occur from MIDI messages received. -type NoteTrigger struct { - MatchAllNotes bool `fig:"match_all_notes"` - Channel uint8 `fig:"channel"` - Note uint8 `fig:"note"` - Velocity uint8 `fig:"velocity"` - MatchAllVelocities bool `fig:"match_all_velocities"` - MidiInfoInRequest bool `fig:"midi_info_in_request"` - InsecureSkipVerify bool `fig:"insecure_skip_verify"` - URL string `fig:"url"` - Method string `fig:"method"` - Body string `fig:"body"` - Headers http.Header `fig:"headers"` - DebugRequest bool `fig:"debug_request"` +// LogLevel Definition +type LogLevel int + +const ( + // Logs only errors. + ErrorLog LogLevel = iota + // MQTT, HTTP, and MIDI receive logging. + ReceiveLog + // MQTT, HTTP, and MIDI send logging. + SendLog + // Debug messages. + DebugLog +) + +// Provides a string value for a log level. +func (l LogLevel) String() string { + return [...]string{"Error", "Receive", "Send", "Debug"}[l] } -// Triggers that occur from HTTP messsages received. +// Configurations relating to MQTT connection. +type MQTTConfig struct { + // Hostname of the MQTT broker. + Host string `fig:"host"` + // Port of the MQTT broker. + Port int `fig:"port"` + // MQTT client ID of this relay. + ClientId string `fig:"client_id"` + // User name used for MQTT authentication. + User string `fig:"user"` + // Password used for MQTT authentication. + Password string `fig:"password"` + // Topic where MQTT messages are pushed and received. + // Set topic to `midi/example` and the following topics will be setup. + // midi/example/cmd - Any commands received on MIDI will publish here. + // midi/example/send - Any commands pushed via MQTT will be forwarded to MIDI. + // midi/example/status - Configuration is published on startup. + // midi/example/status/check - Request status. + Topic string `fig:"topic"` + // Disable sending all midi notes. + DisableMidiFirehose bool `fig:"disable_midi_firehose"` + // Disables the config send. + DisableConfigSend bool `fig:"disable_config_send"` +} + +// Payload to decode/encode JSON message. +type MQTTPayload struct { + Channel uint8 `json:"channel"` + Note uint8 `json:"note"` + Velocity uint8 `json:"velocity"` +} + +// Triggers that occur from MIDI messages received. +type NoteTrigger struct { + // If set, every note played will be matched. + MatchAllNotes bool `fig:"match_all_notes"` + // Channel to match. + Channel uint8 `fig:"channel"` + // Note to match. + Note uint8 `fig:"note"` + // Velocity to match. + Velocity uint8 `fig:"velocity"` + // If we should match all velocity values. + MatchAllVelocities bool `fig:"match_all_velocities"` + // Custom MQTT message. Do not set to ignore MQTT. + MqttTopic string `fig:"mqtt_topic"` + // Nil payload will generate a payload with midi info. + MqttPayload []interface{} `fig:"mqtt_payload"` + // If the HTTP request should includ midi info. + MidiInfoInRequest bool `fig:"midi_info_in_request"` + // Should SSL requests require a valid certificate. + InsecureSkipVerify bool `fig:"insecure_skip_verify"` + // The URL to call with the HTTP request. Do not set if you wish to not send HTTP request. + URL string `fig:"url"` + // HTTP method, defaults to GET. + Method string `fig:"method"` + // HTTP body. + Body string `fig:"body"` + // HTTP headers. + Headers http.Header `fig:"headers"` +} + +// Triggers that occur from HTTP or MQTT messsages received. type RequestTrigger struct { - Channel uint8 `fig:"channel"` - Note uint8 `fig:"note"` - Velocity uint8 `fig:"velocity"` - MidiInfoInRequest bool `fig:"midi_info_in_request"` - URI string `fig:"uri"` + Channel uint8 `fig:"channel"` + Note uint8 `fig:"note"` + Velocity uint8 `fig:"velocity"` + // Parse midi notes from HTTP request. + MidiInfoInRequest bool `fig:"midi_info_in_request"` + // Absolute MQTT topic to subscribe. + MqttTopic string `fig:"mqtt_topic"` + // Sub topic off relay MQTT topic to subscribe. + // midi/example/$SUB_TOPIC + MqttSubTopic string `fig:"mqtt_sub_topic"` + // Rather or not to disallow payload to be relayed. + DisallowPayload bool `fig:"disallow_payload"` + // Request URL path to trigger with. + URI string `fig:"uri"` } // A common router for both receiving and sending MIDI messages. type MidiRouter struct { - Name string `fig:"name"` - Device string `fig:"device"` - DebugListener bool `fig:"debug_listener"` - DisableListener bool `fig:"disable_listener"` - NoteTriggers []NoteTrigger `fig:"note_triggers"` + // Used for human readable config. + Name string `fig:"name"` + // Midi device to connect. + Device string `fig:"device"` + // MQTT Connection if you are to integrate with MQTT. + MQTT MQTTConfig `fig:"mqtt"` + // Only connect for sending notes, not receiving. + DisableListener bool `fig:"disable_listener"` + // Listener triggers for notes to send HTTP and or MQTT messages. + NoteTriggers []NoteTrigger `fig:"note_triggers"` + // HTTP and or MQTT triggers to send MIDI notes. RequestTriggers []RequestTrigger `fig:"request_triggers"` - MidiOut drivers.Out `fig:"-"` - ListenerStop func() `fig:"-"` + // How much logging. + // 0 - Errors + // 1 - MQTT and OSC receive logging. + // 2 - MQTT and OSC send logging. + // 3 - Debug + LogLevel LogLevel `yaml:"log_level" json:"log_level"` + + // Connection to MIDI device. + MidiOut drivers.Out `fig:"-"` + // Function to stop listening to MIDI device. + ListenerStop func() `fig:"-"` + // The client connection to MQTT. + MqttClient mqtt.Client `yaml:"-" json:"-"` +} + +// Logging function to allow log levels. +func (r *MidiRouter) Log(level LogLevel, format string, args ...interface{}) { + if level <= r.LogLevel { + log.Println(fmt.Sprintf(format, args...)) + } } // When a MIDI message occurs, send the HTTP request. func (r *MidiRouter) sendRequest(channel, note, velocity uint8) { + // If MQTT firehose not disabled, send to general cmd topic. + if r.MQTT.Host != "" && r.MQTT.Port != 0 && !r.MQTT.DisableMidiFirehose { + payload := MQTTPayload{ + Channel: channel, + Note: note, + Velocity: velocity, + } + data, err := json.Marshal(payload) + if err != nil { + r.Log(ErrorLog, "Json Encode: %s", err) + } else { + topic := r.MQTT.Topic + "/cmd" + r.MqttClient.Publish(topic, 0, true, data) + r.Log(SendLog, "-> [MQTT] %s", topic) + } + } + // Check each trigger to find requests that match this message. for _, trig := range r.NoteTriggers { // If match all notes, process this request. @@ -64,73 +181,93 @@ func (r *MidiRouter) sendRequest(channel, note, velocity uint8) { // For all logging, we want to print the message so setup a common string to print. logInfo := fmt.Sprintf("note %s(%d) on channel %v with velocity %v", midi.Note(note), note, channel, velocity) - // Default method to GET if nothing is defined. - if trig.Method == "" { - trig.Method = "GET" + if trig.MqttTopic != "" { + if trig.MqttPayload != nil { + r.MqttClient.Publish(trig.MqttTopic, 0, true, trig.MqttPayload) + r.Log(SendLog, "-> [MQTT] %s", trig.MqttTopic) + } else { + payload := MQTTPayload{ + Channel: channel, + Note: note, + Velocity: velocity, + } + data, err := json.Marshal(payload) + if err != nil { + r.Log(ErrorLog, "Json Encode: %s", err) + } else { + r.MqttClient.Publish(trig.MqttTopic, 0, true, data) + r.Log(SendLog, "-> [MQTT] %s", trig.MqttTopic) + } + } } - // Parse the URL to make sure its valid. - url, err := url.Parse(trig.URL) - // If not valid, we need to stop processing this request. - if err != nil { - log.Printf("Trigger failed to parse url: %s\n %s\n", err, logInfo) - continue - } + if trig.URL != "" { + // Default method to GET if nothing is defined. + if trig.Method == "" { + trig.Method = "GET" + } - // If MIDI info needs to be added to the request, add it. - if trig.MidiInfoInRequest { - query := url.Query() - query.Add("channel", strconv.Itoa(int(channel))) - query.Add("note", strconv.Itoa(int(note))) - query.Add("velocity", strconv.Itoa(int(velocity))) - url.RawQuery = query.Encode() - } - - // If body provided, setup a reader for it. - var body io.Reader - if trig.Body != "" { - body = strings.NewReader(trig.Body) - } - - // If debugging, log that we're starting a request. - if trig.DebugRequest { - log.Printf("Starting request for trigger: %s %s\n%s\n", trig.Method, url.String(), logInfo) - } - - // Make the request. - req, err := http.NewRequest(trig.Method, url.String(), body) - if err != nil { - log.Printf("Trigger failed to parse url: %s\n %s\n", err, logInfo) - continue - } - - // Add headers to the request. - req.Header = trig.Headers - - // Configure transport with trigger config. - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: trig.InsecureSkipVerify}, - } - client := &http.Client{Transport: tr} - - // Perform the request. - res, err := client.Do(req) - if err != nil { - log.Printf("Trigger failed to request: %s\n %s\n", err, logInfo) - continue - } - - // Close the body at end of request. - defer res.Body.Close() - - // If debug enabled, read the body and log it. - if trig.DebugRequest { - body, err := io.ReadAll(res.Body) + // Parse the URL to make sure its valid. + url, err := url.Parse(trig.URL) + // If not valid, we need to stop processing this request. if err != nil { - log.Printf("Trigger failed to read body: %s\n %s\n", err, logInfo) + r.Log(ErrorLog, "Trigger failed to parse url: %s\n %s", err, logInfo) continue } - log.Printf("Trigger response: %s\n%s\n", logInfo, string(body)) + + // If MIDI info needs to be added to the request, add it. + if trig.MidiInfoInRequest { + query := url.Query() + query.Add("channel", strconv.Itoa(int(channel))) + query.Add("note", strconv.Itoa(int(note))) + query.Add("velocity", strconv.Itoa(int(velocity))) + url.RawQuery = query.Encode() + } + + // If body provided, setup a reader for it. + var body io.Reader + if trig.Body != "" { + body = strings.NewReader(trig.Body) + } + + // If debugging, log that we're starting a request. + r.Log(DebugLog, "Starting request for trigger: %s %s\n%s", trig.Method, url.String(), logInfo) + + // Make the request. + req, err := http.NewRequest(trig.Method, url.String(), body) + if err != nil { + r.Log(ErrorLog, "Trigger failed to parse url: %s\n %s", err, logInfo) + continue + } + + // Add headers to the request. + req.Header = trig.Headers + + // Configure transport with trigger config. + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: trig.InsecureSkipVerify}, + } + client := &http.Client{Transport: tr} + + // Perform the request. + res, err := client.Do(req) + if err != nil { + r.Log(ErrorLog, "Trigger failed to request: %s\n %s", err, logInfo) + continue + } + + // Close the body at end of request. + defer res.Body.Close() + + // If debug enabled, read the body and log it. + if r.LogLevel >= DebugLog { + body, err := io.ReadAll(res.Body) + if err != nil { + r.Log(ErrorLog, "Trigger failed to read body: %s\n %s", err, logInfo) + continue + } + r.Log(DebugLog, "Trigger response: %s\n%s", logInfo, string(body)) + } } } } @@ -141,7 +278,7 @@ func (m *MidiRouter) Handler(w http.ResponseWriter, r *http.Request) { // Check each request trigger for ones that match the request URI. for _, t := range m.RequestTriggers { // If matches request, process MIDI message. - if t.URI == r.URL.RawPath { + if t.URI != "" && t.URI == r.URL.RawPath { // Set default values to those from this trigger. channel, note, velocity := t.Channel, t.Note, t.Velocity // If MIDI info is in the request query, update to request. @@ -179,7 +316,7 @@ func (m *MidiRouter) Handler(w http.ResponseWriter, r *http.Request) { // Get send function for output. send, err := midi.SendTo(m.MidiOut) if err != nil { - log.Printf("Failed to get midi sender for request: %s\n%s\n", t.URI, err) + m.Log(ErrorLog, "Failed to get midi sender for request: %s\n%s", t.URI, err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } @@ -193,7 +330,7 @@ func (m *MidiRouter) Handler(w http.ResponseWriter, r *http.Request) { // Send MIDI message. err = send(msg) if err != nil { - log.Printf("Failed to send midi message: %s\n%s\n", t.URI, err) + m.Log(ErrorLog, "Failed to send midi message: %s\n%s", t.URI, err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } @@ -204,6 +341,116 @@ func (m *MidiRouter) Handler(w http.ResponseWriter, r *http.Request) { } } +// Send config to MQTT status. +func (r *MidiRouter) SendStatus() { + // If disabled, ignore. + if r.MQTT.DisableConfigSend { + return + } + + // Make JSON dump. + config, err := json.Marshal(&r) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + } + + // Send config. + r.MqttClient.Publish(r.MQTT.Topic+"/status", 0, true, config) +} + +// Handle MQTT events. +func (r *MidiRouter) MqttOnEvent(client mqtt.Client, message mqtt.Message) { + r.Log(ReceiveLog, "<- [MQTT] %s: %s\n", message.Topic(), message.Payload()) + + // Check commands to see if one matches this topic. + for _, t := range r.RequestTriggers { + if (t.MqttTopic != "" && message.Topic() == t.MqttTopic) || + (t.MqttSubTopic != "" && message.Topic() == r.MQTT.Topic+"/"+t.MqttSubTopic) { + // Set default values to those from this trigger. + channel, note, velocity := t.Channel, t.Note, t.Velocity + + // If arguments allowed and provided, parse, otherwise use default payload. + arguments := MQTTPayload{ + Channel: channel, + Note: note, + Velocity: velocity, + } + if !t.DisallowPayload && len(message.Payload()) != 0 { + err := json.Unmarshal(message.Payload(), &arguments) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + channel = arguments.Channel + note = arguments.Note + velocity = arguments.Velocity + } + + // Get send function for output. + send, err := midi.SendTo(r.MidiOut) + if err != nil { + log.Printf("Failed to get midi sender for request: %s\n%s\n", message.Topic(), err) + return + } + + // Make the MIDI message based on information. + msg := midi.NoteOn(channel, note, velocity) + if velocity == 0 { + msg = midi.NoteOff(channel, note) + } + + // Send MIDI message. + err = send(msg) + if err != nil { + log.Printf("Failed to send midi message: %s\n%s\n", message.Topic(), err) + return + } + } + } + + // If standard send topic. + if strings.HasPrefix(message.Topic(), r.MQTT.Topic+"/send") { + // If arguments allowed and provided, parse, otherwise use default payload. + var arguments MQTTPayload + if len(message.Payload()) != 0 { + err := json.Unmarshal(message.Payload(), &arguments) + if err != nil { + r.Log(ErrorLog, "Json Error: %s", err) + return + } + // Get send function for output. + send, err := midi.SendTo(r.MidiOut) + if err != nil { + log.Printf("Failed to get midi sender for request: %s\n%s\n", message.Topic(), err) + return + } + + // Make the MIDI message based on information. + msg := midi.NoteOn(arguments.Channel, arguments.Note, arguments.Velocity) + if arguments.Velocity == 0 { + msg = midi.NoteOff(arguments.Channel, arguments.Note) + } + + // Send MIDI message. + err = send(msg) + if err != nil { + log.Printf("Failed to send midi message: %s\n%s\n", message.Topic(), err) + return + } + } + } else if message.Topic() == r.MQTT.Topic+"/status/check" { + r.SendStatus() + } +} + +// Subscribe to MQTT Topic. +func (r *MidiRouter) MqttSubscribe(topic string) { + r.Log(DebugLog, "Subscribing MQTT: %s", topic) + if t := r.MqttClient.Subscribe(topic, 0, r.MqttOnEvent); t.Wait() && t.Error() != nil { + r.Log(ErrorLog, "MQTT Subscribe Error: %s", t.Error()) + } +} + // Connect to MIDI devices and start listening. func (r *MidiRouter) Connect() { // If request triggers defined, find the out port. @@ -234,19 +481,13 @@ func (r *MidiRouter) Connect() { switch { // Get notes with an velocity set. case msg.GetNoteStart(&channel, ¬e, &velocity): - // If debug, log. - if r.DebugListener { - log.Printf("starting note %s(%d) on channel %v with velocity %v\n", midi.Note(note), note, channel, velocity) - } + r.Log(ReceiveLog, "starting note %s(%d) on channel %v with velocity %v", midi.Note(note), note, channel, velocity) // Process request. r.sendRequest(channel, note, velocity) // If no velocity is set, an note end message is received. case msg.GetNoteEnd(&channel, ¬e): - // If debug, log. - if r.DebugListener { - log.Printf("ending note %s(%d) on channel %v\n", midi.Note(note), note, channel) - } + r.Log(ReceiveLog, "ending note %s(%d) on channel %v", midi.Note(note), note, channel) // Process request. r.sendRequest(channel, note, 0) default: @@ -254,12 +495,42 @@ func (r *MidiRouter) Connect() { } }) if err != nil { - log.Printf("Error listening to device: %s\n", err) + r.Log(ErrorLog, "Error listening to device: %s", err) return } // Update stop function for disconnects. r.ListenerStop = stop + + if r.MQTT.Host != "" && r.MQTT.Port != 0 { + // Connect to MQTT. + mqtt_opts := mqtt.NewClientOptions() + mqtt_opts.AddBroker(fmt.Sprintf("tcp://%s:%d", r.MQTT.Host, r.MQTT.Port)) + mqtt_opts.SetClientID(r.MQTT.ClientId) + mqtt_opts.SetUsername(r.MQTT.User) + mqtt_opts.SetPassword(r.MQTT.Password) + r.MqttClient = mqtt.NewClient(mqtt_opts) + + // Connect and failures are fatal exiting service. + r.Log(DebugLog, "Connecting to MQTT") + if t := r.MqttClient.Connect(); t.Wait() && t.Error() != nil { + log.Fatalf("MQTT error: %s", t.Error()) + return + } + + // Subscribe to MQTT topics. + r.MqttSubscribe(r.MQTT.Topic + "/send") + r.MqttSubscribe(r.MQTT.Topic + "/status/check") + // Subscribe to command topics configured. + for _, trig := range r.RequestTriggers { + if trig.MqttTopic != "" { + r.MqttSubscribe(trig.MqttTopic) + } + if trig.MqttSubTopic != "" { + r.MqttSubscribe(r.MQTT.Topic + "/" + trig.MqttSubTopic) + } + } + } } // On disconnect, stop and remove output device.