Merge pull request #675 from Arceliar/buffers

Buffers
This commit is contained in:
Arceliar 2020-05-02 10:08:30 -05:00 committed by GitHub
commit 349c6dbad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 420 additions and 561 deletions

View File

@ -110,7 +110,8 @@ type Session struct {
// there is exactly one entry then this node is not connected to any other nodes // there is exactly one entry then this node is not connected to any other nodes
// and is therefore isolated. // and is therefore isolated.
func (c *Core) GetPeers() []Peer { func (c *Core) GetPeers() []Peer {
ports := c.peers.ports.Load().(map[switchPort]*peer) var ports map[switchPort]*peer
phony.Block(&c.peers, func() { ports = c.peers.ports })
var peers []Peer var peers []Peer
var ps []switchPort var ps []switchPort
for port := range ports { for port := range ports {
@ -143,10 +144,14 @@ func (c *Core) GetPeers() []Peer {
// isolated or not connected to any peers. // isolated or not connected to any peers.
func (c *Core) GetSwitchPeers() []SwitchPeer { func (c *Core) GetSwitchPeers() []SwitchPeer {
var switchpeers []SwitchPeer var switchpeers []SwitchPeer
table := c.switchTable.table.Load().(lookupTable) var table *lookupTable
peers := c.peers.ports.Load().(map[switchPort]*peer) var ports map[switchPort]*peer
phony.Block(&c.peers, func() {
table = c.peers.table
ports = c.peers.ports
})
for _, elem := range table.elems { for _, elem := range table.elems {
peer, isIn := peers[elem.port] peer, isIn := ports[elem.port]
if !isIn { if !isIn {
continue continue
} }
@ -194,34 +199,6 @@ func (c *Core) GetDHT() []DHTEntry {
return dhtentries return dhtentries
} }
// GetSwitchQueues returns information about the switch queues that are
// currently in effect. These values can change within an instant.
func (c *Core) GetSwitchQueues() SwitchQueues {
var switchqueues SwitchQueues
switchTable := &c.switchTable
getSwitchQueues := func() {
switchqueues = SwitchQueues{
Count: uint64(len(switchTable.queues.bufs)),
Size: switchTable.queues.size,
HighestCount: uint64(switchTable.queues.maxbufs),
HighestSize: switchTable.queues.maxsize,
MaximumSize: switchTable.queues.totalMaxSize,
}
for k, v := range switchTable.queues.bufs {
nexthop := switchTable.bestPortForCoords([]byte(k))
queue := SwitchQueue{
ID: k,
Size: v.size,
Packets: uint64(len(v.packets)),
Port: uint64(nexthop),
}
switchqueues.Queues = append(switchqueues.Queues, queue)
}
}
phony.Block(&c.switchTable, getSwitchQueues)
return switchqueues
}
// GetSessions returns a list of open sessions from this node to other nodes. // GetSessions returns a list of open sessions from this node to other nodes.
func (c *Core) GetSessions() []Session { func (c *Core) GetSessions() []Session {
var sessions []Session var sessions []Session
@ -324,8 +301,11 @@ func (c *Core) EncryptionPublicKey() string {
// connected to any other nodes (effectively making you the root of a // connected to any other nodes (effectively making you the root of a
// single-node network). // single-node network).
func (c *Core) Coords() []uint64 { func (c *Core) Coords() []uint64 {
table := c.switchTable.table.Load().(lookupTable) var coords []byte
return wire_coordsBytestoUint64s(table.self.getCoords()) phony.Block(&c.router, func() {
coords = c.router.table.self.getCoords()
})
return wire_coordsBytestoUint64s(coords)
} }
// Address gets the IPv6 address of the Yggdrasil node. This is always a /128 // Address gets the IPv6 address of the Yggdrasil node. This is always a /128
@ -489,7 +469,11 @@ func (c *Core) CallPeer(addr string, sintf string) error {
// DisconnectPeer disconnects a peer once. This should be specified as a port // DisconnectPeer disconnects a peer once. This should be specified as a port
// number. // number.
func (c *Core) DisconnectPeer(port uint64) error { func (c *Core) DisconnectPeer(port uint64) error {
c.peers.removePeer(switchPort(port)) c.peers.Act(nil, func() {
if p, isIn := c.peers.ports[switchPort(port)]; isIn {
p.Act(&c.peers, p._removeSelf)
}
})
return nil return nil
} }

View File

@ -198,8 +198,10 @@ func (c *Core) _stop() {
c.addPeerTimer.Stop() c.addPeerTimer.Stop()
} }
c.link.stop() c.link.stop()
/* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown
for _, peer := range c.GetPeers() { for _, peer := range c.GetPeers() {
c.DisconnectPeer(peer.Port) c.DisconnectPeer(peer.Port)
} }
*/
c.log.Infoln("Stopped") c.log.Infoln("Stopped")
} }

View File

@ -186,11 +186,9 @@ func dht_ordered(first, second, third *crypto.NodeID) bool {
// Update info about the node that sent the request. // Update info about the node that sent the request.
func (t *dht) handleReq(req *dhtReq) { func (t *dht) handleReq(req *dhtReq) {
// Send them what they asked for // Send them what they asked for
loc := t.router.core.switchTable.getLocator()
coords := loc.getCoords()
res := dhtRes{ res := dhtRes{
Key: t.router.core.boxPub, Key: t.router.core.boxPub,
Coords: coords, Coords: t.router.table.self.getCoords(),
Dest: req.Dest, Dest: req.Dest,
Infos: t.lookup(&req.Dest, false), Infos: t.lookup(&req.Dest, false),
} }
@ -300,11 +298,9 @@ func (t *dht) ping(info *dhtInfo, target *crypto.NodeID) {
if target == nil { if target == nil {
target = &t.nodeID target = &t.nodeID
} }
loc := t.router.core.switchTable.getLocator()
coords := loc.getCoords()
req := dhtReq{ req := dhtReq{
Key: t.router.core.boxPub, Key: t.router.core.boxPub,
Coords: coords, Coords: t.router.table.self.getCoords(),
Dest: *target, Dest: *target,
} }
t.sendReq(&req, info) t.sendReq(&req, info)
@ -378,7 +374,7 @@ func (t *dht) getImportant() []*dhtInfo {
}) })
// Keep the ones that are no further than the closest seen so far // Keep the ones that are no further than the closest seen so far
minDist := ^uint64(0) minDist := ^uint64(0)
loc := t.router.core.switchTable.getLocator() loc := t.router.table.self
important := infos[:0] important := infos[:0]
for _, info := range infos { for _, info := range infos {
dist := uint64(loc.dist(info.coords)) dist := uint64(loc.dist(info.coords))
@ -416,7 +412,7 @@ func (t *dht) isImportant(ninfo *dhtInfo) bool {
} }
important := t.getImportant() important := t.getImportant()
// Check if ninfo is of equal or greater importance to what we already know // Check if ninfo is of equal or greater importance to what we already know
loc := t.router.core.switchTable.getLocator() loc := t.router.table.self
ndist := uint64(loc.dist(ninfo.coords)) ndist := uint64(loc.dist(ninfo.coords))
minDist := ^uint64(0) minDist := ^uint64(0)
for _, info := range important { for _, info := range important {

View File

@ -62,7 +62,7 @@ type linkInterface struct {
keepAliveTimer *time.Timer // Fires to send keep-alive traffic keepAliveTimer *time.Timer // Fires to send keep-alive traffic
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
inSwitch bool // True if the switch is tracking this link isIdle bool // True if the peer actor knows the link is idle
stalled bool // True if we haven't been receiving any response traffic stalled bool // True if we haven't been receiving any response traffic
unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up) unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up)
} }
@ -217,19 +217,30 @@ func (intf *linkInterface) handler() error {
intf.link.mutex.Unlock() intf.link.mutex.Unlock()
// Create peer // Create peer
shared := crypto.GetSharedKey(myLinkPriv, &meta.link) shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) phony.Block(&intf.link.core.peers, func() {
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() })
})
if intf.peer == nil { if intf.peer == nil {
return errors.New("failed to create peer") return errors.New("failed to create peer")
} }
defer func() { defer func() {
// More cleanup can go here // More cleanup can go here
intf.link.core.peers.removePeer(intf.peer.port) intf.peer.Act(nil, intf.peer._removeSelf)
}() }()
intf.peer.out = func(msgs [][]byte) { intf.peer.out = func(msgs [][]byte) {
intf.writer.sendFrom(intf.peer, msgs, false) // nil to prevent it from blocking if the link is somehow frozen
// this is safe because another packet won't be sent until the link notifies
// the peer that it's ready for one
intf.writer.sendFrom(nil, msgs, false)
} }
intf.peer.linkOut = func(bs []byte) { intf.peer.linkOut = func(bs []byte) {
intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) // nil to prevent it from blocking if the link is somehow frozen
// FIXME this is hypothetically not safe, the peer shouldn't be sending
// additional packets until this one finishes, otherwise this could leak
// memory if writing happens slower than link packets are generated...
// that seems unlikely, so it's a lesser evil than deadlocking for now
intf.writer.sendFrom(nil, [][]byte{bs}, true)
} }
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
@ -275,20 +286,13 @@ const (
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
if !isLinkTraffic { if !isLinkTraffic {
intf.inSwitch = false intf.isIdle = false
} }
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
intf._cancelStallTimer() intf._cancelStallTimer()
}) })
} }
// called by an AfterFunc if we seem to be blocked in a send syscall for a long time
func (intf *linkInterface) _notifySyscall() {
intf.link.core.switchTable.Act(intf, func() {
intf.link.core.switchTable._sendingIn(intf.peer.port)
})
}
// we just sent something, so cancel any pending timer to send keep-alive traffic // we just sent something, so cancel any pending timer to send keep-alive traffic
func (intf *linkInterface) _cancelStallTimer() { func (intf *linkInterface) _cancelStallTimer() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
@ -304,7 +308,7 @@ func (intf *linkInterface) notifyBlockedSend() {
intf.Act(nil, func() { intf.Act(nil, func() {
if intf.sendTimer != nil { if intf.sendTimer != nil {
//As far as we know, we're still trying to send, and the timer fired. //As far as we know, we're still trying to send, and the timer fired.
intf.link.core.switchTable.blockPeer(intf.peer.port) intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
} }
}) })
} }
@ -315,7 +319,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
intf.sendTimer.Stop() intf.sendTimer.Stop()
intf.sendTimer = nil intf.sendTimer = nil
if !isLinkTraffic { if !isLinkTraffic {
intf._notifySwitch() intf._notifyIdle()
} }
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
@ -324,15 +328,13 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
} }
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state // Notify the switch that we're ready for more traffic, assuming we're not in a stalled state
func (intf *linkInterface) _notifySwitch() { func (intf *linkInterface) _notifyIdle() {
if !intf.inSwitch { if !intf.isIdle {
if intf.stalled { if intf.stalled {
intf.unstalled = false intf.unstalled = false
} else { } else {
intf.inSwitch = true intf.isIdle = true
intf.link.core.switchTable.Act(intf, func() { intf.peer.Act(intf, intf.peer._handleIdle)
intf.link.core.switchTable._idleIn(intf.peer.port)
})
} }
} }
} }
@ -344,7 +346,7 @@ func (intf *linkInterface) notifyStalled() {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
intf.stalled = true intf.stalled = true
intf.link.core.switchTable.blockPeer(intf.peer.port) intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
} }
}) })
} }
@ -368,7 +370,7 @@ func (intf *linkInterface) notifyRead(size int) {
} }
intf.stalled = false intf.stalled = false
if !intf.unstalled { if !intf.unstalled {
intf._notifySwitch() intf._notifyIdle()
intf.unstalled = true intf.unstalled = true
} }
if size > 0 && intf.stallTimer == nil { if size > 0 && intf.stallTimer == nil {
@ -402,19 +404,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
size += len(bs) size += len(bs)
} }
w.intf.notifySending(size, isLinkTraffic) w.intf.notifySending(size, isLinkTraffic)
// start a timer that will fire if we get stuck in writeMsgs for an oddly long time
var once sync.Once
timer := time.AfterFunc(time.Millisecond, func() {
// 1 ms is kind of arbitrary
// the rationale is that this should be very long compared to a syscall
// but it's still short compared to end-to-end latency or human perception
once.Do(func() {
w.intf.Act(nil, w.intf._notifySyscall)
})
})
w.intf.msgIO.writeMsgs(bss) w.intf.msgIO.writeMsgs(bss)
// Make sure we either stop the timer from doing anything or wait until it's done
once.Do(func() { timer.Stop() })
w.intf.notifySent(size, isLinkTraffic) w.intf.notifySent(size, isLinkTraffic)
// Cleanup // Cleanup
for _, bs := range bss { for _, bs := range bss {

View File

@ -18,6 +18,7 @@ type nodeinfo struct {
myNodeInfo NodeInfoPayload myNodeInfo NodeInfoPayload
callbacks map[crypto.BoxPubKey]nodeinfoCallback callbacks map[crypto.BoxPubKey]nodeinfoCallback
cache map[crypto.BoxPubKey]nodeinfoCached cache map[crypto.BoxPubKey]nodeinfoCached
table *lookupTable
} }
type nodeinfoCached struct { type nodeinfoCached struct {
@ -187,9 +188,9 @@ func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse
} }
func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) {
table := m.core.switchTable.table.Load().(lookupTable) loc := m.table.self
nodeinfo := nodeinfoReqRes{ nodeinfo := nodeinfoReqRes{
SendCoords: table.self.getCoords(), SendCoords: loc.getCoords(),
IsResponse: isResponse, IsResponse: isResponse,
NodeInfo: m._getNodeInfo(), NodeInfo: m._getNodeInfo(),
} }

View File

@ -0,0 +1,126 @@
package yggdrasil
import (
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
)
// TODO take max size from config
const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB
type pqStreamID string
type pqPacketInfo struct {
packet []byte
time time.Time
}
type pqStream struct {
infos []pqPacketInfo
size uint64
}
// TODO separate queues per e.g. traffic flow
type packetQueue struct {
streams map[pqStreamID]pqStream
size uint64
}
func (q *packetQueue) cleanup() {
for q.size > MAX_PACKET_QUEUE_SIZE {
// TODO? drop from a random stream
// odds proportional to size? bandwidth?
// always using the worst is exploitable -> flood 1 packet per random stream
// find the stream that's using the most bandwidth
now := time.Now()
var worst pqStreamID
for id := range q.streams {
worst = id
break // get a random ID to start
}
worstStream := q.streams[worst]
worstSize := float64(worstStream.size)
worstAge := now.Sub(worstStream.infos[0].time).Seconds()
for id, stream := range q.streams {
thisSize := float64(stream.size)
thisAge := now.Sub(stream.infos[0].time).Seconds()
// cross multiply to avoid division by zero issues
if worstSize*thisAge < thisSize*worstAge {
// worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth
worst = id
worstStream = stream
worstSize = thisSize
worstAge = thisAge
}
}
// Drop the oldest packet from the worst stream
packet := worstStream.infos[0].packet
worstStream.infos = worstStream.infos[1:]
worstStream.size -= uint64(len(packet))
q.size -= uint64(len(packet))
util.PutBytes(packet)
// save the modified stream to queues
if len(worstStream.infos) > 0 {
q.streams[worst] = worstStream
} else {
delete(q.streams, worst)
}
}
}
func (q *packetQueue) push(packet []byte) {
if q.streams == nil {
q.streams = make(map[pqStreamID]pqStream)
}
// get stream
id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now
stream := q.streams[id]
// update stream
stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()})
stream.size += uint64(len(packet))
// save update to queues
q.streams[id] = stream
q.size += uint64(len(packet))
q.cleanup()
}
func (q *packetQueue) pop() ([]byte, bool) {
if len(q.streams) > 0 {
// get the stream that uses the least bandwidth
now := time.Now()
var best pqStreamID
for id := range q.streams {
best = id
break // get a random ID to start
}
bestStream := q.streams[best]
bestSize := float64(bestStream.size)
bestAge := now.Sub(bestStream.infos[0].time).Seconds()
for id, stream := range q.streams {
thisSize := float64(stream.size)
thisAge := now.Sub(stream.infos[0].time).Seconds()
// cross multiply to avoid division by zero issues
if bestSize*thisAge > thisSize*bestAge {
// bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth
best = id
bestStream = stream
bestSize = thisSize
bestAge = thisAge
}
}
// get the oldest packet from the best stream
packet := bestStream.infos[0].packet
bestStream.infos = bestStream.infos[1:]
bestStream.size -= uint64(len(packet))
q.size -= uint64(len(packet))
// save the modified stream to queues
if len(bestStream.infos) > 0 {
q.streams[best] = bestStream
} else {
delete(q.streams, best)
}
return packet, true
}
return nil, false
}

View File

@ -6,8 +6,6 @@ package yggdrasil
import ( import (
"encoding/hex" "encoding/hex"
"sync"
"sync/atomic"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
@ -21,17 +19,17 @@ import (
// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer. // In most cases, this involves passing the packet to the handler for outgoing traffic to another peer.
// In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch. // In other cases, its link protocol traffic is used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
type peers struct { type peers struct {
phony.Inbox
core *Core core *Core
mutex sync.Mutex // Synchronize writes to atomic ports map[switchPort]*peer // use CoW semantics, share updated version with each peer
ports atomic.Value //map[switchPort]*peer, use CoW semantics table *lookupTable // Sent from switch, share updated version with each peer
} }
// Initializes the peers struct. // Initializes the peers struct.
func (ps *peers) init(c *Core) { func (ps *peers) init(c *Core) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c ps.core = c
ps.ports = make(map[switchPort]*peer)
ps.table = new(lookupTable)
} }
func (ps *peers) reconfigure() { func (ps *peers) reconfigure() {
@ -80,16 +78,6 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string {
return ps.core.config.Current.AllowedEncryptionPublicKeys return ps.core.config.Current.AllowedEncryptionPublicKeys
} }
// Atomically gets a map[switchPort]*peer of known peers.
func (ps *peers) getPorts() map[switchPort]*peer {
return ps.ports.Load().(map[switchPort]*peer)
}
// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time).
func (ps *peers) putPorts(ports map[switchPort]*peer) {
ps.ports.Store(ports)
}
// Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct { type peer struct {
phony.Inbox phony.Inbox
@ -110,10 +98,33 @@ type peer struct {
// The below aren't actually useful internally, they're just gathered for getPeers statistics // The below aren't actually useful internally, they're just gathered for getPeers statistics
bytesSent uint64 bytesSent uint64
bytesRecvd uint64 bytesRecvd uint64
ports map[switchPort]*peer
table *lookupTable
queue packetQueue
idle bool
}
func (ps *peers) updateTables(from phony.Actor, table *lookupTable) {
ps.Act(from, func() {
ps.table = table
ps._updatePeers()
})
}
func (ps *peers) _updatePeers() {
ports := ps.ports
table := ps.table
for _, peer := range ps.ports {
p := peer // peer is mutated during iteration
p.Act(ps, func() {
p.ports = ports
p.table = table
})
}
} }
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer {
now := time.Now() now := time.Now()
p := peer{box: *box, p := peer{box: *box,
sig: *sig, sig: *sig,
@ -125,9 +136,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare
core: ps.core, core: ps.core,
intf: intf, intf: intf,
} }
ps.mutex.Lock() oldPorts := ps.ports
defer ps.mutex.Unlock()
oldPorts := ps.getPorts()
newPorts := make(map[switchPort]*peer) newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts { for k, v := range oldPorts {
newPorts[k] = v newPorts[k] = v
@ -139,46 +148,49 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare
break break
} }
} }
ps.putPorts(newPorts) ps.ports = newPorts
ps._updatePeers()
return &p return &p
} }
// Removes a peer for a given port, if one exists. func (p *peer) _removeSelf() {
func (ps *peers) removePeer(port switchPort) { p.core.peers.Act(p, func() {
if port == 0 { p.core.peers._removePeer(p)
return
} // Can't remove self peer
phony.Block(&ps.core.router, func() {
ps.core.switchTable.forgetPeer(port)
}) })
ps.mutex.Lock() }
oldPorts := ps.getPorts()
p, isIn := oldPorts[port] // Removes a peer for a given port, if one exists.
func (ps *peers) _removePeer(p *peer) {
if q := ps.ports[p.port]; p.port == 0 || q != p {
return
} // Can't remove self peer or nonexistant peer
ps.core.switchTable.forgetPeer(ps, p.port)
oldPorts := ps.ports
newPorts := make(map[switchPort]*peer) newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts { for k, v := range oldPorts {
newPorts[k] = v newPorts[k] = v
} }
delete(newPorts, port) delete(newPorts, p.port)
ps.putPorts(newPorts) if p.close != nil {
ps.mutex.Unlock() p.close()
if isIn {
if p.close != nil {
p.close()
}
close(p.done)
} }
close(p.done)
ps.ports = newPorts
ps._updatePeers()
} }
// If called, sends a notification to each peer that they should send a new switch message. // If called, sends a notification to each peer that they should send a new switch message.
// Mainly called by the switch after an update. // Mainly called by the switch after an update.
func (ps *peers) sendSwitchMsgs(from phony.Actor) { func (ps *peers) sendSwitchMsgs(from phony.Actor) {
ports := ps.getPorts() ps.Act(from, func() {
for _, p := range ports { for _, peer := range ps.ports {
if p.port == 0 { p := peer
continue if p.port == 0 {
continue
}
p.Act(ps, p._sendSwitchMsg)
} }
p.Act(from, p._sendSwitchMsg) })
}
} }
// This must be launched in a separate goroutine by whatever sets up the peer struct. // This must be launched in a separate goroutine by whatever sets up the peer struct.
@ -233,15 +245,26 @@ func (p *peer) _handlePacket(packet []byte) {
} }
} }
// Get the coords of a packet without decoding
func peer_getPacketCoords(packet []byte) []byte {
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
return coords
}
// Called to handle traffic or protocolTraffic packets. // Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
func (p *peer) _handleTraffic(packet []byte) { func (p *peer) _handleTraffic(packet []byte) {
table := p.core.switchTable.getTable() if _, isIn := p.table.elems[p.port]; !isIn && p.port != 0 {
if _, isIn := table.elems[p.port]; !isIn && p.port != 0 {
// Drop traffic if the peer isn't in the switch // Drop traffic if the peer isn't in the switch
return return
} }
p.core.switchTable.packetInFrom(p, packet) coords := peer_getPacketCoords(packet)
next := p.table.lookup(coords)
if nPeer, isIn := p.ports[next]; isIn {
nPeer.sendPacketsFrom(p, [][]byte{packet})
}
//p.core.switchTable.packetInFrom(p, packet)
} }
func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) {
@ -250,16 +273,33 @@ func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) {
}) })
} }
// This just calls p.out(packet) for now.
func (p *peer) _sendPackets(packets [][]byte) { func (p *peer) _sendPackets(packets [][]byte) {
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
var size int
for _, packet := range packets { for _, packet := range packets {
size += len(packet) p.queue.push(packet)
}
if p.idle {
p.idle = false
p._handleIdle()
}
}
func (p *peer) _handleIdle() {
var packets [][]byte
var size uint64
for size < 65535 {
if packet, success := p.queue.pop(); success {
packets = append(packets, packet)
size += uint64(len(packet))
} else {
break
}
}
if len(packets) > 0 {
p.bytesSent += uint64(size)
p.out(packets)
} else {
p.idle = true
} }
p.bytesSent += uint64(size)
p.out(packets)
} }
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
@ -313,7 +353,7 @@ func (p *peer) _handleLinkTraffic(bs []byte) {
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them. // Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
func (p *peer) _sendSwitchMsg() { func (p *peer) _sendSwitchMsg() {
msg := p.core.switchTable.getMsg() msg := p.table.getMsg()
if msg == nil { if msg == nil {
return return
} }
@ -335,7 +375,8 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
return return
} }
if len(msg.Hops) < 1 { if len(msg.Hops) < 1 {
p.core.peers.removePeer(p.port) p._removeSelf()
return
} }
var loc switchLocator var loc switchLocator
prevKey := msg.Root prevKey := msg.Root
@ -346,23 +387,31 @@ func (p *peer) _handleSwitchMsg(packet []byte) {
loc.coords = append(loc.coords, hop.Port) loc.coords = append(loc.coords, hop.Port)
bs := getBytesForSig(&hop.Next, &sigMsg) bs := getBytesForSig(&hop.Next, &sigMsg)
if !crypto.Verify(&prevKey, bs, &hop.Sig) { if !crypto.Verify(&prevKey, bs, &hop.Sig) {
p.core.peers.removePeer(p.port) p._removeSelf()
return
} }
prevKey = hop.Next prevKey = hop.Next
} }
p.core.switchTable.handleMsg(&msg, p.port) p.core.switchTable.Act(p, func() {
if !p.core.switchTable.checkRoot(&msg) { if !p.core.switchTable._checkRoot(&msg) {
// Bad switch message // Bad switch message
p.dinfo = nil p.Act(&p.core.switchTable, func() {
return p.dinfo = nil
} })
// Pass a message to the dht informing it that this peer (still) exists } else {
loc.coords = loc.coords[:len(loc.coords)-1] // handle the message
p.dinfo = &dhtInfo{ p.core.switchTable._handleMsg(&msg, p.port, false)
key: p.box, p.Act(&p.core.switchTable, func() {
coords: loc.getCoords(), // Pass a message to the dht informing it that this peer (still) exists
} loc.coords = loc.coords[:len(loc.coords)-1]
p._updateDHT() p.dinfo = &dhtInfo{
key: p.box,
coords: loc.getCoords(),
}
p._updateDHT()
})
}
})
} }
// This generates the bytes that we sign or check the signature of for a switchMsg. // This generates the bytes that we sign or check the signature of for a switchMsg.

View File

@ -46,6 +46,7 @@ type router struct {
nodeinfo nodeinfo nodeinfo nodeinfo
searches searches searches searches
sessions sessions sessions sessions
table *lookupTable // has a copy of our locator
} }
// Initializes the router struct, which includes setting up channels to/from the adapter. // Initializes the router struct, which includes setting up channels to/from the adapter.
@ -61,8 +62,19 @@ func (r *router) init(core *Core) {
linkType: "self", linkType: "self",
}, },
} }
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) var p *peer
p.out = func(packets [][]byte) { r.handlePackets(p, packets) } phony.Block(&r.core.peers, func() {
// FIXME don't block here!
p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
})
p.out = func(packets [][]byte) {
r.handlePackets(p, packets)
r.Act(p, func() {
// after the router handle the packets, notify the peer that it's ready for more
p.Act(r, p._handleIdle)
})
}
p.Act(r, p._handleIdle)
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
r.nodeinfo.init(r.core) r.nodeinfo.init(r.core)
r.core.config.Mutex.RLock() r.core.config.Mutex.RLock()
@ -73,6 +85,21 @@ func (r *router) init(core *Core) {
r.sessions.init(r) r.sessions.init(r)
} }
func (r *router) updateTable(from phony.Actor, table *lookupTable) {
r.Act(from, func() {
r.table = table
r.nodeinfo.Act(r, func() {
r.nodeinfo.table = table
})
for _, ses := range r.sessions.sinfos {
sinfo := ses
sinfo.Act(r, func() {
sinfo.table = table
})
}
})
}
// Reconfigures the router and any child modules. This should only ever be run // Reconfigures the router and any child modules. This should only ever be run
// by the router actor. // by the router actor.
func (r *router) reconfigure() { func (r *router) reconfigure() {
@ -126,7 +153,7 @@ func (r *router) reset(from phony.Actor) {
func (r *router) doMaintenance() { func (r *router) doMaintenance() {
phony.Block(r, func() { phony.Block(r, func() {
// Any periodic maintenance stuff goes here // Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance() r.core.switchTable.doMaintenance(r)
r.dht.doMaintenance() r.dht.doMaintenance()
r.sessions.cleanup() r.sessions.cleanup()
}) })

View File

@ -161,11 +161,10 @@ func (sinfo *searchInfo) continueSearch(infos []*dhtInfo) {
// Initially start a search // Initially start a search
func (sinfo *searchInfo) startSearch() { func (sinfo *searchInfo) startSearch() {
loc := sinfo.searches.router.core.switchTable.getLocator()
var infos []*dhtInfo var infos []*dhtInfo
infos = append(infos, &dhtInfo{ infos = append(infos, &dhtInfo{
key: sinfo.searches.router.core.boxPub, key: sinfo.searches.router.core.boxPub,
coords: loc.getCoords(), coords: sinfo.searches.router.table.self.getCoords(),
}) })
// Start the search by asking ourself, useful if we're the destination // Start the search by asking ourself, useful if we're the destination
sinfo.continueSearch(infos) sinfo.continueSearch(infos)

View File

@ -16,9 +16,6 @@ import (
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery
const nonceWindow = time.Second
// All the information we know about an active session. // All the information we know about an active session.
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
type sessionInfo struct { type sessionInfo struct {
@ -52,6 +49,7 @@ type sessionInfo struct {
cancel util.Cancellation // Used to terminate workers cancel util.Cancellation // Used to terminate workers
conn *Conn // The associated Conn object conn *Conn // The associated Conn object
callbacks []chan func() // Finished work from crypto workers callbacks []chan func() // Finished work from crypto workers
table *lookupTable // table.self is a locator where we get our coords
} }
// Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
@ -217,6 +215,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle() sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.table = ss.router.table
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
return &sinfo return &sinfo
@ -266,8 +265,7 @@ func (ss *sessions) removeSession(sinfo *sessionInfo) {
// Returns a session ping appropriate for the given session info. // Returns a session ping appropriate for the given session info.
func (sinfo *sessionInfo) _getPing() sessionPing { func (sinfo *sessionInfo) _getPing() sessionPing {
loc := sinfo.sessions.router.core.switchTable.getLocator() coords := sinfo.table.self.getCoords()
coords := loc.getCoords()
ping := sessionPing{ ping := sessionPing{
SendPermPub: sinfo.sessions.router.core.boxPub, SendPermPub: sinfo.sessions.router.core.boxPub,
Handle: sinfo.myHandle, Handle: sinfo.myHandle,
@ -393,14 +391,9 @@ func (sinfo *sessionInfo) _getMTU() MTU {
return sinfo.myMTU return sinfo.myMTU
} }
// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. // Checks if a packet's nonce is newer than any previously received
func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool { func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool {
// The bitmask is to allow for some non-duplicate out-of-order packets return theirNonce.Minus(&sinfo.theirNonce) > 0
if theirNonce.Minus(&sinfo.theirNonce) > 0 {
// This is newer than the newest nonce we've seen
return true
}
return time.Since(sinfo.time) < nonceWindow
} }
// Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce // Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce

View File

@ -12,13 +12,9 @@ package yggdrasil
// A little annoying to do with constant changes from backpressure // A little annoying to do with constant changes from backpressure
import ( import (
"math/rand"
"sync"
"sync/atomic"
"time" "time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
) )
@ -150,6 +146,7 @@ type tableElem struct {
type lookupTable struct { type lookupTable struct {
self switchLocator self switchLocator
elems map[switchPort]tableElem elems map[switchPort]tableElem
_msg switchMsg
} }
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
@ -167,17 +164,11 @@ type switchData struct {
type switchTable struct { type switchTable struct {
core *Core core *Core
key crypto.SigPubKey // Our own key key crypto.SigPubKey // Our own key
phony.Inbox // Owns the below
time time.Time // Time when locator.tstamp was last updated time time.Time // Time when locator.tstamp was last updated
drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData
parent switchPort // Port of whatever peer is our parent, or self if we're root parent switchPort // Port of whatever peer is our parent, or self if we're root
data switchData // data switchData //
updater atomic.Value // *sync.Once
table atomic.Value // lookupTable
phony.Inbox // Owns the below
queues switch_buffers // Queues - not atomic so ONLY use through the actor
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
sending map[switchPort]struct{} // peers known to be blocked in a send (somehow)
} }
// Minimum allowed total size of switch queues. // Minimum allowed total size of switch queues.
@ -191,21 +182,8 @@ func (t *switchTable) init(core *Core) {
locator := switchLocator{root: t.key, tstamp: now.Unix()} locator := switchLocator{root: t.key, tstamp: now.Unix()}
peers := make(map[switchPort]peerInfo) peers := make(map[switchPort]peerInfo)
t.data = switchData{locator: locator, peers: peers} t.data = switchData{locator: locator, peers: peers}
t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{})
t.drop = make(map[crypto.SigPubKey]int64) t.drop = make(map[crypto.SigPubKey]int64)
phony.Block(t, func() { phony.Block(t, t._updateTable)
core.config.Mutex.RLock()
if core.config.Current.SwitchOptions.MaxTotalQueueSize > SwitchQueueTotalMinSize {
t.queues.totalMaxSize = core.config.Current.SwitchOptions.MaxTotalQueueSize
} else {
t.queues.totalMaxSize = SwitchQueueTotalMinSize
}
core.config.Mutex.RUnlock()
t.queues.bufs = make(map[string]switch_buffer)
t.idle = make(map[switchPort]struct{})
t.sending = make(map[switchPort]struct{})
})
} }
func (t *switchTable) reconfigure() { func (t *switchTable) reconfigure() {
@ -214,24 +192,17 @@ func (t *switchTable) reconfigure() {
t.core.peers.reconfigure() t.core.peers.reconfigure()
} }
// Safely gets a copy of this node's locator.
func (t *switchTable) getLocator() switchLocator {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.data.locator.clone()
}
// Regular maintenance to possibly timeout/reset the root and similar. // Regular maintenance to possibly timeout/reset the root and similar.
func (t *switchTable) doMaintenance() { func (t *switchTable) doMaintenance(from phony.Actor) {
// Periodic maintenance work to keep things internally consistent t.Act(from, func() {
t.mutex.Lock() // Write lock // Periodic maintenance work to keep things internally consistent
defer t.mutex.Unlock() // Release lock when we're done t._cleanRoot()
t.cleanRoot() t._cleanDropped()
t.cleanDropped() })
} }
// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out.
func (t *switchTable) cleanRoot() { func (t *switchTable) _cleanRoot() {
// TODO rethink how this is done?... // TODO rethink how this is done?...
// Get rid of the root if it looks like its timed out // Get rid of the root if it looks like its timed out
now := time.Now() now := time.Now()
@ -256,58 +227,58 @@ func (t *switchTable) cleanRoot() {
t.time = now t.time = now
if t.data.locator.root != t.key { if t.data.locator.root != t.key {
t.data.seq++ t.data.seq++
t.updater.Store(&sync.Once{}) defer t.core.router.reset(nil)
t.core.router.reset(nil)
} }
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
t._updateTable() // updates base copy of switch msg in lookupTable
t.core.peers.sendSwitchMsgs(t) t.core.peers.sendSwitchMsgs(t)
} }
} }
// Blocks and, if possible, unparents a peer // Blocks and, if possible, unparents a peer
func (t *switchTable) blockPeer(port switchPort) { func (t *switchTable) blockPeer(from phony.Actor, port switchPort) {
t.mutex.Lock() t.Act(from, func() {
defer t.mutex.Unlock() peer, isIn := t.data.peers[port]
peer, isIn := t.data.peers[port] if !isIn {
if !isIn { return
return
}
peer.blocked = true
t.data.peers[port] = peer
if port != t.parent {
return
}
t.parent = 0
for _, info := range t.data.peers {
if info.port == port {
continue
} }
t.unlockedHandleMsg(&info.msg, info.port, true) peer.blocked = true
} t.data.peers[port] = peer
t.unlockedHandleMsg(&peer.msg, peer.port, true) if port != t.parent {
return
}
t.parent = 0
for _, info := range t.data.peers {
if info.port == port {
continue
}
t._handleMsg(&info.msg, info.port, true)
}
t._handleMsg(&peer.msg, peer.port, true)
})
} }
// Removes a peer. // Removes a peer.
// Must be called by the router actor with a lambda that calls this. // Must be called by the router actor with a lambda that calls this.
// If the removed peer was this node's parent, it immediately tries to find a new parent. // If the removed peer was this node's parent, it immediately tries to find a new parent.
func (t *switchTable) forgetPeer(port switchPort) { func (t *switchTable) forgetPeer(from phony.Actor, port switchPort) {
t.mutex.Lock() t.Act(from, func() {
defer t.mutex.Unlock() delete(t.data.peers, port)
delete(t.data.peers, port) defer t._updateTable()
t.updater.Store(&sync.Once{}) if port != t.parent {
if port != t.parent { return
return }
} t.parent = 0
t.parent = 0 for _, info := range t.data.peers {
for _, info := range t.data.peers { t._handleMsg(&info.msg, info.port, true)
t.unlockedHandleMsg(&info.msg, info.port, true) }
} })
} }
// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps.
// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos.
// This function is called periodically to do that cleanup. // This function is called periodically to do that cleanup.
func (t *switchTable) cleanDropped() { func (t *switchTable) _cleanDropped() {
// TODO? only call this after root changes, not periodically // TODO? only call this after root changes, not periodically
for root := range t.drop { for root := range t.drop {
if !firstIsBetter(&root, &t.data.locator.root) { if !firstIsBetter(&root, &t.data.locator.root) {
@ -333,9 +304,7 @@ type switchMsgHop struct {
} }
// This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer. // This returns a *switchMsg to a copy of this node's current switchMsg, which can safely have additional information appended to Hops and sent to a peer.
func (t *switchTable) getMsg() *switchMsg { func (t *switchTable) _getMsg() *switchMsg {
t.mutex.RLock()
defer t.mutex.RUnlock()
if t.parent == 0 { if t.parent == 0 {
return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp} return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp}
} else if parent, isIn := t.data.peers[t.parent]; isIn { } else if parent, isIn := t.data.peers[t.parent]; isIn {
@ -347,14 +316,18 @@ func (t *switchTable) getMsg() *switchMsg {
} }
} }
func (t *lookupTable) getMsg() *switchMsg {
msg := t._msg
msg.Hops = append([]switchMsgHop(nil), t._msg.Hops...)
return &msg
}
// This function checks that the root information in a switchMsg is OK. // This function checks that the root information in a switchMsg is OK.
// In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout. // In particular, that the root is better, or else the same as the current root but with a good timestamp, and that this root+timestamp haven't been dropped due to timeout.
func (t *switchTable) checkRoot(msg *switchMsg) bool { func (t *switchTable) _checkRoot(msg *switchMsg) bool {
// returns false if it's a dropped root, not a better root, or has an older timestamp // returns false if it's a dropped root, not a better root, or has an older timestamp
// returns true otherwise // returns true otherwise
// used elsewhere to keep inserting peers into the dht only if root info is OK // used elsewhere to keep inserting peers into the dht only if root info is OK
t.mutex.RLock()
defer t.mutex.RUnlock()
dropTstamp, isIn := t.drop[msg.Root] dropTstamp, isIn := t.drop[msg.Root]
switch { switch {
case isIn && dropTstamp >= msg.TStamp: case isIn && dropTstamp >= msg.TStamp:
@ -370,20 +343,13 @@ func (t *switchTable) checkRoot(msg *switchMsg) bool {
} }
} }
// This is a mutexed wrapper to unlockedHandleMsg, and is called by the peer structs in peers.go to pass a switchMsg for that peer into the switch.
func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.unlockedHandleMsg(msg, fromPort, false)
}
// This updates the switch with information about a peer. // This updates the switch with information about a peer.
// Then the tricky part, it decides if it should update our own locator as a result. // Then the tricky part, it decides if it should update our own locator as a result.
// That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc... // That happens if this node is already our parent, or is advertising a better root, or is advertising a better path to the same root, etc...
// There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing. // There are a lot of very delicate order sensitive checks here, so its' best to just read the code if you need to understand what it's doing.
// It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used alongside nodes that used the previous order. // It's very important to not change the order of the statements in the case function unless you're absolutely sure that it's safe, including safe if used alongside nodes that used the previous order.
// Set the third arg to true if you're reprocessing an old message, e.g. to find a new parent after one disconnects, to avoid updating some timing related things. // Set the third arg to true if you're reprocessing an old message, e.g. to find a new parent after one disconnects, to avoid updating some timing related things.
func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) { func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessing bool) {
// TODO directly use a switchMsg instead of switchMessage + sigs // TODO directly use a switchMsg instead of switchMessage + sigs
now := time.Now() now := time.Now()
// Set up the sender peerInfo // Set up the sender peerInfo
@ -506,10 +472,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
if peer.port == sender.port { if peer.port == sender.port {
continue continue
} }
t.unlockedHandleMsg(&peer.msg, peer.port, true) t._handleMsg(&peer.msg, peer.port, true)
} }
// Process the sender last, to avoid keeping them as a parent if at all possible. // Process the sender last, to avoid keeping them as a parent if at all possible.
t.unlockedHandleMsg(&sender.msg, sender.port, true) t._handleMsg(&sender.msg, sender.port, true)
case now.Sub(t.time) < switch_throttle: case now.Sub(t.time) < switch_throttle:
// We've already gotten an update from this root recently, so ignore this one to avoid flooding. // We've already gotten an update from this root recently, so ignore this one to avoid flooding.
case sender.locator.tstamp > t.data.locator.tstamp: case sender.locator.tstamp > t.data.locator.tstamp:
@ -527,10 +493,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
} }
t.data.locator = sender.locator t.data.locator = sender.locator
t.parent = sender.port t.parent = sender.port
t.core.peers.sendSwitchMsgs(t) defer t.core.peers.sendSwitchMsgs(t)
} }
if true || doUpdate { if true || doUpdate {
t.updater.Store(&sync.Once{}) defer t._updateTable()
} }
return return
} }
@ -540,7 +506,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
// The rest of these are related to the switch worker // The rest of these are related to the switch worker
// This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. // This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions.
func (t *switchTable) updateTable() { func (t *switchTable) _updateTable() {
// WARNING this should only be called from within t.data.updater.Do() // WARNING this should only be called from within t.data.updater.Do()
// It relies on the sync.Once for synchronization with messages and lookups // It relies on the sync.Once for synchronization with messages and lookups
// TODO use a pre-computed faster lookup table // TODO use a pre-computed faster lookup table
@ -549,8 +515,6 @@ func (t *switchTable) updateTable() {
// Each struct has stores the best port to forward to, and a next coord map // Each struct has stores the best port to forward to, and a next coord map
// Move to struct, then iterate over coord maps until you dead end // Move to struct, then iterate over coord maps until you dead end
// The last port before the dead end should be the closest // The last port before the dead end should be the closest
t.mutex.RLock()
defer t.mutex.RUnlock()
newTable := lookupTable{ newTable := lookupTable{
self: t.data.locator.clone(), self: t.data.locator.clone(),
elems: make(map[switchPort]tableElem, len(t.data.peers)), elems: make(map[switchPort]tableElem, len(t.data.peers)),
@ -568,13 +532,9 @@ func (t *switchTable) updateTable() {
time: pinfo.time, time: pinfo.time,
} }
} }
t.table.Store(newTable) newTable._msg = *t._getMsg()
} t.core.peers.updateTables(t, &newTable)
t.core.router.updateTable(t, &newTable)
// Returns a copy of the atomically-updated table used for switch lookups
func (t *switchTable) getTable() lookupTable {
t.updater.Load().(*sync.Once).Do(t.updateTable)
return t.table.Load().(lookupTable)
} }
// Starts the switch worker // Starts the switch worker
@ -584,307 +544,39 @@ func (t *switchTable) start() error {
return nil return nil
} }
type closerInfo struct { // Find the best port to forward to for a given set of coords
elem tableElem func (t *lookupTable) lookup(coords []byte) switchPort {
dist int var bestPort switchPort
} myDist := t.self.dist(coords)
bestDist := myDist
// Return a map of ports onto distance, keeping only ports closer to the destination than this node var bestElem tableElem
// If the map is empty (or nil), then no peer is closer for _, info := range t.elems {
func (t *switchTable) getCloser(dest []byte) []closerInfo { dist := info.locator.dist(coords)
table := t.getTable() if dist >= myDist {
myDist := table.self.dist(dest)
if myDist == 0 {
// Skip the iteration step if it's impossible to be closer
return nil
}
t.queues.closer = t.queues.closer[:0]
for _, info := range table.elems {
dist := info.locator.dist(dest)
if dist < myDist {
t.queues.closer = append(t.queues.closer, closerInfo{info, dist})
}
}
return t.queues.closer
}
// Returns true if the peer is closer to the destination than ourself
func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
table := t.getTable()
if info, isIn := table.elems[port]; isIn {
theirDist := info.locator.dist(dest)
myDist := table.self.dist(dest)
return theirDist < myDist
} else {
return false
}
}
// Get the coords of a packet without decoding
func switch_getPacketCoords(packet []byte) []byte {
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
return coords
}
// Returns a unique string for each stream of traffic
// Equal to coords
// The sender may append arbitrary info to the end of coords (as long as it's begins with a 0x00) to designate separate traffic streams
// Currently, it's the IPv6 next header type and the first 2 uint16 of the next header
// This is equivalent to the TCP/UDP protocol numbers and the source / dest ports
// TODO figure out if something else would make more sense (other transport protocols?)
func switch_getPacketStreamID(packet []byte) string {
return string(switch_getPacketCoords(packet))
}
// Returns the flowlabel from a given set of coords
func switch_getFlowLabelFromCoords(in []byte) []byte {
for i, v := range in {
if v == 0 {
return in[i+1:]
}
}
return []byte{}
}
// Find the best port for a given set of coords
func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
table := t.getTable()
var best switchPort
bestDist := table.self.dist(coords)
for to, elem := range table.elems {
dist := elem.locator.dist(coords)
if !(dist < bestDist) {
continue continue
} }
best = to
bestDist = dist
}
return best
}
// Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool {
coords := switch_getPacketCoords(packet)
closer := t.getCloser(coords)
if len(closer) == 0 {
// TODO? call the router directly, and remove the whole concept of a self peer?
self := t.core.peers.getPorts()[0]
self.sendPacketsFrom(t, [][]byte{packet})
return true
}
var best *closerInfo
ports := t.core.peers.getPorts()
for _, cinfo := range closer {
to := ports[cinfo.elem.port]
//_, isIdle := idle[cinfo.elem.port]
_, isSending := sending[cinfo.elem.port]
var update bool var update bool
switch { switch {
case to == nil: case dist < bestDist:
// no port was found, ignore it // Closer to destination
case isSending:
// the port is busy, ignore it
case best == nil:
// this is the first idle port we've found, so select it until we find a
// better candidate port to use instead
update = true update = true
case cinfo.dist < best.dist: case dist > bestDist:
// the port takes a shorter path/is more direct than our current // Further from destination
// candidate, so select that instead case info.locator.tstamp > bestElem.locator.tstamp:
// Newer root update
update = true update = true
case cinfo.dist > best.dist: case info.locator.tstamp < bestElem.locator.tstamp:
// the port takes a longer path/is less direct than our current candidate, // Older root update
// ignore it case info.time.Before(bestElem.time):
case cinfo.elem.locator.tstamp > best.elem.locator.tstamp: // Received root update via this peer sooner
// has a newer tstamp from the root, so presumably a better path
update = true
case cinfo.elem.locator.tstamp < best.elem.locator.tstamp:
// has a n older tstamp, so presumably a worse path
case cinfo.elem.time.Before(best.elem.time):
// same tstamp, but got it earlier, so presumably a better path
//t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time)
update = true update = true
default: default:
// the search for a port has finished
} }
if update { if update {
b := cinfo // because cinfo gets mutated by the iteration bestPort = info.port
best = &b bestDist = dist
bestElem = info
} }
} }
if best != nil { return bestPort
if _, isIdle := idle[best.elem.port]; isIdle {
delete(idle, best.elem.port)
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
return true
}
}
// Didn't find anyone idle to send it to
return false
}
// Info about a buffered packet
type switch_packetInfo struct {
bytes []byte
time time.Time // Timestamp of when the packet arrived
}
// Used to keep track of buffered packets
type switch_buffer struct {
packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large
size uint64 // Total queue size in bytes
}
type switch_buffers struct {
totalMaxSize uint64
bufs map[string]switch_buffer // Buffers indexed by StreamID
size uint64 // Total size of all buffers, in bytes
maxbufs int
maxsize uint64
closer []closerInfo // Scratch space
}
func (b *switch_buffers) _cleanup(t *switchTable) {
for streamID, buf := range b.bufs {
// Remove queues for which we have no next hop
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
if len(t.getCloser(coords)) == 0 {
for _, packet := range buf.packets {
util.PutBytes(packet.bytes)
}
b.size -= buf.size
delete(b.bufs, streamID)
}
}
for b.size > b.totalMaxSize {
// Drop a random queue
target := rand.Uint64() % b.size
var size uint64 // running total
for streamID, buf := range b.bufs {
size += buf.size
if size < target {
continue
}
var packet switch_packetInfo
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
b.size -= uint64(len(packet.bytes))
util.PutBytes(packet.bytes)
if len(buf.packets) == 0 {
delete(b.bufs, streamID)
} else {
// Need to update the map, since buf was retrieved by value
b.bufs[streamID] = buf
}
break
}
}
}
// Handles incoming idle notifications
// Loops over packets and sends the newest one that's OK for this peer to send
// Returns true if the peer is no longer idle, false if it should be added to the idle list
func (t *switchTable) _handleIdle(port switchPort) bool {
// TODO? only send packets for which this is the best next hop that isn't currently blocked sending
to := t.core.peers.getPorts()[port]
if to == nil {
return true
}
var packets [][]byte
var psize int
t.queues._cleanup(t)
now := time.Now()
for psize < 65535 {
var best *string
var bestPriority float64
for streamID, buf := range t.queues.bufs {
// Filter over the streams that this node is closer to
// Keep the one with the smallest queue
packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority >= bestPriority && t.portIsCloser(coords, port) {
b := streamID // copy since streamID is mutated in the loop
best = &b
bestPriority = priority
}
}
if best != nil {
buf := t.queues.bufs[*best]
var packet switch_packetInfo
// TODO decide if this should be LIFO or FIFO
packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes))
t.queues.size -= uint64(len(packet.bytes))
if len(buf.packets) == 0 {
delete(t.queues.bufs, *best)
} else {
// Need to update the map, since buf was retrieved by value
t.queues.bufs[*best] = buf
}
packets = append(packets, packet.bytes)
psize += len(packet.bytes)
} else {
// Finished finding packets
break
}
}
if len(packets) > 0 {
to.sendPacketsFrom(t, packets)
return true
}
return false
}
func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
t.Act(from, func() {
t._packetIn(bytes)
})
}
func (t *switchTable) _packetIn(bytes []byte) {
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t._handleIn(bytes, t.idle, t.sending) {
// There's nobody free to take it right now, so queue it for later
packet := switch_packetInfo{bytes, time.Now()}
streamID := switch_getPacketStreamID(packet.bytes)
buf, bufExists := t.queues.bufs[streamID]
buf.packets = append(buf.packets, packet)
buf.size += uint64(len(packet.bytes))
t.queues.size += uint64(len(packet.bytes))
// Keep a track of the max total queue size
if t.queues.size > t.queues.maxsize {
t.queues.maxsize = t.queues.size
}
t.queues.bufs[streamID] = buf
if !bufExists {
// Keep a track of the max total queue count. Only recalculate this
// when the queue is new because otherwise repeating len(dict) might
// cause unnecessary processing overhead
if len(t.queues.bufs) > t.queues.maxbufs {
t.queues.maxbufs = len(t.queues.bufs)
}
}
t.queues._cleanup(t)
}
}
func (t *switchTable) _idleIn(port switchPort) {
// Try to find something to send to this peer
delete(t.sending, port)
if !t._handleIdle(port) {
// Didn't find anything ready to send yet, so stay idle
t.idle[port] = struct{}{}
}
}
func (t *switchTable) _sendingIn(port switchPort) {
if _, isIn := t.idle[port]; !isIn {
t.sending[port] = struct{}{}
}
} }