more cleanup

This commit is contained in:
Arceliar 2020-05-23 10:28:57 -05:00
parent ef1e506a0c
commit 59896f17fd
3 changed files with 50 additions and 50 deletions

View File

@ -21,11 +21,11 @@ import (
) )
type links struct { type links struct {
core *Core core *Core
mutex sync.RWMutex // protects interfaces below mutex sync.RWMutex // protects links below
interfaces map[linkInfo]*linkInterface links map[linkInfo]*link
tcp tcp // TCP interface support tcp tcp // TCP interface support
stopped chan struct{} stopped chan struct{}
// TODO timeout (to remove from switch), read from config.ReadTimeout // TODO timeout (to remove from switch), read from config.ReadTimeout
} }
@ -46,7 +46,7 @@ type linkMsgIO interface {
_recvMetaBytes() ([]byte, error) _recvMetaBytes() ([]byte, error)
} }
type linkInterface struct { type link struct {
lname string lname string
links *links links *links
peer *peer peer *peer
@ -55,8 +55,8 @@ type linkInterface struct {
incoming bool incoming bool
force bool force bool
closed chan struct{} closed chan struct{}
reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch reader linkReader // Reads packets, notifies this link, passes packets to switch
writer linkWriter // Writes packets, notifies this linkInterface writer linkWriter // Writes packets, notifies this link
phony.Inbox // Protects the below phony.Inbox // Protects the below
sendTimer *time.Timer // Fires to signal that sending is blocked sendTimer *time.Timer // Fires to signal that sending is blocked
keepAliveTimer *time.Timer // Fires to send keep-alive traffic keepAliveTimer *time.Timer // Fires to send keep-alive traffic
@ -69,7 +69,7 @@ type linkInterface struct {
func (l *links) init(c *Core) error { func (l *links) init(c *Core) error {
l.core = c l.core = c
l.mutex.Lock() l.mutex.Lock()
l.interfaces = make(map[linkInfo]*linkInterface) l.links = make(map[linkInfo]*link)
l.mutex.Unlock() l.mutex.Unlock()
l.stopped = make(chan struct{}) l.stopped = make(chan struct{})
@ -121,9 +121,9 @@ func (l *links) listen(uri string) error {
} }
} }
func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool) (*link, error) {
// Technically anything unique would work for names, but let's pick something human readable, just for debugging // Technically anything unique would work for names, but let's pick something human readable, just for debugging
intf := linkInterface{ intf := link{
lname: name, lname: name,
links: l, links: l,
msgIO: msgIO, msgIO: msgIO,
@ -150,7 +150,7 @@ func (l *links) stop() error {
return nil return nil
} }
func (intf *linkInterface) handler() error { func (intf *link) handler() error {
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
go func() { go func() {
for bss := range intf.writer.worker { for bss := range intf.writer.worker {
@ -201,7 +201,7 @@ func (intf *linkInterface) handler() error {
intf.info.box = meta.box intf.info.box = meta.box
intf.info.sig = meta.sig intf.info.sig = meta.sig
intf.links.mutex.Lock() intf.links.mutex.Lock()
if oldIntf, isIn := intf.links.interfaces[intf.info]; isIn { if oldIntf, isIn := intf.links.links[intf.info]; isIn {
intf.links.mutex.Unlock() intf.links.mutex.Unlock()
// FIXME we should really return an error and let the caller block instead // FIXME we should really return an error and let the caller block instead
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc. // That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
@ -214,10 +214,10 @@ func (intf *linkInterface) handler() error {
return nil return nil
} else { } else {
intf.closed = make(chan struct{}) intf.closed = make(chan struct{})
intf.links.interfaces[intf.info] = intf intf.links.links[intf.info] = intf
defer func() { defer func() {
intf.links.mutex.Lock() intf.links.mutex.Lock()
delete(intf.links.interfaces, intf.info) delete(intf.links.links, intf.info)
intf.links.mutex.Unlock() intf.links.mutex.Unlock()
close(intf.closed) close(intf.closed)
}() }()
@ -271,9 +271,21 @@ func (intf *linkInterface) handler() error {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// linkInterface needs to match the peerInterface type needed by the peers // link needs to match the linkInterface type needed by the peers
func (intf *linkInterface) out(bss [][]byte) { type linkInterface interface {
out([][]byte)
linkOut([]byte)
notifyQueued(uint64)
close()
// These next ones are only used by the API
name() string
local() string
remote() string
interfaceType() string
}
func (intf *link) out(bss [][]byte) {
intf.Act(nil, func() { intf.Act(nil, func() {
// nil to prevent it from blocking if the link is somehow frozen // nil to prevent it from blocking if the link is somehow frozen
// this is safe because another packet won't be sent until the link notifies // this is safe because another packet won't be sent until the link notifies
@ -282,7 +294,7 @@ func (intf *linkInterface) out(bss [][]byte) {
}) })
} }
func (intf *linkInterface) linkOut(bs []byte) { func (intf *link) linkOut(bs []byte) {
intf.Act(nil, func() { intf.Act(nil, func() {
// nil to prevent it from blocking if the link is somehow frozen // nil to prevent it from blocking if the link is somehow frozen
// FIXME this is hypothetically not safe, the peer shouldn't be sending // FIXME this is hypothetically not safe, the peer shouldn't be sending
@ -293,7 +305,7 @@ func (intf *linkInterface) linkOut(bs []byte) {
}) })
} }
func (intf *linkInterface) notifyQueued(seq uint64) { func (intf *link) notifyQueued(seq uint64) {
// This is the part where we want non-nil 'from' fields // This is the part where we want non-nil 'from' fields
intf.Act(intf.peer, func() { intf.Act(intf.peer, func() {
if intf.isSending { if intf.isSending {
@ -302,23 +314,23 @@ func (intf *linkInterface) notifyQueued(seq uint64) {
}) })
} }
func (intf *linkInterface) close() { func (intf *link) close() {
intf.Act(nil, func() { intf.msgIO.close() }) intf.Act(nil, func() { intf.msgIO.close() })
} }
func (intf *linkInterface) name() string { func (intf *link) name() string {
return intf.lname return intf.lname
} }
func (intf *linkInterface) local() string { func (intf *link) local() string {
return intf.info.local return intf.info.local
} }
func (intf *linkInterface) remote() string { func (intf *link) remote() string {
return intf.info.remote return intf.info.remote
} }
func (intf *linkInterface) interfaceType() string { func (intf *link) interfaceType() string {
return intf.info.linkType return intf.info.linkType
} }
@ -331,7 +343,7 @@ const (
) )
// notify the intf that we're currently sending // notify the intf that we're currently sending
func (intf *linkInterface) notifySending(size int) { func (intf *link) notifySending(size int) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
intf.isSending = true intf.isSending = true
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
@ -340,7 +352,7 @@ func (intf *linkInterface) notifySending(size int) {
} }
// we just sent something, so cancel any pending timer to send keep-alive traffic // we just sent something, so cancel any pending timer to send keep-alive traffic
func (intf *linkInterface) _cancelStallTimer() { func (intf *link) _cancelStallTimer() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
intf.stallTimer = nil intf.stallTimer = nil
@ -350,7 +362,7 @@ func (intf *linkInterface) _cancelStallTimer() {
// This gets called from a time.AfterFunc, and notifies the switch that we appear // This gets called from a time.AfterFunc, and notifies the switch that we appear
// to have gotten blocked on a write, so the switch should start routing traffic // to have gotten blocked on a write, so the switch should start routing traffic
// through other links, if alternatives exist // through other links, if alternatives exist
func (intf *linkInterface) notifyBlockedSend() { func (intf *link) notifyBlockedSend() {
intf.Act(nil, func() { intf.Act(nil, func() {
if intf.sendTimer != nil && !intf.blocked { if intf.sendTimer != nil && !intf.blocked {
//As far as we know, we're still trying to send, and the timer fired. //As far as we know, we're still trying to send, and the timer fired.
@ -361,7 +373,7 @@ func (intf *linkInterface) notifyBlockedSend() {
} }
// notify the intf that we've finished sending, returning the peer to the switch // notify the intf that we've finished sending, returning the peer to the switch
func (intf *linkInterface) notifySent(size int) { func (intf *link) notifySent(size int) {
intf.Act(&intf.writer, func() { intf.Act(&intf.writer, func() {
if intf.sendTimer != nil { if intf.sendTimer != nil {
intf.sendTimer.Stop() intf.sendTimer.Stop()
@ -381,12 +393,12 @@ func (intf *linkInterface) notifySent(size int) {
} }
// Notify the peer that we're ready for more traffic // Notify the peer that we're ready for more traffic
func (intf *linkInterface) _notifyIdle() { func (intf *link) _notifyIdle() {
intf.peer.Act(intf, intf.peer._handleIdle) intf.peer.Act(intf, intf.peer._handleIdle)
} }
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds // Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
func (intf *linkInterface) notifyStalled() { func (intf *link) notifyStalled() {
intf.Act(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil && !intf.blocked { if intf.stallTimer != nil && !intf.blocked {
intf.stallTimer.Stop() intf.stallTimer.Stop()
@ -398,7 +410,7 @@ func (intf *linkInterface) notifyStalled() {
} }
// reset the close timer // reset the close timer
func (intf *linkInterface) notifyReading() { func (intf *link) notifyReading() {
intf.Act(&intf.reader, func() { intf.Act(&intf.reader, func() {
if intf.closeTimer != nil { if intf.closeTimer != nil {
intf.closeTimer.Stop() intf.closeTimer.Stop()
@ -408,7 +420,7 @@ func (intf *linkInterface) notifyReading() {
} }
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic // wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
func (intf *linkInterface) notifyRead(size int) { func (intf *link) notifyRead(size int) {
intf.Act(&intf.reader, func() { intf.Act(&intf.reader, func() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
@ -425,7 +437,7 @@ func (intf *linkInterface) notifyRead(size int) {
} }
// We need to send keep-alive traffic now // We need to send keep-alive traffic now
func (intf *linkInterface) notifyDoKeepAlive() { func (intf *link) notifyDoKeepAlive() {
intf.Act(nil, func() { // Sent from a time.AfterFunc intf.Act(nil, func() { // Sent from a time.AfterFunc
if intf.stallTimer != nil { if intf.stallTimer != nil {
intf.stallTimer.Stop() intf.stallTimer.Stop()
@ -439,7 +451,7 @@ func (intf *linkInterface) notifyDoKeepAlive() {
type linkWriter struct { type linkWriter struct {
phony.Inbox phony.Inbox
intf *linkInterface intf *link
worker chan [][]byte worker chan [][]byte
closed bool closed bool
} }
@ -463,7 +475,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) {
type linkReader struct { type linkReader struct {
phony.Inbox phony.Inbox
intf *linkInterface intf *link
err chan error err chan error
} }

View File

@ -77,23 +77,11 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string {
return ps.core.config.Current.AllowedEncryptionPublicKeys return ps.core.config.Current.AllowedEncryptionPublicKeys
} }
type peerInterface interface {
out([][]byte)
linkOut([]byte)
notifyQueued(uint64)
close()
// These next ones are only used by the API
name() string
local() string
remote() string
interfaceType() string
}
// Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic // Information known about a peer, including their box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct { type peer struct {
phony.Inbox phony.Inbox
core *Core core *Core
intf peerInterface intf linkInterface
port switchPort port switchPort
box crypto.BoxPubKey box crypto.BoxPubKey
sig crypto.SigPubKey sig crypto.SigPubKey
@ -134,7 +122,7 @@ func (ps *peers) _updatePeers() {
} }
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf peerInterface) *peer { func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf linkInterface) *peer {
now := time.Now() now := time.Now()
p := peer{box: *box, p := peer{box: *box,
core: ps.core, core: ps.core,

View File

@ -9,7 +9,7 @@ type Simlink struct {
phony.Inbox phony.Inbox
rch chan []byte rch chan []byte
dest *Simlink dest *Simlink
link *linkInterface link *link
started bool started bool
} }