Merge pull request #21 from Arceliar/testing

Breaking wire format cleanup and other updates
This commit is contained in:
Arceliar 2018-02-23 17:10:45 -06:00 committed by GitHub
commit 0fae932512
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 155 additions and 148 deletions

View File

@ -126,6 +126,8 @@ func (ps *peers) newPeer(box *boxPubKey,
func (p *peer) linkLoop(in <-chan []byte) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var counter uint8
var lastRSeq uint64
for {
select {
case packet, ok := <-in:
@ -139,10 +141,26 @@ func (p *peer) linkLoop(in <-chan []byte) {
if p.port == 0 {
continue
} // Don't send announces on selfInterface
// Maybe we shouldn't time out, and instead wait for a kill signal?
p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port)
var update bool
switch {
case p.msgAnc == nil:
update = true
case lastRSeq != p.msgAnc.seq:
update = true
case p.msgAnc.rseq != p.myMsg.seq:
update = true
case counter%4 == 0:
update = true
}
if update {
if p.msgAnc != nil {
lastRSeq = p.msgAnc.seq
}
p.sendSwitchAnnounce()
}
counter = (counter + 1) % 4
}
}
}
}
@ -186,11 +204,12 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
if to == nil {
return
}
newTTLLen := wire_uint64_len(newTTL)
// This mutates the packet in-place if the length of the TTL changes!
ttlSlice := wire_encode_uint64(newTTL)
newTTLLen := len(ttlSlice)
shift := ttlLen - newTTLLen
wire_put_uint64(newTTL, packet[ttlBegin+shift:])
copy(packet[shift:], packet[:pTypeLen])
copy(packet[ttlBegin+shift:], ttlSlice)
packet = packet[shift:]
to.sendPacket(packet)
}
@ -204,8 +223,6 @@ func (p *peer) sendPacket(packet []byte) {
func (p *peer) sendLinkPacket(packet []byte) {
bs, nonce := boxSeal(&p.shared, packet, nil)
linkPacket := wire_linkProtoTrafficPacket{
toKey: p.box,
fromKey: p.core.boxPub,
nonce: *nonce,
payload: bs,
}
@ -218,12 +235,6 @@ func (p *peer) handleLinkTraffic(bs []byte) {
if !packet.decode(bs) {
return
}
if packet.toKey != p.core.boxPub {
return
}
if packet.fromKey != p.box {
return
}
payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce)
if !isOK {
return
@ -418,7 +429,9 @@ func (p *peer) sendSwitchAnnounce() {
anc.seq = p.myMsg.seq
anc.len = uint64(len(p.myMsg.locator.coords))
//anc.Deg = p.myMsg.Degree
//anc.RSeq = p.myMsg.RSeq
if p.msgAnc != nil {
anc.rseq = p.msgAnc.seq
}
packet := anc.encode()
p.sendLinkPacket(packet)
}

View File

@ -295,9 +295,7 @@ func (r *router) handleDHTReq(bs []byte, fromKey *boxPubKey) {
if !req.decode(bs) {
return
}
if req.key != *fromKey {
return
}
req.key = *fromKey
r.core.dht.handleReq(&req)
}
@ -306,9 +304,7 @@ func (r *router) handleDHTRes(bs []byte, fromKey *boxPubKey) {
if !res.decode(bs) {
return
}
if res.key != *fromKey {
return
}
res.key = *fromKey
r.core.dht.handleRes(&res)
}

View File

@ -152,6 +152,15 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo {
sinfo.myNonce = *newBoxNonce()
sinfo.theirMTU = 1280
sinfo.myMTU = uint16(ss.core.tun.mtu)
if sinfo.myMTU > 2048 {
// FIXME this is a temporary workaround to an issue with UDP peers
// UDP links need to fragment packets (within ygg) to get them over the wire
// For some reason, TCP streams over UDP peers can get stuck in a bad state
// When this happens, TCP throttles back, and each TCP retransmission loses fragments
// On my wifi network, it seems to happen around the 22nd-23rd fragment of a large packet
// By setting the path MTU to something small, this should (hopefully) mitigate the issue
sinfo.myMTU = 2048
}
higher := false
for idx := range ss.core.boxPub {
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {

View File

@ -11,11 +11,6 @@ package yggdrasil
// TODO? use a pre-computed lookup table (python version had this)
// A little annoying to do with constant changes from bandwidth estimates
// FIXME (!) throttle how often root updates are accepted
// If the root starts spaming with new timestamps, it should only affect their neighbors
// The rest of the network should see announcements at a somewhat reasonable rate
// Maybe no faster than 2x the usual update interval
import "time"
import "sync"
import "sync/atomic"
@ -23,6 +18,8 @@ import "sync/atomic"
//import "fmt"
const switch_timeout = time.Minute
const switch_updateInterval = switch_timeout / 2
const switch_throttle = switch_updateInterval / 2
// You should be able to provide crypto signatures for this
// 1 signature per coord, from the *sender* to that coord
@ -219,7 +216,7 @@ func (t *switchTable) cleanRoot() {
}
// Or, if we are the root, possibly update our timestamp
if t.data.locator.root == t.key &&
now.Sub(t.time) > switch_timeout/2 {
now.Sub(t.time) > switch_updateInterval {
//fmt.Println("root is self and old, updating", t.data.locator.Root)
doUpdate = true
}
@ -343,9 +340,11 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
updateRoot = true
case cost < pCost:
updateRoot = true
case sender.port == t.parent &&
(msg.locator.tstamp > t.data.locator.tstamp ||
!equiv(&msg.locator, &t.data.locator)):
case sender.port != t.parent: // do nothing
case !equiv(&msg.locator, &t.data.locator):
updateRoot = true
case now.Sub(t.time) < switch_throttle: // do nothing
case msg.locator.tstamp > t.data.locator.tstamp:
updateRoot = true
}
if updateRoot {
@ -390,6 +389,9 @@ func (t *switchTable) updateTable() {
}
for _, pinfo := range t.data.peers {
//if !pinfo.forward { continue }
if pinfo.locator.root != newTable.self.root {
continue
}
loc := pinfo.locator.clone()
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
newTable.elems = append(newTable.elems, tableElem{
@ -422,9 +424,6 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) {
// score is in units of bandwidth / distance
bestScore := float64(-1)
for _, info := range table.elems {
if info.locator.root != table.self.root {
continue
}
dist := info.locator.dist(dest) //getDist(info.locator.coords)
if !(dist < myDist) {
continue

View File

@ -15,6 +15,7 @@ import "time"
import "errors"
import "sync"
import "fmt"
import "bufio"
const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense
@ -23,11 +24,14 @@ type tcpInterface struct {
serv *net.TCPListener
mutex sync.Mutex // Protecting the below
calls map[string]struct{}
conns map[tcpInfo](chan struct{})
}
type tcpKeys struct {
type tcpInfo struct {
box boxPubKey
sig sigPubKey
localAddr string // net.IPAddr.String(), not TCPAddr, don't care about port
remoteAddr string
}
func (iface *tcpInterface) init(core *Core, addr string) {
@ -41,6 +45,7 @@ func (iface *tcpInterface) init(core *Core, addr string) {
panic(err)
}
iface.calls = make(map[string]struct{})
iface.conns = make(map[tcpInfo](chan struct{}))
go iface.listener()
}
@ -102,8 +107,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
if n < len(keys) { /*panic("Partial key packet?") ;*/
return
}
ks := tcpKeys{}
if !tcp_chop_keys(&ks.box, &ks.sig, &keys) { /*panic("Invalid key packet?") ;*/
info := tcpInfo{}
if !tcp_chop_keys(&info.box, &info.sig, &keys) { /*panic("Invalid key packet?") ;*/
return
}
// Quit the parent call if this is a connection to ourself
@ -115,21 +120,73 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
}
return true
}
if equiv(ks.box[:], iface.core.boxPub[:]) {
if equiv(info.box[:], iface.core.boxPub[:]) {
return
} // testing
if equiv(ks.sig[:], iface.core.sigPub[:]) {
if equiv(info.sig[:], iface.core.sigPub[:]) {
return
}
// Check if we already have a connection to this node, close and block if yes
local := sock.LocalAddr().(*net.TCPAddr)
laddr := net.IPAddr{
IP: local.IP,
Zone: local.Zone,
}
info.localAddr = laddr.String()
remote := sock.RemoteAddr().(*net.TCPAddr)
raddr := net.IPAddr{
IP: remote.IP,
Zone: remote.Zone,
}
info.remoteAddr = raddr.String()
iface.mutex.Lock()
if blockChan, isIn := iface.conns[info]; isIn {
iface.mutex.Unlock()
sock.Close()
<-blockChan
return
}
blockChan := make(chan struct{})
iface.conns[info] = blockChan
iface.mutex.Unlock()
defer func() {
iface.mutex.Lock()
delete(iface.conns, info)
iface.mutex.Unlock()
close(blockChan)
}()
// Note that multiple connections to the same node are allowed
// E.g. over different interfaces
linkIn := make(chan []byte, 1)
p := iface.core.peers.newPeer(&ks.box, &ks.sig) //, in, out)
p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out)
in := func(bs []byte) {
p.handlePacket(bs, linkIn)
}
out := make(chan []byte, 32) // TODO? what size makes sense
defer close(out)
buf := bufio.NewWriterSize(sock, tcp_msgSize)
send := func(msg []byte) {
msgLen := wire_encode_uint64(uint64(len(msg)))
before := buf.Buffered()
start := time.Now()
buf.Write(tcp_msg[:])
buf.Write(msgLen)
buf.Write(msg)
timed := time.Since(start)
after := buf.Buffered()
written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after
if written > 0 {
p.updateBandwidth(written, timed)
}
util_putBytes(msg)
}
flush := func() {
size := buf.Buffered()
start := time.Now()
buf.Flush()
timed := time.Since(start)
p.updateBandwidth(size, timed)
}
go func() {
var stack [][]byte
put := func(msg []byte) {
@ -139,25 +196,6 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
stack = stack[1:]
}
}
send := func() {
msg := stack[len(stack)-1]
stack = stack[:len(stack)-1]
buf := net.Buffers{tcp_msg[:],
wire_encode_uint64(uint64(len(msg))),
msg}
size := 0
for _, bs := range buf {
size += len(bs)
}
start := time.Now()
buf.WriteTo(sock)
timed := time.Since(start)
pType, _ := wire_decode_uint64(msg)
if pType == wire_LinkProtocolTraffic {
p.updateBandwidth(size, timed)
}
util_putBytes(msg)
}
for msg := range out {
put(msg)
for len(stack) > 0 {
@ -165,13 +203,17 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
select {
case msg, ok := <-out:
if !ok {
flush()
return
}
put(msg)
default:
send()
msg := stack[len(stack)-1]
stack = stack[:len(stack)-1]
send(msg)
}
}
flush()
}
}()
p.out = func(msg []byte) {
@ -197,8 +239,8 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
p.core.peers.mutex.Unlock()
close(linkIn)
}()
them := sock.RemoteAddr()
themNodeID := getNodeID(&ks.box)
them := sock.RemoteAddr().(*net.TCPAddr)
themNodeID := getNodeID(&info.box)
themAddr := address_addrForNodeID(themNodeID)
themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, them)

View File

@ -56,6 +56,7 @@ type connInfo struct {
out chan []byte
countIn uint8
countOut uint8
chunkSize uint16
}
type udpKeys struct {
@ -96,7 +97,6 @@ func (iface *udpInterface) startConn(info *connInfo) {
defer ticker.Stop()
defer func() {
// Cleanup
// FIXME this still leaks a peer struct
iface.mutex.Lock()
delete(iface.conns, info.addr)
iface.mutex.Unlock()
@ -168,50 +168,25 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
linkIn: make(chan []byte, 1),
keysIn: make(chan *udpKeys, 1),
out: make(chan []byte, 32),
chunkSize: 576 - 60 - 8 - 3, // max safe - max ip - udp header - chunk overhead
}
/*
conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) }
conn.peer.out = func (msg []byte) {
start := time.Now()
iface.sock.WriteToUDP(msg, udpAddr)
timed := time.Since(start)
conn.peer.updateBandwidth(len(msg), timed)
util_putBytes(msg)
} // Old version, always one syscall per packet
//*/
/*
conn.peer.out = func (msg []byte) {
defer func() { recover() }()
select {
case conn.out<-msg:
default: util_putBytes(msg)
if udpAddr.IP.IsLinkLocalUnicast() {
ifce, err := net.InterfaceByName(udpAddr.Zone)
if ifce != nil && err == nil {
conn.chunkSize = uint16(ifce.MTU) - 60 - 8 - 3
}
}
go func () {
for msg := range conn.out {
start := time.Now()
iface.sock.WriteToUDP(msg, udpAddr)
timed := time.Since(start)
conn.peer.updateBandwidth(len(msg), timed)
util_putBytes(msg)
}
}()
//*/
//*
var inChunks uint8
var inBuf []byte
conn.in = func(bs []byte) {
//defer util_putBytes(bs)
chunks, chunk, count, payload := udp_decode(bs)
//iface.core.log.Println("DEBUG:", addr, chunks, chunk, count, len(payload))
//iface.core.log.Println("DEBUG: payload:", payload)
if count != conn.countIn {
inChunks = 0
inBuf = inBuf[:0]
conn.countIn = count
}
if chunk <= chunks && chunk == inChunks+1 {
//iface.core.log.Println("GOING:", addr, chunks, chunk, count, len(payload))
inChunks += 1
inBuf = append(inBuf, payload...)
if chunks != chunk {
@ -219,7 +194,6 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
}
msg := append(util_getBytes(), inBuf...)
conn.peer.handlePacket(msg, conn.linkIn)
//iface.core.log.Println("DONE:", addr, chunks, chunk, count, len(payload))
}
}
conn.peer.out = func(msg []byte) {
@ -236,11 +210,10 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
for msg := range conn.out {
chunks = chunks[:0]
bs := msg
for len(bs) > udp_chunkSize {
chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:]
for len(bs) > int(conn.chunkSize) {
chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:]
}
chunks = append(chunks, bs)
//iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg))
if len(chunks) > 255 {
continue
}
@ -284,19 +257,14 @@ func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) {
}
func (iface *udpInterface) reader() {
bs := make([]byte, 2048) // This needs to be large enough for everything...
bs := make([]byte, 65536) // This needs to be large enough for everything...
for {
//iface.core.log.Println("Starting read")
n, udpAddr, err := iface.sock.ReadFromUDP(bs)
//iface.core.log.Println("Read", n, udpAddr.String(), err)
//iface.core.log.Println("DEBUG: read:", bs[0], bs[1], bs[2], n)
if err != nil {
panic(err)
break
}
if n > 1500 {
panic(n)
}
//msg := append(util_getBytes(), bs[:n]...)
msg := bs[:n]
var addr connAddr
addr.fromUDPAddr(udpAddr)
@ -319,8 +287,6 @@ func (iface *udpInterface) reader() {
////////////////////////////////////////////////////////////////////////////////
const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size
func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) {
if len(bs) >= 3 {
chunks, chunk, count, payload = bs[0], bs[1], bs[2], bs[3:]

View File

@ -21,7 +21,6 @@ const (
wire_DHTLookupResponse // inside protocol traffic header
wire_SearchRequest // inside protocol traffic header
wire_SearchResponse // inside protocol traffic header
//wire_Keys // udp key packet (boxPub, sigPub)
)
// Encode uint64 using a variable length scheme
@ -112,8 +111,6 @@ func wire_put_coords(coords []byte, bs []byte) []byte {
func wire_decode_coords(packet []byte) ([]byte, int) {
coordLen, coordBegin := wire_decode_uint64(packet)
coordEnd := coordBegin + int(coordLen)
//if coordBegin == 0 { panic("No coords found") } // Testing
//if coordEnd > len(packet) { panic("Packet too short") } // Testing
if coordBegin == 0 || coordEnd > len(packet) {
return nil, 0
}
@ -129,7 +126,7 @@ type msgAnnounce struct {
seq uint64
len uint64
//Deg uint64
//RSeq uint64
rseq uint64
}
func (m *msgAnnounce) encode() []byte {
@ -138,8 +135,7 @@ func (m *msgAnnounce) encode() []byte {
bs = append(bs, wire_encode_uint64(wire_intToUint(m.tstamp))...)
bs = append(bs, wire_encode_uint64(m.seq)...)
bs = append(bs, wire_encode_uint64(m.len)...)
//bs = append(bs, wire_encode_uint64(m.Deg)...)
//bs = append(bs, wire_encode_uint64(m.RSeq)...)
bs = append(bs, wire_encode_uint64(m.rseq)...)
return bs
}
@ -159,8 +155,8 @@ func (m *msgAnnounce) decode(bs []byte) bool {
return false
case !wire_chop_uint64(&m.len, &bs):
return false
//case !wire_chop_uint64(&m.Deg, &bs): return false
//case !wire_chop_uint64(&m.RSeq, &bs): return false
case !wire_chop_uint64(&m.rseq, &bs):
return false
}
m.tstamp = wire_intFromUint(tstamp)
return true
@ -380,16 +376,12 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool {
}
type wire_linkProtoTrafficPacket struct {
toKey boxPubKey
fromKey boxPubKey
nonce boxNonce
payload []byte
}
func (p *wire_linkProtoTrafficPacket) encode() []byte {
bs := wire_encode_uint64(wire_LinkProtocolTraffic)
bs = append(bs, p.toKey[:]...)
bs = append(bs, p.fromKey[:]...)
bs = append(bs, p.nonce[:]...)
bs = append(bs, p.payload...)
return bs
@ -402,10 +394,6 @@ func (p *wire_linkProtoTrafficPacket) decode(bs []byte) bool {
return false
case pType != wire_LinkProtocolTraffic:
return false
case !wire_chop_slice(p.toKey[:], &bs):
return false
case !wire_chop_slice(p.fromKey[:], &bs):
return false
case !wire_chop_slice(p.nonce[:], &bs):
return false
}
@ -467,7 +455,6 @@ func (p *sessionPing) decode(bs []byte) bool {
func (r *dhtReq) encode() []byte {
coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_DHTLookupRequest)
bs = append(bs, r.key[:]...)
bs = append(bs, coords...)
bs = append(bs, r.dest[:]...)
return bs
@ -480,8 +467,6 @@ func (r *dhtReq) decode(bs []byte) bool {
return false
case pType != wire_DHTLookupRequest:
return false
case !wire_chop_slice(r.key[:], &bs):
return false
case !wire_chop_coords(&r.coords, &bs):
return false
case !wire_chop_slice(r.dest[:], &bs):
@ -494,7 +479,6 @@ func (r *dhtReq) decode(bs []byte) bool {
func (r *dhtRes) encode() []byte {
coords := wire_encode_coords(r.coords)
bs := wire_encode_uint64(wire_DHTLookupResponse)
bs = append(bs, r.key[:]...)
bs = append(bs, coords...)
bs = append(bs, r.dest[:]...)
for _, info := range r.infos {
@ -512,8 +496,6 @@ func (r *dhtRes) decode(bs []byte) bool {
return false
case pType != wire_DHTLookupResponse:
return false
case !wire_chop_slice(r.key[:], &bs):
return false
case !wire_chop_coords(&r.coords, &bs):
return false
case !wire_chop_slice(r.dest[:], &bs):

View File

@ -118,7 +118,7 @@ func generateConfig() *nodeConfig {
cfg.Multicast = true
cfg.LinkLocal = ""
cfg.IfName = "auto"
cfg.IfMTU = 65535
cfg.IfMTU = 1280
if runtime.GOOS == "windows" {
cfg.IfTAPMode = true
} else {
@ -188,8 +188,8 @@ func (n *node) announce() {
panic(err)
}
var anAddr net.TCPAddr
tcpAddr := n.core.DEBUG_getGlobalTCPAddr()
anAddr.Port = tcpAddr.Port
myAddr := n.core.DEBUG_getGlobalTCPAddr()
anAddr.Port = myAddr.Port
destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr)
if err != nil {
panic(err)