diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 977c7421..7898a128 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" @@ -15,10 +14,11 @@ type Conn struct { core *Core nodeID *crypto.NodeID nodeMask *crypto.NodeID - session *sessionInfo + recv chan *wire_trafficPacket // Eventually gets attached to session.recv mutex *sync.RWMutex - readDeadline time.Time - writeDeadline time.Time + session *sessionInfo + readDeadline time.Time // TODO timer + writeDeadline time.Time // TODO timer expired bool } @@ -39,6 +39,7 @@ func (c *Conn) startSearch() { if sinfo != nil { c.mutex.Lock() c.session = sinfo + c.session.recv = c.recv c.nodeID, c.nodeMask = sinfo.theirAddr.GetNodeIDandMask() c.mutex.Unlock() } @@ -50,113 +51,124 @@ func (c *Conn) startSearch() { } c.core.searches.continueSearch(sinfo) } - switch { - case c.session == nil || !c.session.init.Load().(bool): + c.mutex.RLock() + defer c.mutex.RUnlock() + if c.session == nil { doSearch() - case time.Since(c.session.time.Load().(time.Time)) > 6*time.Second: - sTime := c.session.time.Load().(time.Time) - pingTime := c.session.pingTime.Load().(time.Time) - if sTime.Before(pingTime) && time.Since(pingTime) > 6*time.Second { - doSearch() - } else { - pingSend := c.session.pingSend.Load().(time.Time) - now := time.Now() - if !sTime.Before(pingTime) { - c.session.pingTime.Store(now) - } - if time.Since(pingSend) > time.Second { - c.session.pingSend.Store(now) - c.core.sessions.sendPingPong(c.session, false) + } else { + sinfo := c.session // In case c.session is somehow changed meanwhile + sinfo.worker <- func() { + switch { + case !sinfo.init: + doSearch() + case time.Since(sinfo.time) > 6*time.Second: + if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { + // TODO double check that the above condition is correct + doSearch() + } else { + c.core.sessions.ping(sinfo) + } + default: // Don't do anything, to keep traffic throttled } } } } func (c *Conn) Read(b []byte) (int, error) { - c.mutex.RLock() - defer c.mutex.RUnlock() - if c.expired { - return 0, errors.New("session is closed") - } - if c.session == nil { - return 0, errors.New("searching for remote side") - } - if init, ok := c.session.init.Load().(bool); !ok || (ok && !init) { - return 0, errors.New("waiting for remote side to accept " + c.String()) + err := func() error { + c.mutex.RLock() + defer c.mutex.RUnlock() + if c.expired { + return errors.New("session is closed") + } + return nil + }() + if err != nil { + return 0, err } select { - case p, ok := <-c.session.recv: + // TODO... + case p, ok := <-c.recv: if !ok { + c.mutex.Lock() c.expired = true + c.mutex.Unlock() return 0, errors.New("session is closed") } defer util.PutBytes(p.Payload) - err := func() error { - c.session.theirNonceMutex.Lock() - defer c.session.theirNonceMutex.Unlock() - if !c.session.nonceIsOK(&p.Nonce) { - return errors.New("packet dropped due to invalid nonce") + c.mutex.RLock() + sinfo := c.session + c.mutex.RUnlock() + var err error + sinfo.doWorker(func() { + if !sinfo.nonceIsOK(&p.Nonce) { + err = errors.New("packet dropped due to invalid nonce") + return } - bs, isOK := crypto.BoxOpen(&c.session.sharedSesKey, p.Payload, &p.Nonce) + bs, isOK := crypto.BoxOpen(&sinfo.sharedSesKey, p.Payload, &p.Nonce) if !isOK { util.PutBytes(bs) - return errors.New("packet dropped due to decryption failure") + err = errors.New("packet dropped due to decryption failure") + return } copy(b, bs) if len(bs) < len(b) { b = b[:len(bs)] } - c.session.updateNonce(&p.Nonce) - c.session.time.Store(time.Now()) - return nil - }() + sinfo.updateNonce(&p.Nonce) + sinfo.time = time.Now() + sinfo.bytesRecvd += uint64(len(b)) + }) if err != nil { return 0, err } - atomic.AddUint64(&c.session.bytesRecvd, uint64(len(b))) return len(b), nil - case <-c.session.closed: - c.expired = true - return len(b), errors.New("session is closed") + //case <-c.recvTimeout: + //case <-c.session.closed: + // c.expired = true + // return len(b), errors.New("session is closed") } } func (c *Conn) Write(b []byte) (bytesWritten int, err error) { - c.mutex.RLock() - defer c.mutex.RUnlock() - if c.expired { - return 0, errors.New("session is closed") + var sinfo *sessionInfo + err = func() error { + c.mutex.RLock() + defer c.mutex.RUnlock() + if c.expired { + return errors.New("session is closed") + } + sinfo = c.session + return nil + }() + if err != nil { + return 0, err } - if c.session == nil { + if sinfo == nil { c.core.router.doAdmin(func() { c.startSearch() }) return 0, errors.New("searching for remote side") } - defer util.PutBytes(b) - if init, ok := c.session.init.Load().(bool); !ok || (ok && !init) { - return 0, errors.New("waiting for remote side to accept " + c.String()) - } - coords := c.session.coords - c.session.myNonceMutex.Lock() - payload, nonce := crypto.BoxSeal(&c.session.sharedSesKey, b, &c.session.myNonce) - defer util.PutBytes(payload) - p := wire_trafficPacket{ - Coords: coords, - Handle: c.session.theirHandle, - Nonce: *nonce, - Payload: payload, - } - packet := p.encode() - c.session.myNonceMutex.Unlock() - atomic.AddUint64(&c.session.bytesSent, uint64(len(b))) - select { - case c.session.send <- packet: - case <-c.session.closed: - c.expired = true - return len(b), errors.New("session is closed") - } - c.session.core.router.out(packet) + //defer util.PutBytes(b) + var packet []byte + sinfo.doWorker(func() { + if !sinfo.init { + err = errors.New("waiting for remote side to accept " + c.String()) + return + } + payload, nonce := crypto.BoxSeal(&sinfo.sharedSesKey, b, &sinfo.myNonce) + defer util.PutBytes(payload) + p := wire_trafficPacket{ + Coords: sinfo.coords, + Handle: sinfo.theirHandle, + Nonce: *nonce, + Payload: payload, + } + packet = p.encode() + sinfo.bytesSent += uint64(len(b)) + }) + sinfo.core.router.out(packet) return len(b), nil } diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 4a3d8167..49ce0a98 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -63,6 +63,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (Conn, err mutex: &sync.RWMutex{}, nodeID: nodeID, nodeMask: nodeMask, + recv: make(chan *wire_trafficPacket, 32), } conn.core.router.doAdmin(func() { conn.startSearch() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 7da5162f..348a1ed6 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -23,7 +23,7 @@ package yggdrasil // The router then runs some sanity checks before passing it to the adapter import ( - "bytes" + //"bytes" "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" @@ -42,12 +42,12 @@ type router struct { out func([]byte) // packets we're sending to the network, link to peer's "in" toRecv chan router_recvPacket // packets to handle via recvPacket() recv chan<- []byte // place where the adapter pulls received packets from - send <-chan []byte // place where the adapter puts outgoing packets - reject chan<- RejectedPacket // place where we send error packets back to adapter - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff - cryptokey cryptokey - nodeinfo nodeinfo + //send <-chan []byte // place where the adapter puts outgoing packets + reject chan<- RejectedPacket // place where we send error packets back to adapter + reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff + cryptokey cryptokey + nodeinfo nodeinfo } // Packet and session info, used to check that the packet matches a valid IP range or CKR prefix before sending to the adapter. @@ -122,11 +122,11 @@ func (r *router) init(core *Core) { }() r.out = func(packet []byte) { out2 <- packet } r.toRecv = make(chan router_recvPacket, 32) - recv := make(chan []byte, 32) - send := make(chan []byte, 32) + //recv := make(chan []byte, 32) + //send := make(chan []byte, 32) reject := make(chan RejectedPacket, 32) - r.recv = recv - r.send = send + //r.recv = recv + //r.send = send r.reject = reject r.reset = make(chan struct{}, 1) r.admin = make(chan func(), 32) @@ -157,8 +157,8 @@ func (r *router) mainLoop() { r.recvPacket(rp.bs, rp.sinfo) case p := <-r.in: r.handleIn(p) - case p := <-r.send: - r.sendPacket(p) + //case p := <-r.send: + // r.sendPacket(p) case info := <-r.core.dht.peers: r.core.dht.insertPeer(info) case <-r.reset: @@ -181,6 +181,7 @@ func (r *router) mainLoop() { } } +/* // Checks a packet's to/from address to make sure it's in the allowed range. // If a session to the destination exists, gets the session and passes the packet to it. // If no session exists, it triggers (or continues) a search. @@ -353,6 +354,7 @@ func (r *router) sendPacket(bs []byte) { sinfo.send <- bs } } +*/ // Called for incoming traffic by the session worker for that connection. // Checks that the IP address is correct (matches the session) and passes the packet to the adapter. @@ -429,7 +431,11 @@ func (r *router) handleTraffic(packet []byte) { if !isIn { return } - sinfo.recv <- &p + select { + case sinfo.recv <- &p: // FIXME ideally this should be FIFO + default: + util.PutBytes(p.Payload) + } } // Handles protocol traffic by decrypting it, checking its type, and passing it to the appropriate handler for that traffic type. diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index b0bba2d0..15d346a1 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -18,38 +18,50 @@ import ( // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { - core *Core // - reconfigure chan chan error // - theirAddr address.Address // - theirSubnet address.Subnet // - theirPermPub crypto.BoxPubKey // - theirSesPub crypto.BoxPubKey // - mySesPub crypto.BoxPubKey // - mySesPriv crypto.BoxPrivKey // - sharedSesKey crypto.BoxSharedKey // derived from session keys - theirHandle crypto.Handle // - myHandle crypto.Handle // - theirNonce crypto.BoxNonce // - theirNonceMask uint64 // - theirNonceMutex sync.Mutex // protects the above - myNonce crypto.BoxNonce // - myNonceMutex sync.Mutex // protects the above - theirMTU uint16 // - myMTU uint16 // - wasMTUFixed bool // Was the MTU fixed by a receive error? - time atomic.Value // time.Time // Time we last received a packet - mtuTime atomic.Value // time.Time // time myMTU was last changed - pingTime atomic.Value // time.Time // time the first ping was sent since the last received packet - pingSend atomic.Value // time.Time // time the last ping was sent - coords []byte // coords of destination - packet []byte // a buffered packet, sent immediately on ping/pong - init atomic.Value // bool // Reset if coords change - send chan []byte // - recv chan *wire_trafficPacket // - closed chan interface{} // - tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation - bytesSent uint64 // Bytes of real traffic sent in this session - bytesRecvd uint64 // Bytes of real traffic received in this session + core *Core // + reconfigure chan chan error // + theirAddr address.Address // + theirSubnet address.Subnet // + theirPermPub crypto.BoxPubKey // + theirSesPub crypto.BoxPubKey // + mySesPub crypto.BoxPubKey // + mySesPriv crypto.BoxPrivKey // + sharedSesKey crypto.BoxSharedKey // derived from session keys + theirHandle crypto.Handle // + myHandle crypto.Handle // + theirNonce crypto.BoxNonce // + theirNonceMask uint64 // + myNonce crypto.BoxNonce // + theirMTU uint16 // + myMTU uint16 // + wasMTUFixed bool // Was the MTU fixed by a receive error? + time time.Time // Time we last received a packet + mtuTime time.Time // time myMTU was last changed + pingTime time.Time // time the first ping was sent since the last received packet + pingSend time.Time // time the last ping was sent + coords []byte // coords of destination + packet []byte // a buffered packet, sent immediately on ping/pong + init bool // Reset if coords change + tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation + bytesSent uint64 // Bytes of real traffic sent in this session + bytesRecvd uint64 // Bytes of real traffic received in this session + worker chan func() // Channel to send work to the session worker + recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn +} + +func (sinfo *sessionInfo) doWorker(f func()) { + done := make(chan struct{}) + sinfo.worker <- func() { + f() + close(done) + } + <-done +} + +func (sinfo *sessionInfo) workerMain() { + for f := range sinfo.worker { + f() + } } // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -89,16 +101,19 @@ func (s *sessionInfo) update(p *sessionPing) bool { // allocate enough space for additional coords s.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) } - now := time.Now() - s.time.Store(now) - atomic.StoreInt64(&s.tstamp, p.Tstamp) - s.init.Store(true) + s.time = time.Now() + s.tstamp = p.Tstamp + s.init = true return true } // Returns true if the session has been idle for longer than the allowed timeout. func (s *sessionInfo) timedout() bool { - return time.Since(s.time.Load().(time.Time)) > time.Minute + var timedout bool + s.doWorker(func() { + timedout = time.Since(s.time) > time.Minute + }) + return timedout } // Struct of all active sessions. @@ -282,10 +297,10 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.theirMTU = 1280 sinfo.myMTU = 1280 now := time.Now() - sinfo.time.Store(now) - sinfo.mtuTime.Store(now) - sinfo.pingTime.Store(now) - sinfo.pingSend.Store(now) + sinfo.time = now + sinfo.mtuTime = now + sinfo.pingTime = now + sinfo.pingSend = now higher := false for idx := range ss.core.boxPub { if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] { @@ -305,14 +320,13 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) - sinfo.send = make(chan []byte, 32) - sinfo.recv = make(chan *wire_trafficPacket, 32) - sinfo.closed = make(chan interface{}) + sinfo.worker = make(chan func(), 1) ss.sinfos[sinfo.myHandle] = &sinfo ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.addrToPerm[sinfo.theirAddr] = &sinfo.theirPermPub ss.subnetToPerm[sinfo.theirSubnet] = &sinfo.theirPermPub + go sinfo.workerMain() return &sinfo } @@ -366,14 +380,12 @@ func (ss *sessions) cleanup() { // Closes a session, removing it from sessions maps and killing the worker goroutine. func (sinfo *sessionInfo) close() { - close(sinfo.closed) delete(sinfo.core.sessions.sinfos, sinfo.myHandle) delete(sinfo.core.sessions.byMySes, sinfo.mySesPub) delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) delete(sinfo.core.sessions.addrToPerm, sinfo.theirAddr) delete(sinfo.core.sessions.subnetToPerm, sinfo.theirSubnet) - close(sinfo.send) - close(sinfo.recv) + close(sinfo.worker) } // Returns a session ping appropriate for the given session info. @@ -436,7 +448,7 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) { packet := p.encode() ss.core.router.out(packet) if !isPong { - sinfo.pingSend.Store(time.Now()) + sinfo.pingSend = time.Now() } } @@ -468,29 +480,36 @@ func (ss *sessions) handlePing(ping *sessionPing) { mutex: &sync.RWMutex{}, nodeID: crypto.GetNodeID(&sinfo.theirPermPub), nodeMask: &crypto.NodeID{}, + recv: make(chan *wire_trafficPacket, 32), } for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF } + sinfo.recv = conn.recv ss.listener.conn <- conn } else { ss.core.log.Debugln("Received new session but there is no listener, ignoring") } ss.listenerMutex.Unlock() } - // Update the session - if !sinfo.update(ping) { /*panic("Should not happen in testing")*/ - return - } - if !ping.IsPong { - ss.sendPingPong(sinfo, true) - } - if sinfo.packet != nil { - // send - var bs []byte - bs, sinfo.packet = sinfo.packet, nil - ss.core.router.sendPacket(bs) - } + sinfo.doWorker(func() { + // Update the session + if !sinfo.update(ping) { /*panic("Should not happen in testing")*/ + return + } + if !ping.IsPong { + ss.sendPingPong(sinfo, true) + } + if sinfo.packet != nil { + /* FIXME this needs to live in the net.Conn or something, needs work in Write + // send + var bs []byte + bs, sinfo.packet = sinfo.packet, nil + ss.core.router.sendPacket(bs) // FIXME this needs to live in the net.Conn or something, needs work in Write + */ + sinfo.packet = nil + } + }) } // Get the MTU of the session. @@ -536,6 +555,8 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) { // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. func (ss *sessions) resetInits() { for _, sinfo := range ss.sinfos { - sinfo.init.Store(false) + sinfo.doWorker(func() { + sinfo.init = false + }) } }