mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-12-24 16:57:53 +00:00
only create one interface, but still opens duplicate connections before it catches this, so more work is needed
This commit is contained in:
parent
f95663e923
commit
12c0e019dc
@ -8,13 +8,21 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||||
//"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type link struct {
|
type link struct {
|
||||||
core *Core
|
core *Core
|
||||||
mutex sync.RWMutex // protects interfaces below
|
mutex sync.RWMutex // protects interfaces below
|
||||||
interfaces map[string]*linkInterface
|
interfaces map[linkInfo]*linkInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
type linkInfo struct {
|
||||||
|
box crypto.BoxPubKey // Their encryption key
|
||||||
|
sig crypto.SigPubKey // Their signing key
|
||||||
|
linkType string // Type of link, e.g. TCP, AWDL
|
||||||
|
local string // Local name or address
|
||||||
|
remote string // Remote name or address
|
||||||
}
|
}
|
||||||
|
|
||||||
type linkInterfaceMsgIO interface {
|
type linkInterfaceMsgIO interface {
|
||||||
@ -27,16 +35,18 @@ type linkInterfaceMsgIO interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type linkInterface struct {
|
type linkInterface struct {
|
||||||
name string
|
name string
|
||||||
link *link
|
link *link
|
||||||
peer *peer
|
peer *peer
|
||||||
msgIO linkInterfaceMsgIO
|
msgIO linkInterfaceMsgIO
|
||||||
|
info linkInfo
|
||||||
|
closed chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) init(c *Core) error {
|
func (l *link) init(c *Core) error {
|
||||||
l.core = c
|
l.core = c
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
l.interfaces = make(map[string]*linkInterface)
|
l.interfaces = make(map[linkInfo]*linkInterface)
|
||||||
l.mutex.Unlock()
|
l.mutex.Unlock()
|
||||||
|
|
||||||
if err := l.core.awdl.init(c); err != nil {
|
if err := l.core.awdl.init(c); err != nil {
|
||||||
@ -47,18 +57,19 @@ func (l *link) init(c *Core) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) create(msgIO linkInterfaceMsgIO, name string) (*linkInterface, error) {
|
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string) (*linkInterface, error) {
|
||||||
l.mutex.Lock()
|
// Technically anything unique would work for names, but lets pick something human readable, just for debugging
|
||||||
defer l.mutex.Unlock()
|
|
||||||
if _, ok := l.interfaces[name]; ok {
|
|
||||||
return nil, errors.New("Interface with this name already exists")
|
|
||||||
}
|
|
||||||
intf := linkInterface{
|
intf := linkInterface{
|
||||||
name: name,
|
name: name,
|
||||||
link: l,
|
link: l,
|
||||||
msgIO: msgIO,
|
msgIO: msgIO,
|
||||||
|
info: linkInfo{
|
||||||
|
linkType: linkType,
|
||||||
|
local: local,
|
||||||
|
remote: remote,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
l.interfaces[intf.name] = &intf
|
//l.interfaces[intf.name] = &intf
|
||||||
//go intf.start()
|
//go intf.start()
|
||||||
return &intf, nil
|
return &intf, nil
|
||||||
}
|
}
|
||||||
@ -89,7 +100,25 @@ func (intf *linkInterface) handler() error {
|
|||||||
intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
|
intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
|
||||||
return errors.New("failed to connect: wrong version")
|
return errors.New("failed to connect: wrong version")
|
||||||
}
|
}
|
||||||
// FIXME we *must* stop here and check that we don't already have a connection to this peer. Need to figure out a sane way how to do that. Otherwise you'll have things like duplicate connections (one in each direction) for auto-discovered peers.
|
// Check if we already have a link to this node
|
||||||
|
intf.info.box = meta.box
|
||||||
|
intf.info.sig = meta.sig
|
||||||
|
intf.link.mutex.Lock()
|
||||||
|
if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn {
|
||||||
|
intf.link.mutex.Unlock()
|
||||||
|
// FIXME we should really return an error and let the caller block instead
|
||||||
|
// That lets them do things like close connections before blocking
|
||||||
|
intf.link.core.log.Println("DEBUG: found existing interface for", intf.name)
|
||||||
|
<-oldIntf.closed
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
intf.closed = make(chan struct{})
|
||||||
|
intf.link.interfaces[intf.info] = intf
|
||||||
|
defer close(intf.closed)
|
||||||
|
intf.link.core.log.Println("DEBUG: registered interface for", intf.name)
|
||||||
|
}
|
||||||
|
intf.link.mutex.Unlock()
|
||||||
|
// Create peer
|
||||||
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
||||||
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name)
|
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name)
|
||||||
if intf.peer == nil {
|
if intf.peer == nil {
|
||||||
@ -111,7 +140,6 @@ func (intf *linkInterface) handler() error {
|
|||||||
go intf.peer.linkLoop()
|
go intf.peer.linkLoop()
|
||||||
// Start the writer
|
// Start the writer
|
||||||
go func() {
|
go func() {
|
||||||
// TODO util.PutBytes etc.
|
|
||||||
interval := 4 * time.Second
|
interval := 4 * time.Second
|
||||||
timer := time.NewTimer(interval)
|
timer := time.NewTimer(interval)
|
||||||
clearTimer := func() {
|
clearTimer := func() {
|
||||||
@ -142,6 +170,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
intf.msgIO.writeMsg(msg)
|
intf.msgIO.writeMsg(msg)
|
||||||
|
util.PutBytes(msg)
|
||||||
if true {
|
if true {
|
||||||
// TODO *don't* do this if we're not reading any traffic
|
// TODO *don't* do this if we're not reading any traffic
|
||||||
// In such a case, the reader is responsible for resetting it the next time we read something
|
// In such a case, the reader is responsible for resetting it the next time we read something
|
||||||
|
@ -285,15 +285,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
|
|||||||
iface.setExtraOptions(sock)
|
iface.setExtraOptions(sock)
|
||||||
stream := stream{}
|
stream := stream{}
|
||||||
stream.init(sock, nil)
|
stream.init(sock, nil)
|
||||||
name := sock.LocalAddr().String() + sock.RemoteAddr().String()
|
local, _, _ := net.SplitHostPort(sock.LocalAddr().String())
|
||||||
link, err := iface.core.link.create(&stream, name)
|
remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
|
||||||
|
name := "tcp://" + sock.RemoteAddr().String()
|
||||||
|
link, err := iface.core.link.create(&stream, name, "tcp", local, remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
iface.core.log.Println(err)
|
iface.core.log.Println(err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
iface.core.log.Println("DEBUG: starting handler")
|
iface.core.log.Println("DEBUG: starting handler for", name)
|
||||||
link.handler()
|
link.handler()
|
||||||
iface.core.log.Println("DEBUG: stopped handler")
|
iface.core.log.Println("DEBUG: stopped handler for", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine.
|
// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user