diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 1b48f391..5489c015 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -54,6 +54,15 @@ type linkInterface struct { incoming bool force bool closed chan struct{} + reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch + writer linkWriter // Writes packets, notifies this linkInterface + phony.Inbox // Protects the below + sendTimer *time.Timer // Fires to signal that sending is blocked + stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen + recvTimer *time.Timer // Fires to send keep-alive traffic + closeTimer *time.Timer // Fires when the link has been idle so long we need to close it + inSwitch bool // True if the switch is tracking this link + stalled bool // True if we haven't been receiving any response traffic } func (l *link) init(c *Core) error { @@ -124,6 +133,9 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st incoming: incoming, force: force, } + intf.writer.intf = &intf + intf.reader.intf = &intf + intf.reader.err = make(chan error) return &intf, nil } @@ -187,7 +199,7 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Lock() delete(intf.link.interfaces, intf.info) intf.link.mutex.Unlock() - close(intf.closed) + //close(intf.closed) }() intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name) } @@ -203,13 +215,24 @@ func (intf *linkInterface) handler() error { intf.link.core.peers.removePeer(intf.peer.port) }() // Finish setting up the peer struct + /* out := make(chan [][]byte, 1) defer close(out) intf.peer.out = func(msgs [][]byte) { defer func() { recover() }() out <- msgs } + */ + intf.peer.out = func(msgs [][]byte) { + intf.writer.sendFrom(intf.peer, msgs, false) + } intf.peer.linkOut = make(chan []byte, 1) + go func() { + // TODO fix this + for bs := range intf.peer.linkOut { + intf.writer.sendFrom(intf.peer, [][]byte{bs}, true) + } + }() themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) @@ -218,6 +241,7 @@ func (intf *linkInterface) handler() error { // Start the link loop go intf.peer.start() // Start the writer + /* signalReady := make(chan struct{}, 1) signalSent := make(chan bool, 1) sendAck := make(chan struct{}, 1) @@ -413,6 +437,7 @@ func (intf *linkInterface) handler() error { // Now try to read again helper.RecvFrom(nil, helperFunc) } + // Start the read loop helper.RecvFrom(nil, helperFunc) <-done // Wait for the helper to exit @@ -427,5 +452,189 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Infof("Disconnected %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) } + */ + // Start the reader + intf.reader.RecvFrom(nil, intf.reader._read) + // Wait for the reader to finish + err = <- intf.reader.err + if err != nil { + intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local, err) + } else { + intf.link.core.log.Infof("Disconnected %s: %s, source %s", + strings.ToUpper(intf.info.linkType), themString, intf.info.local) + } return err } + +//////////////////////////////////////////////////////////////////////////////// + + +/* + phony.Inbox // Protects the below + sendTimer *time.Timer // Fires to signal that sending is blocked + stallTimer *time.Time // Fires to signal that no incoming traffic (including keep-alive) has been seen + recvTimer *time.Timer // Fires to send keep-alive traffic + closeTimer *time.Timer // Fires when the link has been idle so long we need to close it + inSwitch bool // True if the switch is tracking this link + stalled bool // True if we haven't been receiving any response traffic +*/ + +const ( + sendBlockedTime = time.Second // How long to wait before deciding a send is blocked + keepAliveTime = 2*time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send + stallTime = 6*time.Second // How long to wait for response traffic before deciding the connection has stalled + closeTime = 2*switch_timeout // How long to wait before closing the link +) + +// notify the intf that we're currently sending +func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { + intf.RecvFrom(nil, func() { + if !isLinkTraffic && size > 0 { + intf.inSwitch = false + } + intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend) + intf._cancelRecvTimer() + }) +} + +// we just sent something, so cancel any pending timer to send keep-alive traffic +func (intf *linkInterface) _cancelRecvTimer() { + intf.RecvFrom(nil, func() { + if intf.recvTimer != nil { + intf.recvTimer.Stop() + intf.recvTimer = nil + } + }) +} + +// called by an AfterFunc if we appear to have timed out +func (intf *linkInterface) notifyBlockedSend() { + intf.RecvFrom(nil, func() { + if intf.sendTimer != nil { + //As far as we know, we're still trying to send, and the timer fired. + intf.link.core.switchTable.blockPeer(intf.peer.port) + } + }) +} + +// notify the intf that we've finished sending, returning the peer to the switch +func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { + intf.RecvFrom(nil, func() { + intf.sendTimer.Stop() + intf.sendTimer = nil + if !isLinkTraffic { + intf._notifySwitch() + } + if size > 0 && intf.stallTimer == nil { + intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) + } + }) +} + +// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state +func (intf *linkInterface) _notifySwitch() { + if !intf.inSwitch && !intf.stalled { + intf.inSwitch = true + intf.link.core.switchTable.RecvFrom(intf, func() { + intf.link.core.switchTable._idleIn(intf.peer.port) + }) + } +} + +// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds +func (intf *linkInterface) notifyStalled() { + intf.RecvFrom(nil, func() { + if intf.stallTimer != nil { + intf.stallTimer = nil + intf.stalled = true + intf.link.core.switchTable.blockPeer(intf.peer.port) + } + }) +} + + +// reset the close timer +func (intf *linkInterface) notifyReading(from phony.Actor) { + intf.RecvFrom(from, func() { + if intf.closeTimer != nil { + intf.closeTimer.Stop() + } + intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() }) + }) +} + +// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic +func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) { + intf.RecvFrom(from, func() { + intf.link.core.log.Printf("DEBUG notifyReadFrom: inSwitch %v, stalled %v\n", intf.inSwitch, intf.stalled) + if intf.stallTimer != nil { + intf.stallTimer.Stop() + intf.stallTimer = nil + } + intf.stalled = false + intf._notifySwitch() + if size > 0 && intf.recvTimer == nil { + intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive) + } + }) +} + +// We need to send keep-alive traffic now +func (intf *linkInterface) notifyDoKeepAlive() { + intf.RecvFrom(nil, func() { + if intf.recvTimer != nil { + intf.recvTimer.Stop() + intf.recvTimer = nil + intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic + } + }) +} + +//////////////////////////////////////////////////////////////////////////////// + +type linkWriter struct { + phony.Inbox + intf *linkInterface +} + +func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { + w.RecvFrom(from, func() { + var size int + for _, bs := range bss { + size += len(bs) + } + w.intf.notifySending(size, isLinkTraffic) + w.intf.msgIO.writeMsgs(bss) + w.intf.notifySent(size, isLinkTraffic) + w.intf.link.core.log.Println("DEBUG: wrote something, size:", size, "isLinkTraffic:", isLinkTraffic) + }) +} + +//////////////////////////////////////////////////////////////////////////////// + +type linkReader struct { + phony.Inbox + intf *linkInterface + err chan error +} + +func (r *linkReader) _read() { + r.intf.notifyReading(r) + msg, err := r.intf.msgIO.readMsg() + r.intf.link.core.log.Println("DEBUG read something") + r.intf.notifyReadFrom(r, len(msg)) + if len(msg) > 0 { + r.intf.peer.handlePacketFrom(r, msg) + } + if err != nil { + if err != io.EOF { + r.err<-err + } + close(r.err) + return + } + // Now try to read again + r.RecvFrom(nil, r._read) +} +