mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-11-30 13:35:19 +00:00
Merge pull request #220 from Arceliar/switch
More latency-based switch optimizations
This commit is contained in:
commit
58b60af208
@ -504,27 +504,43 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) {
|
|||||||
|
|
||||||
func DEBUG_simLinkPeers(p, q *peer) {
|
func DEBUG_simLinkPeers(p, q *peer) {
|
||||||
// Sets q.out() to point to p and starts p.linkLoop()
|
// Sets q.out() to point to p and starts p.linkLoop()
|
||||||
p.linkOut, q.linkOut = make(chan []byte, 1), make(chan []byte, 1)
|
goWorkers := func(source, dest *peer) {
|
||||||
|
source.linkOut = make(chan []byte, 1)
|
||||||
|
send := make(chan []byte, 1)
|
||||||
|
source.out = func(bs []byte) {
|
||||||
|
send <- bs
|
||||||
|
}
|
||||||
|
go source.linkLoop()
|
||||||
go func() {
|
go func() {
|
||||||
for bs := range p.linkOut {
|
var packets [][]byte
|
||||||
q.handlePacket(bs)
|
for {
|
||||||
|
select {
|
||||||
|
case packet := <-source.linkOut:
|
||||||
|
packets = append(packets, packet)
|
||||||
|
continue
|
||||||
|
case packet := <-send:
|
||||||
|
packets = append(packets, packet)
|
||||||
|
source.core.switchTable.idleIn <- source.port
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if len(packets) > 0 {
|
||||||
|
dest.handlePacket(packets[0])
|
||||||
|
packets = packets[1:]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case packet := <-source.linkOut:
|
||||||
|
packets = append(packets, packet)
|
||||||
|
case packet := <-send:
|
||||||
|
packets = append(packets, packet)
|
||||||
|
source.core.switchTable.idleIn <- source.port
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
go func() {
|
|
||||||
for bs := range q.linkOut {
|
|
||||||
p.handlePacket(bs)
|
|
||||||
}
|
}
|
||||||
}()
|
goWorkers(p, q)
|
||||||
p.out = func(bs []byte) {
|
goWorkers(q, p)
|
||||||
p.core.switchTable.idleIn <- p.port
|
|
||||||
go q.handlePacket(bs)
|
|
||||||
}
|
|
||||||
q.out = func(bs []byte) {
|
|
||||||
q.core.switchTable.idleIn <- q.port
|
|
||||||
go p.handlePacket(bs)
|
|
||||||
}
|
|
||||||
go p.linkLoop()
|
|
||||||
go q.linkLoop()
|
|
||||||
p.core.switchTable.idleIn <- p.port
|
p.core.switchTable.idleIn <- p.port
|
||||||
q.core.switchTable.idleIn <- q.port
|
q.core.switchTable.idleIn <- q.port
|
||||||
}
|
}
|
||||||
|
@ -377,6 +377,11 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
|
|||||||
doUpdate := false
|
doUpdate := false
|
||||||
oldSender := t.data.peers[fromPort]
|
oldSender := t.data.peers[fromPort]
|
||||||
if !equiv(&sender.locator, &oldSender.locator) {
|
if !equiv(&sender.locator, &oldSender.locator) {
|
||||||
|
// Reset faster info, we'll start refilling it right after this
|
||||||
|
sender.faster = nil
|
||||||
|
for _, peer := range t.data.peers {
|
||||||
|
delete(peer.faster, sender.port)
|
||||||
|
}
|
||||||
doUpdate = true
|
doUpdate = true
|
||||||
}
|
}
|
||||||
// Update the matrix of peer "faster" thresholds
|
// Update the matrix of peer "faster" thresholds
|
||||||
@ -387,25 +392,20 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
|
|||||||
for port, peer := range t.data.peers {
|
for port, peer := range t.data.peers {
|
||||||
if port == fromPort {
|
if port == fromPort {
|
||||||
continue
|
continue
|
||||||
}
|
} else if sender.locator.root != peer.locator.root || sender.locator.tstamp > peer.locator.tstamp {
|
||||||
switch {
|
|
||||||
case msg.Root != peer.locator.root:
|
|
||||||
// Different roots, blindly guess that the relationships will stay the same?
|
|
||||||
sender.faster[port] = oldSender.faster[peer.port]
|
|
||||||
case sender.locator.tstamp <= peer.locator.tstamp:
|
|
||||||
// Slower than this node, penalize (more than the reward amount)
|
|
||||||
if oldSender.faster[port] > 1 {
|
|
||||||
sender.faster[port] = oldSender.faster[peer.port] - 2
|
|
||||||
} else {
|
|
||||||
sender.faster[port] = 0
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
// We were faster than this node, so increment, as long as we don't overflow because of it
|
// We were faster than this node, so increment, as long as we don't overflow because of it
|
||||||
if oldSender.faster[peer.port] < switch_faster_threshold {
|
if oldSender.faster[peer.port] < switch_faster_threshold {
|
||||||
sender.faster[port] = oldSender.faster[peer.port] + 1
|
sender.faster[port] = oldSender.faster[peer.port] + 1
|
||||||
} else {
|
} else {
|
||||||
sender.faster[port] = switch_faster_threshold
|
sender.faster[port] = switch_faster_threshold
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Slower than this node, penalize (more than the reward amount)
|
||||||
|
if oldSender.faster[port] > 1 {
|
||||||
|
sender.faster[port] = oldSender.faster[peer.port] - 2
|
||||||
|
} else {
|
||||||
|
sender.faster[port] = 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -457,12 +457,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
|
|||||||
// First, reset all faster-related info to 0.
|
// First, reset all faster-related info to 0.
|
||||||
// Then, de-parent the node and reprocess all messages to find a new parent.
|
// Then, de-parent the node and reprocess all messages to find a new parent.
|
||||||
t.parent = 0
|
t.parent = 0
|
||||||
sender.faster = nil
|
|
||||||
for _, peer := range t.data.peers {
|
for _, peer := range t.data.peers {
|
||||||
if peer.port == sender.port {
|
if peer.port == sender.port {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
delete(peer.faster, sender.port)
|
|
||||||
t.unlockedHandleMsg(&peer.msg, peer.port, true)
|
t.unlockedHandleMsg(&peer.msg, peer.port, true)
|
||||||
}
|
}
|
||||||
// Process the sender last, to avoid keeping them as a parent if at all possible.
|
// Process the sender last, to avoid keeping them as a parent if at all possible.
|
||||||
|
Loading…
Reference in New Issue
Block a user