Merge pull request #702 from Arceliar/switch

Precompute more for the switch lookup table
This commit is contained in:
Arceliar 2020-05-30 18:39:43 -05:00 committed by GitHub
commit aec82d7a39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -93,6 +93,20 @@ func (l *switchLocator) dist(dest []byte) int {
return dist return dist
} }
func (l *switchLocator) ldist(sl *switchLocator) int {
lca := -1
for idx := 0; idx < len(l.coords); idx++ {
if idx >= len(sl.coords) {
break
}
if l.coords[idx] != sl.coords[idx] {
break
}
lca = idx
}
return len(l.coords) + len(sl.coords) - 2*(lca+1)
}
// Gets coords in wire encoded format, with *no* length prefix. // Gets coords in wire encoded format, with *no* length prefix.
func (l *switchLocator) getCoords() []byte { func (l *switchLocator) getCoords() []byte {
bs := make([]byte, 0, len(l.coords)) bs := make([]byte, 0, len(l.coords))
@ -140,13 +154,15 @@ type tableElem struct {
port switchPort port switchPort
locator switchLocator locator switchLocator
time time.Time time time.Time
next map[switchPort]*tableElem
} }
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go). // This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
type lookupTable struct { type lookupTable struct {
self switchLocator self switchLocator
elems map[switchPort]tableElem elems map[switchPort]tableElem // all switch peers, just for sanity checks + API/debugging
_msg switchMsg _start tableElem // used for lookups
_msg switchMsg
} }
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically. // This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
@ -155,7 +171,6 @@ type switchData struct {
// All data that's mutable and used by exported Table methods // All data that's mutable and used by exported Table methods
// To be read/written with atomic.Value Store/Load calls // To be read/written with atomic.Value Store/Load calls
locator switchLocator locator switchLocator
seq uint64 // Sequence number, reported to peers, so they know about changes
peers map[switchPort]peerInfo peers map[switchPort]peerInfo
msg *switchMsg msg *switchMsg
} }
@ -226,7 +241,6 @@ func (t *switchTable) _cleanRoot() {
t.parent = switchPort(0) t.parent = switchPort(0)
t.time = now t.time = now
if t.data.locator.root != t.key { if t.data.locator.root != t.key {
t.data.seq++
defer t.core.router.reset(nil) defer t.core.router.reset(nil)
} }
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
@ -376,6 +390,9 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
sender.key = prevKey sender.key = prevKey
prevKey = hop.Next prevKey = hop.Next
} }
if sender.key == t.key {
return // Don't peer with ourself via different interfaces
}
sender.msg = *msg sender.msg = *msg
sender.port = fromPort sender.port = fromPort
sender.time = now sender.time = now
@ -428,6 +445,9 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
} }
} }
} }
if sender.blocked != oldSender.blocked {
doUpdate = true
}
// Update sender // Update sender
t.data.peers[fromPort] = sender t.data.peers[fromPort] = sender
// Decide if we should also update our root info to make the sender our parent // Decide if we should also update our root info to make the sender our parent
@ -497,9 +517,8 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
} }
// Note that we depend on the LIFO order of the stack of defers here... // Note that we depend on the LIFO order of the stack of defers here...
if updateRoot { if updateRoot {
doUpdate = true
if !equiv(&sender.locator, &t.data.locator) { if !equiv(&sender.locator, &t.data.locator) {
doUpdate = true
t.data.seq++
defer t.core.router.reset(t) defer t.core.router.reset(t)
} }
if t.data.locator.tstamp != sender.locator.tstamp { if t.data.locator.tstamp != sender.locator.tstamp {
@ -510,47 +529,88 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
defer t.core.peers.sendSwitchMsgs(t) defer t.core.peers.sendSwitchMsgs(t)
} }
if doUpdate { if doUpdate {
defer t._updateTable() t._updateTable()
} }
return return
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// The rest of these are related to the switch worker // The rest of these are related to the switch lookup table
// This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions.
func (t *switchTable) _updateTable() { func (t *switchTable) _updateTable() {
// WARNING this should only be called from within t.data.updater.Do()
// It relies on the sync.Once for synchronization with messages and lookups
// TODO use a pre-computed faster lookup table
// Instead of checking distance for every destination every time
// Array of structs, indexed by first coord that differs from self
// Each struct has stores the best port to forward to, and a next coord map
// Move to struct, then iterate over coord maps until you dead end
// The last port before the dead end should be the closest
newTable := lookupTable{ newTable := lookupTable{
self: t.data.locator.clone(), self: t.data.locator.clone(),
elems: make(map[switchPort]tableElem, len(t.data.peers)), elems: make(map[switchPort]tableElem, len(t.data.peers)),
_msg: *t._getMsg(),
} }
newTable._init()
for _, pinfo := range t.data.peers { for _, pinfo := range t.data.peers {
//if !pinfo.forward { continue } if pinfo.blocked || pinfo.locator.root != newTable.self.root {
if pinfo.locator.root != newTable.self.root {
continue continue
} }
loc := pinfo.locator.clone() loc := pinfo.locator.clone()
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
newTable.elems[pinfo.port] = tableElem{ elem := tableElem{
locator: loc, locator: loc,
port: pinfo.port, port: pinfo.port,
time: pinfo.time, time: pinfo.time,
} }
newTable._insert(&elem)
newTable.elems[pinfo.port] = elem
} }
newTable._msg = *t._getMsg()
t.core.peers.updateTables(t, &newTable) t.core.peers.updateTables(t, &newTable)
t.core.router.updateTable(t, &newTable) t.core.router.updateTable(t, &newTable)
} }
func (t *lookupTable) _init() {
// WARNING: this relies on the convention that the self port is 0
self := tableElem{locator: t.self} // create self elem
t._start = self // initialize _start to self
t._insert(&self) // insert self into table
}
func (t *lookupTable) _insert(elem *tableElem) {
// This is a helper that should only be run during _updateTable
here := &t._start
for idx := 0; idx <= len(elem.locator.coords); idx++ {
refLoc := here.locator
refLoc.coords = refLoc.coords[:idx] // Note that this is length idx (starts at length 0)
oldDist := refLoc.ldist(&here.locator)
newDist := refLoc.ldist(&elem.locator)
var update bool
switch {
case newDist < oldDist: // new elem is closer to this point in the tree
update = true
case newDist > oldDist: // new elem is too far
case elem.locator.tstamp > refLoc.tstamp: // new elem has a closer timestamp
update = true
case elem.locator.tstamp < refLoc.tstamp: // new elem's timestamp is too old
case elem.time.Before(here.time): // same dist+timestamp, but new elem delivered it faster
update = true
}
if update {
here.port = elem.port
here.locator = elem.locator
here.time = elem.time
// Problem: here is a value, so this doesn't actually update anything...
}
if idx < len(elem.locator.coords) {
if here.next == nil {
here.next = make(map[switchPort]*tableElem)
}
var next *tableElem
var ok bool
if next, ok = here.next[elem.locator.coords[idx]]; !ok {
nextVal := *elem
next = &nextVal
here.next[next.locator.coords[idx]] = next
}
here = next
}
}
}
// Starts the switch worker // Starts the switch worker
func (t *switchTable) start() error { func (t *switchTable) start() error {
t.core.log.Infoln("Starting switch") t.core.log.Infoln("Starting switch")
@ -558,39 +618,17 @@ func (t *switchTable) start() error {
return nil return nil
} }
// Find the best port to forward to for a given set of coords
func (t *lookupTable) lookup(coords []byte) switchPort { func (t *lookupTable) lookup(coords []byte) switchPort {
var bestPort switchPort var offset int
myDist := t.self.dist(coords) here := &t._start
bestDist := myDist for offset < len(coords) {
var bestElem tableElem port, l := wire_decode_uint64(coords[offset:])
for _, info := range t.elems { offset += l
dist := info.locator.dist(coords) if next, ok := here.next[switchPort(port)]; ok {
if dist >= myDist { here = next
continue } else {
} break
var update bool
switch {
case dist < bestDist:
// Closer to destination
update = true
case dist > bestDist:
// Further from destination
case info.locator.tstamp > bestElem.locator.tstamp:
// Newer root update
update = true
case info.locator.tstamp < bestElem.locator.tstamp:
// Older root update
case info.time.Before(bestElem.time):
// Received root update via this peer sooner
update = true
default:
}
if update {
bestPort = info.port
bestDist = dist
bestElem = info
} }
} }
return bestPort return here.port
} }