Email archive server with SysLog support and web interface.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

187 lines
5.0 KiB

package main
import (
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 1024 * 1024
)
// Setup an upgrader with our configuration.
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
type WSMessageInterface interface {
handleMessage(message []byte, c *WSClient)
}
// The websocket server structure.
type WS struct {
clients map[*WSClient]bool
message chan []byte
register chan *WSClient
unregister chan *WSClient
messageInterface WSMessageInterface
}
// Setup the websocket
func WSInit(messageInterface WSMessageInterface) *WS {
ws := &WS{
message: make(chan []byte),
register: make(chan *WSClient),
unregister: make(chan *WSClient),
clients: make(map[*WSClient]bool),
messageInterface: messageInterface,
}
go ws.run()
return ws
}
// Main websocket channel handler.
func (ws *WS) run() {
for {
select {
case client := <-ws.register:
// If we have a new client, add it to the client map.
ws.clients[client] = true
case client := <-ws.unregister:
// If a client is unregistering and in the client map, remove it.
if _, ok := ws.clients[client]; ok {
delete(ws.clients, client)
close(client.send)
}
case message := <-ws.message:
// A message is being sent to all clients.
for client := range ws.clients {
// Send message to client if possible.
select {
case client.send <- message:
default:
// If we were unable to send, the client is no longer connected.
close(client.send)
delete(ws.clients, client)
}
}
}
}
}
// Websocket client structure.
type WSClient struct {
ws *WS
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
// Read messages from the client.
func (c *WSClient) reader() {
// When we are unable to read from the client, we need to unregister and close the connection.
defer func() {
c.ws.unregister <- c
c.conn.Close()
}()
// Set the max size of a message from the client.
c.conn.SetReadLimit(maxMessageSize)
// The client must send us keep alive messages, otherwise the connection is dead.
c.conn.SetReadDeadline(time.Now().Add(pongWait))
// When we receive a pong from the client, the keep alive timeout is extended.
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
// Until connection is close, read messages fromt eh client.
for {
_, message, err := c.conn.ReadMessage()
// If we received an error, something is wrong and we need to close the connection.
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// We got a connection, so we can pass it to the message interface for handling.
c.ws.messageInterface.handleMessage(message, c)
}
}
// Watches the message send channel for new messages to send to the client.
func (c *WSClient) writer() {
// We need to pink the client every now and then.
ticker := time.NewTicker(pingPeriod)
// If the writer channel closes, we need to stop the ping ticker and close the connection.
defer func() {
ticker.Stop()
c.conn.Close()
}()
// Check for new message or ping ticker, whichever happens first.
for {
select {
case message, ok := <-c.send:
// If a new message was added tot he channel, we need to write it.
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
// If the message received is closing the channel, we need to send a close message.
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// Get the next writer for a text message.
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
// Write the message to the client.
w.Write(message)
// Close the writer. If an error, we stop the write channel.
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
// Send a ping message.
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// New connection handler for websockets that creates a client and upgrades the connection.
func (ws *WS) Handler(w http.ResponseWriter, r *http.Request) {
log.Println("New websocket connection from ", r.RemoteAddr)
// Upgade the connection to a websocket connection.
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// Create a new client and register it.
client := &WSClient{ws: ws, conn: conn, send: make(chan []byte, 256)}
ws.register <- client
// Start the reader and writer.
go client.reader()
go client.writer()
}