Use larger UDP chunks for link-local IP and let the OS fragment it. Switch to UDP for link-local peers. Minor code cleanup for TCP.

This commit is contained in:
Arceliar 2018-02-19 19:34:51 -06:00
parent 8c7d514032
commit 4045597516
4 changed files with 53 additions and 47 deletions

View File

@ -158,6 +158,23 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
} }
out := make(chan []byte, 32) // TODO? what size makes sense out := make(chan []byte, 32) // TODO? what size makes sense
defer close(out) defer close(out)
send := func(msg []byte) {
buf := net.Buffers{tcp_msg[:],
wire_encode_uint64(uint64(len(msg))),
msg}
size := 0
for _, bs := range buf {
size += len(bs)
}
start := time.Now()
buf.WriteTo(sock)
timed := time.Since(start)
pType, _ := wire_decode_uint64(msg)
if pType == wire_LinkProtocolTraffic {
p.updateBandwidth(size, timed)
}
util_putBytes(msg)
}
go func() { go func() {
var stack [][]byte var stack [][]byte
put := func(msg []byte) { put := func(msg []byte) {
@ -167,25 +184,6 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
stack = stack[1:] stack = stack[1:]
} }
} }
send := func() {
msg := stack[len(stack)-1]
stack = stack[:len(stack)-1]
buf := net.Buffers{tcp_msg[:],
wire_encode_uint64(uint64(len(msg))),
msg}
size := 0
for _, bs := range buf {
size += len(bs)
}
start := time.Now()
buf.WriteTo(sock)
timed := time.Since(start)
pType, _ := wire_decode_uint64(msg)
if pType == wire_LinkProtocolTraffic {
p.updateBandwidth(size, timed)
}
util_putBytes(msg)
}
for msg := range out { for msg := range out {
put(msg) put(msg)
for len(stack) > 0 { for len(stack) > 0 {
@ -197,7 +195,9 @@ func (iface *tcpInterface) handler(sock *net.TCPConn) {
} }
put(msg) put(msg)
default: default:
send() msg := stack[len(stack)-1]
stack = stack[:len(stack)-1]
send(msg)
} }
} }
} }

View File

@ -46,16 +46,17 @@ func (c *connAddr) toUDPAddr() *net.UDPAddr {
} }
type connInfo struct { type connInfo struct {
name string name string
addr connAddr addr connAddr
peer *peer peer *peer
linkIn chan []byte linkIn chan []byte
keysIn chan *udpKeys keysIn chan *udpKeys
timeout int // count of how many heartbeats have been missed timeout int // count of how many heartbeats have been missed
in func([]byte) in func([]byte)
out chan []byte out chan []byte
countIn uint8 countIn uint8
countOut uint8 countOut uint8
chunkSize uint16
} }
type udpKeys struct { type udpKeys struct {
@ -73,6 +74,8 @@ func (iface *udpInterface) init(core *Core, addr string) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
//iface.sock.SetReadBuffer(1048576)
//iface.sock.SetWriteBuffer(1048576)
iface.conns = make(map[connAddr]*connInfo) iface.conns = make(map[connAddr]*connInfo)
go iface.reader() go iface.reader()
} }
@ -162,12 +165,16 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
themAddrString := net.IP(themAddr[:]).String() themAddrString := net.IP(themAddr[:]).String()
themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String()) themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String())
conn = &connInfo{ conn = &connInfo{
name: themString, name: themString,
addr: connAddr(addr), addr: connAddr(addr),
peer: iface.core.peers.newPeer(&ks.box, &ks.sig), peer: iface.core.peers.newPeer(&ks.box, &ks.sig),
linkIn: make(chan []byte, 1), linkIn: make(chan []byte, 1),
keysIn: make(chan *udpKeys, 1), keysIn: make(chan *udpKeys, 1),
out: make(chan []byte, 32), out: make(chan []byte, 32),
chunkSize: 576 - 60 - 8 - 3, // max save - max ip - udp header - chunk overhead
}
if udpAddr.IP.IsLinkLocalUnicast() {
conn.chunkSize = 65535 - 8 - 3
} }
/* /*
conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) } conn.in = func (msg []byte) { conn.peer.handlePacket(msg, conn.linkIn) }
@ -236,8 +243,8 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
for msg := range conn.out { for msg := range conn.out {
chunks = chunks[:0] chunks = chunks[:0]
bs := msg bs := msg
for len(bs) > udp_chunkSize { for len(bs) > int(conn.chunkSize) {
chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:] chunks, bs = append(chunks, bs[:conn.chunkSize]), bs[conn.chunkSize:]
} }
chunks = append(chunks, bs) chunks = append(chunks, bs)
//iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg)) //iface.core.log.Println("DEBUG: out chunks:", len(chunks), len(msg))
@ -284,7 +291,7 @@ func (iface *udpInterface) handlePacket(msg []byte, addr connAddr) {
} }
func (iface *udpInterface) reader() { func (iface *udpInterface) reader() {
bs := make([]byte, 2048) // This needs to be large enough for everything... bs := make([]byte, 65536) // This needs to be large enough for everything...
for { for {
//iface.core.log.Println("Starting read") //iface.core.log.Println("Starting read")
n, udpAddr, err := iface.sock.ReadFromUDP(bs) n, udpAddr, err := iface.sock.ReadFromUDP(bs)
@ -293,9 +300,7 @@ func (iface *udpInterface) reader() {
panic(err) panic(err)
break break
} }
if n > 1500 { //iface.core.log.Println("DEBUG: recv len:", n)
panic(n)
}
//msg := append(util_getBytes(), bs[:n]...) //msg := append(util_getBytes(), bs[:n]...)
msg := bs[:n] msg := bs[:n]
var addr connAddr var addr connAddr
@ -319,7 +324,8 @@ func (iface *udpInterface) reader() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size //const udp_chunkSize = 508 // Apparently the maximum guaranteed safe IPv4 size
//const udp_chunkSize = 65535 - 3 - 8
func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) { func udp_decode(bs []byte) (chunks, chunk, count uint8, payload []byte) {
if len(bs) >= 3 { if len(bs) >= 3 {

View File

@ -21,7 +21,7 @@ const (
wire_DHTLookupResponse // inside protocol traffic header wire_DHTLookupResponse // inside protocol traffic header
wire_SearchRequest // inside protocol traffic header wire_SearchRequest // inside protocol traffic header
wire_SearchResponse // inside protocol traffic header wire_SearchResponse // inside protocol traffic header
//wire_Keys // udp key packet (boxPub, sigPub) wire_Keys // udp key packet (boxPub, sigPub)
) )
// Encode uint64 using a variable length scheme // Encode uint64 using a variable length scheme

View File

@ -177,7 +177,7 @@ func (n *node) listen() {
saddr := addr.String() saddr := addr.String()
//if _, isIn := n.peers[saddr]; isIn { continue } //if _, isIn := n.peers[saddr]; isIn { continue }
//n.peers[saddr] = struct{}{} //n.peers[saddr] = struct{}{}
n.core.DEBUG_addTCPConn(saddr) // FIXME? can result in 2 connections per peer n.core.DEBUG_maybeSendUDPKeys(saddr) // FIXME? can result in 2 connections per peer
//fmt.Println("DEBUG:", "added multicast peer:", saddr) //fmt.Println("DEBUG:", "added multicast peer:", saddr)
} }
} }
@ -188,8 +188,8 @@ func (n *node) announce() {
panic(err) panic(err)
} }
var anAddr net.TCPAddr var anAddr net.TCPAddr
tcpAddr := n.core.DEBUG_getGlobalTCPAddr() myAddr := n.core.DEBUG_getGlobalUDPAddr()
anAddr.Port = tcpAddr.Port anAddr.Port = myAddr.Port
destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr) destAddr, err := net.ResolveUDPAddr("udp6", multicastAddr)
if err != nil { if err != nil {
panic(err) panic(err)