ugly work-in-progress to migrate link to the actor model

This commit is contained in:
Arceliar 2019-08-25 22:19:20 -05:00
parent dffd70119d
commit b5b179904b

View File

@ -54,6 +54,15 @@ type linkInterface struct {
incoming bool incoming bool
force bool force bool
closed chan struct{} closed chan struct{}
reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch
writer linkWriter // Writes packets, notifies this linkInterface
phony.Inbox // Protects the below
sendTimer *time.Timer // Fires to signal that sending is blocked
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
recvTimer *time.Timer // Fires to send keep-alive traffic
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
inSwitch bool // True if the switch is tracking this link
stalled bool // True if we haven't been receiving any response traffic
} }
func (l *link) init(c *Core) error { func (l *link) init(c *Core) error {
@ -124,6 +133,9 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
incoming: incoming, incoming: incoming,
force: force, force: force,
} }
intf.writer.intf = &intf
intf.reader.intf = &intf
intf.reader.err = make(chan error)
return &intf, nil return &intf, nil
} }
@ -187,7 +199,7 @@ func (intf *linkInterface) handler() error {
intf.link.mutex.Lock() intf.link.mutex.Lock()
delete(intf.link.interfaces, intf.info) delete(intf.link.interfaces, intf.info)
intf.link.mutex.Unlock() intf.link.mutex.Unlock()
close(intf.closed) //close(intf.closed)
}() }()
intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name) intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name)
} }
@ -203,13 +215,24 @@ func (intf *linkInterface) handler() error {
intf.link.core.peers.removePeer(intf.peer.port) intf.link.core.peers.removePeer(intf.peer.port)
}() }()
// Finish setting up the peer struct // Finish setting up the peer struct
/*
out := make(chan [][]byte, 1) out := make(chan [][]byte, 1)
defer close(out) defer close(out)
intf.peer.out = func(msgs [][]byte) { intf.peer.out = func(msgs [][]byte) {
defer func() { recover() }() defer func() { recover() }()
out <- msgs out <- msgs
} }
*/
intf.peer.out = func(msgs [][]byte) {
intf.writer.sendFrom(intf.peer, msgs, false)
}
intf.peer.linkOut = make(chan []byte, 1) intf.peer.linkOut = make(chan []byte, 1)
go func() {
// TODO fix this
for bs := range intf.peer.linkOut {
intf.writer.sendFrom(intf.peer, [][]byte{bs}, true)
}
}()
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
@ -218,6 +241,7 @@ func (intf *linkInterface) handler() error {
// Start the link loop // Start the link loop
go intf.peer.start() go intf.peer.start()
// Start the writer // Start the writer
/*
signalReady := make(chan struct{}, 1) signalReady := make(chan struct{}, 1)
signalSent := make(chan bool, 1) signalSent := make(chan bool, 1)
sendAck := make(chan struct{}, 1) sendAck := make(chan struct{}, 1)
@ -413,6 +437,7 @@ func (intf *linkInterface) handler() error {
// Now try to read again // Now try to read again
helper.RecvFrom(nil, helperFunc) helper.RecvFrom(nil, helperFunc)
} }
// Start the read loop // Start the read loop
helper.RecvFrom(nil, helperFunc) helper.RecvFrom(nil, helperFunc)
<-done // Wait for the helper to exit <-done // Wait for the helper to exit
@ -427,5 +452,189 @@ func (intf *linkInterface) handler() error {
intf.link.core.log.Infof("Disconnected %s: %s, source %s", intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
} }
*/
// Start the reader
intf.reader.RecvFrom(nil, intf.reader._read)
// Wait for the reader to finish
err = <- intf.reader.err
if err != nil {
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
} else {
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
return err return err
} }
////////////////////////////////////////////////////////////////////////////////
/*
phony.Inbox // Protects the below
sendTimer *time.Timer // Fires to signal that sending is blocked
stallTimer *time.Time // Fires to signal that no incoming traffic (including keep-alive) has been seen
recvTimer *time.Timer // Fires to send keep-alive traffic
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
inSwitch bool // True if the switch is tracking this link
stalled bool // True if we haven't been receiving any response traffic
*/
const (
sendBlockedTime = time.Second // How long to wait before deciding a send is blocked
keepAliveTime = 2*time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send
stallTime = 6*time.Second // How long to wait for response traffic before deciding the connection has stalled
closeTime = 2*switch_timeout // How long to wait before closing the link
)
// notify the intf that we're currently sending
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
intf.RecvFrom(nil, func() {
if !isLinkTraffic && size > 0 {
intf.inSwitch = false
}
intf.sendTimer = time.AfterFunc(sendBlockedTime, intf.notifyBlockedSend)
intf._cancelRecvTimer()
})
}
// we just sent something, so cancel any pending timer to send keep-alive traffic
func (intf *linkInterface) _cancelRecvTimer() {
intf.RecvFrom(nil, func() {
if intf.recvTimer != nil {
intf.recvTimer.Stop()
intf.recvTimer = nil
}
})
}
// called by an AfterFunc if we appear to have timed out
func (intf *linkInterface) notifyBlockedSend() {
intf.RecvFrom(nil, func() {
if intf.sendTimer != nil {
//As far as we know, we're still trying to send, and the timer fired.
intf.link.core.switchTable.blockPeer(intf.peer.port)
}
})
}
// notify the intf that we've finished sending, returning the peer to the switch
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
intf.RecvFrom(nil, func() {
intf.sendTimer.Stop()
intf.sendTimer = nil
if !isLinkTraffic {
intf._notifySwitch()
}
if size > 0 && intf.stallTimer == nil {
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
}
})
}
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state
func (intf *linkInterface) _notifySwitch() {
if !intf.inSwitch && !intf.stalled {
intf.inSwitch = true
intf.link.core.switchTable.RecvFrom(intf, func() {
intf.link.core.switchTable._idleIn(intf.peer.port)
})
}
}
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
func (intf *linkInterface) notifyStalled() {
intf.RecvFrom(nil, func() {
if intf.stallTimer != nil {
intf.stallTimer = nil
intf.stalled = true
intf.link.core.switchTable.blockPeer(intf.peer.port)
}
})
}
// reset the close timer
func (intf *linkInterface) notifyReading(from phony.Actor) {
intf.RecvFrom(from, func() {
if intf.closeTimer != nil {
intf.closeTimer.Stop()
}
intf.closeTimer = time.AfterFunc(closeTime, func() { intf.msgIO.close() })
})
}
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
func (intf *linkInterface) notifyReadFrom(from phony.Actor, size int) {
intf.RecvFrom(from, func() {
intf.link.core.log.Printf("DEBUG notifyReadFrom: inSwitch %v, stalled %v\n", intf.inSwitch, intf.stalled)
if intf.stallTimer != nil {
intf.stallTimer.Stop()
intf.stallTimer = nil
}
intf.stalled = false
intf._notifySwitch()
if size > 0 && intf.recvTimer == nil {
intf.recvTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
}
})
}
// We need to send keep-alive traffic now
func (intf *linkInterface) notifyDoKeepAlive() {
intf.RecvFrom(nil, func() {
if intf.recvTimer != nil {
intf.recvTimer.Stop()
intf.recvTimer = nil
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic
}
})
}
////////////////////////////////////////////////////////////////////////////////
type linkWriter struct {
phony.Inbox
intf *linkInterface
}
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) {
w.RecvFrom(from, func() {
var size int
for _, bs := range bss {
size += len(bs)
}
w.intf.notifySending(size, isLinkTraffic)
w.intf.msgIO.writeMsgs(bss)
w.intf.notifySent(size, isLinkTraffic)
w.intf.link.core.log.Println("DEBUG: wrote something, size:", size, "isLinkTraffic:", isLinkTraffic)
})
}
////////////////////////////////////////////////////////////////////////////////
type linkReader struct {
phony.Inbox
intf *linkInterface
err chan error
}
func (r *linkReader) _read() {
r.intf.notifyReading(r)
msg, err := r.intf.msgIO.readMsg()
r.intf.link.core.log.Println("DEBUG read something")
r.intf.notifyReadFrom(r, len(msg))
if len(msg) > 0 {
r.intf.peer.handlePacketFrom(r, msg)
}
if err != nil {
if err != io.EOF {
r.err<-err
}
close(r.err)
return
}
// Now try to read again
r.RecvFrom(nil, r._read)
}