From c8e1be0f73255dddbc5ac5845c0ba2c1edc6666a Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sat, 19 Jan 2019 16:37:45 -0600 Subject: [PATCH] link/stream refactoring bugfixes and gofmt --- contrib/ansible/genkeys.go | 20 ++++++++++---------- src/yggdrasil/awdl.go | 10 +++++----- src/yggdrasil/stream.go | 21 +++++++++++---------- src/yggdrasil/tcp.go | 7 ++----- 4 files changed, 28 insertions(+), 30 deletions(-) diff --git a/contrib/ansible/genkeys.go b/contrib/ansible/genkeys.go index 22418a0c..81397386 100644 --- a/contrib/ansible/genkeys.go +++ b/contrib/ansible/genkeys.go @@ -35,22 +35,22 @@ func main() { } var encryptionKeys []keySet - for i := 0; i < *numHosts + 1; i++ { + for i := 0; i < *numHosts+1; i++ { encryptionKeys = append(encryptionKeys, newBoxKey()) } encryptionKeys = sortKeySetArray(encryptionKeys) - for i := 0; i < *keyTries - *numHosts - 1; i++ { - encryptionKeys[0] = newBoxKey(); + for i := 0; i < *keyTries-*numHosts-1; i++ { + encryptionKeys[0] = newBoxKey() encryptionKeys = bubbleUpTo(encryptionKeys, 0) } var signatureKeys []keySet - for i := 0; i < *numHosts + 1; i++ { + for i := 0; i < *numHosts+1; i++ { signatureKeys = append(signatureKeys, newSigKey()) } signatureKeys = sortKeySetArray(signatureKeys) - for i := 0; i < *keyTries - *numHosts - 1; i++ { - signatureKeys[0] = newSigKey(); + for i := 0; i < *keyTries-*numHosts-1; i++ { + signatureKeys[0] = newSigKey() signatureKeys = bubbleUpTo(signatureKeys, 0) } @@ -112,11 +112,11 @@ func sortKeySetArray(sets []keySet) []keySet { } func bubbleUpTo(sets []keySet, num int) []keySet { - for i := 0; i < len(sets) - num - 1; i++ { - if isBetter(sets[i + 1].id, sets[i].id) { + for i := 0; i < len(sets)-num-1; i++ { + if isBetter(sets[i+1].id, sets[i].id) { var tmp = sets[i] - sets[i] = sets[i + 1] - sets[i + 1] = tmp + sets[i] = sets[i+1] + sets[i+1] = tmp } } return sets diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 4c4d83ca..573b6e72 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -42,7 +42,10 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*a toAWDL: toAWDL, shutdown: make(chan bool), } - intf.stream.init() + inPacket := func(packet []byte) { + intf.link.fromlink <- packet + } + intf.stream.init(inPacket) go intf.handler() l.mutex.Lock() l.interfaces[name] = &intf @@ -74,9 +77,6 @@ func (l *awdl) shutdown(identity string) error { } func (ai *awdlInterface) handler() { - inPacket := func(packet []byte) { - ai.link.fromlink <- packet - } for { select { case <-ai.shutdown: @@ -84,7 +84,7 @@ func (ai *awdlInterface) handler() { case <-ai.link.shutdown: return case in := <-ai.fromAWDL: - ai.stream.write(in, inPacket) + ai.stream.handleInput(in) case out := <-ai.link.tolink: ai.toAWDL <- out } diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index a4e84c9e..43eff3ff 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -8,15 +8,16 @@ import ( ) type stream struct { - buffer []byte + inputBuffer []byte + handlePacket func([]byte) } const streamMsgSize = 2048 + 65535 var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" -func (s *stream) init() { - s.buffer = make([]byte, 2*streamMsgSize) +func (s *stream) init(in func([]byte)) { + s.handlePacket = in } // This reads from the channel into a []byte buffer for incoming messages. It @@ -24,11 +25,10 @@ func (s *stream) init() { // to the peer struct via the provided `in func([]byte)` argument. Then it // shifts the incomplete fragments of data forward so future reads won't // overwrite it. -func (s *stream) write(bs []byte, in func([]byte)) error { - frag := s.buffer[:0] - if n := len(bs); n > 0 { - frag = append(frag, bs[:n]...) - msg, ok, err2 := stream_chopMsg(&frag) +func (s *stream) handleInput(bs []byte) error { + if len(bs) > 0 { + s.inputBuffer = append(s.inputBuffer, bs...) + msg, ok, err2 := stream_chopMsg(&s.inputBuffer) if err2 != nil { return fmt.Errorf("message error: %v", err2) } @@ -37,8 +37,9 @@ func (s *stream) write(bs []byte, in func([]byte)) error { return nil } newMsg := append(util.GetBytes(), msg...) - in(newMsg) - util.Yield() + s.inputBuffer = append(s.inputBuffer[:0], s.inputBuffer...) + s.handlePacket(newMsg) + util.Yield() // Make sure we give up control to the scheduler } return nil } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 682796ec..1d4ec994 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -378,9 +378,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { // E.g. over different interfaces p := iface.core.peers.newPeer(&meta.box, &meta.sig, crypto.GetSharedKey(myLinkPriv, &meta.link), sock.RemoteAddr().String()) p.linkOut = make(chan []byte, 1) - in := func(bs []byte) { - p.handlePacket(bs) - } out := make(chan []byte, 1) defer close(out) go func() { @@ -443,7 +440,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) iface.core.log.Printf("Connected: %s, source: %s", themString, us) - iface.stream.init() + iface.stream.init(p.handlePacket) bs := make([]byte, 2*streamMsgSize) var n int for { @@ -455,7 +452,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { break } if n > 0 { - iface.stream.write(bs[:n], in) + iface.stream.handleInput(bs[:n]) } } if err == nil {