work in progress, establishes TCP connections and gets through metadata handshake using the link code, but doesn't seem to send traffic yet (no switch peers are created)

This commit is contained in:
Arceliar 2019-01-21 23:08:50 -06:00
parent 5a4d6481dd
commit 137212d8cf
4 changed files with 137 additions and 25 deletions

View File

@ -1,7 +1,7 @@
package yggdrasil
import (
"fmt"
//"fmt"
"sync"
)
@ -30,6 +30,7 @@ func (l *awdl) init(c *Core) error {
return nil
}
/* temporarily disabled while getting the TCP side to work
func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*awdlInterface, error) {
link, err := l.core.link.create(fromAWDL, toAWDL, name)
if err != nil {
@ -90,3 +91,4 @@ func (ai *awdlInterface) handler() {
}
}
}
*/

View File

@ -4,11 +4,11 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
//"sync/atomic"
"time"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
//"github.com/yggdrasil-network/yggdrasil-go/src/util"
)
type link struct {
@ -29,11 +29,8 @@ type linkInterfaceMsgIO interface {
type linkInterface struct {
name string
link *link
fromlink chan []byte
tolink chan []byte
shutdown chan bool
peer *peer
stream stream
msgIO linkInterfaceMsgIO
}
func (l *link) init(c *Core) error {
@ -50,7 +47,7 @@ func (l *link) init(c *Core) error {
return nil
}
func (l *link) create(fromlink chan []byte, tolink chan []byte, name string) (*linkInterface, error) {
func (l *link) create(msgIO linkInterfaceMsgIO, name string) (*linkInterface, error) {
l.mutex.Lock()
defer l.mutex.Unlock()
if _, ok := l.interfaces[name]; ok {
@ -59,15 +56,114 @@ func (l *link) create(fromlink chan []byte, tolink chan []byte, name string) (*l
intf := linkInterface{
name: name,
link: l,
fromlink: fromlink,
tolink: tolink,
shutdown: make(chan bool),
msgIO: msgIO,
}
l.interfaces[intf.name] = &intf
go intf.start()
//go intf.start()
return &intf, nil
}
func (intf *linkInterface) handler() error {
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
myLinkPub, myLinkPriv := crypto.NewBoxKeys()
meta := version_getBaseMetadata()
meta.box = intf.link.core.boxPub
meta.sig = intf.link.core.sigPub
meta.link = *myLinkPub
metaBytes := meta.encode()
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
err := intf.msgIO._sendMetaBytes(metaBytes)
if err != nil {
return err
}
metaBytes, err = intf.msgIO._recvMetaBytes()
if err != nil {
return err
}
meta = version_metadata{}
if !meta.decode(metaBytes) || !meta.check() {
return errors.New("failed to decode metadata")
}
base := version_getBaseMetadata()
if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer {
intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
return errors.New("failed to connect: wrong version")
}
// FIXME we *must* stop here and check that we don't already have a connection to this peer. Need to figure out a sane way how to do that. Otherwise you'll have things like duplicate connections (one in each direction) for auto-discovered peers.
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name)
if intf.peer == nil {
return errors.New("failed to create peer")
}
defer func() {
// More cleanup can go here
intf.link.core.peers.removePeer(intf.peer.port)
}()
// Finish setting up the peer struct
out := make(chan []byte, 1)
defer close(out)
intf.peer.out = func(msg []byte) {
defer func() { recover() }()
out <- msg
}
intf.peer.close = func() { intf.msgIO.close() }
go intf.peer.linkLoop()
// Start the writer
go func() {
interval := 4 * time.Second
timer := time.NewTimer(interval)
clearTimer := func() {
if !timer.Stop() {
<-timer.C
}
}
defer clearTimer()
for {
// First try to send any link protocol traffic
select {
case msg := <-intf.peer.linkOut:
intf.msgIO.writeMsg(msg)
continue
default:
}
// No protocol traffic to send, so reset the timer
clearTimer()
timer.Reset(interval)
// Now block until something is ready or the timer triggers keepalive traffic
select {
case <-timer.C:
intf.msgIO.writeMsg(nil)
case msg := <-intf.peer.linkOut:
intf.msgIO.writeMsg(msg)
case msg, ok := <-out:
if !ok {
return
}
intf.msgIO.writeMsg(msg)
if true {
// TODO *don't* do this if we're not reading any traffic
// In such a case, the reader is responsible for resetting it the next time we read something
intf.link.core.switchTable.idleIn <- intf.peer.port
}
}
}
}()
intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle
// Run reader loop
for {
msg, err := intf.msgIO.readMsg()
if len(msg) > 0 {
intf.peer.handlePacket(msg)
}
if err != nil {
return err
}
}
////////////////////////////////////////////////////////////////////////////////
return nil
}
/*
func (intf *linkInterface) start() {
myLinkPub, myLinkPriv := crypto.NewBoxKeys()
meta := version_getBaseMetadata()
@ -171,3 +267,4 @@ func (ai *linkInterface) handler() {
}
}
}
*/

View File

@ -14,8 +14,6 @@ var _ = linkInterfaceMsgIO(&stream{})
type stream struct {
rwc io.ReadWriteCloser
inputBuffer []byte // Incoming packet stream
didFirstSend bool // Used for metadata exchange
didFirstRecv bool // Used for metadata exchange
// TODO remove the rest, it shouldn't matter in the long run
handlePacket func([]byte)
}

View File

@ -44,7 +44,6 @@ type tcpInterface struct {
mutex sync.Mutex // Protecting the below
calls map[string]struct{}
conns map[tcpInfo](chan struct{})
stream stream
}
// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring.
@ -281,9 +280,25 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
}()
}
func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
defer sock.Close()
iface.setExtraOptions(sock)
stream := stream{}
stream.init(sock, nil)
name := sock.LocalAddr().String() + sock.RemoteAddr().String()
link, err := iface.core.link.create(&stream, name)
if err != nil {
iface.core.log.Println(err)
panic(err)
}
iface.core.log.Println("DEBUG: starting handler")
link.handler()
iface.core.log.Println("DEBUG: stopped handler")
}
// This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine.
// It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout).
func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
func (iface *tcpInterface) handler_old(sock net.Conn, incoming bool) {
defer sock.Close()
iface.setExtraOptions(sock)
// Get our keys
@ -440,7 +455,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, them)
iface.core.log.Printf("Connected: %s, source: %s", themString, us)
iface.stream.init(sock, p.handlePacket)
//iface.stream.init(sock, p.handlePacket)
bs := make([]byte, 2*streamMsgSize)
var n int
for {
@ -452,7 +467,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
break
}
if n > 0 {
iface.stream.handleInput(bs[:n])
//iface.stream.handleInput(bs[:n])
}
}
if err == nil {