mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-12-22 16:07:31 +00:00
remove UDP, to be replaced with a better implementation later
This commit is contained in:
parent
2f8aaa5c20
commit
1b89892610
@ -361,8 +361,6 @@ func (a *admin) addPeer(addr string) error {
|
|||||||
switch strings.ToLower(u.Scheme) {
|
switch strings.ToLower(u.Scheme) {
|
||||||
case "tcp":
|
case "tcp":
|
||||||
a.core.tcp.connect(u.Host)
|
a.core.tcp.connect(u.Host)
|
||||||
case "udp":
|
|
||||||
a.core.udp.connect(u.Host)
|
|
||||||
case "socks":
|
case "socks":
|
||||||
a.core.tcp.connectSOCKS(u.Host, u.Path[1:])
|
a.core.tcp.connectSOCKS(u.Host, u.Path[1:])
|
||||||
default:
|
default:
|
||||||
@ -371,18 +369,12 @@ func (a *admin) addPeer(addr string) error {
|
|||||||
} else {
|
} else {
|
||||||
// no url scheme provided
|
// no url scheme provided
|
||||||
addr = strings.ToLower(addr)
|
addr = strings.ToLower(addr)
|
||||||
if strings.HasPrefix(addr, "udp:") {
|
|
||||||
a.core.udp.connect(addr[4:])
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
if strings.HasPrefix(addr, "tcp:") {
|
if strings.HasPrefix(addr, "tcp:") {
|
||||||
addr = addr[4:]
|
addr = addr[4:]
|
||||||
}
|
}
|
||||||
a.core.tcp.connect(addr)
|
a.core.tcp.connect(addr)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return errors.New("invalid peer: " + addr)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,6 @@ type Core struct {
|
|||||||
searches searches
|
searches searches
|
||||||
multicast multicast
|
multicast multicast
|
||||||
tcp tcpInterface
|
tcp tcpInterface
|
||||||
udp udpInterface
|
|
||||||
log *log.Logger
|
log *log.Logger
|
||||||
ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this
|
ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this
|
||||||
}
|
}
|
||||||
@ -99,11 +98,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.udp.init(c, nc.Listen); err != nil {
|
|
||||||
c.log.Println("Failed to start UDP interface")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.router.start(); err != nil {
|
if err := c.router.start(); err != nil {
|
||||||
c.log.Println("Failed to start router")
|
c.log.Println("Failed to start router")
|
||||||
return err
|
return err
|
||||||
|
@ -317,6 +317,7 @@ func (c *Core) DEBUG_init(bpub []byte,
|
|||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/*
|
||||||
func (c *Core) DEBUG_setupAndStartGlobalUDPInterface(addrport string) {
|
func (c *Core) DEBUG_setupAndStartGlobalUDPInterface(addrport string) {
|
||||||
if err := c.udp.init(c, addrport); err != nil {
|
if err := c.udp.init(c, addrport); err != nil {
|
||||||
c.log.Println("Failed to start UDP interface:", err)
|
c.log.Println("Failed to start UDP interface:", err)
|
||||||
@ -342,6 +343,7 @@ func (c *Core) DEBUG_maybeSendUDPKeys(saddr string) {
|
|||||||
c.udp.sendKeys(addr)
|
c.udp.sendKeys(addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@ -1,394 +0,0 @@
|
|||||||
package yggdrasil
|
|
||||||
|
|
||||||
// This communicates with peers via UDP
|
|
||||||
// It's not as well tested or debugged as the TCP transport
|
|
||||||
// It's intended to use UDP, so debugging/optimzing this is a high priority
|
|
||||||
// TODO? use golang.org/x/net/ipv6.PacketConn's ReadBatch and WriteBatch?
|
|
||||||
// To send all chunks of a message / recv all available chunks in one syscall
|
|
||||||
// That might be faster on supported platforms, but it needs investigation
|
|
||||||
// Chunks are currently murged, but outgoing messages aren't chunked
|
|
||||||
// This is just to support chunking in the future, if it's needed and debugged
|
|
||||||
// Basically, right now we might send UDP packets that are too large
|
|
||||||
|
|
||||||
// TODO remove old/unused code and better document live code
|
|
||||||
|
|
||||||
import "net"
|
|
||||||
import "time"
|
|
||||||
import "sync"
|
|
||||||
import "fmt"
|
|
||||||
|
|
||||||
type udpInterface struct {
|
|
||||||
core *Core
|
|
||||||
sock *net.UDPConn // Or more general PacketConn?
|
|
||||||
mutex sync.RWMutex // each conn has an owner goroutine
|
|
||||||
conns map[connAddr]*connInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
type connAddr struct {
|
|
||||||
ip [16]byte
|
|
||||||
port int
|
|
||||||
zone string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *connAddr) fromUDPAddr(u *net.UDPAddr) {
|
|
||||||
copy(c.ip[:], u.IP.To16())
|
|
||||||
c.port = u.Port
|
|
||||||
c.zone = u.Zone
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *connAddr) toUDPAddr() *net.UDPAddr {
|
|
||||||
var u net.UDPAddr
|
|
||||||
u.IP = make([]byte, 16)
|
|
||||||
copy(u.IP, c.ip[:])
|
|
||||||
u.Port = c.port
|
|
||||||
u.Zone = c.zone
|
|
||||||
return &u
|
|
||||||
}
|
|
||||||
|
|
||||||
type connInfo struct {
|
|
||||||
name string
|
|
||||||
addr connAddr
|
|
||||||
peer *peer
|
|
||||||
linkIn chan []byte
|
|
||||||
keysIn chan *udpKeys
|
|
||||||
closeIn chan *udpKeys
|
|
||||||
timeout int // count of how many heartbeats have been missed
|
|
||||||
in func([]byte)
|
|
||||||
out chan []byte
|
|
||||||
countIn uint8
|
|
||||||
countOut uint8
|
|
||||||
chunkSize uint16
|
|
||||||
}
|
|
||||||
|
|
||||||
type udpKeys struct {
|
|
||||||
box boxPubKey
|
|
||||||
sig sigPubKey
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) getAddr() *net.UDPAddr {
|
|
||||||
return iface.sock.LocalAddr().(*net.UDPAddr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) connect(saddr string) {
|
|
||||||
udpAddr, err := net.ResolveUDPAddr("udp", saddr)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
var addr connAddr
|
|
||||||
addr.fromUDPAddr(udpAddr)
|
|
||||||
iface.mutex.RLock()
|
|
||||||
_, isIn := iface.conns[addr]
|
|
||||||
iface.mutex.RUnlock()
|
|
||||||
if !isIn {
|
|
||||||
iface.sendKeys(addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) init(core *Core, addr string) (err error) {
|
|
||||||
iface.core = core
|
|
||||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
iface.sock, err = net.ListenUDP("udp", udpAddr)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
iface.conns = make(map[connAddr]*connInfo)
|
|
||||||
go iface.reader()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) sendKeys(addr connAddr) {
|
|
||||||
udpAddr := addr.toUDPAddr()
|
|
||||||
msg := []byte{}
|
|
||||||
msg = udp_encode(msg, 0, 0, 0, nil)
|
|
||||||
msg = append(msg, iface.core.boxPub[:]...)
|
|
||||||
msg = append(msg, iface.core.sigPub[:]...)
|
|
||||||
iface.sock.WriteToUDP(msg, udpAddr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) sendClose(addr connAddr) {
|
|
||||||
udpAddr := addr.toUDPAddr()
|
|
||||||
msg := []byte{}
|
|
||||||
msg = udp_encode(msg, 0, 1, 0, nil)
|
|
||||||
msg = append(msg, iface.core.boxPub[:]...)
|
|
||||||
msg = append(msg, iface.core.sigPub[:]...)
|
|
||||||
iface.sock.WriteToUDP(msg, udpAddr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func udp_isKeys(msg []byte) bool {
|
|
||||||
keyLen := 3 + boxPubKeyLen + sigPubKeyLen
|
|
||||||
return len(msg) == keyLen && msg[0] == 0x00 && msg[1] == 0x00
|
|
||||||
}
|
|
||||||
|
|
||||||
func udp_isClose(msg []byte) bool {
|
|
||||||
keyLen := 3 + boxPubKeyLen + sigPubKeyLen
|
|
||||||
return len(msg) == keyLen && msg[0] == 0x00 && msg[1] == 0x01
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) startConn(info *connInfo) {
|
|
||||||
ticker := time.NewTicker(6 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
defer func() {
|
|
||||||
// Cleanup
|
|
||||||
iface.mutex.Lock()
|
|
||||||
delete(iface.conns, info.addr)
|
|
||||||
iface.mutex.Unlock()
|
|
||||||
iface.core.peers.removePeer(info.peer.port)
|
|
||||||
close(info.linkIn)
|
|
||||||
close(info.keysIn)
|
|
||||||
close(info.closeIn)
|
|
||||||
close(info.out)
|
|
||||||
iface.core.log.Println("Removing peer:", info.name)
|
|
||||||
}()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case ks := <-info.closeIn:
|
|
||||||
{
|
|
||||||
if ks.box == info.peer.box && ks.sig == info.peer.sig {
|
|
||||||
// TODO? secure this somehow
|
|
||||||
// Maybe add a signature and sequence number (timestamp) to close and keys?
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case ks := <-info.keysIn:
|
|
||||||
{
|
|
||||||
// FIXME? need signatures/sequence-numbers or something
|
|
||||||
// Spoofers could lock out a peer with fake/bad keys
|
|
||||||
if ks.box == info.peer.box && ks.sig == info.peer.sig {
|
|
||||||
info.timeout = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-ticker.C:
|
|
||||||
{
|
|
||||||
if info.timeout > 10 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
info.timeout++
|
|
||||||
iface.sendKeys(info.addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) handleClose(msg []byte, addr connAddr) {
|
|
||||||
//defer util_putBytes(msg)
|
|
||||||
var ks udpKeys
|
|
||||||
_, _, _, bs := udp_decode(msg)
|
|
||||||
switch {
|
|
||||||
case !wire_chop_slice(ks.box[:], &bs):
|
|
||||||
return
|
|
||||||
case !wire_chop_slice(ks.sig[:], &bs):
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ks.box == iface.core.boxPub {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ks.sig == iface.core.sigPub {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
iface.mutex.RLock()
|
|
||||||
conn, isIn := iface.conns[addr]
|
|
||||||
iface.mutex.RUnlock()
|
|
||||||
if !isIn {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
func() {
|
|
||||||
defer func() { recover() }()
|
|
||||||
select {
|
|
||||||
case conn.closeIn <- &ks:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
|
|
||||||
//defer util_putBytes(msg)
|
|
||||||
var ks udpKeys
|
|
||||||
_, _, _, bs := udp_decode(msg)
|
|
||||||
switch {
|
|
||||||
case !wire_chop_slice(ks.box[:], &bs):
|
|
||||||
return
|
|
||||||
case !wire_chop_slice(ks.sig[:], &bs):
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ks.box == iface.core.boxPub {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ks.sig == iface.core.sigPub {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
iface.mutex.RLock()
|
|
||||||
conn, isIn := iface.conns[addr]
|
|
||||||
iface.mutex.RUnlock()
|
|
||||||
if !isIn {
|
|
||||||
udpAddr := addr.toUDPAddr()
|
|
||||||
// Check if we're authorized to connect to this key / IP
|
|
||||||
// TODO monitor and always allow outgoing connections
|
|
||||||
if !iface.core.peers.isAllowedEncryptionPublicKey(&ks.box) {
|
|
||||||
// Allow unauthorized peers if they're link-local
|
|
||||||
if !udpAddr.IP.IsLinkLocalUnicast() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
themNodeID := getNodeID(&ks.box)
|
|
||||||
themAddr := address_addrForNodeID(themNodeID)
|
|
||||||
themAddrString := net.IP(themAddr[:]).String()
|
|
||||||
themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String())
|
|
||||||
conn = &connInfo{
|
|
||||||
name: themString,
|
|
||||||
addr: connAddr(addr),
|
|
||||||
peer: iface.core.peers.newPeer(&ks.box, &ks.sig),
|
|
||||||
linkIn: make(chan []byte, 1),
|
|
||||||
keysIn: make(chan *udpKeys, 1),
|
|
||||||
closeIn: make(chan *udpKeys, 1),
|
|
||||||
out: make(chan []byte, 32),
|
|
||||||
chunkSize: 576 - 60 - 8 - 3, // max safe - max ip - udp header - chunk overhead
|
|
||||||
}
|
|
||||||
if udpAddr.IP.IsLinkLocalUnicast() {
|
|
||||||
ifce, err := net.InterfaceByName(udpAddr.Zone)
|
|
||||||
if ifce != nil && err == nil {
|
|
||||||
conn.chunkSize = uint16(ifce.MTU) - 60 - 8 - 3
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var inChunks uint8
|
|
||||||
var inBuf []byte
|
|
||||||
conn.in = func(bs []byte) {
|
|
||||||
//defer util_putBytes(bs)
|
|
||||||
chunks, chunk, count, payload := udp_decode(bs)
|
|
||||||
if count != conn.countIn {
|
|
||||||
if len(inBuf) > 0 {
|
|
||||||
// Something went wrong
|
|
||||||
// Forward whatever we have
|
|
||||||
// Maybe the destination can do something about it
|
|
||||||
msg := append(util_getBytes(), inBuf...)
|
|
||||||
conn.peer.handlePacket(msg, conn.linkIn)
|
|
||||||
}
|
|
||||||
inChunks = 0
|
|
||||||
inBuf = inBuf[:0]
|
|
||||||
conn.countIn = count
|
|
||||||
}
|
|
||||||
if chunk <= chunks && chunk == inChunks+1 {
|
|
||||||
inChunks += 1
|
|
||||||
inBuf = append(inBuf, payload...)
|
|
||||||
if chunks != chunk {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
msg := append(util_getBytes(), inBuf...)
|
|
||||||
conn.peer.handlePacket(msg, conn.linkIn)
|
|
||||||
inBuf = inBuf[:0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
conn.peer.out = func(msg []byte) {
|
|
||||||
defer func() { recover() }()
|
|
||||||
select {
|
|
||||||
case conn.out <- msg:
|
|
||||||
default:
|
|
||||||
util_putBytes(msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
var out []byte
|
|
||||||
var chunks [][]byte
|
|
||||||
for msg := range conn.out {
|
|
||||||
chunks = chunks[:0]
|
|
||||||
bs := msg
|
|
||||||
for len(bs) > int(conn.chunkSize) {
|
|
||||||
chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:]
|
|
||||||
}
|
|
||||||
chunks = append(chunks, bs)
|
|
||||||
if len(chunks) > 255 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
start := time.Now()
|
|
||||||
for idx, bs := range chunks {
|
|
||||||
nChunks, nChunk, count := uint8(len(chunks)), uint8(idx)+1, conn.countOut
|
|
||||||
out = udp_encode(out[:0], nChunks, nChunk, count, bs)
|
|
||||||
//iface.core.log.Println("DEBUG out:", nChunks, nChunk, count, len(bs))
|
|
||||||
iface.sock.WriteToUDP(out, udpAddr)
|
|
||||||
}
|
|
||||||
timed := time.Since(start)
|
|
||||||
conn.countOut += 1
|
|
||||||
conn.peer.updateBandwidth(len(msg), timed)
|
|
||||||
util_putBytes(msg)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
//*/
|
|
||||||
conn.peer.close = func() { iface.sendClose(conn.addr) }
|
|
||||||
iface.mutex.Lock()
|
|
||||||
iface.conns[addr] = conn
|
|
||||||
iface.mutex.Unlock()
|
|
||||||
iface.core.log.Println("Adding peer:", conn.name)
|
|
||||||
go iface.startConn(conn)
|
|
||||||
go conn.peer.linkLoop(conn.linkIn)
|
|
||||||
iface.sendKeys(conn.addr)
|
|
||||||
}
|
|
||||||
func() {
|
|
||||||
defer func() { recover() }()
|
|
||||||
select {
|
|
||||||
case conn.keysIn <- &ks:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) {
|
|
||||||
iface.mutex.RLock()
|
|
||||||
if conn, isIn := iface.conns[addr]; isIn {
|
|
||||||
conn.in(msg)
|
|
||||||
}
|
|
||||||
iface.mutex.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iface *udpInterface) reader() {
|
|
||||||
iface.core.log.Println("Listening for UDP on:", iface.sock.LocalAddr().String())
|
|
||||||
bs := make([]byte, 65536) // This needs to be large enough for everything...
|
|
||||||
for {
|
|
||||||
n, udpAddr, err := iface.sock.ReadFromUDP(bs)
|
|
||||||
//iface.core.log.Println("DEBUG: read:", bs[0], bs[1], bs[2], n)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
msg := bs[:n]
|
|
||||||
var addr connAddr
|
|
||||||
addr.fromUDPAddr(udpAddr)
|
|
||||||
switch {
|
|
||||||
case udp_isKeys(msg):
|
|
||||||
var them address
|
|
||||||
copy(them[:], udpAddr.IP.To16())
|
|
||||||
if them.isValid() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if udpAddr.IP.IsLinkLocalUnicast() {
|
|
||||||
if len(iface.core.ifceExpr) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
for _, expr := range iface.core.ifceExpr {
|
|
||||||
if expr.MatchString(udpAddr.Zone) {
|
|
||||||
iface.handleKeys(msg, addr)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case udp_isClose(msg):
|
|
||||||
iface.handleClose(msg, addr)
|
|
||||||
default:
|
|
||||||
iface.handlePacket(msg, addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) {
|
|
||||||
if len(bs) >= 3 {
|
|
||||||
chunks, chunk, count, payload = bs[0], bs[1], bs[2], bs[3:]
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func udp_encode(out []byte, chunks, chunk, count uint8, payload []byte) []byte {
|
|
||||||
return append(append(out, chunks, chunk, count), payload...)
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user