tka: make storage a parameter rather than an Authority struct member

Updates #5435

Based on the discussion in #5435, we can better support transactional data models
by making the underlying storage layer a parameter (which can be specialized for
the request) rather than a long-lived member of Authority.

Now that Authority is just an instantaneous snapshot of state, we can do things
like provide idempotent methods and make it cloneable, too.

Signed-off-by: Tom DNetto <tom@tailscale.com>
This commit is contained in:
Tom DNetto
2022-08-26 09:45:16 -07:00
committed by Tom
parent 7d1357162e
commit 79905a1162
9 changed files with 158 additions and 114 deletions

View File

@@ -39,8 +39,15 @@ type Authority struct {
head AUM
oldestAncestor AUM
state State
}
storage Chonk
// Clone duplicates the Authority structure.
func (a *Authority) Clone() *Authority {
return &Authority{
head: a.head,
oldestAncestor: a.oldestAncestor,
state: a.state.Clone(),
}
}
// A chain describes a linear sequence of updates from Oldest to Head,
@@ -477,7 +484,6 @@ func Open(storage Chonk) (*Authority, error) {
return &Authority{
head: c.Head,
oldestAncestor: c.Oldest,
storage: storage,
state: c.state,
}, nil
}
@@ -557,12 +563,18 @@ func (a *Authority) ValidDisablement(secret []byte) bool {
return a.state.checkDisablement(secret)
}
// Inform is called to tell the authority about new updates. Updates
// should be ordered oldest to newest. An error is returned if any
// of the updates could not be processed.
func (a *Authority) Inform(updates []AUM) error {
// InformIdempotent returns a new Authority based on applying the given
// updates, with the given updates committed to storage.
//
// If any of the updates could not be applied:
// - An error is returned
// - No changes to storage are made.
//
// MissingAUMs() should be used to get a list of updates appropriate for
// this function. In any case, updates should be ordered oldest to newest.
func (a *Authority) InformIdempotent(storage Chonk, updates []AUM) (Authority, error) {
if len(updates) == 0 {
return errors.New("inform called with empty slice")
return Authority{}, errors.New("inform called with empty slice")
}
stateAt := make(map[AUMHash]State, len(updates)+1)
toCommit := make([]AUM, 0, len(updates))
@@ -584,30 +596,30 @@ func (a *Authority) Inform(updates []AUM) error {
for i, update := range updates {
hash := update.Hash()
// Check if we already have this AUM thus don't need to process it.
if _, err := a.storage.AUM(hash); err == nil {
if _, err := storage.AUM(hash); err == nil {
isHeadChain = false // Disable the head-chain optimization.
continue
}
parent, hasParent := update.Parent()
if !hasParent {
return fmt.Errorf("update %d: missing parent", i)
return Authority{}, fmt.Errorf("update %d: missing parent", i)
}
state, hasState := stateAt[parent]
var err error
if !hasState {
if state, err = computeStateAt(a.storage, 2000, parent); err != nil {
return fmt.Errorf("update %d computing state: %v", i, err)
if state, err = computeStateAt(storage, 2000, parent); err != nil {
return Authority{}, fmt.Errorf("update %d computing state: %v", i, err)
}
stateAt[parent] = state
}
if err := aumVerify(update, state, false); err != nil {
return fmt.Errorf("update %d invalid: %v", i, err)
return Authority{}, fmt.Errorf("update %d invalid: %v", i, err)
}
if stateAt[hash], err = state.applyVerifiedAUM(update); err != nil {
return fmt.Errorf("update %d cannot be applied: %v", i, err)
return Authority{}, fmt.Errorf("update %d cannot be applied: %v", i, err)
}
if isHeadChain && parent != prevHash {
@@ -617,26 +629,40 @@ func (a *Authority) Inform(updates []AUM) error {
toCommit = append(toCommit, update)
}
if err := a.storage.CommitVerifiedAUMs(toCommit); err != nil {
return fmt.Errorf("commit: %v", err)
if err := storage.CommitVerifiedAUMs(toCommit); err != nil {
return Authority{}, fmt.Errorf("commit: %v", err)
}
if isHeadChain {
// Head-chain fastpath: We can use the state we computed
// in the last iteration.
a.head = updates[len(updates)-1]
a.state = stateAt[prevHash]
} else {
oldestAncestor := a.oldestAncestor.Hash()
c, err := computeActiveChain(a.storage, &oldestAncestor, 2000)
if err != nil {
return fmt.Errorf("recomputing active chain: %v", err)
}
a.head = c.Head
a.oldestAncestor = c.Oldest
a.state = c.state
return Authority{
head: updates[len(updates)-1],
oldestAncestor: a.oldestAncestor,
state: stateAt[prevHash],
}, nil
}
oldestAncestor := a.oldestAncestor.Hash()
c, err := computeActiveChain(storage, &oldestAncestor, 2000)
if err != nil {
return Authority{}, fmt.Errorf("recomputing active chain: %v", err)
}
return Authority{
head: c.Head,
oldestAncestor: c.Oldest,
state: c.state,
}, nil
}
// Inform is the same as InformIdempotent, except the state of the Authority
// is updated in-place.
func (a *Authority) Inform(storage Chonk, updates []AUM) error {
newAuthority, err := a.InformIdempotent(storage, updates)
if err != nil {
return err
}
*a = newAuthority
return nil
}