Email archive server with SysLog support and web interface.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
8.8 KiB

4 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "os"
  9. "path"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/DusanKasan/parsemail"
  15. "github.com/google/uuid"
  16. )
  17. // When a new message is received, this function is called to store it.
  18. func MailSaveMessage(remoteAddr string, from string, to string, r io.Reader) error {
  19. // We need the message body in bytes to save.
  20. b, err := ioutil.ReadAll(r)
  21. if err != nil { // If we can't read, we have an issue.
  22. return err
  23. }
  24. // The email parser expects an io.Reader, but as we already read the reader passed. We must make a new one.
  25. reader := bytes.NewReader(b)
  26. // Parse the email
  27. email, err := parsemail.Parse(reader)
  28. if err != nil {
  29. return err
  30. }
  31. // Generate a UUID for this message.
  32. UUID := uuid.New().String()
  33. // Save message body using database or file if configured.
  34. if app.config.MailPath == "db" {
  35. // Save to database.
  36. message := Messages{}
  37. message.UUID = UUID
  38. message.Message = b
  39. app.db.Create(&message)
  40. } else {
  41. // If the directory configured in the config does not exist... We must fail.
  42. if _, err := os.Stat(app.config.MailPath); err != nil {
  43. return fmt.Errorf("Mail directory does not exist: %s", app.config.MailPath)
  44. }
  45. // Create the file.
  46. messagePath := path.Join(app.config.MailPath, UUID)
  47. fp, err := os.Create(messagePath)
  48. if err != nil {
  49. return err
  50. }
  51. // Write to the file
  52. fp.Write(b)
  53. fp.Close()
  54. }
  55. // Create a message log entry with parsed email.
  56. messageEntry := MessageLog{}
  57. messageEntry.UUID = UUID
  58. messageEntry.MessageID = email.MessageID
  59. if len(email.From) <= 0 {
  60. messageEntry.From = from
  61. } else {
  62. messageEntry.From = email.From[0].Address
  63. }
  64. if len(email.To) <= 0 {
  65. messageEntry.To = to
  66. } else {
  67. messageEntry.To = email.To[0].Address
  68. }
  69. messageEntry.Subject = email.Subject
  70. messageEntry.PlainText = email.TextBody != ""
  71. messageEntry.HTML = email.HTMLBody != ""
  72. messageEntry.Attachments = len(email.Attachments) > 0
  73. // If a spam level header exists, parse the score.
  74. spamScore := email.Header.Get("X-SPAM-LEVEL")
  75. rxScore := regexp.MustCompile("Spam detection results:\\s+([0-9]+)")
  76. matches := rxScore.FindStringSubmatch(spamScore)
  77. if len(matches) == 2 {
  78. spamScoreI, err := strconv.Atoi(matches[1])
  79. if err != nil {
  80. spamScoreI = 0
  81. }
  82. messageEntry.SpamScore = spamScoreI
  83. }
  84. // Get the source IP from the earliest received header. Default to remote address which is sending the message.
  85. // As messages are likely forwarded to this server, the earliest received header is what we want here.
  86. messageEntry.SourceIP = remoteAddr
  87. // Regex to parse the received header with hostname/ip address.
  88. rxAddr := regexp.MustCompile("from ([A-Za-z0-9-.]+) \\(.* \\[([0-9a-fA-F.:]+)\\]\\)")
  89. // Loop through all entires of received headers.
  90. for _, header := range email.Header["Received"] {
  91. // Parse the header.
  92. matches := rxAddr.FindStringSubmatch(header)
  93. if len(matches) == 3 {
  94. // If we got an source IP from the header, save it.
  95. messageEntry.SourceIP = matches[1] + " (" + matches[2] + ")"
  96. }
  97. }
  98. messageEntry.Size = len(b)
  99. messageEntry.Received = time.Now()
  100. messageEntry.Status = "unknown" // We start as unknown and the status is updated by syslog.
  101. // Save the message entry.
  102. app.db.Create(&messageEntry)
  103. log.Printf("SMTP: Received message from %s (%d bytes)", messageEntry.From, messageEntry.Size)
  104. // Notify websocket subscribers of new message.
  105. app.httpServer.wsInterface.sendMessage("receivedNewMessage", messageEntry)
  106. // Update message count.
  107. app.messageCount++
  108. app.httpServer.wsInterface.sendMessage("updateMessageCount", app.messageCount)
  109. return nil
  110. }
  111. // Finds and outputs a reader for the message body based on UUID.
  112. func MailGetMessageData(UUID string) (r io.Reader, err error) {
  113. // If we are configured to use the database for storage, then we should check if the UUID is in the database.
  114. // Otherwise, we check the path set to see if a file exists with the UUID.
  115. if app.config.MailPath == "db" {
  116. // Search database for message body by UUID.
  117. var message Messages
  118. app.db.Where("uuid = ?", UUID).First(&message)
  119. // If not found, we provide an error.
  120. if message.UUID == "" {
  121. err = fmt.Errorf(APINoMessage)
  122. return
  123. }
  124. // Create a reader for the message data.
  125. r = bytes.NewReader(message.Message)
  126. } else {
  127. // Verify that the UUID exists in the file system.
  128. if _, err = os.Stat(path.Join(app.config.MailPath, UUID)); err != nil {
  129. return
  130. }
  131. // If the file exists, we open it to return.
  132. r, err = os.Open(path.Join(app.config.MailPath, UUID))
  133. }
  134. // Return reader.
  135. return
  136. }
  137. // To try and make the syslog code light weight, this function was created
  138. // to update the status of messages to what was parsed in the syslog.
  139. // This function will read an update queue map of syslog ids with updated statuses.
  140. func RunSysLogMailUpdateQueue() {
  141. ticker := time.NewTicker(5 * time.Second)
  142. for _ = range ticker.C { // Every 5 seconds.
  143. // We want to keep track as to rather statuses were updated to notify subscribers.
  144. updated := false
  145. // Copy the update queue so that we can empty the main update queue.
  146. updateQueue := make(map[string]bool)
  147. for key, val := range app.sysLogMailUpdateQueue {
  148. updateQueue[key] = val
  149. }
  150. // We empty the main update queue as the syslog may have status changes during this run.
  151. // We want to ensure that those changes do not get lost.
  152. app.sysLogMailUpdateQueue = make(map[string]bool)
  153. // Loop the update queue.
  154. for sidHostname, _ := range updateQueue {
  155. // Update queue should contain syslog id and hostname separated by a colon.
  156. // We pair the syslog id with the hostname just incase different hosts use the same syslog id.
  157. s := strings.Split(sidHostname, ":")
  158. // If there is more or less than 2 parts, this is invalid.
  159. if len(s) != 2 {
  160. continue
  161. }
  162. sid := s[0]
  163. hostname := s[1]
  164. // Pull the syslog id information from the database.
  165. var match SysLogIDInfo
  166. app.db.Where("s_id = ? AND hostname = ?", sid, hostname).First(&match)
  167. // If nothing was returned, or this one is set to be ignored... We will stop here.
  168. // When a syslog id is ignored, it is likely due to it being either the main message received before sending out,
  169. // or it is the message forwarded to Mail Archive which is not the main mail delivery status.
  170. if match.SID == "" || match.Ignore {
  171. continue
  172. }
  173. // Pull the message log entry matching the message id associated with the syslog id.
  174. var messageEntry MessageLog
  175. app.db.Where("message_id = ?", match.MessageID).First(&messageEntry)
  176. // If we found the message log entry, we can update the status to match our syslog id delivery status.
  177. if messageEntry.UUID != "" {
  178. messageEntry.Status = match.Status
  179. app.db.Save(&messageEntry)
  180. // As we updated a message log entry, we want to inform the subscribers that an update occurred.
  181. updated = true
  182. }
  183. }
  184. // If we updated the status of a message log entry, we need to inform subscribers connected to websocket.
  185. if updated {
  186. app.httpServer.wsInterface.sendMessage("messageStatusesUpdated", true)
  187. }
  188. }
  189. }
  190. // This function will run a database cleanup of old messages every 30 minutes.
  191. func RunDatabaseCleanup() {
  192. ticker := time.NewTicker(30 * time.Minute)
  193. for _ = range ticker.C {
  194. // Get the oldest date we will allow at this point in time based on the configured maximum age.
  195. maxAge := time.Now().Add(app.config.MaxAge * time.Second * -1)
  196. // We want to just pull UUID and message id of the old messages to be cleaned up.
  197. type MessageIDs struct {
  198. UUID string
  199. MessageID string
  200. }
  201. var messageIDs []MessageIDs
  202. app.db.Table("message_logs").Select("uuid,message_id").Where("received <= ?", maxAge).Scan(&messageIDs)
  203. // Loop through all found old messages to clean up the database.
  204. for _, message := range messageIDs {
  205. // Find syslog id information entries matching this message.
  206. var matches []SysLogIDInfo
  207. app.db.Where("message_id = ?", message.MessageID).Find(&matches)
  208. // With each found syslog id, we need to delete the syslog messages and the syslog id information.
  209. for _, match := range matches {
  210. app.db.Where("s_id = ? AND hostname = ?", match.SID, match.Hostname).Delete(SysLogMessage{})
  211. app.db.Delete(&match)
  212. }
  213. // Delete the message log entry for this message.
  214. app.db.Where("uuid = ?", message.UUID).Delete(MessageLog{})
  215. // Delete message data matching the UUID for the message.
  216. app.db.Where("uuid = ?", message.UUID).Delete(Messages{})
  217. // If the configured mail storage path is not the database, remove it from the file system.
  218. if app.config.MailPath != "db" {
  219. if _, err := os.Stat(path.Join(app.config.MailPath, message.UUID)); err == nil {
  220. os.Remove(path.Join(app.config.MailPath, message.UUID))
  221. }
  222. }
  223. // Update message count.
  224. app.messageCount--
  225. }
  226. // Send updated message count.
  227. app.httpServer.wsInterface.sendMessage("updateMessageCount", app.messageCount)
  228. }
  229. }