virtual-vxlan/listener.go
2025-01-05 22:22:24 -06:00

258 lines
6.6 KiB
Go

package main
import (
"errors"
"fmt"
"net"
"sync"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
log "github.com/sirupsen/logrus"
)
// The base of a vxlan connection is the port listener.
// The port listener listens for vxlan packets on a port,
// and it passes matching VNI values with interfaces.
// Interfaces are added to the listener, which processes
// vxlan packets.
type Listener struct {
net struct {
stopping sync.WaitGroup
addr *net.UDPAddr
is4 bool
maxMessageSize int
maxMessageSizeC chan int
conn *net.UDPConn
promisc *Promiscuous
interfaces []*Interface
sync.RWMutex
}
Name string
Permanent bool
closed chan struct{}
log *log.Entry
}
// Check if IP address is all zero.
func isZeroAddr(ip net.IP) bool {
for _, b := range ip {
if b != 0x0 {
return false
}
}
return true
}
// Make a new listener on the specified address. This
// listener is added to the app listener list, and errors
// on existing listeners for the specified address.
func NewListener(name, address string, maxMessageSize int, perm bool) (l *Listener, err error) {
// Verify the specified address is valid.
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return
}
is4 := addr.IP.To4() != nil
// Verify no listeners exist with this address.
app.Net.Lock()
defer app.Net.Unlock()
for _, listener := range app.Net.Listeners {
if listener.PrettyName() == addr.String() {
return nil, fmt.Errorf("listener already exists with address %s", addr.String())
}
if listener.Name == name {
return nil, fmt.Errorf("listener already exists with name %s", name)
}
}
// On Windows, there is no public way to configure hardware vxlan offloading.
// This in term filters packets that are destined to non-broadcast MAC addresses
// in vxlan packets. Which prevents us from receiving the packets, so we set the
// interface to promiscuous mode to allow us to receive packets. We can only do
// this if the IP address provided is an absolute address.
var promisc *Promiscuous
if !isZeroAddr(addr.IP) {
promisc, err = SetInterfacePromiscuous(addr.IP)
if err != nil {
return
}
}
// Start listening on the specified address.
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return
}
// Save listener details.
l = new(Listener)
l.Name = name
l.net.addr = addr
l.net.is4 = is4
l.net.maxMessageSize = maxMessageSize
l.net.maxMessageSizeC = make(chan int)
l.net.conn = conn
l.net.promisc = promisc
l.log = log.WithFields(log.Fields{
"listener": addr.String(),
})
l.closed = make(chan struct{})
l.Permanent = perm
app.Net.Listeners = append(app.Net.Listeners, l)
// Start reading packets on the listener.
go l.packetReader()
// Inform that we started a listern.
l.log.Print("Listener started.")
return
}
// Get the current maximum message size.
func (l *Listener) MaxMessageSize() int {
l.net.RLock()
defer l.net.RUnlock()
return l.net.maxMessageSize
}
// Set the maximum message size to a new value.
func (l *Listener) SetMaxMessageSize(size int) {
if size <= 1 {
return
}
l.net.Lock()
l.net.maxMessageSize = size
l.net.Unlock()
go func() {
l.net.maxMessageSizeC <- size
}()
}
// Close the listener, and its interfaces.
func (l *Listener) Close() (err error) {
// Ensure proper multi tasking.
l.net.Lock()
l.log.Debug("Listener is closing.")
// Close the connection.
err = l.net.conn.Close()
if l.net.promisc != nil {
l.net.promisc.Close()
}
close(l.closed)
// Wait for packet readers to stop.
l.net.stopping.Wait()
// Remove self from app.
app.Net.Lock()
defer app.Net.Unlock()
for i, listener := range app.Net.Listeners {
if listener == l {
app.Net.Listeners = append(app.Net.Listeners[:i], app.Net.Listeners[i+1:]...)
break
}
}
// The interfaces will be acquiring lock here while closing.
l.net.Unlock()
for len(l.net.interfaces) >= 1 {
l.net.interfaces[0].Close()
}
l.log.Print("Listener closed.")
return
}
// Get listener name.
func (l *Listener) PrettyName() string {
return l.net.addr.String()
}
// Get listener address.
func (l *Listener) Addr() net.Addr {
return l.net.addr
}
// Read packets and parse.
func (l *Listener) packetReader() {
// When stopping, we need to wait for this packet reader to finish.
l.net.stopping.Add(1)
defer func() {
l.log.Debug("packet reader - stopped")
l.net.stopping.Done()
}()
// Setup packet decoder.
var vxlan layers.VXLAN
var eth layers.Ethernet
var ip4 layers.IPv4
var ip6 layers.IPv6
var arp layers.ARP
parser := gopacket.NewDecodingLayerParser(layers.LayerTypeVXLAN, &vxlan, &eth, &ip4, &ip6, &arp)
parser.IgnoreUnsupported = true
var decoded []gopacket.LayerType
// Start reading packets, with current buffer size.
l.log.Debug("packet reader - started")
buf := make([]byte, l.net.maxMessageSize)
for {
// Read packet.
n, err := l.net.conn.Read(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) {
break
}
l.log.Errorf("received error reading from listener: %v", err)
}
// Only process packets larger than a vxlan header.
if n > 38 {
// Attempt to parse vxlan and its layers, up to IP and ARP.
// Parsing any further is a waste of processing power.
decoded = nil
packet := buf[:n]
err = parser.DecodeLayers(packet, &decoded)
// If we successfully parsed a packet, route it accordingly.
if err == nil {
// To ensure the interfaces do not change as we're handling the packet, lock it.
l.net.RLock()
for _, ifce := range l.net.interfaces {
// If this interface has the decoded VNI, pass the packet to it.
if ifce.vni == vxlan.VNI {
// Depending on what layer type was decoded, pass to the interface.
for _, layerType := range decoded {
if layerType == layers.LayerTypeARP {
ifce.HandleARP(arp)
} else if layerType == layers.LayerTypeIPv4 {
ifce.HandleIPv4(eth, ip4)
} else if layerType == layers.LayerTypeIPv6 {
ifce.HandleIPv6(eth, ip6)
}
}
}
}
l.net.RUnlock()
}
}
// If the max message size has a change request, make a new buffer and save change.
select {
case newSize, ok := <-l.net.maxMessageSizeC:
if ok {
buf = make([]byte, newSize)
}
default:
}
}
}
// Write data to destination.
func (l *Listener) WriteTo(b []byte, addr *net.UDPAddr) (int, error) {
return l.net.conn.WriteTo(b, addr)
}