diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 43db934a..4c4d83ca 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -18,6 +18,7 @@ type awdlInterface struct { shutdown chan bool peer *peer link *linkInterface + stream stream } func (l *awdl) init(c *Core) error { @@ -41,6 +42,8 @@ func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*a toAWDL: toAWDL, shutdown: make(chan bool), } + intf.stream.init() + go intf.handler() l.mutex.Lock() l.interfaces[name] = &intf l.mutex.Unlock() @@ -71,6 +74,9 @@ func (l *awdl) shutdown(identity string) error { } func (ai *awdlInterface) handler() { + inPacket := func(packet []byte) { + ai.link.fromlink <- packet + } for { select { case <-ai.shutdown: @@ -78,7 +84,7 @@ func (ai *awdlInterface) handler() { case <-ai.link.shutdown: return case in := <-ai.fromAWDL: - ai.link.fromlink <- in + ai.stream.write(in, inPacket) case out := <-ai.link.tolink: ai.toAWDL <- out } diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 1f26b69a..70905619 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -199,8 +199,8 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { return err } - if err := c.awdl.init(c); err != nil { - c.log.Println("Failed to start AWDL interface") + if err := c.link.init(c); err != nil { + c.log.Println("Failed to start link interfaces") return err } diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 386fef88..32b5ea70 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -18,6 +18,7 @@ type link struct { } type linkInterface struct { + name string link *link fromlink chan []byte tolink chan []byte @@ -32,55 +33,75 @@ func (l *link) init(c *Core) error { l.interfaces = make(map[string]*linkInterface) l.mutex.Unlock() + if err := l.core.awdl.init(c); err != nil { + l.core.log.Println("Failed to start AWDL interface") + return err + } + return nil } func (l *link) create(fromlink chan []byte, tolink chan []byte, name string) (*linkInterface, error) { + l.mutex.Lock() + defer l.mutex.Unlock() + if _, ok := l.interfaces[name]; ok { + return nil, errors.New("Interface with this name already exists") + } intf := linkInterface{ + name: name, link: l, fromlink: fromlink, tolink: tolink, shutdown: make(chan bool), } - l.mutex.Lock() - l.interfaces[name] = &intf - l.mutex.Unlock() + l.interfaces[intf.name] = &intf + go intf.start() + return &intf, nil +} + +func (intf *linkInterface) start() { myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() - meta.box = l.core.boxPub - meta.sig = l.core.sigPub + meta.box = intf.link.core.boxPub + meta.sig = intf.link.core.sigPub meta.link = *myLinkPub metaBytes := meta.encode() - tolink <- metaBytes - metaBytes = <-fromlink + //intf.link.core.log.Println("start: intf.tolink <- metaBytes") + intf.tolink <- metaBytes + //intf.link.core.log.Println("finish: intf.tolink <- metaBytes") + //intf.link.core.log.Println("start: metaBytes = <-intf.fromlink") + metaBytes = <-intf.fromlink + //intf.link.core.log.Println("finish: metaBytes = <-intf.fromlink") meta = version_metadata{} if !meta.decode(metaBytes) || !meta.check() { - return nil, errors.New("Metadata decode failure") + intf.link.core.log.Println("Metadata decode failure") + return } base := version_getBaseMetadata() if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { - return nil, errors.New("Failed to connect to node: " + name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + return } shared := crypto.GetSharedKey(myLinkPriv, &meta.link) - //shared := crypto.GetSharedKey(&l.core.boxPriv, boxPubKey) - intf.peer = l.core.peers.newPeer(&meta.box, &meta.sig, shared, name) - if intf.peer != nil { - intf.peer.linkOut = make(chan []byte, 1) // protocol traffic - intf.peer.out = func(msg []byte) { - defer func() { recover() }() - intf.tolink <- msg - } // called by peer.sendPacket() - l.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle - intf.peer.close = func() { - close(intf.fromlink) - close(intf.tolink) - } - go intf.handler() - go intf.peer.linkLoop() - return &intf, nil + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) + if intf.peer == nil { + intf.link.mutex.Lock() + delete(intf.link.interfaces, intf.name) + intf.link.mutex.Unlock() + return } - delete(l.interfaces, name) - return nil, errors.New("l.core.peers.newPeer failed") + intf.peer.linkOut = make(chan []byte, 1) // protocol traffic + intf.peer.out = func(msg []byte) { + defer func() { recover() }() + intf.tolink <- msg + } // called by peer.sendPacket() + intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle + intf.peer.close = func() { + close(intf.fromlink) + close(intf.tolink) + } + go intf.handler() + go intf.peer.linkLoop() } func (l *link) getInterface(identity string) *linkInterface { diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 5d67aceb..a4e84c9e 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -9,7 +9,6 @@ import ( type stream struct { buffer []byte - cursor int } const streamMsgSize = 2048 + 65535 @@ -18,7 +17,6 @@ var streamMsg = [...]byte{0xde, 0xad, 0xb1, 0x75} // "dead bits" func (s *stream) init() { s.buffer = make([]byte, 2*streamMsgSize) - s.cursor = 0 } // This reads from the channel into a []byte buffer for incoming messages. It @@ -76,36 +74,3 @@ func stream_chopMsg(bs *[]byte) ([]byte, bool, error) { (*bs) = (*bs)[msgEnd:] return msg, true, nil } - -/* -func (s *stream) chopMsg() ([]byte, bool, error) { - // Returns msg, ok, err - if len(s.buffer) < len(streamMsg) { - fmt.Println("*** too short") - return nil, false, nil - } - for idx := range streamMsg { - if s.buffer[idx] != streamMsg[idx] { - fmt.Println("*** bad message") - return nil, false, errors.New("bad message") - } - } - msgLen, msgLenLen := wire_decode_uint64((s.buffer)[len(streamMsg):]) - if msgLen > streamMsgSize { - fmt.Println("*** oversized message") - return nil, false, errors.New("oversized message") - } - msgBegin := len(streamMsg) + msgLenLen - msgEnd := msgBegin + int(msgLen) - if msgLenLen == 0 || len(s.buffer) < msgEnd { - // We don't have the full message - // Need to buffer this and wait for the rest to come in - fmt.Println("*** still waiting") - return nil, false, nil - } - msg := s.buffer[msgBegin:msgEnd] - s.buffer = s.buffer[msgEnd:] - fmt.Println("*** done") - return msg, true, nil -} -*/