start converting the peer struct into an actor

This commit is contained in:
Arceliar 2019-08-24 12:46:24 -05:00
parent ef15a6bd79
commit 775fb535dc
2 changed files with 26 additions and 16 deletions

View File

@ -12,6 +12,8 @@ import (
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util" "github.com/yggdrasil-network/yggdrasil-go/src/util"
"github.com/Arceliar/phony"
) )
// The peers struct represents peers with an active connection. // The peers struct represents peers with an active connection.
@ -97,6 +99,7 @@ type peer struct {
bytesSent uint64 // To track bandwidth usage for getPeers bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element // BUG: sync/atomic, 32 bit platforms need the above to be the first element
phony.Actor
core *Core core *Core
intf *linkInterface intf *linkInterface
port switchPort port switchPort
@ -206,7 +209,7 @@ func (p *peer) linkLoop() {
if !ok { if !ok {
return return
} }
p.sendSwitchMsg() <-p.SyncExec(p._sendSwitchMsg)
case dinfo = <-p.dinfo: case dinfo = <-p.dinfo:
case _ = <-tick.C: case _ = <-tick.C:
if dinfo != nil { if dinfo != nil {
@ -227,20 +230,19 @@ func (p *peer) handlePacket(packet []byte) {
} }
switch pType { switch pType {
case wire_Traffic: case wire_Traffic:
p.handleTraffic(packet, pTypeLen) p._handleTraffic(packet, pTypeLen)
case wire_ProtocolTraffic: case wire_ProtocolTraffic:
p.handleTraffic(packet, pTypeLen) p._handleTraffic(packet, pTypeLen)
case wire_LinkProtocolTraffic: case wire_LinkProtocolTraffic:
p.handleLinkTraffic(packet) p._handleLinkTraffic(packet)
default: default:
util.PutBytes(packet) util.PutBytes(packet)
} }
return
} }
// Called to handle traffic or protocolTraffic packets. // Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
func (p *peer) handleTraffic(packet []byte, pTypeLen int) { func (p *peer) _handleTraffic(packet []byte, pTypeLen int) {
table := p.core.switchTable.getTable() table := p.core.switchTable.getTable()
if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { if _, isIn := table.elems[p.port]; !isIn && p.port != 0 {
// Drop traffic if the peer isn't in the switch // Drop traffic if the peer isn't in the switch
@ -249,8 +251,14 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
p.core.switchTable.packetIn <- packet p.core.switchTable.packetIn <- packet
} }
func (p *peer) sendPacketsFrom(from phony.IActor, packets [][]byte) {
p.EnqueueFrom(from, func() {
p._sendPackets(packets)
})
}
// This just calls p.out(packet) for now. // This just calls p.out(packet) for now.
func (p *peer) sendPackets(packets [][]byte) { func (p *peer) _sendPackets(packets [][]byte) {
// Is there ever a case where something more complicated is needed? // Is there ever a case where something more complicated is needed?
// What if p.out blocks? // What if p.out blocks?
var size int var size int
@ -263,7 +271,7 @@ func (p *peer) sendPackets(packets [][]byte) {
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
// It sends it to p.linkOut, which bypasses the usual packet queues. // It sends it to p.linkOut, which bypasses the usual packet queues.
func (p *peer) sendLinkPacket(packet []byte) { func (p *peer) _sendLinkPacket(packet []byte) {
innerPayload, innerNonce := crypto.BoxSeal(&p.linkShared, packet, nil) innerPayload, innerNonce := crypto.BoxSeal(&p.linkShared, packet, nil)
innerLinkPacket := wire_linkProtoTrafficPacket{ innerLinkPacket := wire_linkProtoTrafficPacket{
Nonce: *innerNonce, Nonce: *innerNonce,
@ -281,7 +289,7 @@ func (p *peer) sendLinkPacket(packet []byte) {
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
// Identifies the link traffic type and calls the appropriate handler. // Identifies the link traffic type and calls the appropriate handler.
func (p *peer) handleLinkTraffic(bs []byte) { func (p *peer) _handleLinkTraffic(bs []byte) {
packet := wire_linkProtoTrafficPacket{} packet := wire_linkProtoTrafficPacket{}
if !packet.decode(bs) { if !packet.decode(bs) {
return return
@ -304,14 +312,14 @@ func (p *peer) handleLinkTraffic(bs []byte) {
} }
switch pType { switch pType {
case wire_SwitchMsg: case wire_SwitchMsg:
p.handleSwitchMsg(payload) p._handleSwitchMsg(payload)
default: default:
util.PutBytes(bs) util.PutBytes(bs)
} }
} }
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them. // Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
func (p *peer) sendSwitchMsg() { func (p *peer) _sendSwitchMsg() {
msg := p.core.switchTable.getMsg() msg := p.core.switchTable.getMsg()
if msg == nil { if msg == nil {
return return
@ -323,12 +331,12 @@ func (p *peer) sendSwitchMsg() {
Sig: *crypto.Sign(&p.core.sigPriv, bs), Sig: *crypto.Sign(&p.core.sigPriv, bs),
}) })
packet := msg.encode() packet := msg.encode()
p.sendLinkPacket(packet) p._sendLinkPacket(packet)
} }
// Handles a switchMsg from the peer, checking signatures and passing good messages to the switch. // Handles a switchMsg from the peer, checking signatures and passing good messages to the switch.
// Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins). // Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins).
func (p *peer) handleSwitchMsg(packet []byte) { func (p *peer) _handleSwitchMsg(packet []byte) {
var msg switchMsg var msg switchMsg
if !msg.decode(packet) { if !msg.decode(packet) {
return return

View File

@ -703,7 +703,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo
if best != nil { if best != nil {
// Send to the best idle next hop // Send to the best idle next hop
delete(idle, best.port) delete(idle, best.port)
best.sendPackets([][]byte{packet}) best.sendPacketsFrom(nil, [][]byte{packet})
return true return true
} }
// Didn't find anyone idle to send it to // Didn't find anyone idle to send it to
@ -817,7 +817,8 @@ func (t *switchTable) handleIdle(port switchPort) bool {
} }
} }
if len(packets) > 0 { if len(packets) > 0 {
to.sendPackets(packets) // TODO rewrite if/when the switch becomes an actor
to.sendPacketsFrom(nil, packets)
return true return true
} }
return false return false
@ -830,7 +831,8 @@ func (t *switchTable) doWorker() {
// Keep sending packets to the router // Keep sending packets to the router
self := t.core.peers.getPorts()[0] self := t.core.peers.getPorts()[0]
for bs := range sendingToRouter { for bs := range sendingToRouter {
self.sendPackets([][]byte{bs}) // TODO remove this ugly mess of goroutines if/when the switch becomes an actor
<-self.SyncExec(func() { self._sendPackets([][]byte{bs}) })
} }
}() }()
go func() { go func() {