mirror of
https://github.com/tailscale/tailscale.git
synced 2025-08-11 13:18:53 +00:00
control/controlclient, ipn: add client audit logging (#14950)
updates tailscale/corp#26435 Adds client support for sending audit logs to control via /machine/audit-log. Specifically implements audit logging for user initiated disconnections. This will require further work to optimize the peristant storage and exclusion via build tags for mobile: tailscale/corp#27011 tailscale/corp#27012 Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
This commit is contained in:
466
ipn/auditlog/auditlog.go
Normal file
466
ipn/auditlog/auditlog.go
Normal file
@@ -0,0 +1,466 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// Package auditlog provides a mechanism for logging audit events.
|
||||
package auditlog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/util/rands"
|
||||
"tailscale.com/util/set"
|
||||
)
|
||||
|
||||
// transaction represents an audit log that has not yet been sent to the control plane.
|
||||
type transaction struct {
|
||||
// EventID is the unique identifier for the event being logged.
|
||||
// This is used on the client side only and is not sent to control.
|
||||
EventID string `json:",omitempty"`
|
||||
// Retries is the number of times the logger has attempted to send this log.
|
||||
// This is used on the client side only and is not sent to control.
|
||||
Retries int `json:",omitempty"`
|
||||
|
||||
// Action is the action to be logged. It must correspond to a known action in the control plane.
|
||||
Action tailcfg.ClientAuditAction `json:",omitempty"`
|
||||
// Details is an opaque string specific to the action being logged. Empty strings may not
|
||||
// be valid depending on the action being logged.
|
||||
Details string `json:",omitempty"`
|
||||
// TimeStamp is the time at which the audit log was generated on the node.
|
||||
TimeStamp time.Time `json:",omitzero"`
|
||||
}
|
||||
|
||||
// Transport provides a means for a client to send audit logs to a consumer (typically the control plane).
|
||||
type Transport interface {
|
||||
// SendAuditLog sends an audit log to a consumer of audit logs.
|
||||
// Errors should be checked with [IsRetryableError] for retryability.
|
||||
SendAuditLog(context.Context, tailcfg.AuditLogRequest) error
|
||||
}
|
||||
|
||||
// LogStore provides a means for a [Logger] to persist logs to disk or memory.
|
||||
type LogStore interface {
|
||||
// Save saves the given data to a persistent store. Save will overwrite existing data
|
||||
// for the given key.
|
||||
save(key ipn.ProfileID, txns []*transaction) error
|
||||
|
||||
// Load retrieves the data from a persistent store. Returns a nil slice and
|
||||
// no error if no data exists for the given key.
|
||||
load(key ipn.ProfileID) ([]*transaction, error)
|
||||
}
|
||||
|
||||
// Opts contains the configuration options for a [Logger].
|
||||
type Opts struct {
|
||||
// RetryLimit is the maximum number of attempts the logger will make to send a log before giving up.
|
||||
RetryLimit int
|
||||
// Store is the persistent store used to save logs to disk. Must be non-nil.
|
||||
Store LogStore
|
||||
// Logf is the logger used to log messages from the audit logger. Must be non-nil.
|
||||
Logf logger.Logf
|
||||
}
|
||||
|
||||
// IsRetryableError returns true if the given error is retryable
|
||||
// See [controlclient.apiResponseError]. Potentially retryable errors implement the Retryable() method.
|
||||
func IsRetryableError(err error) bool {
|
||||
var retryable interface{ Retryable() bool }
|
||||
return errors.As(err, &retryable) && retryable.Retryable()
|
||||
}
|
||||
|
||||
type backoffOpts struct {
|
||||
min, max time.Duration
|
||||
multiplier float64
|
||||
}
|
||||
|
||||
// .5, 1, 2, 4, 8, 10, 10, 10, 10, 10...
|
||||
var defaultBackoffOpts = backoffOpts{
|
||||
min: time.Millisecond * 500,
|
||||
max: 10 * time.Second,
|
||||
multiplier: 2,
|
||||
}
|
||||
|
||||
// Logger provides a queue-based mechanism for submitting audit logs to the control plane - or
|
||||
// another suitable consumer. Logs are stored to disk and retried until they are successfully sent,
|
||||
// or until they permanently fail.
|
||||
//
|
||||
// Each individual profile/controlclient tuple should construct and manage a unique [Logger] instance.
|
||||
type Logger struct {
|
||||
logf logger.Logf
|
||||
retryLimit int // the maximum number of attempts to send a log before giving up.
|
||||
flusher chan struct{} // channel used to signal a flush operation.
|
||||
done chan struct{} // closed when the flush worker exits.
|
||||
ctx context.Context // canceled when the logger is stopped.
|
||||
ctxCancel context.CancelFunc // cancels ctx.
|
||||
backoffOpts // backoff settings for retry operations.
|
||||
|
||||
// mu protects the fields below.
|
||||
mu sync.Mutex
|
||||
store LogStore // persistent storage for unsent logs.
|
||||
profileID ipn.ProfileID // empty if [Logger.SetProfileID] has not been called.
|
||||
transport Transport // nil until [Logger.Start] is called.
|
||||
}
|
||||
|
||||
// NewLogger creates a new [Logger] with the given options.
|
||||
func NewLogger(opts Opts) *Logger {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
al := &Logger{
|
||||
retryLimit: opts.RetryLimit,
|
||||
logf: logger.WithPrefix(opts.Logf, "auditlog: "),
|
||||
store: opts.Store,
|
||||
flusher: make(chan struct{}, 1),
|
||||
done: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
ctxCancel: cancel,
|
||||
backoffOpts: defaultBackoffOpts,
|
||||
}
|
||||
al.logf("created")
|
||||
return al
|
||||
}
|
||||
|
||||
// FlushAndStop synchronously flushes all pending logs and stops the audit logger.
|
||||
// This will block until a final flush operation completes or context is done.
|
||||
// If the logger is already stopped, this will return immediately. All unsent
|
||||
// logs will be persisted to the store.
|
||||
func (al *Logger) FlushAndStop(ctx context.Context) {
|
||||
al.stop()
|
||||
al.flush(ctx)
|
||||
}
|
||||
|
||||
// SetProfileID sets the profileID for the logger. This must be called before any logs can be enqueued.
|
||||
// The profileID of a logger cannot be changed once set.
|
||||
func (al *Logger) SetProfileID(profileID ipn.ProfileID) error {
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
if al.profileID != "" {
|
||||
return errors.New("profileID already set")
|
||||
}
|
||||
|
||||
al.profileID = profileID
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start starts the audit logger with the given transport.
|
||||
// It returns an error if the logger is already started.
|
||||
func (al *Logger) Start(t Transport) error {
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
|
||||
if al.transport != nil {
|
||||
return errors.New("already started")
|
||||
}
|
||||
|
||||
al.transport = t
|
||||
pending, err := al.storedCountLocked()
|
||||
if err != nil {
|
||||
al.logf("[unexpected] failed to restore logs: %v", err)
|
||||
}
|
||||
go al.flushWorker()
|
||||
if pending > 0 {
|
||||
al.flushAsync()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ErrAuditLogStorageFailure is returned when the logger fails to persist logs to the store.
|
||||
var ErrAuditLogStorageFailure = errors.New("audit log storage failure")
|
||||
|
||||
// Enqueue queues an audit log to be sent to the control plane (or another suitable consumer/transport).
|
||||
// This will return an error if the underlying store fails to save the log or we fail to generate a unique
|
||||
// eventID for the log.
|
||||
func (al *Logger) Enqueue(action tailcfg.ClientAuditAction, details string) error {
|
||||
txn := &transaction{
|
||||
Action: action,
|
||||
Details: details,
|
||||
TimeStamp: time.Now(),
|
||||
}
|
||||
// Generate a suitably random eventID for the transaction.
|
||||
txn.EventID = fmt.Sprint(txn.TimeStamp, rands.HexString(16))
|
||||
return al.enqueue(txn)
|
||||
}
|
||||
|
||||
// flushAsync requests an asynchronous flush.
|
||||
// It is a no-op if a flush is already pending.
|
||||
func (al *Logger) flushAsync() {
|
||||
select {
|
||||
case al.flusher <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (al *Logger) flushWorker() {
|
||||
defer close(al.done)
|
||||
|
||||
var retryDelay time.Duration
|
||||
retry := time.NewTimer(0)
|
||||
retry.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-al.ctx.Done():
|
||||
return
|
||||
case <-al.flusher:
|
||||
err := al.flush(al.ctx)
|
||||
switch {
|
||||
case errors.Is(err, context.Canceled):
|
||||
// The logger was stopped, no need to retry.
|
||||
return
|
||||
case err != nil:
|
||||
retryDelay = max(al.backoffOpts.min, min(retryDelay*time.Duration(al.backoffOpts.multiplier), al.backoffOpts.max))
|
||||
al.logf("retrying after %v, %v", retryDelay, err)
|
||||
retry.Reset(retryDelay)
|
||||
default:
|
||||
retryDelay = 0
|
||||
retry.Stop()
|
||||
}
|
||||
case <-retry.C:
|
||||
al.flushAsync()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flush attempts to send all pending logs to the control plane.
|
||||
// l.mu must not be held.
|
||||
func (al *Logger) flush(ctx context.Context) error {
|
||||
al.mu.Lock()
|
||||
pending, err := al.store.load(al.profileID)
|
||||
t := al.transport
|
||||
al.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
// This will catch nil profileIDs
|
||||
return fmt.Errorf("failed to restore pending logs: %w", err)
|
||||
}
|
||||
if len(pending) == 0 {
|
||||
return nil
|
||||
}
|
||||
if t == nil {
|
||||
return errors.New("no transport")
|
||||
}
|
||||
|
||||
complete, unsent := al.sendToTransport(ctx, pending, t)
|
||||
al.markTransactionsDone(complete)
|
||||
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
if err = al.appendToStoreLocked(unsent); err != nil {
|
||||
al.logf("[unexpected] failed to persist logs: %v", err)
|
||||
}
|
||||
|
||||
if len(unsent) != 0 {
|
||||
return fmt.Errorf("failed to send %d logs", len(unsent))
|
||||
}
|
||||
|
||||
if len(complete) != 0 {
|
||||
al.logf("complete %d audit log transactions", len(complete))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendToTransport sends all pending logs to the control plane. Returns a pair of slices
|
||||
// containing the logs that were successfully sent (or failed permanently) and those that were not.
|
||||
//
|
||||
// This may require multiple round trips to the control plane and can be a long running transaction.
|
||||
func (al *Logger) sendToTransport(ctx context.Context, pending []*transaction, t Transport) (complete []*transaction, unsent []*transaction) {
|
||||
for i, txn := range pending {
|
||||
req := tailcfg.AuditLogRequest{
|
||||
Action: tailcfg.ClientAuditAction(txn.Action),
|
||||
Details: txn.Details,
|
||||
Timestamp: txn.TimeStamp,
|
||||
}
|
||||
|
||||
if err := t.SendAuditLog(ctx, req); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
|
||||
// The contex is done. All further attempts will fail.
|
||||
unsent = append(unsent, pending[i:]...)
|
||||
return complete, unsent
|
||||
case IsRetryableError(err) && txn.Retries+1 < al.retryLimit:
|
||||
// We permit a maximum number of retries for each log. All retriable
|
||||
// errors should be transient and we should be able to send the log eventually, but
|
||||
// we don't want logs to be persisted indefinitely.
|
||||
txn.Retries++
|
||||
unsent = append(unsent, txn)
|
||||
default:
|
||||
complete = append(complete, txn)
|
||||
al.logf("failed permanently: %v", err)
|
||||
}
|
||||
} else {
|
||||
// No error - we're done.
|
||||
complete = append(complete, txn)
|
||||
}
|
||||
}
|
||||
|
||||
return complete, unsent
|
||||
}
|
||||
|
||||
func (al *Logger) stop() {
|
||||
al.mu.Lock()
|
||||
t := al.transport
|
||||
al.mu.Unlock()
|
||||
|
||||
if t == nil {
|
||||
// No transport means no worker goroutine and done will not be
|
||||
// closed if we cancel the context.
|
||||
return
|
||||
}
|
||||
|
||||
al.ctxCancel()
|
||||
<-al.done
|
||||
al.logf("stopped for profileID: %v", al.profileID)
|
||||
}
|
||||
|
||||
// appendToStoreLocked persists logs to the store. This will deduplicate
|
||||
// logs so it is safe to call this with the same logs multiple time, to
|
||||
// requeue failed transactions for example.
|
||||
//
|
||||
// l.mu must be held.
|
||||
func (al *Logger) appendToStoreLocked(txns []*transaction) error {
|
||||
if len(txns) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if al.profileID == "" {
|
||||
return errors.New("no logId set")
|
||||
}
|
||||
|
||||
persisted, err := al.store.load(al.profileID)
|
||||
if err != nil {
|
||||
al.logf("[unexpected] append failed to restore logs: %v", err)
|
||||
}
|
||||
|
||||
// The order is important here. We want the latest transactions first, which will
|
||||
// ensure when we dedup, the new transactions are seen and the older transactions
|
||||
// are discarded.
|
||||
txnsOut := append(txns, persisted...)
|
||||
txnsOut = deduplicateAndSort(txnsOut)
|
||||
|
||||
return al.store.save(al.profileID, txnsOut)
|
||||
}
|
||||
|
||||
// storedCountLocked returns the number of logs persisted to the store.
|
||||
// al.mu must be held.
|
||||
func (al *Logger) storedCountLocked() (int, error) {
|
||||
persisted, err := al.store.load(al.profileID)
|
||||
return len(persisted), err
|
||||
}
|
||||
|
||||
// markTransactionsDone removes logs from the store that are complete (sent or failed permanently).
|
||||
// al.mu must not be held.
|
||||
func (al *Logger) markTransactionsDone(sent []*transaction) {
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
|
||||
ids := set.Set[string]{}
|
||||
for _, txn := range sent {
|
||||
ids.Add(txn.EventID)
|
||||
}
|
||||
|
||||
persisted, err := al.store.load(al.profileID)
|
||||
if err != nil {
|
||||
al.logf("[unexpected] markTransactionsDone failed to restore logs: %v", err)
|
||||
}
|
||||
var unsent []*transaction
|
||||
for _, txn := range persisted {
|
||||
if !ids.Contains(txn.EventID) {
|
||||
unsent = append(unsent, txn)
|
||||
}
|
||||
}
|
||||
al.store.save(al.profileID, unsent)
|
||||
}
|
||||
|
||||
// deduplicateAndSort removes duplicate logs from the given slice and sorts them by timestamp.
|
||||
// The first log entry in the slice will be retained, subsequent logs with the same EventID will be discarded.
|
||||
func deduplicateAndSort(txns []*transaction) []*transaction {
|
||||
seen := set.Set[string]{}
|
||||
deduped := make([]*transaction, 0, len(txns))
|
||||
for _, txn := range txns {
|
||||
if !seen.Contains(txn.EventID) {
|
||||
deduped = append(deduped, txn)
|
||||
seen.Add(txn.EventID)
|
||||
}
|
||||
}
|
||||
// Sort logs by timestamp - oldest to newest. This will put the oldest logs at
|
||||
// the front of the queue.
|
||||
sort.Slice(deduped, func(i, j int) bool {
|
||||
return deduped[i].TimeStamp.Before(deduped[j].TimeStamp)
|
||||
})
|
||||
return deduped
|
||||
}
|
||||
|
||||
func (al *Logger) enqueue(txn *transaction) error {
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
|
||||
if err := al.appendToStoreLocked([]*transaction{txn}); err != nil {
|
||||
return fmt.Errorf("%w: %w", ErrAuditLogStorageFailure, err)
|
||||
}
|
||||
|
||||
// If a.transport is nil if the logger is stopped.
|
||||
if al.transport != nil {
|
||||
al.flushAsync()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ LogStore = (*logStateStore)(nil)
|
||||
|
||||
// logStateStore is a concrete implementation of [LogStore]
|
||||
// using [ipn.StateStore] as the underlying storage.
|
||||
type logStateStore struct {
|
||||
store ipn.StateStore
|
||||
}
|
||||
|
||||
// NewLogStore creates a new LogStateStore with the given [ipn.StateStore].
|
||||
func NewLogStore(store ipn.StateStore) LogStore {
|
||||
return &logStateStore{
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *logStateStore) generateKey(key ipn.ProfileID) string {
|
||||
return "auditlog-" + string(key)
|
||||
}
|
||||
|
||||
// Save saves the given logs to an [ipn.StateStore]. This overwrites
|
||||
// any existing entries for the given key.
|
||||
func (s *logStateStore) save(key ipn.ProfileID, txns []*transaction) error {
|
||||
if key == "" {
|
||||
return errors.New("empty key")
|
||||
}
|
||||
|
||||
data, err := json.Marshal(txns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k := ipn.StateKey(s.generateKey(key))
|
||||
return s.store.WriteState(k, data)
|
||||
}
|
||||
|
||||
// Load retrieves the logs from an [ipn.StateStore].
|
||||
func (s *logStateStore) load(key ipn.ProfileID) ([]*transaction, error) {
|
||||
if key == "" {
|
||||
return nil, errors.New("empty key")
|
||||
}
|
||||
|
||||
k := ipn.StateKey(s.generateKey(key))
|
||||
data, err := s.store.ReadState(k)
|
||||
|
||||
switch {
|
||||
case errors.Is(err, ipn.ErrStateNotExist):
|
||||
return nil, nil
|
||||
case err != nil:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var txns []*transaction
|
||||
err = json.Unmarshal(data, &txns)
|
||||
return txns, err
|
||||
}
|
481
ipn/auditlog/auditlog_test.go
Normal file
481
ipn/auditlog/auditlog_test.go
Normal file
@@ -0,0 +1,481 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package auditlog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
qt "github.com/frankban/quicktest"
|
||||
"tailscale.com/ipn/store/mem"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tstest"
|
||||
)
|
||||
|
||||
// loggerForTest creates an auditLogger for you and cleans it up
|
||||
// (and ensures no goroutines are leaked) when the test is done.
|
||||
func loggerForTest(t *testing.T, opts Opts) *Logger {
|
||||
t.Helper()
|
||||
tstest.ResourceCheck(t)
|
||||
|
||||
if opts.Logf == nil {
|
||||
opts.Logf = t.Logf
|
||||
}
|
||||
|
||||
if opts.Store == nil {
|
||||
t.Fatalf("opts.Store must be set")
|
||||
}
|
||||
|
||||
a := NewLogger(opts)
|
||||
|
||||
t.Cleanup(func() {
|
||||
a.FlushAndStop(context.Background())
|
||||
})
|
||||
return a
|
||||
}
|
||||
|
||||
func TestNonRetryableErrors(t *testing.T) {
|
||||
errorTests := []struct {
|
||||
desc string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{"DeadlineExceeded", context.DeadlineExceeded, false},
|
||||
{"Canceled", context.Canceled, false},
|
||||
{"Canceled wrapped", fmt.Errorf("%w: %w", context.Canceled, errors.New("ctx cancelled")), false},
|
||||
{"Random error", errors.New("random error"), false},
|
||||
}
|
||||
|
||||
for _, tt := range errorTests {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
if IsRetryableError(tt.err) != tt.want {
|
||||
t.Fatalf("retriable: got %v, want %v", !tt.want, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueAndFlush enqueues n logs and flushes them.
|
||||
// We expect all logs to be flushed and for no
|
||||
// logs to remain in the store once FlushAndStop returns.
|
||||
func TestEnqueueAndFlush(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
mockTransport := newMockTransport(nil)
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 200,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNil)
|
||||
c.Assert(al.Start(mockTransport), qt.IsNil)
|
||||
|
||||
wantSent := 10
|
||||
|
||||
for i := range wantSent {
|
||||
err := al.Enqueue(tailcfg.AuditNodeDisconnect, fmt.Sprintf("log %d", i))
|
||||
c.Assert(err, qt.IsNil)
|
||||
}
|
||||
|
||||
al.FlushAndStop(context.Background())
|
||||
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
gotStored, err := al.storedCountLocked()
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
if wantStored := 0; gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
|
||||
if gotSent := mockTransport.sentCount(); gotSent != wantSent {
|
||||
t.Fatalf("sent: got %d, want %d", gotSent, wantSent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueAndFlushWithFlushCancel calls FlushAndCancel with a cancelled
|
||||
// context. We expect nothing to be sent and all logs to be stored.
|
||||
func TestEnqueueAndFlushWithFlushCancel(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
mockTransport := newMockTransport(&retriableError)
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 200,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNil)
|
||||
c.Assert(al.Start(mockTransport), qt.IsNil)
|
||||
|
||||
for i := range 10 {
|
||||
err := al.Enqueue(tailcfg.AuditNodeDisconnect, fmt.Sprintf("log %d", i))
|
||||
c.Assert(err, qt.IsNil)
|
||||
}
|
||||
|
||||
// Cancel the context before calling FlushAndStop - nothing should get sent.
|
||||
// This mimics a timeout before flush() has a chance to execute.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
al.FlushAndStop(ctx)
|
||||
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
gotStored, err := al.storedCountLocked()
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
if wantStored := 10; gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
|
||||
if gotSent, wantSent := mockTransport.sentCount(), 0; gotSent != wantSent {
|
||||
t.Fatalf("sent: got %d, want %d", gotSent, wantSent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeduplicateAndSort tests that the most recent log is kept when deduplicating logs
|
||||
func TestDeduplicateAndSort(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNil)
|
||||
|
||||
logs := []*transaction{
|
||||
{EventID: "1", Details: "log 1", TimeStamp: time.Now().Add(-time.Minute * 1), Retries: 1},
|
||||
}
|
||||
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
al.appendToStoreLocked(logs)
|
||||
|
||||
// Update the transaction and re-append it
|
||||
logs[0].Retries = 2
|
||||
al.appendToStoreLocked(logs)
|
||||
|
||||
fromStore, err := al.store.load("test")
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
// We should see only one transaction
|
||||
if wantStored, gotStored := len(logs), len(fromStore); gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
|
||||
// We should see the latest transaction
|
||||
if wantRetryCount, gotRetryCount := 2, fromStore[0].Retries; gotRetryCount != wantRetryCount {
|
||||
t.Fatalf("reties: got %d, want %d", gotRetryCount, wantRetryCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChangeProfileId(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNil)
|
||||
|
||||
// Changing a profile ID must fail
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNotNil)
|
||||
}
|
||||
|
||||
// TestSendOnRestore pushes a n logs to the persistent store, and ensures they
|
||||
// are sent as soon as Start is called then checks to ensure the sent logs no
|
||||
// longer exist in the store.
|
||||
func TestSendOnRestore(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
mockTransport := newMockTransport(nil)
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
al.SetProfileID("test")
|
||||
|
||||
wantTotal := 10
|
||||
|
||||
for range 10 {
|
||||
al.Enqueue(tailcfg.AuditNodeDisconnect, "log")
|
||||
}
|
||||
|
||||
c.Assert(al.Start(mockTransport), qt.IsNil)
|
||||
|
||||
al.FlushAndStop(context.Background())
|
||||
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
gotStored, err := al.storedCountLocked()
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
if wantStored := 0; gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
|
||||
if gotSent, wantSent := mockTransport.sentCount(), wantTotal; gotSent != wantSent {
|
||||
t.Fatalf("sent: got %d, want %d", gotSent, wantSent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFailureExhaustion enqueues n logs, with the transport in a failable state.
|
||||
// We then set it to a non-failing state, call FlushAndStop and expect all logs to be sent.
|
||||
func TestFailureExhaustion(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
mockTransport := newMockTransport(&retriableError)
|
||||
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 1,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNil)
|
||||
c.Assert(al.Start(mockTransport), qt.IsNil)
|
||||
|
||||
for range 10 {
|
||||
err := al.Enqueue(tailcfg.AuditNodeDisconnect, "log")
|
||||
c.Assert(err, qt.IsNil)
|
||||
}
|
||||
|
||||
al.FlushAndStop(context.Background())
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
gotStored, err := al.storedCountLocked()
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
if wantStored := 0; gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
|
||||
if gotSent, wantSent := mockTransport.sentCount(), 0; gotSent != wantSent {
|
||||
t.Fatalf("sent: got %d, want %d", gotSent, wantSent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueAndFailNoRetry enqueues a set of logs, all of which will fail and are not
|
||||
// retriable. We then call FlushAndStop and expect all to be unsent.
|
||||
func TestEnqueueAndFailNoRetry(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
mockTransport := newMockTransport(&nonRetriableError)
|
||||
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNil)
|
||||
c.Assert(al.Start(mockTransport), qt.IsNil)
|
||||
|
||||
for i := range 10 {
|
||||
err := al.Enqueue(tailcfg.AuditNodeDisconnect, fmt.Sprintf("log %d", i))
|
||||
c.Assert(err, qt.IsNil)
|
||||
}
|
||||
|
||||
al.FlushAndStop(context.Background())
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
gotStored, err := al.storedCountLocked()
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
if wantStored := 0; gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
|
||||
if gotSent, wantSent := mockTransport.sentCount(), 0; gotSent != wantSent {
|
||||
t.Fatalf("sent: got %d, want %d", gotSent, wantSent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueAndRetry enqueues a set of logs, all of which will fail and are retriable.
|
||||
// Mid-test, we set the transport to not-fail and expect the queue to flush properly
|
||||
// We set the backoff parameters to 0 seconds so retries are immediate.
|
||||
func TestEnqueueAndRetry(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
mockTransport := newMockTransport(&retriableError)
|
||||
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
|
||||
al.backoffOpts = backoffOpts{
|
||||
min: 1 * time.Millisecond,
|
||||
max: 4 * time.Millisecond,
|
||||
multiplier: 2.0,
|
||||
}
|
||||
|
||||
c.Assert(al.SetProfileID("test"), qt.IsNil)
|
||||
c.Assert(al.Start(mockTransport), qt.IsNil)
|
||||
|
||||
err := al.Enqueue(tailcfg.AuditNodeDisconnect, fmt.Sprintf("log 1"))
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
// This will wait for at least 2 retries
|
||||
gotRetried, wantRetried := mockTransport.waitForSendAttemptsToReach(3), true
|
||||
if gotRetried != wantRetried {
|
||||
t.Fatalf("retried: got %v, want %v", gotRetried, wantRetried)
|
||||
}
|
||||
|
||||
mockTransport.setErrorCondition(nil)
|
||||
|
||||
al.FlushAndStop(context.Background())
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
|
||||
gotStored, err := al.storedCountLocked()
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
if wantStored := 0; gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
|
||||
if gotSent, wantSent := mockTransport.sentCount(), 1; gotSent != wantSent {
|
||||
t.Fatalf("sent: got %d, want %d", gotSent, wantSent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueBeforeSetProfileID tests that logs enqueued before SetProfileId are not sent
|
||||
func TestEnqueueBeforeSetProfileID(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
al := loggerForTest(t, Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: NewLogStore(&mem.Store{}),
|
||||
})
|
||||
|
||||
err := al.Enqueue(tailcfg.AuditNodeDisconnect, "log")
|
||||
c.Assert(err, qt.IsNotNil)
|
||||
al.FlushAndStop(context.Background())
|
||||
|
||||
al.mu.Lock()
|
||||
defer al.mu.Unlock()
|
||||
gotStored, err := al.storedCountLocked()
|
||||
c.Assert(err, qt.IsNotNil)
|
||||
|
||||
if wantStored := 0; gotStored != wantStored {
|
||||
t.Fatalf("stored: got %d, want %d", gotStored, wantStored)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLogStoring tests that audit logs are persisted sorted by timestamp, oldest to newest
|
||||
func TestLogSorting(t *testing.T) {
|
||||
c := qt.New(t)
|
||||
mockStore := NewLogStore(&mem.Store{})
|
||||
|
||||
logs := []*transaction{
|
||||
{EventID: "1", Details: "log 3", TimeStamp: time.Now().Add(-time.Minute * 1)},
|
||||
{EventID: "1", Details: "log 3", TimeStamp: time.Now().Add(-time.Minute * 2)},
|
||||
{EventID: "2", Details: "log 2", TimeStamp: time.Now().Add(-time.Minute * 3)},
|
||||
{EventID: "3", Details: "log 1", TimeStamp: time.Now().Add(-time.Minute * 4)},
|
||||
}
|
||||
|
||||
wantLogs := []transaction{
|
||||
{Details: "log 1"},
|
||||
{Details: "log 2"},
|
||||
{Details: "log 3"},
|
||||
}
|
||||
|
||||
mockStore.save("test", logs)
|
||||
|
||||
gotLogs, err := mockStore.load("test")
|
||||
c.Assert(err, qt.IsNil)
|
||||
gotLogs = deduplicateAndSort(gotLogs)
|
||||
|
||||
for i := range gotLogs {
|
||||
if want, got := wantLogs[i].Details, gotLogs[i].Details; want != got {
|
||||
t.Fatalf("Details: got %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mock implementations for testing
|
||||
|
||||
// newMockTransport returns a mock transport for testing
|
||||
// If err is no nil, SendAuditLog will return this error if the send is attempted
|
||||
// before the context is cancelled.
|
||||
func newMockTransport(err error) *mockAuditLogTransport {
|
||||
return &mockAuditLogTransport{
|
||||
err: err,
|
||||
attempts: make(chan int, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type mockAuditLogTransport struct {
|
||||
attempts chan int // channel to notify of send attempts
|
||||
|
||||
mu sync.Mutex
|
||||
sendAttmpts int // number of attempts to send logs
|
||||
sendCount int // number of logs sent by the transport
|
||||
err error // error to return when sending logs
|
||||
}
|
||||
|
||||
// waitForSendAttemptsToReach blocks until the number of send attempts reaches n
|
||||
// This should be use only in tests where the transport is expected to retry sending logs
|
||||
func (t *mockAuditLogTransport) waitForSendAttemptsToReach(n int) bool {
|
||||
for attempts := range t.attempts {
|
||||
if attempts >= n {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *mockAuditLogTransport) setErrorCondition(err error) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.err = err
|
||||
}
|
||||
|
||||
func (t *mockAuditLogTransport) sentCount() int {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.sendCount
|
||||
}
|
||||
|
||||
func (t *mockAuditLogTransport) SendAuditLog(ctx context.Context, _ tailcfg.AuditLogRequest) (err error) {
|
||||
t.mu.Lock()
|
||||
t.sendAttmpts += 1
|
||||
defer func() {
|
||||
a := t.sendAttmpts
|
||||
t.mu.Unlock()
|
||||
select {
|
||||
case t.attempts <- a:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if t.err != nil {
|
||||
return t.err
|
||||
}
|
||||
t.sendCount += 1
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
retriableError = mockError{errors.New("retriable error")}
|
||||
nonRetriableError = mockError{errors.New("permanent failure error")}
|
||||
)
|
||||
|
||||
type mockError struct {
|
||||
error
|
||||
}
|
||||
|
||||
func (e mockError) Retryable() bool {
|
||||
return e == retriableError
|
||||
}
|
Reference in New Issue
Block a user