mirror of
https://github.com/yggdrasil-network/yggdrasil-go.git
synced 2024-11-23 18:15:24 +00:00
refactor sessions to store a pointer to router instead of core
This commit is contained in:
parent
5bb85cf07b
commit
436c84ca33
@ -252,7 +252,7 @@ func (c *Conn) Close() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) LocalAddr() crypto.NodeID {
|
func (c *Conn) LocalAddr() crypto.NodeID {
|
||||||
return *crypto.GetNodeID(&c.session.core.boxPub)
|
return *crypto.GetNodeID(&c.core.boxPub)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) RemoteAddr() crypto.NodeID {
|
func (c *Conn) RemoteAddr() crypto.NodeID {
|
||||||
|
@ -75,7 +75,7 @@ func (r *router) init(core *Core) {
|
|||||||
r.core.config.Mutex.RUnlock()
|
r.core.config.Mutex.RUnlock()
|
||||||
r.dht.init(r)
|
r.dht.init(r)
|
||||||
r.searches.init(r)
|
r.searches.init(r)
|
||||||
r.sessions.init(r.core)
|
r.sessions.init(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Starts the mainLoop goroutine.
|
// Starts the mainLoop goroutine.
|
||||||
|
@ -40,7 +40,7 @@ func (h nonceHeap) peek() *crypto.BoxNonce { return &h[len(h)-1] }
|
|||||||
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
|
// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
|
||||||
type sessionInfo struct {
|
type sessionInfo struct {
|
||||||
phony.Actor // Protects all of the below, use it any time you read/change the contents of a session
|
phony.Actor // Protects all of the below, use it any time you read/change the contents of a session
|
||||||
core *Core //
|
sessions *sessions //
|
||||||
reconfigure chan chan error //
|
reconfigure chan chan error //
|
||||||
theirAddr address.Address //
|
theirAddr address.Address //
|
||||||
theirSubnet address.Subnet //
|
theirSubnet address.Subnet //
|
||||||
@ -136,7 +136,7 @@ func (s *sessionInfo) _update(p *sessionPing) bool {
|
|||||||
// Sessions are indexed by handle.
|
// Sessions are indexed by handle.
|
||||||
// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
|
// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
|
||||||
type sessions struct {
|
type sessions struct {
|
||||||
core *Core
|
router *router
|
||||||
listener *Listener
|
listener *Listener
|
||||||
listenerMutex sync.Mutex
|
listenerMutex sync.Mutex
|
||||||
reconfigure chan chan error
|
reconfigure chan chan error
|
||||||
@ -149,8 +149,8 @@ type sessions struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the session struct.
|
// Initializes the session struct.
|
||||||
func (ss *sessions) init(core *Core) {
|
func (ss *sessions) init(r *router) {
|
||||||
ss.core = core
|
ss.router = r
|
||||||
ss.reconfigure = make(chan chan error, 1)
|
ss.reconfigure = make(chan chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@ -213,18 +213,18 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
sinfo := sessionInfo{}
|
sinfo := sessionInfo{}
|
||||||
sinfo.core = ss.core
|
sinfo.sessions = ss
|
||||||
sinfo.reconfigure = make(chan chan error, 1)
|
sinfo.reconfigure = make(chan chan error, 1)
|
||||||
sinfo.theirPermPub = *theirPermKey
|
sinfo.theirPermPub = *theirPermKey
|
||||||
sinfo.sharedPermKey = *ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub)
|
sinfo.sharedPermKey = *ss.getSharedKey(&ss.router.core.boxPriv, &sinfo.theirPermPub)
|
||||||
pub, priv := crypto.NewBoxKeys()
|
pub, priv := crypto.NewBoxKeys()
|
||||||
sinfo.mySesPub = *pub
|
sinfo.mySesPub = *pub
|
||||||
sinfo.mySesPriv = *priv
|
sinfo.mySesPriv = *priv
|
||||||
sinfo.myNonce = *crypto.NewBoxNonce()
|
sinfo.myNonce = *crypto.NewBoxNonce()
|
||||||
sinfo.theirMTU = 1280
|
sinfo.theirMTU = 1280
|
||||||
ss.core.config.Mutex.RLock()
|
ss.router.core.config.Mutex.RLock()
|
||||||
sinfo.myMTU = uint16(ss.core.config.Current.IfMTU)
|
sinfo.myMTU = uint16(ss.router.core.config.Current.IfMTU)
|
||||||
ss.core.config.Mutex.RUnlock()
|
ss.router.core.config.Mutex.RUnlock()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
sinfo.timeOpened = now
|
sinfo.timeOpened = now
|
||||||
sinfo.time = now
|
sinfo.time = now
|
||||||
@ -234,11 +234,11 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
|
|||||||
sinfo.init = make(chan struct{})
|
sinfo.init = make(chan struct{})
|
||||||
sinfo.cancel = util.NewCancellation()
|
sinfo.cancel = util.NewCancellation()
|
||||||
higher := false
|
higher := false
|
||||||
for idx := range ss.core.boxPub {
|
for idx := range ss.router.core.boxPub {
|
||||||
if ss.core.boxPub[idx] > sinfo.theirPermPub[idx] {
|
if ss.router.core.boxPub[idx] > sinfo.theirPermPub[idx] {
|
||||||
higher = true
|
higher = true
|
||||||
break
|
break
|
||||||
} else if ss.core.boxPub[idx] < sinfo.theirPermPub[idx] {
|
} else if ss.router.core.boxPub[idx] < sinfo.theirPermPub[idx] {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -260,8 +260,8 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
|
|||||||
go func() {
|
go func() {
|
||||||
// Run cleanup when the session is canceled
|
// Run cleanup when the session is canceled
|
||||||
<-sinfo.cancel.Finished()
|
<-sinfo.cancel.Finished()
|
||||||
sinfo.core.router.doAdmin(func() {
|
sinfo.sessions.router.doAdmin(func() {
|
||||||
sinfo.core.router.sessions.removeSession(&sinfo)
|
sinfo.sessions.removeSession(&sinfo)
|
||||||
})
|
})
|
||||||
}()
|
}()
|
||||||
go sinfo.startWorkers()
|
go sinfo.startWorkers()
|
||||||
@ -298,18 +298,18 @@ func (ss *sessions) cleanup() {
|
|||||||
|
|
||||||
// Closes a session, removing it from sessions maps.
|
// Closes a session, removing it from sessions maps.
|
||||||
func (ss *sessions) removeSession(sinfo *sessionInfo) {
|
func (ss *sessions) removeSession(sinfo *sessionInfo) {
|
||||||
if s := sinfo.core.router.sessions.sinfos[sinfo.myHandle]; s == sinfo {
|
if s := sinfo.sessions.sinfos[sinfo.myHandle]; s == sinfo {
|
||||||
delete(sinfo.core.router.sessions.sinfos, sinfo.myHandle)
|
delete(sinfo.sessions.sinfos, sinfo.myHandle)
|
||||||
delete(sinfo.core.router.sessions.byTheirPerm, sinfo.theirPermPub)
|
delete(sinfo.sessions.byTheirPerm, sinfo.theirPermPub)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a session ping appropriate for the given session info.
|
// Returns a session ping appropriate for the given session info.
|
||||||
func (sinfo *sessionInfo) _getPing() sessionPing {
|
func (sinfo *sessionInfo) _getPing() sessionPing {
|
||||||
loc := sinfo.core.switchTable.getLocator()
|
loc := sinfo.sessions.router.core.switchTable.getLocator()
|
||||||
coords := loc.getCoords()
|
coords := loc.getCoords()
|
||||||
ping := sessionPing{
|
ping := sessionPing{
|
||||||
SendPermPub: sinfo.core.boxPub,
|
SendPermPub: sinfo.sessions.router.core.boxPub,
|
||||||
Handle: sinfo.myHandle,
|
Handle: sinfo.myHandle,
|
||||||
SendSesPub: sinfo.mySesPub,
|
SendSesPub: sinfo.mySesPub,
|
||||||
Tstamp: time.Now().Unix(),
|
Tstamp: time.Now().Unix(),
|
||||||
@ -360,13 +360,13 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) {
|
|||||||
p := wire_protoTrafficPacket{
|
p := wire_protoTrafficPacket{
|
||||||
Coords: sinfo.coords,
|
Coords: sinfo.coords,
|
||||||
ToKey: sinfo.theirPermPub,
|
ToKey: sinfo.theirPermPub,
|
||||||
FromKey: sinfo.core.boxPub,
|
FromKey: sinfo.sessions.router.core.boxPub,
|
||||||
Nonce: *nonce,
|
Nonce: *nonce,
|
||||||
Payload: payload,
|
Payload: payload,
|
||||||
}
|
}
|
||||||
packet := p.encode()
|
packet := p.encode()
|
||||||
// TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first
|
// TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first
|
||||||
sinfo.core.router.EnqueueFrom(sinfo, func() { sinfo.core.router.out(packet) })
|
sinfo.sessions.router.EnqueueFrom(sinfo, func() { sinfo.sessions.router.out(packet) })
|
||||||
if sinfo.pingTime.Before(sinfo.time) {
|
if sinfo.pingTime.Before(sinfo.time) {
|
||||||
sinfo.pingTime = time.Now()
|
sinfo.pingTime = time.Now()
|
||||||
}
|
}
|
||||||
@ -390,7 +390,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
|
|||||||
if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo {
|
if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo {
|
||||||
panic("This should not happen")
|
panic("This should not happen")
|
||||||
}
|
}
|
||||||
conn := newConn(ss.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo)
|
conn := newConn(ss.router.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo)
|
||||||
for i := range conn.nodeMask {
|
for i := range conn.nodeMask {
|
||||||
conn.nodeMask[i] = 0xFF
|
conn.nodeMask[i] = 0xFF
|
||||||
}
|
}
|
||||||
@ -400,7 +400,7 @@ func (ss *sessions) handlePing(ping *sessionPing) {
|
|||||||
ss.listenerMutex.Unlock()
|
ss.listenerMutex.Unlock()
|
||||||
}
|
}
|
||||||
if sinfo != nil {
|
if sinfo != nil {
|
||||||
sinfo.EnqueueFrom(&ss.core.router, func() {
|
sinfo.EnqueueFrom(ss.router, func() {
|
||||||
// Update the session
|
// Update the session
|
||||||
if !sinfo._update(ping) { /*panic("Should not happen in testing")*/
|
if !sinfo._update(ping) { /*panic("Should not happen in testing")*/
|
||||||
return
|
return
|
||||||
@ -468,7 +468,7 @@ func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) {
|
|||||||
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
|
// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
|
||||||
func (ss *sessions) reset() {
|
func (ss *sessions) reset() {
|
||||||
for _, sinfo := range ss.sinfos {
|
for _, sinfo := range ss.sinfos {
|
||||||
sinfo.EnqueueFrom(&ss.core.router, func() {
|
sinfo.EnqueueFrom(ss.router, func() {
|
||||||
sinfo.reset = true
|
sinfo.reset = true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -639,7 +639,10 @@ func (sinfo *sessionInfo) sendWorker() {
|
|||||||
util.PutBytes(msg.Message)
|
util.PutBytes(msg.Message)
|
||||||
util.PutBytes(p.Payload)
|
util.PutBytes(p.Payload)
|
||||||
// Send the packet
|
// Send the packet
|
||||||
sinfo.core.router.out(packet)
|
// TODO replace this with a send to the peer struct if that becomes an actor
|
||||||
|
sinfo.sessions.router.EnqueueFrom(sinfo, func() {
|
||||||
|
sinfo.sessions.router.out(packet)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
ch <- callback
|
ch <- callback
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user