From 691192ff5ae5a3f012f5f616327645b01e4c1462 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 21 Sep 2019 14:33:45 -0500 Subject: [PATCH 1/4] weird scheduler hack, seems to tend to make things more stable without actually locking streams to any particular link --- src/yggdrasil/switch.go | 11 ++++++++--- src/yggdrasil/tcp.go | 5 +++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f4df60d5..ece2fa26 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -712,10 +712,15 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo } } if best != nil { - // Send to the best idle next hop delete(idle, best.elem.port) - ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) - return true + // Tell ourselves to send to this node later + // If another (e.g. even better) hop becomes idle in the mean time, it'll take the packet instead + // FIXME this is just a hack, but seems to help with stability... + go t.Act(nil, func() { + t._idleIn(best.elem.port) + }) + //ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) + //return true } // Didn't find anyone idle to send it to return false diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index ed8f7b9b..66f708c2 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -233,8 +233,9 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) { } defer func() { // Block new calls for a little while, to mitigate livelock scenarios - time.Sleep(default_timeout) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + rand.Seed(time.Now().UnixNano()) + delay := default_timeout + time.Duration(rand.Intn(10000))*time.Millisecond + time.Sleep(delay) t.mutex.Lock() delete(t.calls, callname) t.mutex.Unlock() From 8c64e6fa093a06f166fd654450a8958191b9c6cd Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 24 Sep 2019 18:01:35 -0500 Subject: [PATCH 2/4] explicitly notify the switch when a link appears to be blocked in a send instead of assuming this is the case for all idle links. how we decide when it's really blocked still needs testing/optimizing --- src/yggdrasil/link.go | 17 +++++++++++++++++ src/yggdrasil/switch.go | 34 +++++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index a4a41e7f..ece69caf 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -64,6 +64,8 @@ type linkInterface struct { closeTimer *time.Timer // Fires when the link has been idle so long we need to close it inSwitch bool // True if the switch is tracking this link stalled bool // True if we haven't been receiving any response traffic + sendSeqSent uint // Incremented each time we start sending + sendSeqRecv uint // Incremented each time we finish sending } func (l *link) init(c *Core) error { @@ -273,9 +275,23 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { } intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() + intf.sendSeqSent++ + seq := intf.sendSeqSent + intf.Act(nil, func() { + intf._checkSending(seq) + }) }) } +// If check if we're still sending +func (intf *linkInterface) _checkSending(seq uint) { + if intf.sendSeqRecv != seq { + intf.link.core.switchTable.Act(intf, func() { + intf.link.core.switchTable._sendingIn(intf.peer.port) + }) + } +} + // we just sent something, so cancel any pending timer to send keep-alive traffic func (intf *linkInterface) _cancelStallTimer() { if intf.stallTimer != nil { @@ -305,6 +321,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) } + intf.sendSeqRecv++ }) } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ece2fa26..0150e173 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -177,6 +177,7 @@ type switchTable struct { phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor + sending map[switchPort]struct{} // peers known to be blocked in a send (somehow) } // Minimum allowed total size of switch queues. @@ -203,6 +204,7 @@ func (t *switchTable) init(core *Core) { core.config.Mutex.RUnlock() t.queues.bufs = make(map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) + t.sending = make(map[switchPort]struct{}) }) } @@ -527,7 +529,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep t.parent = sender.port t.core.peers.sendSwitchMsgs(t) } - if doUpdate { + if true || doUpdate { t.updater.Store(&sync.Once{}) } return @@ -664,7 +666,7 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) bool { +func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool { coords := switch_getPacketCoords(packet) closer := t.getCloser(coords) if len(closer) == 0 { @@ -677,12 +679,13 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo ports := t.core.peers.getPorts() for _, cinfo := range closer { to := ports[cinfo.elem.port] - _, isIdle := idle[cinfo.elem.port] + //_, isIdle := idle[cinfo.elem.port] + _, isSending := sending[cinfo.elem.port] var update bool switch { case to == nil: // no port was found, ignore it - case !isIdle: + case isSending: // the port is busy, ignore it case best == nil: // this is the first idle port we've found, so select it until we find a @@ -702,6 +705,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo // has a n older tstamp, so presumably a worse path case cinfo.elem.time.Before(best.elem.time): // same tstamp, but got it earlier, so presumably a better path + //t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time) update = true default: // the search for a port has finished @@ -712,13 +716,18 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo } } if best != nil { - delete(idle, best.elem.port) + if _, isIdle := idle[best.elem.port]; isIdle { + delete(idle, best.elem.port) + ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) + return true + } + //delete(idle, best.elem.port) // Tell ourselves to send to this node later // If another (e.g. even better) hop becomes idle in the mean time, it'll take the packet instead // FIXME this is just a hack, but seems to help with stability... - go t.Act(nil, func() { - t._idleIn(best.elem.port) - }) + //go t.Act(nil, func() { + // t._idleIn(best.elem.port) + //}) //ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) //return true } @@ -847,7 +856,7 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if !t._handleIn(bytes, t.idle) { + if !t._handleIn(bytes, t.idle, t.sending) { // There's nobody free to take it right now, so queue it for later packet := switch_packetInfo{bytes, time.Now()} streamID := switch_getPacketStreamID(packet.bytes) @@ -874,8 +883,15 @@ func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _idleIn(port switchPort) { // Try to find something to send to this peer + delete(t.sending, port) if !t._handleIdle(port) { // Didn't find anything ready to send yet, so stay idle t.idle[port] = struct{}{} } } + +func (t *switchTable) _sendingIn(port switchPort) { + if _, isIn := t.idle[port]; !isIn { + t.sending[port] = struct{}{} + } +} From b9e74f34ec2663b043374d1e0163aa55338792f5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Tue, 24 Sep 2019 18:28:13 -0500 Subject: [PATCH 3/4] replace the send-to-self with a timer and an arbitrary timeout; i don't really like this but it seems to work better (1 ms is fast by human standards but an eternity for a syscall or the scheduler, so i think that's reasonable) --- src/yggdrasil/link.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index ece69caf..875fde5e 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -64,8 +64,6 @@ type linkInterface struct { closeTimer *time.Timer // Fires when the link has been idle so long we need to close it inSwitch bool // True if the switch is tracking this link stalled bool // True if we haven't been receiving any response traffic - sendSeqSent uint // Incremented each time we start sending - sendSeqRecv uint // Incremented each time we finish sending } func (l *link) init(c *Core) error { @@ -275,21 +273,14 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { } intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() - intf.sendSeqSent++ - seq := intf.sendSeqSent - intf.Act(nil, func() { - intf._checkSending(seq) - }) }) } -// If check if we're still sending -func (intf *linkInterface) _checkSending(seq uint) { - if intf.sendSeqRecv != seq { - intf.link.core.switchTable.Act(intf, func() { - intf.link.core.switchTable._sendingIn(intf.peer.port) - }) - } +// called by an AfterFunc if we seem to be blocked in a send syscall for a long time +func (intf *linkInterface) _notifySyscall() { + intf.link.core.switchTable.Act(intf, func() { + intf.link.core.switchTable._sendingIn(intf.peer.port) + }) } // we just sent something, so cancel any pending timer to send keep-alive traffic @@ -321,7 +312,6 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) } - intf.sendSeqRecv++ }) } @@ -397,7 +387,15 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool size += len(bs) } w.intf.notifySending(size, isLinkTraffic) + var once sync.Once + timer := time.AfterFunc(time.Millisecond, func() { + once.Do(func() { + w.intf.Act(nil, w.intf._notifySyscall) + }) + }) w.intf.msgIO.writeMsgs(bss) + // Make sure we either stop the timer from doing anything or wait until it's done + once.Do(func() { timer.Stop() }) w.intf.notifySent(size, isLinkTraffic) // Cleanup for _, bs := range bss { From ac58c3586eacda562b3e71477a0f8353de460be5 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Wed, 25 Sep 2019 17:53:25 -0500 Subject: [PATCH 4/4] cleanup/comments --- src/yggdrasil/link.go | 10 ++++++++-- src/yggdrasil/switch.go | 10 +--------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 875fde5e..98c080c7 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -291,9 +291,11 @@ func (intf *linkInterface) _cancelStallTimer() { } } -// called by an AfterFunc if we appear to have timed out +// This gets called from a time.AfterFunc, and notifies the switch that we appear +// to have gotten blocked on a write, so the switch should start routing traffic +// through other links, if alternatives exist func (intf *linkInterface) notifyBlockedSend() { - intf.Act(nil, func() { // Sent from a time.AfterFunc + intf.Act(nil, func() { if intf.sendTimer != nil { //As far as we know, we're still trying to send, and the timer fired. intf.link.core.switchTable.blockPeer(intf.peer.port) @@ -387,8 +389,12 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool size += len(bs) } w.intf.notifySending(size, isLinkTraffic) + // start a timer that will fire if we get stuck in writeMsgs for an oddly long time var once sync.Once timer := time.AfterFunc(time.Millisecond, func() { + // 1 ms is kind of arbitrary + // the rationale is that this should be very long compared to a syscall + // but it's still short compared to end-to-end latency or human perception once.Do(func() { w.intf.Act(nil, w.intf._notifySyscall) }) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 0150e173..ba30758c 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -721,15 +721,6 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) return true } - //delete(idle, best.elem.port) - // Tell ourselves to send to this node later - // If another (e.g. even better) hop becomes idle in the mean time, it'll take the packet instead - // FIXME this is just a hack, but seems to help with stability... - //go t.Act(nil, func() { - // t._idleIn(best.elem.port) - //}) - //ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) - //return true } // Didn't find anyone idle to send it to return false @@ -799,6 +790,7 @@ func (b *switch_buffers) _cleanup(t *switchTable) { // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list func (t *switchTable) _handleIdle(port switchPort) bool { + // TODO? only send packets for which this is the best next hop that isn't currently blocked sending to := t.core.peers.getPorts()[port] if to == nil { return true