mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-11-24 02:25:21 +00:00
commit
90b7d9ef97
61
cmd/yggdrasilsim/dial.go
Normal file
61
cmd/yggdrasilsim/dial.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||||
|
)
|
||||||
|
|
||||||
|
func doListen(recvNode *simNode) {
|
||||||
|
// TODO be able to stop the listeners somehow so they don't leak across different tests
|
||||||
|
for {
|
||||||
|
c, err := recvNode.listener.Accept()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
c.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dialTest(sendNode, recvNode *simNode) {
|
||||||
|
if sendNode.id == recvNode.id {
|
||||||
|
fmt.Println("Skipping dial to self")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var mask crypto.NodeID
|
||||||
|
for idx := range mask {
|
||||||
|
mask[idx] = 0xff
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
c, err := sendNode.dialer.DialByNodeIDandMask(nil, &recvNode.nodeID, &mask)
|
||||||
|
if c != nil {
|
||||||
|
c.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Dial failed:", err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dialStore(store nodeStore) {
|
||||||
|
var nodeIdxs []int
|
||||||
|
for idx, n := range store {
|
||||||
|
nodeIdxs = append(nodeIdxs, idx)
|
||||||
|
go doListen(n)
|
||||||
|
}
|
||||||
|
sort.Slice(nodeIdxs, func(i, j int) bool {
|
||||||
|
return nodeIdxs[i] < nodeIdxs[j]
|
||||||
|
})
|
||||||
|
for _, idx := range nodeIdxs {
|
||||||
|
sendNode := store[idx]
|
||||||
|
for _, jdx := range nodeIdxs {
|
||||||
|
recvNode := store[jdx]
|
||||||
|
fmt.Printf("Dialing from node %d to node %d / %d...\n", idx, jdx, len(store))
|
||||||
|
dialTest(sendNode, recvNode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
6
cmd/yggdrasilsim/main.go
Normal file
6
cmd/yggdrasilsim/main.go
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
store := makeStoreSquareGrid(4)
|
||||||
|
dialStore(store)
|
||||||
|
}
|
28
cmd/yggdrasilsim/node.go
Normal file
28
cmd/yggdrasilsim/node.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
|
||||||
|
"github.com/gologme/log"
|
||||||
|
|
||||||
|
"github.com/yggdrasil-network/yggdrasil-go/src/config"
|
||||||
|
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||||
|
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type simNode struct {
|
||||||
|
core yggdrasil.Core
|
||||||
|
id int
|
||||||
|
nodeID crypto.NodeID
|
||||||
|
dialer *yggdrasil.Dialer
|
||||||
|
listener *yggdrasil.Listener
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNode(id int) *simNode {
|
||||||
|
n := simNode{id: id}
|
||||||
|
n.core.Start(config.GenerateConfig(), log.New(ioutil.Discard, "", 0))
|
||||||
|
n.nodeID = *n.core.NodeID()
|
||||||
|
n.dialer, _ = n.core.ConnDialer()
|
||||||
|
n.listener, _ = n.core.ConnListen()
|
||||||
|
return &n
|
||||||
|
}
|
41
cmd/yggdrasilsim/store.go
Normal file
41
cmd/yggdrasilsim/store.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
type nodeStore map[int]*simNode
|
||||||
|
|
||||||
|
func makeStoreSingle() nodeStore {
|
||||||
|
s := make(nodeStore)
|
||||||
|
s[0] = newNode(0)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func linkNodes(a *simNode, b *simNode) {
|
||||||
|
la := a.core.NewSimlink()
|
||||||
|
lb := b.core.NewSimlink()
|
||||||
|
la.SetDestination(lb)
|
||||||
|
lb.SetDestination(la)
|
||||||
|
la.Start()
|
||||||
|
lb.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeStoreSquareGrid(sideLength int) nodeStore {
|
||||||
|
store := make(nodeStore)
|
||||||
|
nNodes := sideLength * sideLength
|
||||||
|
idxs := make([]int, 0, nNodes)
|
||||||
|
// TODO shuffle nodeIDs
|
||||||
|
for idx := 1; idx <= nNodes; idx++ {
|
||||||
|
idxs = append(idxs, idx)
|
||||||
|
}
|
||||||
|
for _, idx := range idxs {
|
||||||
|
n := newNode(idx)
|
||||||
|
store[idx] = n
|
||||||
|
}
|
||||||
|
for idx := 0; idx < nNodes; idx++ {
|
||||||
|
if (idx % sideLength) != 0 {
|
||||||
|
linkNodes(store[idxs[idx]], store[idxs[idx-1]])
|
||||||
|
}
|
||||||
|
if idx >= sideLength {
|
||||||
|
linkNodes(store[idxs[idx]], store[idxs[idx-sideLength]])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return store
|
||||||
|
}
|
@ -217,9 +217,23 @@ func (intf *linkInterface) handler() error {
|
|||||||
intf.link.mutex.Unlock()
|
intf.link.mutex.Unlock()
|
||||||
// Create peer
|
// Create peer
|
||||||
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
||||||
|
out := func(msgs [][]byte) {
|
||||||
|
// nil to prevent it from blocking if the link is somehow frozen
|
||||||
|
// this is safe because another packet won't be sent until the link notifies
|
||||||
|
// the peer that it's ready for one
|
||||||
|
intf.writer.sendFrom(nil, msgs, false)
|
||||||
|
}
|
||||||
|
linkOut := func(bs []byte) {
|
||||||
|
// nil to prevent it from blocking if the link is somehow frozen
|
||||||
|
// FIXME this is hypothetically not safe, the peer shouldn't be sending
|
||||||
|
// additional packets until this one finishes, otherwise this could leak
|
||||||
|
// memory if writing happens slower than link packets are generated...
|
||||||
|
// that seems unlikely, so it's a lesser evil than deadlocking for now
|
||||||
|
intf.writer.sendFrom(nil, [][]byte{bs}, true)
|
||||||
|
}
|
||||||
phony.Block(&intf.link.core.peers, func() {
|
phony.Block(&intf.link.core.peers, func() {
|
||||||
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
|
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
|
||||||
intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() })
|
intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut)
|
||||||
})
|
})
|
||||||
if intf.peer == nil {
|
if intf.peer == nil {
|
||||||
return errors.New("failed to create peer")
|
return errors.New("failed to create peer")
|
||||||
@ -228,20 +242,6 @@ func (intf *linkInterface) handler() error {
|
|||||||
// More cleanup can go here
|
// More cleanup can go here
|
||||||
intf.peer.Act(nil, intf.peer._removeSelf)
|
intf.peer.Act(nil, intf.peer._removeSelf)
|
||||||
}()
|
}()
|
||||||
intf.peer.out = func(msgs [][]byte) {
|
|
||||||
// nil to prevent it from blocking if the link is somehow frozen
|
|
||||||
// this is safe because another packet won't be sent until the link notifies
|
|
||||||
// the peer that it's ready for one
|
|
||||||
intf.writer.sendFrom(nil, msgs, false)
|
|
||||||
}
|
|
||||||
intf.peer.linkOut = func(bs []byte) {
|
|
||||||
// nil to prevent it from blocking if the link is somehow frozen
|
|
||||||
// FIXME this is hypothetically not safe, the peer shouldn't be sending
|
|
||||||
// additional packets until this one finishes, otherwise this could leak
|
|
||||||
// memory if writing happens slower than link packets are generated...
|
|
||||||
// that seems unlikely, so it's a lesser evil than deadlocking for now
|
|
||||||
intf.writer.sendFrom(nil, [][]byte{bs}, true)
|
|
||||||
}
|
|
||||||
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
|
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
|
||||||
themAddrString := net.IP(themAddr[:]).String()
|
themAddrString := net.IP(themAddr[:]).String()
|
||||||
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
|
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
|
||||||
|
@ -123,7 +123,7 @@ func (ps *peers) _updatePeers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
|
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
|
||||||
func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer {
|
func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
p := peer{box: *box,
|
p := peer{box: *box,
|
||||||
sig: *sig,
|
sig: *sig,
|
||||||
@ -134,6 +134,8 @@ func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShar
|
|||||||
close: closer,
|
close: closer,
|
||||||
core: ps.core,
|
core: ps.core,
|
||||||
intf: intf,
|
intf: intf,
|
||||||
|
out: out,
|
||||||
|
linkOut: linkOut,
|
||||||
}
|
}
|
||||||
oldPorts := ps.ports
|
oldPorts := ps.ports
|
||||||
newPorts := make(map[switchPort]*peer)
|
newPorts := make(map[switchPort]*peer)
|
||||||
|
@ -62,17 +62,17 @@ func (r *router) init(core *Core) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
var p *peer
|
var p *peer
|
||||||
phony.Block(&r.core.peers, func() {
|
peerOut := func(packets [][]byte) {
|
||||||
// FIXME don't block here!
|
|
||||||
p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
|
|
||||||
})
|
|
||||||
p.out = func(packets [][]byte) {
|
|
||||||
r.handlePackets(p, packets)
|
r.handlePackets(p, packets)
|
||||||
r.Act(p, func() {
|
r.Act(p, func() {
|
||||||
// after the router handle the packets, notify the peer that it's ready for more
|
// after the router handle the packets, notify the peer that it's ready for more
|
||||||
p.Act(r, p._handleIdle)
|
p.Act(r, p._handleIdle)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
phony.Block(&r.core.peers, func() {
|
||||||
|
// FIXME don't block here!
|
||||||
|
p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil)
|
||||||
|
})
|
||||||
p.Act(r, p._handleIdle)
|
p.Act(r, p._handleIdle)
|
||||||
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
|
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
|
||||||
r.nodeinfo.init(r.core)
|
r.nodeinfo.init(r.core)
|
||||||
|
91
src/yggdrasil/simlink.go
Normal file
91
src/yggdrasil/simlink.go
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
package yggdrasil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"github.com/Arceliar/phony"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Simlink struct {
|
||||||
|
phony.Inbox
|
||||||
|
rch chan []byte
|
||||||
|
dest *Simlink
|
||||||
|
link *linkInterface
|
||||||
|
started bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Simlink) readMsg() ([]byte, error) {
|
||||||
|
bs, ok := <-s.rch
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("read from closed Simlink")
|
||||||
|
}
|
||||||
|
return bs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Simlink) _recvMetaBytes() ([]byte, error) {
|
||||||
|
return s.readMsg()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Simlink) _sendMetaBytes(bs []byte) error {
|
||||||
|
_, err := s.writeMsgs([][]byte{bs})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Simlink) close() error {
|
||||||
|
defer func() { recover() }()
|
||||||
|
close(s.rch)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Simlink) writeMsgs(msgs [][]byte) (int, error) {
|
||||||
|
if s.dest == nil {
|
||||||
|
return 0, errors.New("write to unpaired Simlink")
|
||||||
|
}
|
||||||
|
var size int
|
||||||
|
for _, msg := range msgs {
|
||||||
|
size += len(msg)
|
||||||
|
bs := append([]byte(nil), msg...)
|
||||||
|
phony.Block(s, func() {
|
||||||
|
s.dest.Act(s, func() {
|
||||||
|
defer func() { recover() }()
|
||||||
|
s.dest.rch <- bs
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Core) NewSimlink() *Simlink {
|
||||||
|
s := &Simlink{rch: make(chan []byte, 1)}
|
||||||
|
n := "Simlink"
|
||||||
|
var err error
|
||||||
|
s.link, err = c.link.create(s, n, n, n, n, false, true)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Simlink) SetDestination(dest *Simlink) error {
|
||||||
|
var err error
|
||||||
|
phony.Block(s, func() {
|
||||||
|
if s.dest != nil {
|
||||||
|
err = errors.New("destination already set")
|
||||||
|
} else {
|
||||||
|
s.dest = dest
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Simlink) Start() error {
|
||||||
|
var err error
|
||||||
|
phony.Block(s, func() {
|
||||||
|
if s.started {
|
||||||
|
err = errors.New("already started")
|
||||||
|
} else {
|
||||||
|
s.started = true
|
||||||
|
go s.link.handler()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user