mirror of
https://github.com/tailscale/tailscale.git
synced 2025-06-08 08:48:35 +00:00
175 lines
4.7 KiB
Go
175 lines
4.7 KiB
Go
package ippool
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io"
|
|
"net/netip"
|
|
|
|
"github.com/gaissmai/bart"
|
|
"github.com/hashicorp/raft"
|
|
"go4.org/netipx"
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/tailcfg"
|
|
)
|
|
|
|
// Snapshot and Restore enable the raft lib to do log compaction.
|
|
// https://pkg.go.dev/github.com/hashicorp/raft#FSM
|
|
|
|
// Snapshot is part of the raft.FSM interface.
|
|
// According to the docs it:
|
|
// - should return quickly
|
|
// - will not be called concurrently with Apply
|
|
// - the snapshot returned will have Persist called on it concurrently with Apply
|
|
// (so it should not contain pointers to the original data that's being mutated)
|
|
func (ipp *ConsensusIPPool) Snapshot() (raft.FSMSnapshot, error) {
|
|
// everything is safe for concurrent reads and this is not called concurrently with Apply which is
|
|
// the only thing that writes, so we do not need to lock
|
|
return ipp.getPersistable(), nil
|
|
}
|
|
|
|
type persistableIPSet struct {
|
|
Ranges []persistableIPRange
|
|
}
|
|
|
|
func getPersistableIPSet(i *netipx.IPSet) persistableIPSet {
|
|
rs := []persistableIPRange{}
|
|
for _, r := range i.Ranges() {
|
|
rs = append(rs, getPersistableIPRange(r))
|
|
}
|
|
return persistableIPSet{Ranges: rs}
|
|
}
|
|
|
|
func (mips *persistableIPSet) toIPSet() (*netipx.IPSet, error) {
|
|
b := netipx.IPSetBuilder{}
|
|
for _, r := range mips.Ranges {
|
|
b.AddRange(r.toIPRange())
|
|
}
|
|
return b.IPSet()
|
|
}
|
|
|
|
type persistableIPRange struct {
|
|
From netip.Addr
|
|
To netip.Addr
|
|
}
|
|
|
|
func getPersistableIPRange(r netipx.IPRange) persistableIPRange {
|
|
return persistableIPRange{
|
|
From: r.From(),
|
|
To: r.To(),
|
|
}
|
|
}
|
|
|
|
func (mipr *persistableIPRange) toIPRange() netipx.IPRange {
|
|
return netipx.IPRangeFrom(mipr.From, mipr.To)
|
|
}
|
|
|
|
// Restore is part of the raft.FSM interface.
|
|
// According to the docs it:
|
|
// - will not be called concurrently with any other command
|
|
// - the FSM must discard all previous state before restoring
|
|
func (ipp *ConsensusIPPool) Restore(rc io.ReadCloser) error {
|
|
//IPSet *netipx.IPSet
|
|
//perPeerMap syncs.Map[tailcfg.NodeID, *consensusPerPeerState]
|
|
var snap fsmSnapshot
|
|
if err := json.NewDecoder(rc).Decode(&snap); err != nil {
|
|
return err
|
|
}
|
|
ipset, ppm, err := snap.getData()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ipp.IPSet = ipset
|
|
ipp.perPeerMap = ppm
|
|
return nil
|
|
}
|
|
|
|
type fsmSnapshot struct {
|
|
IPSet persistableIPSet
|
|
PerPeerMap map[tailcfg.NodeID]persistablePPS
|
|
}
|
|
|
|
// Persist is part of the raft.FSMSnapshot interface
|
|
// According to the docs Persist may be called concurrently with Apply
|
|
func (f fsmSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
b, err := json.Marshal(f)
|
|
if err != nil {
|
|
sink.Cancel()
|
|
return err
|
|
}
|
|
if _, err := sink.Write(b); err != nil {
|
|
sink.Cancel()
|
|
return err
|
|
}
|
|
return sink.Close()
|
|
}
|
|
|
|
// Release is part of the raft.FSMSnapshot interface
|
|
func (f fsmSnapshot) Release() {}
|
|
|
|
// getPersistable returns an object that:
|
|
// * contains all the data in ConsensusIPPool
|
|
// * doesn't share any pointers with it
|
|
// * can be marshalled to JSON
|
|
// part of the raft snapshotting, getPersistable will be called during Snapshot
|
|
// and the results used during persist (concurrently with Apply)
|
|
func (ipp *ConsensusIPPool) getPersistable() fsmSnapshot {
|
|
ppm := map[tailcfg.NodeID]persistablePPS{}
|
|
for k, v := range ipp.perPeerMap.All() {
|
|
ppm[k] = v.getPersistable()
|
|
}
|
|
return fsmSnapshot{
|
|
IPSet: getPersistableIPSet(ipp.IPSet),
|
|
PerPeerMap: ppm,
|
|
}
|
|
}
|
|
|
|
// func (s fsmSnapshot)cippDataFromPersisted(d fsmSnapshot) (*netipx.IPSet, syncs.Map[tailcfg.NodeID, *consensusPerPeerState], error) {
|
|
func (s fsmSnapshot) getData() (*netipx.IPSet, syncs.Map[tailcfg.NodeID, *consensusPerPeerState], error) {
|
|
ppm := syncs.Map[tailcfg.NodeID, *consensusPerPeerState]{}
|
|
for k, v := range s.PerPeerMap {
|
|
ppm.Store(k, v.toPerPeerState())
|
|
}
|
|
ipset, err := s.IPSet.toIPSet()
|
|
if err != nil {
|
|
return nil, ppm, err
|
|
}
|
|
return ipset, ppm, nil
|
|
}
|
|
|
|
// getPersistable returns an object that:
|
|
// * contains all the data in consensusPerPeerState
|
|
// * doesn't share any pointers with it
|
|
// * can be marshalled to JSON
|
|
// part of the raft snapshotting, getPersistable will be called during Snapshot
|
|
// and the results used during persist (concurrently with Apply)
|
|
func (ps *consensusPerPeerState) getPersistable() persistablePPS {
|
|
dtaCopy := map[string]netip.Addr{}
|
|
for k, v := range ps.domainToAddr {
|
|
dtaCopy[k] = v
|
|
}
|
|
atd := map[netip.Prefix]whereWhen{}
|
|
for pfx, ww := range ps.addrToDomain.All() {
|
|
atd[pfx] = ww
|
|
}
|
|
return persistablePPS{
|
|
AddrToDomain: atd,
|
|
DomainToAddr: dtaCopy,
|
|
}
|
|
}
|
|
|
|
type persistablePPS struct {
|
|
DomainToAddr map[string]netip.Addr
|
|
AddrToDomain map[netip.Prefix]whereWhen
|
|
}
|
|
|
|
func (p persistablePPS) toPerPeerState() *consensusPerPeerState {
|
|
atd := &bart.Table[whereWhen]{}
|
|
for k, v := range p.AddrToDomain {
|
|
atd.Insert(k, v)
|
|
}
|
|
return &consensusPerPeerState{
|
|
domainToAddr: p.DomainToAddr,
|
|
addrToDomain: atd,
|
|
}
|
|
}
|