diff --git a/cmd/yggdrasilsim/dial.go b/cmd/yggdrasilsim/dial.go index 5713fdd7..c7892d40 100644 --- a/cmd/yggdrasilsim/dial.go +++ b/cmd/yggdrasilsim/dial.go @@ -9,6 +9,7 @@ import ( ) func doListen(recvNode *simNode) { + // TODO be able to stop the listeners somehow so they don't leak across different tests for { c, err := recvNode.listener.Accept() if err != nil { diff --git a/cmd/yggdrasilsim/main.go b/cmd/yggdrasilsim/main.go index fcbcfc97..25504c92 100644 --- a/cmd/yggdrasilsim/main.go +++ b/cmd/yggdrasilsim/main.go @@ -1,12 +1,5 @@ package main -import ( -//"github.com/yggdrasil-network/yggdrasil-go/src/address" -//"github.com/yggdrasil-network/yggdrasil-go/src/config" -//"github.com/yggdrasil-network/yggdrasil-go/src/crypto" -//"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" -) - func main() { store := makeStoreSquareGrid(4) dialStore(store) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 733b9ac1..7f6b9b56 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -217,9 +217,23 @@ func (intf *linkInterface) handler() error { intf.link.mutex.Unlock() // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) + out := func(msgs [][]byte) { + // nil to prevent it from blocking if the link is somehow frozen + // this is safe because another packet won't be sent until the link notifies + // the peer that it's ready for one + intf.writer.sendFrom(nil, msgs, false) + } + linkOut := func(bs []byte) { + // nil to prevent it from blocking if the link is somehow frozen + // FIXME this is hypothetically not safe, the peer shouldn't be sending + // additional packets until this one finishes, otherwise this could leak + // memory if writing happens slower than link packets are generated... + // that seems unlikely, so it's a lesser evil than deadlocking for now + intf.writer.sendFrom(nil, [][]byte{bs}, true) + } phony.Block(&intf.link.core.peers, func() { // FIXME don't use phony.Block, it's bad practice, even if it's safe here - intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }) + intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut) }) if intf.peer == nil { return errors.New("failed to create peer") @@ -228,20 +242,6 @@ func (intf *linkInterface) handler() error { // More cleanup can go here intf.peer.Act(nil, intf.peer._removeSelf) }() - intf.peer.out = func(msgs [][]byte) { - // nil to prevent it from blocking if the link is somehow frozen - // this is safe because another packet won't be sent until the link notifies - // the peer that it's ready for one - intf.writer.sendFrom(nil, msgs, false) - } - intf.peer.linkOut = func(bs []byte) { - // nil to prevent it from blocking if the link is somehow frozen - // FIXME this is hypothetically not safe, the peer shouldn't be sending - // additional packets until this one finishes, otherwise this could leak - // memory if writing happens slower than link packets are generated... - // that seems unlikely, so it's a lesser evil than deadlocking for now - intf.writer.sendFrom(nil, [][]byte{bs}, true) - } themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 7eef9a11..801691a0 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -123,7 +123,7 @@ func (ps *peers) _updatePeers() { } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. -func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer { +func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer { now := time.Now() p := peer{box: *box, sig: *sig, @@ -134,6 +134,8 @@ func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShar close: closer, core: ps.core, intf: intf, + out: out, + linkOut: linkOut, } oldPorts := ps.ports newPorts := make(map[switchPort]*peer) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 71d92609..1bb14c4c 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -62,17 +62,17 @@ func (r *router) init(core *Core) { }, } var p *peer - phony.Block(&r.core.peers, func() { - // FIXME don't block here! - p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - }) - p.out = func(packets [][]byte) { + peerOut := func(packets [][]byte) { r.handlePackets(p, packets) r.Act(p, func() { // after the router handle the packets, notify the peer that it's ready for more p.Act(r, p._handleIdle) }) } + phony.Block(&r.core.peers, func() { + // FIXME don't block here! + p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil) + }) p.Act(r, p._handleIdle) r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) diff --git a/src/yggdrasil/simlink.go b/src/yggdrasil/simlink.go index 736ee633..f830c215 100644 --- a/src/yggdrasil/simlink.go +++ b/src/yggdrasil/simlink.go @@ -57,7 +57,11 @@ func (s *Simlink) writeMsgs(msgs [][]byte) (int, error) { func (c *Core) NewSimlink() *Simlink { s := &Simlink{rch: make(chan []byte, 1)} n := "Simlink" - s.link, _ = c.link.create(s, n, n, n, n, false, true) + var err error + s.link, err = c.link.create(s, n, n, n, n, false, true) + if err != nil { + panic(err) + } return s }