diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 9776ee50..067c2ecc 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -21,11 +21,11 @@ import ( ) type links struct { - core *Core - mutex sync.RWMutex // protects interfaces below - interfaces map[linkInfo]*linkInterface - tcp tcp // TCP interface support - stopped chan struct{} + core *Core + mutex sync.RWMutex // protects links below + links map[linkInfo]*link + tcp tcp // TCP interface support + stopped chan struct{} // TODO timeout (to remove from switch), read from config.ReadTimeout } @@ -46,7 +46,7 @@ type linkMsgIO interface { _recvMetaBytes() ([]byte, error) } -type linkInterface struct { +type link struct { lname string links *links peer *peer @@ -55,8 +55,8 @@ type linkInterface struct { incoming bool force bool closed chan struct{} - reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch - writer linkWriter // Writes packets, notifies this linkInterface + reader linkReader // Reads packets, notifies this link, passes packets to switch + writer linkWriter // Writes packets, notifies this link phony.Inbox // Protects the below sendTimer *time.Timer // Fires to signal that sending is blocked keepAliveTimer *time.Timer // Fires to send keep-alive traffic @@ -69,7 +69,7 @@ type linkInterface struct { func (l *links) init(c *Core) error { l.core = c l.mutex.Lock() - l.interfaces = make(map[linkInfo]*linkInterface) + l.links = make(map[linkInfo]*link) l.mutex.Unlock() l.stopped = make(chan struct{}) @@ -121,9 +121,9 @@ func (l *links) listen(uri string) error { } } -func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { +func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*link, error) { // Technically anything unique would work for names, but let's pick something human readable, just for debugging - intf := linkInterface{ + intf := link{ lname: name, links: l, msgIO: msgIO, @@ -150,7 +150,7 @@ func (l *links) stop() error { return nil } -func (intf *linkInterface) handler() error { +func (intf *link) handler() error { // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later go func() { for bss := range intf.writer.worker { @@ -201,7 +201,7 @@ func (intf *linkInterface) handler() error { intf.info.box = meta.box intf.info.sig = meta.sig intf.links.mutex.Lock() - if oldIntf, isIn := intf.links.interfaces[intf.info]; isIn { + if oldIntf, isIn := intf.links.links[intf.info]; isIn { intf.links.mutex.Unlock() // FIXME we should really return an error and let the caller block instead // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. @@ -214,10 +214,10 @@ func (intf *linkInterface) handler() error { return nil } else { intf.closed = make(chan struct{}) - intf.links.interfaces[intf.info] = intf + intf.links.links[intf.info] = intf defer func() { intf.links.mutex.Lock() - delete(intf.links.interfaces, intf.info) + delete(intf.links.links, intf.info) intf.links.mutex.Unlock() close(intf.closed) }() @@ -271,9 +271,21 @@ func (intf *linkInterface) handler() error { //////////////////////////////////////////////////////////////////////////////// -// linkInterface needs to match the peerInterface type needed by the peers +// link needs to match the linkInterface type needed by the peers -func (intf *linkInterface) out(bss [][]byte) { +type linkInterface interface { + out([][]byte) + linkOut([]byte) + notifyQueued(uint64) + close() + // These next ones are only used by the API + name() string + local() string + remote() string + interfaceType() string +} + +func (intf *link) out(bss [][]byte) { intf.Act(nil, func() { // nil to prevent it from blocking if the link is somehow frozen // this is safe because another packet won't be sent until the link notifies @@ -282,7 +294,7 @@ func (intf *linkInterface) out(bss [][]byte) { }) } -func (intf *linkInterface) linkOut(bs []byte) { +func (intf *link) linkOut(bs []byte) { intf.Act(nil, func() { // nil to prevent it from blocking if the link is somehow frozen // FIXME this is hypothetically not safe, the peer shouldn't be sending @@ -293,7 +305,7 @@ func (intf *linkInterface) linkOut(bs []byte) { }) } -func (intf *linkInterface) notifyQueued(seq uint64) { +func (intf *link) notifyQueued(seq uint64) { // This is the part where we want non-nil 'from' fields intf.Act(intf.peer, func() { if intf.isSending { @@ -302,23 +314,23 @@ func (intf *linkInterface) notifyQueued(seq uint64) { }) } -func (intf *linkInterface) close() { +func (intf *link) close() { intf.Act(nil, func() { intf.msgIO.close() }) } -func (intf *linkInterface) name() string { +func (intf *link) name() string { return intf.lname } -func (intf *linkInterface) local() string { +func (intf *link) local() string { return intf.info.local } -func (intf *linkInterface) remote() string { +func (intf *link) remote() string { return intf.info.remote } -func (intf *linkInterface) interfaceType() string { +func (intf *link) interfaceType() string { return intf.info.linkType } @@ -331,7 +343,7 @@ const ( ) // notify the intf that we're currently sending -func (intf *linkInterface) notifySending(size int) { +func (intf *link) notifySending(size int) { intf.Act(&intf.writer, func() { intf.isSending = true intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) @@ -340,7 +352,7 @@ func (intf *linkInterface) notifySending(size int) { } // we just sent something, so cancel any pending timer to send keep-alive traffic -func (intf *linkInterface) _cancelStallTimer() { +func (intf *link) _cancelStallTimer() { if intf.stallTimer != nil { intf.stallTimer.Stop() intf.stallTimer = nil @@ -350,7 +362,7 @@ func (intf *linkInterface) _cancelStallTimer() { // This gets called from a time.AfterFunc, and notifies the switch that we appear // to have gotten blocked on a write, so the switch should start routing traffic // through other links, if alternatives exist -func (intf *linkInterface) notifyBlockedSend() { +func (intf *link) notifyBlockedSend() { intf.Act(nil, func() { if intf.sendTimer != nil && !intf.blocked { //As far as we know, we're still trying to send, and the timer fired. @@ -361,7 +373,7 @@ func (intf *linkInterface) notifyBlockedSend() { } // notify the intf that we've finished sending, returning the peer to the switch -func (intf *linkInterface) notifySent(size int) { +func (intf *link) notifySent(size int) { intf.Act(&intf.writer, func() { if intf.sendTimer != nil { intf.sendTimer.Stop() @@ -381,12 +393,12 @@ func (intf *linkInterface) notifySent(size int) { } // Notify the peer that we're ready for more traffic -func (intf *linkInterface) _notifyIdle() { +func (intf *link) _notifyIdle() { intf.peer.Act(intf, intf.peer._handleIdle) } // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds -func (intf *linkInterface) notifyStalled() { +func (intf *link) notifyStalled() { intf.Act(nil, func() { // Sent from a time.AfterFunc if intf.stallTimer != nil && !intf.blocked { intf.stallTimer.Stop() @@ -398,7 +410,7 @@ func (intf *linkInterface) notifyStalled() { } // reset the close timer -func (intf *linkInterface) notifyReading() { +func (intf *link) notifyReading() { intf.Act(&intf.reader, func() { if intf.closeTimer != nil { intf.closeTimer.Stop() @@ -408,7 +420,7 @@ func (intf *linkInterface) notifyReading() { } // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic -func (intf *linkInterface) notifyRead(size int) { +func (intf *link) notifyRead(size int) { intf.Act(&intf.reader, func() { if intf.stallTimer != nil { intf.stallTimer.Stop() @@ -425,7 +437,7 @@ func (intf *linkInterface) notifyRead(size int) { } // We need to send keep-alive traffic now -func (intf *linkInterface) notifyDoKeepAlive() { +func (intf *link) notifyDoKeepAlive() { intf.Act(nil, func() { // Sent from a time.AfterFunc if intf.stallTimer != nil { intf.stallTimer.Stop() @@ -439,7 +451,7 @@ func (intf *linkInterface) notifyDoKeepAlive() { type linkWriter struct { phony.Inbox - intf *linkInterface + intf *link worker chan [][]byte closed bool } @@ -463,7 +475,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) { type linkReader struct { phony.Inbox - intf *linkInterface + intf *link err chan error } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 3cfc0b4f..4463bc6d 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -77,23 +77,11 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string { return ps.core.config.Current.AllowedEncryptionPublicKeys } -type peerInterface interface { - out([][]byte) - linkOut([]byte) - notifyQueued(uint64) - close() - // These next ones are only used by the API - name() string - local() string - remote() string - interfaceType() string -} - // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { phony.Inbox core *Core - intf peerInterface + intf linkInterface port switchPort box crypto.BoxPubKey sig crypto.SigPubKey @@ -134,7 +122,7 @@ func (ps *peers) _updatePeers() { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf peerInterface) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf linkInterface) *peer { now := time.Now() p := peer{box: *box, core: ps.core, diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index 6c04a8c0..6675981a 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -9,7 +9,7 @@ type Simlink struct { phony.Inbox rch chan []byte dest *Simlink - link *linkInterface + link *link started bool }