531 lines
13 KiB
Go

package yggdrasil
// TODO cleanup, this file is kind of a mess
// Commented code should be removed
// Live code should be better commented
// FIXME (!) this part may be at least sligtly vulnerable to replay attacks
// The switch message part should catch / drop old tstamps
// So the damage is limited
// But you could still mess up msgAnc / msgHops and break some things there
// It needs to ignore messages with a lower seq
// Probably best to start setting seq to a timestamp in that case...
// FIXME (!?) if it takes too long to communicate all the msgHops, then things hit a horizon
// That could happen with a peer over a high-latency link, with many msgHops
// Possible workarounds:
// 1. Pre-emptively send all hops when one is requested, or after any change
// Maybe requires changing how the throttle works and msgHops are saved
// In case some arrive out of order or are dropped
// This is relatively easy to implement, but could be wasteful
// 2. Save your old locator, sigs, etc, so you can respond to older ancs
// And finish requesting an old anc before updating to a new one
// But that may lead to other issues if not done carefully...
import "time"
import "sync"
import "sync/atomic"
import "math"
//import "fmt"
type peers struct {
core *Core
mutex sync.Mutex // Synchronize writes to atomic
ports atomic.Value //map[Port]*peer, use CoW semantics
//ports map[Port]*peer
authMutex sync.RWMutex
allowedBoxPubs map[boxPubKey]struct{}
}
func (ps *peers) init(c *Core) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c
ps.allowedBoxPubs = make(map[boxPubKey]struct{})
}
func (ps *peers) isAllowedBoxPub(box *boxPubKey) bool {
ps.authMutex.RLock()
defer ps.authMutex.RUnlock()
_, isIn := ps.allowedBoxPubs[*box]
return isIn || len(ps.allowedBoxPubs) == 0
}
func (ps *peers) addAllowedBoxPub(box *boxPubKey) {
ps.authMutex.Lock()
defer ps.authMutex.Unlock()
ps.allowedBoxPubs[*box] = struct{}{}
}
func (ps *peers) removeAllowedBoxPub(box *boxPubKey) {
ps.authMutex.Lock()
defer ps.authMutex.Unlock()
delete(ps.allowedBoxPubs, *box)
}
func (ps *peers) getAllowedBoxPubs() []boxPubKey {
ps.authMutex.RLock()
defer ps.authMutex.RUnlock()
keys := make([]boxPubKey, 0, len(ps.allowedBoxPubs))
for key := range ps.allowedBoxPubs {
keys = append(keys, key)
}
return keys
}
func (ps *peers) getPorts() map[switchPort]*peer {
return ps.ports.Load().(map[switchPort]*peer)
}
func (ps *peers) putPorts(ports map[switchPort]*peer) {
ps.ports.Store(ports)
}
type peer struct {
// Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends
// use get/update methods only! (atomic accessors as float64)
bandwidth uint64
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
firstSeen time.Time // To track uptime for getPeers
box boxPubKey
sig sigPubKey
shared boxSharedKey
//in <-chan []byte
//out chan<- []byte
//in func([]byte)
out func([]byte)
core *Core
port switchPort
msgAnc *msgAnnounce
msgHops []*msgHop
myMsg *switchMessage
mySigs []sigInfo
// This is used to limit how often we perform expensive operations
// Specifically, processing switch messages, signing, and verifying sigs
// Resets at the start of each tick
throttle uint8
// Called when a peer is removed, to close the underlying connection, or via admin api
close func()
// To allow the peer to call close if idle for too long
lastAnc time.Time
}
const peer_Throttle = 1
func (p *peer) getBandwidth() float64 {
bits := atomic.LoadUint64(&p.bandwidth)
return math.Float64frombits(bits)
}
func (p *peer) updateBandwidth(bytes int, duration time.Duration) {
if p == nil {
return
}
for ok := false; !ok; {
oldBits := atomic.LoadUint64(&p.bandwidth)
oldBandwidth := math.Float64frombits(oldBits)
bandwidth := oldBandwidth*7/8 + float64(bytes)/duration.Seconds()
bits := math.Float64bits(bandwidth)
ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits)
}
}
func (ps *peers) newPeer(box *boxPubKey,
sig *sigPubKey) *peer {
now := time.Now()
p := peer{box: *box,
sig: *sig,
shared: *getSharedKey(&ps.core.boxPriv, box),
lastAnc: now,
firstSeen: now,
core: ps.core}
ps.mutex.Lock()
defer ps.mutex.Unlock()
oldPorts := ps.getPorts()
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
newPorts[k] = v
}
for idx := switchPort(0); true; idx++ {
if _, isIn := newPorts[idx]; !isIn {
p.port = switchPort(idx)
newPorts[p.port] = &p
break
}
}
ps.putPorts(newPorts)
return &p
}
func (ps *peers) removePeer(port switchPort) {
// TODO? store linkIn in the peer struct, close it here? (once)
if port == 0 {
return
} // Can't remove self peer
ps.mutex.Lock()
oldPorts := ps.getPorts()
p, isIn := oldPorts[port]
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
newPorts[k] = v
}
delete(newPorts, port)
ps.putPorts(newPorts)
ps.mutex.Unlock()
if isIn && p.close != nil {
p.close()
}
}
func (p *peer) linkLoop(in <-chan []byte) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var counter uint8
var lastRSeq uint64
for {
select {
case packet, ok := <-in:
if !ok {
return
}
p.handleLinkTraffic(packet)
case <-ticker.C:
if time.Since(p.lastAnc) > 16*time.Second && p.close != nil {
// Seems to have timed out, try to trigger a close
p.close()
}
p.throttle = 0
if p.port == 0 {
continue
} // Don't send announces on selfInterface
p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port)
var update bool
switch {
case p.msgAnc == nil:
update = true
case lastRSeq != p.msgAnc.seq:
update = true
case p.msgAnc.rseq != p.myMsg.seq:
update = true
case counter%4 == 0:
update = true
}
if update {
if p.msgAnc != nil {
lastRSeq = p.msgAnc.seq
}
p.sendSwitchAnnounce()
}
counter = (counter + 1) % 4
}
}
}
func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) {
// TODO See comment in sendPacket about atomics technically being done wrong
atomic.AddUint64(&p.bytesRecvd, uint64(len(packet)))
pType, pTypeLen := wire_decode_uint64(packet)
if pTypeLen == 0 {
return
}
switch pType {
case wire_Traffic:
p.handleTraffic(packet, pTypeLen)
case wire_ProtocolTraffic:
p.handleTraffic(packet, pTypeLen)
case wire_LinkProtocolTraffic:
{
select {
case linkIn <- packet:
default:
}
}
default: /*panic(pType) ;*/
return
}
}
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
if p.port != 0 && p.msgAnc == nil {
// Drop traffic until the peer manages to send us at least one anc
return
}
ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:])
ttlBegin := pTypeLen
ttlEnd := pTypeLen + ttlLen
coords, coordLen := wire_decode_coords(packet[ttlEnd:])
coordEnd := ttlEnd + coordLen
if coordEnd == len(packet) {
return
} // No payload
toPort, newTTL := p.core.switchTable.lookup(coords, ttl)
if toPort == p.port {
return
}
to := p.core.peers.getPorts()[toPort]
if to == nil {
return
}
// This mutates the packet in-place if the length of the TTL changes!
ttlSlice := wire_encode_uint64(newTTL)
newTTLLen := len(ttlSlice)
shift := ttlLen - newTTLLen
copy(packet[shift:], packet[:pTypeLen])
copy(packet[ttlBegin+shift:], ttlSlice)
packet = packet[shift:]
to.sendPacket(packet)
}
func (p *peer) sendPacket(packet []byte) {
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
p.out(packet)
// TODO this should really happen at the interface, to account for LIFO packet drops and additional per-packet/per-message overhead, but this should be pretty close... better to move it to the tcp/udp stuff *after* rewriting both to give a common interface
atomic.AddUint64(&p.bytesSent, uint64(len(packet)))
}
func (p *peer) sendLinkPacket(packet []byte) {
bs, nonce := boxSeal(&p.shared, packet, nil)
linkPacket := wire_linkProtoTrafficPacket{
nonce: *nonce,
payload: bs,
}
packet = linkPacket.encode()
p.sendPacket(packet)
}
func (p *peer) handleLinkTraffic(bs []byte) {
packet := wire_linkProtoTrafficPacket{}
if !packet.decode(bs) {
return
}
payload, isOK := boxOpen(&p.shared, packet.payload, &packet.nonce)
if !isOK {
return
}
pType, pTypeLen := wire_decode_uint64(payload)
if pTypeLen == 0 {
return
}
switch pType {
case wire_SwitchAnnounce:
p.handleSwitchAnnounce(payload)
case wire_SwitchHopRequest:
p.handleSwitchHopRequest(payload)
case wire_SwitchHop:
p.handleSwitchHop(payload)
}
}
func (p *peer) handleSwitchAnnounce(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchAnnounce")
anc := msgAnnounce{}
//err := wire_decode_struct(packet, &anc)
//if err != nil { return }
if !anc.decode(packet) {
return
}
//if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil }
if p.msgAnc == nil ||
anc.root != p.msgAnc.root ||
anc.tstamp != p.msgAnc.tstamp ||
anc.seq != p.msgAnc.seq {
p.msgHops = nil
}
p.msgAnc = &anc
p.processSwitchMessage()
p.lastAnc = time.Now()
}
func (p *peer) requestHop(hop uint64) {
//p.core.log.Println("DEBUG requestHop")
req := msgHopReq{}
req.root = p.msgAnc.root
req.tstamp = p.msgAnc.tstamp
req.seq = p.msgAnc.seq
req.hop = hop
packet := req.encode()
p.sendLinkPacket(packet)
}
func (p *peer) handleSwitchHopRequest(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchHopRequest")
if p.throttle > peer_Throttle {
return
}
if p.myMsg == nil {
return
}
req := msgHopReq{}
if !req.decode(packet) {
return
}
if req.root != p.myMsg.locator.root {
return
}
if req.tstamp != p.myMsg.locator.tstamp {
return
}
if req.seq != p.myMsg.seq {
return
}
if uint64(len(p.myMsg.locator.coords)) <= req.hop {
return
}
res := msgHop{}
res.root = p.myMsg.locator.root
res.tstamp = p.myMsg.locator.tstamp
res.seq = p.myMsg.seq
res.hop = req.hop
res.port = p.myMsg.locator.coords[res.hop]
sinfo := p.getSig(res.hop)
//p.core.log.Println("DEBUG sig:", sinfo)
res.next = sinfo.next
res.sig = sinfo.sig
packet = res.encode()
p.sendLinkPacket(packet)
}
func (p *peer) handleSwitchHop(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchHop")
if p.throttle > peer_Throttle {
return
}
if p.msgAnc == nil {
return
}
res := msgHop{}
if !res.decode(packet) {
return
}
if res.root != p.msgAnc.root {
return
}
if res.tstamp != p.msgAnc.tstamp {
return
}
if res.seq != p.msgAnc.seq {
return
}
if res.hop != uint64(len(p.msgHops)) {
return
} // always process in order
loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)}
loc.root = res.root
loc.tstamp = res.tstamp
for _, hop := range p.msgHops {
loc.coords = append(loc.coords, hop.port)
}
loc.coords = append(loc.coords, res.port)
thisHopKey := &res.root
if res.hop != 0 {
thisHopKey = &p.msgHops[res.hop-1].next
}
bs := getBytesForSig(&res.next, &loc)
if p.core.sigs.check(thisHopKey, &res.sig, bs) {
p.msgHops = append(p.msgHops, &res)
p.processSwitchMessage()
} else {
p.throttle++
}
}
func (p *peer) processSwitchMessage() {
//p.core.log.Println("DEBUG: processSwitchMessage")
if p.throttle > peer_Throttle {
return
}
if p.msgAnc == nil {
return
}
if uint64(len(p.msgHops)) < p.msgAnc.len {
p.requestHop(uint64(len(p.msgHops)))
return
}
p.throttle++
if p.msgAnc.len != uint64(len(p.msgHops)) {
return
}
msg := switchMessage{}
coords := make([]switchPort, 0, len(p.msgHops))
sigs := make([]sigInfo, 0, len(p.msgHops))
for idx, hop := range p.msgHops {
// Consistency checks, should be redundant (already checked these...)
if hop.root != p.msgAnc.root {
return
}
if hop.tstamp != p.msgAnc.tstamp {
return
}
if hop.seq != p.msgAnc.seq {
return
}
if hop.hop != uint64(idx) {
return
}
coords = append(coords, hop.port)
sigs = append(sigs, sigInfo{next: hop.next, sig: hop.sig})
}
msg.from = p.sig
msg.locator.root = p.msgAnc.root
msg.locator.tstamp = p.msgAnc.tstamp
msg.locator.coords = coords
msg.seq = p.msgAnc.seq
//msg.RSeq = p.msgAnc.RSeq
//msg.Degree = p.msgAnc.Deg
p.core.switchTable.handleMessage(&msg, p.port, sigs)
if len(coords) == 0 {
return
}
// Reuse locator, set the coords to the peer's coords, to use in dht
msg.locator.coords = coords[:len(coords)-1]
// Pass a mesage to the dht informing it that this peer (still) exists
dinfo := dhtInfo{
key: p.box,
coords: msg.locator.getCoords(),
}
p.core.dht.peers <- &dinfo
}
func (p *peer) sendSwitchAnnounce() {
anc := msgAnnounce{}
anc.root = p.myMsg.locator.root
anc.tstamp = p.myMsg.locator.tstamp
anc.seq = p.myMsg.seq
anc.len = uint64(len(p.myMsg.locator.coords))
//anc.Deg = p.myMsg.Degree
if p.msgAnc != nil {
anc.rseq = p.msgAnc.seq
}
packet := anc.encode()
p.sendLinkPacket(packet)
}
func (p *peer) getSig(hop uint64) sigInfo {
//p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop)
if hop < uint64(len(p.mySigs)) {
return p.mySigs[hop]
}
bs := getBytesForSig(&p.sig, &p.myMsg.locator)
sig := sigInfo{}
sig.next = p.sig
sig.sig = *sign(&p.core.sigPriv, bs)
p.mySigs = append(p.mySigs, sig)
//p.core.log.Println("DEBUG sig bs:", bs)
return sig
}
func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte {
//bs, err := wire_encode_locator(loc)
//if err != nil { panic(err) }
bs := append([]byte(nil), next[:]...)
bs = append(bs, wire_encode_locator(loc)...)
//bs := wire_encode_locator(loc)
//bs = append(next[:], bs...)
return bs
}