mirror of
https://github.com/tailscale/tailscale.git
synced 2025-02-20 11:58:39 +00:00
control/controlclient, ipn: add client audit logging
updates tailscale/corp#26435 Adds client support for sending audit logs to control via /machine/audit-log. Specifically implements audit logging for user initiated disconnections. Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
This commit is contained in:
parent
e11ff28443
commit
a029d5b2ff
@ -813,6 +813,7 @@ tailscale.com/cmd/k8s-operator dependencies: (generated by github.com/tailscale/
|
||||
tailscale.com/hostinfo from tailscale.com/client/web+
|
||||
tailscale.com/internal/noiseconn from tailscale.com/control/controlclient
|
||||
tailscale.com/ipn from tailscale.com/client/local+
|
||||
tailscale.com/ipn/auditlog from tailscale.com/ipn/ipnlocal
|
||||
tailscale.com/ipn/conffile from tailscale.com/ipn/ipnlocal+
|
||||
💣 tailscale.com/ipn/ipnauth from tailscale.com/ipn/ipnlocal+
|
||||
tailscale.com/ipn/ipnlocal from tailscale.com/ipn/localapi+
|
||||
|
@ -271,6 +271,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
|
||||
tailscale.com/hostinfo from tailscale.com/client/web+
|
||||
tailscale.com/internal/noiseconn from tailscale.com/control/controlclient
|
||||
tailscale.com/ipn from tailscale.com/client/local+
|
||||
tailscale.com/ipn/auditlog from tailscale.com/ipn/ipnlocal
|
||||
tailscale.com/ipn/conffile from tailscale.com/cmd/tailscaled+
|
||||
💣 tailscale.com/ipn/ipnauth from tailscale.com/ipn/ipnlocal+
|
||||
tailscale.com/ipn/ipnlocal from tailscale.com/cmd/tailscaled+
|
||||
|
@ -112,13 +112,14 @@ type updateGen int64
|
||||
// Auto connects to a tailcontrol server for a node.
|
||||
// It's a concrete implementation of the Client interface.
|
||||
type Auto struct {
|
||||
direct *Direct // our interface to the server APIs
|
||||
clock tstime.Clock
|
||||
logf logger.Logf
|
||||
closed bool
|
||||
updateCh chan struct{} // readable when we should inform the server of a change
|
||||
observer Observer // called to update Client status; always non-nil
|
||||
observerQueue execqueue.ExecQueue
|
||||
direct *Direct // our interface to the server APIs
|
||||
clock tstime.Clock
|
||||
logf logger.Logf
|
||||
closed bool
|
||||
updateCh chan struct{} // readable when we should inform the server of a change
|
||||
observer Observer // called to update Client status; always non-nil
|
||||
observerQueue execqueue.ExecQueue
|
||||
auditLogShutdown func(time.Duration) // or nil
|
||||
|
||||
unregisterHealthWatch func()
|
||||
|
||||
@ -181,14 +182,15 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
|
||||
opts.Clock = tstime.StdClock{}
|
||||
}
|
||||
c := &Auto{
|
||||
direct: direct,
|
||||
clock: opts.Clock,
|
||||
logf: opts.Logf,
|
||||
updateCh: make(chan struct{}, 1),
|
||||
authDone: make(chan struct{}),
|
||||
mapDone: make(chan struct{}),
|
||||
updateDone: make(chan struct{}),
|
||||
observer: opts.Observer,
|
||||
direct: direct,
|
||||
clock: opts.Clock,
|
||||
logf: opts.Logf,
|
||||
updateCh: make(chan struct{}, 1),
|
||||
authDone: make(chan struct{}),
|
||||
mapDone: make(chan struct{}),
|
||||
updateDone: make(chan struct{}),
|
||||
observer: opts.Observer,
|
||||
auditLogShutdown: opts.AuditLogShutdown,
|
||||
}
|
||||
c.authCtx, c.authCancel = context.WithCancel(context.Background())
|
||||
c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf)
|
||||
@ -756,6 +758,11 @@ func (c *Auto) Shutdown() {
|
||||
}
|
||||
c.logf("client.Shutdown ...")
|
||||
|
||||
// Shutdown and flush the audit logger
|
||||
if c.auditLogShutdown != nil {
|
||||
c.auditLogShutdown(2 * time.Second)
|
||||
}
|
||||
|
||||
direct := c.direct
|
||||
c.closed = true
|
||||
c.observerQueue.Shutdown()
|
||||
|
@ -156,6 +156,8 @@ type Options struct {
|
||||
// If we receive a new DialPlan from the server, this value will be
|
||||
// updated.
|
||||
DialPlan ControlDialPlanner
|
||||
|
||||
AuditLogShutdown func(timeout time.Duration) // optional func to call when the audit log should be shut down
|
||||
}
|
||||
|
||||
// ControlDialPlanner is the interface optionally supplied when creating a
|
||||
@ -1695,6 +1697,54 @@ func (c *Direct) SetDeviceAttrs(ctx context.Context, attrs tailcfg.AttrUpdate) e
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendAuditLog does a synchronous call to the control plane submit an audit log.
|
||||
//
|
||||
// Returned errors maybe be fatal or retriable. If retriable is true, the caller may
|
||||
// retry the operation at a later time.
|
||||
func (c *Auto) SendAuditLog(ctx context.Context, auditLog tailcfg.AuditLogRequest) (err error, retriable bool) {
|
||||
return c.direct.SendAuditLog(ctx, auditLog)
|
||||
}
|
||||
|
||||
// SendAuditLog does a synchronous call to the control plane submit an audit log.
|
||||
//
|
||||
// Returned errors maybe be fatal or retriable. If retriable is true, the caller may
|
||||
// retry the operation at a later time. The caller should not retry operations where
|
||||
// retriable is false.
|
||||
func (c *Direct) SendAuditLog(ctx context.Context, auditLog tailcfg.AuditLogRequest) (err error, retriable bool) {
|
||||
nc, err := c.getNoiseClient()
|
||||
if err != nil {
|
||||
return err, true
|
||||
}
|
||||
|
||||
nodeKey, ok := c.GetPersist().PublicNodeKeyOK()
|
||||
if !ok {
|
||||
return errors.New("no node key"), true
|
||||
}
|
||||
|
||||
req := &tailcfg.AuditLogRequest{
|
||||
NodeKey: nodeKey,
|
||||
Action: auditLog.Action,
|
||||
Details: auditLog.Details,
|
||||
}
|
||||
|
||||
if c.panicOnUse {
|
||||
panic("tainted client")
|
||||
}
|
||||
|
||||
res, err := nc.post(ctx, "/machine/audit-log", nodeKey, req)
|
||||
if err != nil {
|
||||
return err, true
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != 200 {
|
||||
all, _ := io.ReadAll(res.Body)
|
||||
// Errors reaching the control plane are generally retriable. Errors
|
||||
// from the control plane are assumed to be non-retriable.
|
||||
return fmt.Errorf("HTTP error from control plane: %v, %s", res.Status, all), false
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func addLBHeader(req *http.Request, nodeKey key.NodePublic) {
|
||||
if !nodeKey.IsZero() {
|
||||
req.Header.Add(tailcfg.LBHeader, nodeKey.String())
|
||||
|
570
ipn/auditlog/auditlog.go
Normal file
570
ipn/auditlog/auditlog.go
Normal file
@ -0,0 +1,570 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
// package auditlog provides a mechanism for logging client events to the control plane.
|
||||
package auditlog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/version"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultTimeout is the default timeout for a flush operation. This also represents the
|
||||
// minimum interval between flush operations that we'll trigger via Start
|
||||
defaultTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
// PendingAuditLog represents an audit log that has not yet been sent to the control plane.
|
||||
// Users of the audit logger should create an instance of this struct and pass it to the
|
||||
// Enqueue method. The EventID, ProfileID, and Retries fields are managed by the logger
|
||||
// and should not be set by the user.
|
||||
type PendingAuditLog struct {
|
||||
// EventID is the unique identifier for the event being logged.
|
||||
EventID string `json:",omitempty"`
|
||||
// Retries is the number of times we've attempted to submit this log.
|
||||
Retries int `json:",omitempty"`
|
||||
|
||||
// Action is the action to be logged. It must correspond to a known action in the control plane.
|
||||
Action tailcfg.ClientAuditAction `json:",omitempty"`
|
||||
// Details is an opaque string, specific to the action being logged. Empty strings may not
|
||||
// be valid depending on the action being logged.
|
||||
Details string `json:",omitempty"`
|
||||
// Timestamp is the time at which the audit log was generated on the node.
|
||||
TimeStamp time.Time `json:",omitzero"`
|
||||
}
|
||||
|
||||
func (p *PendingAuditLog) Equals(other PendingAuditLog) bool {
|
||||
return p.EventID == other.EventID
|
||||
}
|
||||
|
||||
// Transport provides a means for a client to send audit logs.
|
||||
// to a consumer (typically the control plane).
|
||||
type Transport interface {
|
||||
// SendAuditLog sends an audit log to the control plane.
|
||||
// If err is non-nil, the log was not sent successfully.
|
||||
// If retriable is true, the log may be retried.
|
||||
SendAuditLog(ctx context.Context, auditLog tailcfg.AuditLogRequest) (err error, retriable bool)
|
||||
}
|
||||
|
||||
// PersistentStore provides a means for an audit logger to persist logs to disk or memory.
|
||||
type PersistentStore interface {
|
||||
// Persist saves the given data to a persistent store. Persist may disard logs if
|
||||
// the store has a fixed size limit. Persist will overwrite existing data for the given key.
|
||||
Persist(key string, data []PendingAuditLog) error
|
||||
|
||||
// Restore retrieves the data from a persistent store. This must return
|
||||
// an empty slice if no data exists for the given key.
|
||||
Restore(key string) ([]PendingAuditLog, error)
|
||||
}
|
||||
|
||||
// Opts contains the configuration options for an AuditLogger.
|
||||
type Opts struct {
|
||||
// RetryLimit is maximum number of attempts the logger will make to send a log before giving up.
|
||||
RetryLimit int
|
||||
// Store is the persistent store used to save logs to disk.
|
||||
Store PersistentStore
|
||||
|
||||
Logf logger.Logf
|
||||
}
|
||||
|
||||
type State string
|
||||
|
||||
const (
|
||||
// stopped is the initial state of the logger and the state after Stop() is called. A logger in the stopped state
|
||||
// cannot flush logs but may accept new logs to be enqueued.
|
||||
stopped State = "stopped"
|
||||
// started is the state of a logger after Start() is called. A logger in the started state must have a running
|
||||
// flush worker.
|
||||
started State = "started"
|
||||
)
|
||||
|
||||
type AuditLogger struct {
|
||||
logf logger.Logf
|
||||
retryLimit int
|
||||
timeout time.Duration
|
||||
logId string
|
||||
store PersistentStore
|
||||
|
||||
// mu protects the fields below.
|
||||
mu sync.Mutex
|
||||
transport Transport // transport used to send logs
|
||||
pending []PendingAuditLog // pending logs to be sent
|
||||
state State // state of the logger
|
||||
flusher chan flushOp // channel used to signal a flush operation
|
||||
flushCancel context.CancelFunc // cancel function for the current flush operation's context
|
||||
flushCtx context.Context // context for the current flush
|
||||
workerCancel context.CancelFunc // cancel function for the flush worker's context
|
||||
|
||||
lastFlush time.Time
|
||||
}
|
||||
|
||||
type retryOp struct {
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
type flushOp struct {
|
||||
timeout time.Duration
|
||||
result chan<- FlushResult
|
||||
transport Transport
|
||||
}
|
||||
|
||||
type FlushResult int
|
||||
|
||||
func NewAuditLogger(opts Opts) *AuditLogger {
|
||||
logger := logger.WithPrefix(opts.Logf, "auditlog: ")
|
||||
q := &AuditLogger{
|
||||
retryLimit: opts.RetryLimit,
|
||||
pending: []PendingAuditLog{},
|
||||
logf: logger,
|
||||
timeout: defaultTimeout,
|
||||
store: opts.Store,
|
||||
state: stopped,
|
||||
flusher: make(chan flushOp),
|
||||
}
|
||||
q.logf("created")
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *AuditLogger) FlushAndStop(timeout time.Duration) {
|
||||
c := q.Flush(timeout, nil)
|
||||
<-c
|
||||
q.stop()
|
||||
}
|
||||
|
||||
// SetTransport starts the audit logger, resets the transport to the given value,
|
||||
// restores any persisted logs and immediately flushes the queue if it
|
||||
// was in the stopped state. Returns a read-only channel with a buffer
|
||||
// size of one indicating the number of retriable transactions that remain in the queue.
|
||||
//
|
||||
// # If this is called in the Started state
|
||||
//
|
||||
// If the queue is already in the started state, this will reset the transport and
|
||||
// immediately flush the queue. This is non-blocking and safe to call at-will.
|
||||
func (q *AuditLogger) SetTransport(t Transport, logId string) <-chan FlushResult {
|
||||
q.mu.Lock()
|
||||
|
||||
q.transport = t
|
||||
q.logId = logId
|
||||
to := q.timeout
|
||||
len := len(q.pending)
|
||||
last := q.lastFlush
|
||||
|
||||
if q.state == started {
|
||||
q.mu.Unlock()
|
||||
return q.retryIfNeeded(t, to, len, last)
|
||||
}
|
||||
|
||||
var ctx context.Context
|
||||
|
||||
err := q.restoreLocked()
|
||||
if err != nil {
|
||||
// Continue gracefully.
|
||||
q.logf("failed to restore pending logs: %w", err)
|
||||
}
|
||||
ctx, q.workerCancel = context.WithCancel(context.Background())
|
||||
q.logf("started for logID: %v", q.logId)
|
||||
|
||||
q.state = started
|
||||
q.mu.Unlock()
|
||||
|
||||
go q.flushWorker(ctx)
|
||||
return q.Flush(to, t)
|
||||
}
|
||||
|
||||
func (q *AuditLogger) retryIfNeeded(t Transport, interval time.Duration, pending int, lastAttempt time.Time) <-chan FlushResult {
|
||||
if time.Since(lastAttempt) < interval || pending == 0 {
|
||||
c := make(chan FlushResult, 1)
|
||||
c <- FlushResult(pending)
|
||||
return c
|
||||
} else {
|
||||
return q.Flush(q.timeout, t)
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue queues an audit log to be sent to the control plane.
|
||||
//
|
||||
// Returns a receive-only channel that will be sent a single value indicating the number of
|
||||
// retriable transactions that remain in the queue once flushed.
|
||||
func (q *AuditLogger) Enqueue(log PendingAuditLog) (<-chan FlushResult, error) {
|
||||
// generate a unique eventID for the log. This is used to de-duplicate logs
|
||||
// persisted to the store.
|
||||
bytes := make([]byte, 16)
|
||||
if _, err := rand.Read(bytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventID := fmt.Sprintf("%d", time.Now().Unix()) + hex.EncodeToString(bytes)
|
||||
|
||||
log.EventID = eventID
|
||||
return q.enqueue(log, true)
|
||||
}
|
||||
|
||||
// Flush asynchronously sends all pending audit logs to the control plane.
|
||||
// This will cancel any flush operations that are in-flight and start a new flush operation.
|
||||
// Calls to Flush are serialized. This returns a 1-buffered channel that will
|
||||
// a value indicating the number of retriable transactions that remain in the queue
|
||||
// after the flush operation completes.
|
||||
//
|
||||
// The flush operation will be cancelled after the given timeout.
|
||||
// If t is nil, the loggers current transport (if any) will be used.
|
||||
func (q *AuditLogger) Flush(timeout time.Duration, t Transport) <-chan FlushResult {
|
||||
c := make(chan FlushResult, 1)
|
||||
|
||||
q.mu.Lock()
|
||||
|
||||
// Important to early exit since a stopped logger will not have a flush worker.
|
||||
if q.state == stopped {
|
||||
c <- FlushResult(len(q.pending))
|
||||
q.mu.Unlock()
|
||||
return c
|
||||
}
|
||||
|
||||
if q.flushCancel != nil {
|
||||
q.flushCancel()
|
||||
}
|
||||
if q.flushCtx != nil {
|
||||
<-q.flushCtx.Done()
|
||||
}
|
||||
|
||||
f := q.flusher
|
||||
if t == nil {
|
||||
t = q.transport
|
||||
}
|
||||
q.mu.Unlock()
|
||||
|
||||
f <- flushOp{timeout, c, t}
|
||||
return c
|
||||
}
|
||||
|
||||
func (q *AuditLogger) flushWorker(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case op := <-q.flusher:
|
||||
q.flush(op.timeout, op.transport, op.result)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flush sends all pending logs to the control plane.
|
||||
//
|
||||
// timeout is the maximum time we will permit for the flush operation to complete.
|
||||
// result should be a 1-buffered chan that will always be sent a single value indicating
|
||||
// the number of retriable transactions that remain in the queue once the flush completes.
|
||||
//
|
||||
// q.mu must not be held.
|
||||
func (q *AuditLogger) flush(timeout time.Duration, t Transport, result chan<- FlushResult) {
|
||||
q.mu.Lock()
|
||||
// Early exit if we're stopped or have no logs to flush.
|
||||
if q.state == stopped || len(q.pending) == 0 || t == nil {
|
||||
q.persistLocked(q.pending)
|
||||
if result != nil {
|
||||
result <- FlushResult(len(q.pending))
|
||||
}
|
||||
q.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
pending := q.pending
|
||||
// Logs actively being flushed are no longer pending. Retriable failed transactions
|
||||
// will be requeued.
|
||||
q.pending = []PendingAuditLog{}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
q.flushCancel = cancel
|
||||
q.flushCtx = ctx
|
||||
q.lastFlush = time.Now()
|
||||
defer cancel()
|
||||
q.mu.Unlock()
|
||||
|
||||
requeued := q.sendToTransport(pending, t, ctx)
|
||||
|
||||
if result != nil {
|
||||
result <- FlushResult(requeued)
|
||||
}
|
||||
}
|
||||
|
||||
// flushLogsLocked sends all pending logs to the control plane. Returns the number of logs that
|
||||
// were requeued. Persists all pending logs to the store before returning.
|
||||
//
|
||||
// This may require multiple round trips to the control plane and can be a long running transaction.
|
||||
// q.mu must be not be held.
|
||||
func (q *AuditLogger) sendToTransport(pending []PendingAuditLog, t Transport, ctx context.Context) (requeued int) {
|
||||
failed, sent, requeued := 0, 0, 0
|
||||
|
||||
for _, log := range pending {
|
||||
var err error
|
||||
var retriable = true
|
||||
|
||||
if t != nil {
|
||||
req := tailcfg.AuditLogRequest{
|
||||
Action: tailcfg.ClientAuditAction(log.Action),
|
||||
Details: log.Details,
|
||||
Timestamp: log.TimeStamp,
|
||||
}
|
||||
|
||||
err, retriable = t.SendAuditLog(ctx, req)
|
||||
if err == nil {
|
||||
sent++
|
||||
continue
|
||||
}
|
||||
log.Retries++
|
||||
}
|
||||
|
||||
failed++
|
||||
if !retriable {
|
||||
q.logf("failed permanently: %w", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// We permit a maximum number of retries for each log. All retriable
|
||||
// errors should be transient and we should be able to send the log eventually, but
|
||||
// we don't want logs to be persisted indefinitely.
|
||||
if log.Retries < q.retryLimit {
|
||||
// enqueue the log for retry, but do not request an immediate flush.
|
||||
q.enqueue(log, false)
|
||||
requeued++
|
||||
} else {
|
||||
q.logf("failed permanently after %d retries: %w", log.Retries, err)
|
||||
}
|
||||
}
|
||||
q.logf("requeued %d, sent %d, failed %d", requeued, sent, failed)
|
||||
return requeued
|
||||
}
|
||||
|
||||
// stop synchronously cancels any incomplete flush operations, stops the audit logger,
|
||||
// and persists any pending logs to the store. You may continue to send logs to the logger in
|
||||
// the Stopped state, and they will be persisted to the store.
|
||||
//
|
||||
// Calling Flush and waiting on the result before calling stop is is required if you
|
||||
// want to ensure that a flush is attempted before stopping the logger.
|
||||
func (q *AuditLogger) stop() {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if q.state == stopped {
|
||||
return
|
||||
}
|
||||
|
||||
q.state = stopped
|
||||
q.transport = nil
|
||||
|
||||
if q.workerCancel != nil {
|
||||
q.workerCancel()
|
||||
}
|
||||
|
||||
if q.flushCancel != nil {
|
||||
q.flushCancel()
|
||||
}
|
||||
|
||||
if q.flushCtx != nil {
|
||||
<-q.flushCtx.Done()
|
||||
q.flushCtx = nil
|
||||
}
|
||||
|
||||
err := q.persistLocked(q.pending)
|
||||
if err != nil {
|
||||
// Continue gracefully.
|
||||
q.logf("failed to persist logs: %w", err)
|
||||
}
|
||||
c := len(q.pending)
|
||||
q.pending = []PendingAuditLog{}
|
||||
q.logf("stopped for profileID: %v persisted: %d", q.logId, c)
|
||||
}
|
||||
|
||||
// restoreLocked restores logs from the persistent store and
|
||||
// appends them to q.pending.
|
||||
//
|
||||
// q.mu must be held.
|
||||
func (q *AuditLogger) restoreLocked() error {
|
||||
if q.logId == "" {
|
||||
return errors.New("no logId set")
|
||||
}
|
||||
|
||||
key := string(q.logId)
|
||||
|
||||
logs, err := q.store.Restore(key)
|
||||
if err != nil {
|
||||
// An error on restoration is not fatal.
|
||||
logs = []PendingAuditLog{}
|
||||
q.logf("failed to restore logs: %w", err)
|
||||
}
|
||||
// Logs are back in the queue, remove them from the persistent store.
|
||||
err = q.store.Persist(key, nil)
|
||||
if err != nil {
|
||||
q.logf("failed to restore logs: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
q.logf("restored %d pending logs for profileId %v", len(logs), q.logId)
|
||||
q.pending = deduplicateAndSort(append(q.pending, logs...))
|
||||
return nil
|
||||
}
|
||||
|
||||
// persistLocked persists logs to the store that are
|
||||
// not already present in the store.
|
||||
//
|
||||
// q.mu must be held.
|
||||
func (q *AuditLogger) persistLocked(p []PendingAuditLog) error {
|
||||
if len(p) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if q.logId == "" {
|
||||
return errors.New("no logId set")
|
||||
}
|
||||
|
||||
key := string(q.logId)
|
||||
persisted, _ := q.store.Restore(key)
|
||||
logs := append(persisted, p...)
|
||||
logs = deduplicateAndSort(logs)
|
||||
|
||||
return q.store.Persist(key, logs)
|
||||
}
|
||||
|
||||
func deduplicateAndSort(logs []PendingAuditLog) []PendingAuditLog {
|
||||
seen := make(map[string]struct{})
|
||||
deduped := []PendingAuditLog{}
|
||||
for _, log := range logs {
|
||||
if _, ok := seen[log.EventID]; !ok {
|
||||
deduped = append(deduped, log)
|
||||
seen[log.EventID] = struct{}{}
|
||||
}
|
||||
}
|
||||
// Sort logs by timestamp - oldest to newest
|
||||
sort.Slice(deduped, func(i, j int) bool {
|
||||
return logs[i].TimeStamp.Before(logs[j].TimeStamp)
|
||||
})
|
||||
return deduped
|
||||
}
|
||||
|
||||
func (q *AuditLogger) enqueue(log PendingAuditLog, flush bool) (<-chan FlushResult, error) {
|
||||
q.mu.Lock()
|
||||
|
||||
result := make(chan FlushResult, 1)
|
||||
|
||||
if q.state == stopped {
|
||||
q.pending = append(q.pending, log)
|
||||
q.persistLocked([]PendingAuditLog{log})
|
||||
result <- FlushResult(len(q.pending))
|
||||
q.mu.Unlock()
|
||||
return result, nil
|
||||
}
|
||||
|
||||
q.pending = append(q.pending, log)
|
||||
timeout := q.timeout
|
||||
transport := q.transport
|
||||
q.mu.Unlock()
|
||||
if !flush {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return q.Flush(timeout, transport), nil
|
||||
|
||||
}
|
||||
|
||||
var _ PersistentStore = (*StateStore)(nil)
|
||||
|
||||
// StateStore is a concrete implementation of PersistentStore
|
||||
// using ipn.StateStore as the underlying storage.
|
||||
type StateStore struct {
|
||||
mu sync.Mutex
|
||||
store ipn.StateStore
|
||||
logf logger.Logf
|
||||
}
|
||||
|
||||
func NewStateStore(store ipn.StateStore, logf logger.Logf) PersistentStore {
|
||||
return &StateStore{
|
||||
store: store,
|
||||
logf: logf,
|
||||
}
|
||||
}
|
||||
|
||||
// Persist saves the given logs to an ipn.StateStore. This overwrites
|
||||
// any existing entries for the given key.
|
||||
func (a *StateStore) Persist(key string, logs []PendingAuditLog) error {
|
||||
// Sort logs by timestamp - oldest to newest
|
||||
sort.Slice(logs, func(i, j int) bool {
|
||||
return logs[i].TimeStamp.Before(logs[j].TimeStamp)
|
||||
})
|
||||
|
||||
// AppStore variants have a hard limit of 4Kb with their default StateStore implementation.
|
||||
// (barnstar) TODO: Plumb in a more generic file-based store without the size limitations using
|
||||
// shared storage similar to macsys.
|
||||
if runtime.GOOS == "ios" || (runtime.GOOS == "darwin" && version.IsMacAppStore()) || version.IsAppleTV() {
|
||||
trimmedLogs, err := a.truncateLogs(logs, 4096)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logs = trimmedLogs
|
||||
}
|
||||
|
||||
data, err := json.Marshal(logs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
k := ipn.StateKey(key)
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
a.store.WriteState(k, data)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// truncateLogs removes the first entry in the given slice of logs repeatedly until
|
||||
// the total size of the serialized logs is less than or equal to the given max size.
|
||||
func (a *StateStore) truncateLogs(logs []PendingAuditLog, maxBytes int) ([]PendingAuditLog, error) {
|
||||
if len(logs) == 0 {
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
data, err := json.Marshal(logs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for len(data) > maxBytes && len(logs) > 0 {
|
||||
logs = logs[1:]
|
||||
data, err = json.Marshal(logs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
// Restore retrieves the logs from an ipn.StateStore.
|
||||
func (a *StateStore) Restore(key string) ([]PendingAuditLog, error) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
k := ipn.StateKey(key)
|
||||
data, err := a.store.ReadState(k)
|
||||
|
||||
switch {
|
||||
case errors.Is(err, ipn.ErrStateNotExist):
|
||||
return []PendingAuditLog{}, nil
|
||||
case err != nil:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var logs []PendingAuditLog
|
||||
if err := json.Unmarshal(data, &logs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return logs, nil
|
||||
}
|
498
ipn/auditlog/auditlog_test.go
Normal file
498
ipn/auditlog/auditlog_test.go
Normal file
@ -0,0 +1,498 @@
|
||||
// Copyright (c) Tailscale Inc & AUTHORS
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
|
||||
package auditlog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"tailscale.com/ipn/store/mem"
|
||||
"tailscale.com/tailcfg"
|
||||
)
|
||||
|
||||
// TestEnqueueAndFlush enqueues n logs and flushes them.
|
||||
// We expect all logs to be flushed.
|
||||
func TestEnqueueAndFlush(t *testing.T) {
|
||||
mockTransport := &mockAuditLogTransport{t: t}
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q := NewAuditLogger(Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
q.SetTransport(mockTransport, "test")
|
||||
|
||||
t.Cleanup(func() {
|
||||
q.stop()
|
||||
})
|
||||
|
||||
wantSent := 5
|
||||
wantFailed, gotFailed := FlushResult(0), FlushResult(0)
|
||||
|
||||
for i := 0; i < wantSent; i++ {
|
||||
log := PendingAuditLog{
|
||||
Details: fmt.Sprintf("log %d", i),
|
||||
}
|
||||
result, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to enqueue audit log: %v", err)
|
||||
}
|
||||
select {
|
||||
case gotFailed = <-result:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for logs to be flushed")
|
||||
}
|
||||
}
|
||||
|
||||
gotSent := mockTransport.sendCount
|
||||
if wantSent != gotSent {
|
||||
t.Fatalf("want %d flushed, got %d", wantSent, gotSent)
|
||||
}
|
||||
|
||||
if wantFailed != gotFailed {
|
||||
t.Fatalf("want %d failed, got %d", wantFailed, gotFailed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFailuresFlushLater enqueues a set of logs, all of which will fail to flush.
|
||||
// We then set the transport to not-fail, call Flush and expect all transactions to
|
||||
// complete successfully.
|
||||
func TestFailuresFlushLater(t *testing.T) {
|
||||
mockTransport := &mockAuditLogTransport{
|
||||
t: t,
|
||||
delay: 10 * time.Millisecond,
|
||||
retry: true,
|
||||
fail: true,
|
||||
}
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q := NewAuditLogger(Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
q.SetTransport(mockTransport, "test")
|
||||
|
||||
t.Cleanup(func() {
|
||||
q.stop()
|
||||
})
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
log := PendingAuditLog{
|
||||
Details: fmt.Sprintf("log %d", i),
|
||||
}
|
||||
result, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to enqueue audit log: %v", err)
|
||||
}
|
||||
<-result
|
||||
}
|
||||
|
||||
mockTransport.mu.Lock()
|
||||
mockTransport.fail = false
|
||||
mockTransport.mu.Unlock()
|
||||
|
||||
result := q.Flush(5*time.Second, nil)
|
||||
|
||||
wantFailed, gotFailed := FlushResult(0), FlushResult(0)
|
||||
|
||||
select {
|
||||
case gotFailed = <-result:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for logs to be flushed")
|
||||
}
|
||||
|
||||
wantSent, gotSent := 5, mockTransport.sendCount
|
||||
|
||||
if wantSent != gotSent {
|
||||
t.Fatalf("want %d sent, got %d", wantSent, gotSent)
|
||||
}
|
||||
|
||||
if wantFailed != gotFailed {
|
||||
t.Fatalf("want %d failed, got %d", wantFailed, gotFailed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFailureExhaustion enqueues 3 logs, all of which will fail to flush. We then call Flush
|
||||
// several times to exhaust the retries and expect and expect permanent failure.
|
||||
func TestFailureExhaustion(t *testing.T) {
|
||||
mockTransport := &mockAuditLogTransport{
|
||||
t: t,
|
||||
delay: 10 * time.Millisecond,
|
||||
retry: true,
|
||||
fail: true,
|
||||
}
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q := NewAuditLogger(Opts{
|
||||
RetryLimit: 3,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
q.SetTransport(mockTransport, "test")
|
||||
|
||||
t.Cleanup(func() {
|
||||
q.stop()
|
||||
})
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
log := PendingAuditLog{
|
||||
Details: fmt.Sprintf("log %d", i),
|
||||
}
|
||||
_, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to enqueue audit log: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
wantFailed, gotFailed := FlushResult(0), FlushResult(0)
|
||||
for i := 0; i < 3; i++ {
|
||||
flushed := q.Flush(5*time.Second, mockTransport)
|
||||
gotFailed = <-flushed
|
||||
}
|
||||
|
||||
wantSent, gotSent := 0, mockTransport.sendCount
|
||||
|
||||
if wantSent != gotSent {
|
||||
t.Fatalf("want %d sent, got %d", wantSent, gotSent)
|
||||
}
|
||||
|
||||
if wantFailed != gotFailed {
|
||||
t.Fatalf("want %d failed, got %d", wantFailed, gotFailed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueAndFail enqueues a set of logs logs, both of which will fail and are not
|
||||
// retriable. We then call Flush and expect all to be unsent.
|
||||
func TestEnqueueAndFail(t *testing.T) {
|
||||
mockTransport := &mockAuditLogTransport{
|
||||
t: t,
|
||||
delay: 10 * time.Millisecond,
|
||||
fail: true,
|
||||
retry: false,
|
||||
}
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q := NewAuditLogger(Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
q.SetTransport(mockTransport, "test")
|
||||
|
||||
t.Cleanup(func() {
|
||||
q.stop()
|
||||
})
|
||||
|
||||
unsentCount := FlushResult(0)
|
||||
for i := 0; i < 2; i++ {
|
||||
log := PendingAuditLog{
|
||||
Details: fmt.Sprintf("log %d", i),
|
||||
}
|
||||
result, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to enqueue audit log: %v", err)
|
||||
}
|
||||
select {
|
||||
case unsentCount = <-result:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for logs to be flushed")
|
||||
}
|
||||
}
|
||||
|
||||
gotUnsent, wantUnsent := int(unsentCount), 0
|
||||
|
||||
if wantUnsent != gotUnsent {
|
||||
t.Fatalf("want %d unsent, got %d", wantUnsent, gotUnsent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnqueueAndFailTimeout enqueues a set of logs, all of which will fail to flush due to context
|
||||
// timeouts. With the retry limit set to zero, we expect 0 to be sent to the result
|
||||
// channel.
|
||||
func TestEnqueueAndFailTimeout(t *testing.T) {
|
||||
mockTransport := &mockAuditLogTransport{
|
||||
t: t,
|
||||
delay: 2 * time.Second,
|
||||
fail: true,
|
||||
retry: false,
|
||||
}
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q := NewAuditLogger(Opts{
|
||||
RetryLimit: 0,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
q.SetTransport(mockTransport, "test")
|
||||
|
||||
t.Cleanup(func() {
|
||||
q.stop()
|
||||
})
|
||||
|
||||
q.timeout = time.Millisecond
|
||||
|
||||
unsentCount := FlushResult(0)
|
||||
for i := 0; i < 2; i++ {
|
||||
log := PendingAuditLog{
|
||||
Details: fmt.Sprintf("log %d", i),
|
||||
}
|
||||
result, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to enqueue audit log: %v", err)
|
||||
}
|
||||
select {
|
||||
case unsentCount = <-result:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for logs to be flushed")
|
||||
}
|
||||
}
|
||||
|
||||
gotFailed, wantUnsent := unsentCount, FlushResult(0)
|
||||
|
||||
if wantUnsent != gotFailed {
|
||||
t.Fatalf("want %d unsent, got %d", wantUnsent, gotFailed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStart enqueues a set of logs while the queue is stopped. We then start the queue and expect
|
||||
// all logs to be flushed.
|
||||
func TestStart(t *testing.T) {
|
||||
mockTransport := &mockAuditLogTransport{t: t}
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q := NewAuditLogger(Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
|
||||
t.Cleanup(func() {
|
||||
q.stop()
|
||||
})
|
||||
|
||||
log := PendingAuditLog{
|
||||
Details: "log",
|
||||
}
|
||||
|
||||
// Toss a couple of logs at the stopped queue. These should get
|
||||
// persisted
|
||||
for i := 0; i < 2; i++ {
|
||||
result, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("enqueue failed %v", err)
|
||||
}
|
||||
|
||||
wantPending, gotPending := FlushResult(i+1), <-result
|
||||
if wantPending != gotPending {
|
||||
t.Fatalf("want %d pending, got %d", wantPending, gotPending)
|
||||
}
|
||||
}
|
||||
|
||||
q.SetTransport(mockTransport, "test")
|
||||
|
||||
// Submit another one after starting
|
||||
result, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("enqueue failed %v", err)
|
||||
}
|
||||
|
||||
gotPending, wantPending := <-result, FlushResult(0)
|
||||
// 3 - two while stopped, one after starting
|
||||
wantSent, gotSent := 3, mockTransport.sendCount
|
||||
|
||||
if gotSent != wantSent {
|
||||
t.Fatalf("want %d sent, got %d", wantSent, gotSent)
|
||||
}
|
||||
|
||||
if wantPending != gotPending {
|
||||
t.Fatalf("want %d pending, got %d", wantPending, wantPending)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStop(t *testing.T) {
|
||||
mockTransport := &mockAuditLogTransport{
|
||||
t: t,
|
||||
fail: true,
|
||||
retry: true,
|
||||
}
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q := NewAuditLogger(Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
q.SetTransport(mockTransport, "test")
|
||||
|
||||
t.Cleanup(func() {
|
||||
q.stop()
|
||||
})
|
||||
|
||||
log := PendingAuditLog{
|
||||
Details: "log",
|
||||
}
|
||||
result, err := q.Enqueue(log)
|
||||
if err != nil {
|
||||
t.Fatalf("enqueue failed %v", err)
|
||||
}
|
||||
|
||||
wantPending, gotPending := FlushResult(1), <-result
|
||||
if wantPending != gotPending {
|
||||
t.Fatalf("want %d pending, got %d", wantPending, gotPending)
|
||||
}
|
||||
|
||||
q.stop()
|
||||
// This second stop should no-op
|
||||
q.stop()
|
||||
|
||||
wantSent, gotSent := 0, mockTransport.sendCount
|
||||
if wantSent != gotSent {
|
||||
t.Fatalf("want %d sent, got %d", wantSent, gotSent)
|
||||
}
|
||||
|
||||
mockTransport.fail = false
|
||||
result = q.SetTransport(mockTransport, "")
|
||||
|
||||
gotFailed, wantFailed := <-result, FlushResult(0)
|
||||
if wantFailed != gotFailed {
|
||||
t.Fatalf("want %d failed, got %d", wantFailed, gotFailed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestFlushInAllStates tests that Flush writes a value to the returned channel
|
||||
// regardless of what state the logger is in.
|
||||
func TestFlushInAllStates(t *testing.T) {
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
q1 := NewAuditLogger(Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
q1.SetTransport(&mockAuditLogTransport{t: t}, "test")
|
||||
|
||||
c := q1.Flush(time.Second, nil)
|
||||
<-c
|
||||
q1.stop()
|
||||
|
||||
q1 = NewAuditLogger(Opts{
|
||||
RetryLimit: 100,
|
||||
Logf: t.Logf,
|
||||
Store: mockStore,
|
||||
})
|
||||
|
||||
// Flush write a value to the returned channel even in the
|
||||
// case of a stopped queue
|
||||
c = q1.Flush(time.Second, nil)
|
||||
<-c
|
||||
q1.stop()
|
||||
|
||||
// Nothing to check, but we have to get here...
|
||||
}
|
||||
|
||||
// TestLogStoring tests that audit logs are persisted sorted by timestamp, oldest to newest
|
||||
func TestLogSorting(t *testing.T) {
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
logs := []PendingAuditLog{
|
||||
{Details: "log 3", TimeStamp: time.Now().Add(-time.Minute * 1)},
|
||||
{Details: "log 2", TimeStamp: time.Now().Add(-time.Minute * 2)},
|
||||
{Details: "log 1", TimeStamp: time.Now().Add(-time.Minute * 3)},
|
||||
}
|
||||
|
||||
wantLogs := []PendingAuditLog{
|
||||
{Details: "log 1"},
|
||||
{Details: "log 2"},
|
||||
{Details: "log 3"},
|
||||
}
|
||||
|
||||
mockStore.Persist("test", logs)
|
||||
|
||||
gotLogs, err := mockStore.Restore("test")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to restore logs: %v", err)
|
||||
}
|
||||
|
||||
for i := range gotLogs {
|
||||
if gotLogs[i].Details != wantLogs[i].Details {
|
||||
t.Fatalf("want %s, got %s", wantLogs[i].Details, gotLogs[i].Details)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestLogSizeLimit tests that logs are trimmed to fit within the size limit
|
||||
func TestLogSizeLimit(t *testing.T) {
|
||||
sizeLimit := 1024
|
||||
longDetails := string(repeatedBytes(sizeLimit, t))
|
||||
mockStore := NewStateStore(&mem.Store{}, t.Logf)
|
||||
|
||||
logs := []PendingAuditLog{
|
||||
{Details: longDetails},
|
||||
{Details: "log 2"},
|
||||
{Details: "log 3"},
|
||||
}
|
||||
|
||||
wantLogs := []PendingAuditLog{
|
||||
{Details: "log 2"},
|
||||
{Details: "log 3"},
|
||||
}
|
||||
|
||||
// Cast mockStore to AuditLogPersistentStore
|
||||
store := mockStore.(*StateStore)
|
||||
|
||||
gotLogs, err := store.truncateLogs(logs, sizeLimit)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to limit log size: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(gotLogs, wantLogs) {
|
||||
t.Fatalf("want %v, got %v", wantLogs, gotLogs)
|
||||
}
|
||||
}
|
||||
|
||||
func repeatedBytes(n int, t *testing.T) []byte {
|
||||
t.Helper()
|
||||
b := make([]byte, n)
|
||||
for i := range b {
|
||||
b[i] = 'a'
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// mock implementations for testing
|
||||
|
||||
type mockAuditLogTransport struct {
|
||||
t *testing.T
|
||||
|
||||
mu sync.Mutex
|
||||
sendCount int // number of logs sent by the transport
|
||||
delay time.Duration // artificial delay before sending
|
||||
fail bool // true if the transport should fail
|
||||
retry bool // if true, retr
|
||||
}
|
||||
|
||||
func (m *mockAuditLogTransport) SendAuditLog(ctx context.Context, _ tailcfg.AuditLogRequest) (err error, retriable bool) {
|
||||
m.t.Helper()
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.New("Context Cancelled"), m.retry
|
||||
case <-time.After(m.delay):
|
||||
}
|
||||
|
||||
if m.fail {
|
||||
return errors.New("Failed"), m.retry
|
||||
} else {
|
||||
m.sendCount += 1
|
||||
return nil, false
|
||||
}
|
||||
}
|
@ -10,12 +10,11 @@ import (
|
||||
|
||||
"tailscale.com/client/tailscale/apitype"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/tailcfg"
|
||||
)
|
||||
|
||||
// AuditLogFunc is any function that can be used to log audit actions performed by an [Actor].
|
||||
//
|
||||
// TODO(nickkhyl,barnstar): define a named string type for the action (in tailcfg?) and use it here.
|
||||
type AuditLogFunc func(action, details string)
|
||||
type AuditLogFunc func(id ipn.ProfileID, action tailcfg.ClientAuditAction, details string)
|
||||
|
||||
// Actor is any actor using the [ipnlocal.LocalBackend].
|
||||
//
|
||||
@ -45,7 +44,7 @@ type Actor interface {
|
||||
//
|
||||
// If the auditLogger is non-nil, it is used to write details about the action
|
||||
// to the audit log when required by the policy.
|
||||
CheckProfileAccess(profile ipn.LoginProfileView, requestedAccess ProfileAccess, auditLogger AuditLogFunc) error
|
||||
CheckProfileAccess(profile ipn.LoginProfileView, requestedAccess ProfileAccess, auditLogFn AuditLogFunc) error
|
||||
|
||||
// IsLocalSystem reports whether the actor is the Windows' Local System account.
|
||||
//
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"tailscale.com/client/tailscale/apitype"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/util/syspolicy"
|
||||
)
|
||||
|
||||
@ -48,7 +49,7 @@ func (a actorWithPolicyChecks) CheckProfileAccess(profile ipn.LoginProfileView,
|
||||
//
|
||||
// TODO(nickkhyl): unexport it when we move [ipn.Actor] implementations from [ipnserver]
|
||||
// and corp to this package.
|
||||
func CheckDisconnectPolicy(actor Actor, profile ipn.LoginProfileView, reason string, auditLogger AuditLogFunc) error {
|
||||
func CheckDisconnectPolicy(actor Actor, profile ipn.LoginProfileView, reason string, auditFn AuditLogFunc) error {
|
||||
if alwaysOn, _ := syspolicy.GetBoolean(syspolicy.AlwaysOn, false); !alwaysOn {
|
||||
return nil
|
||||
}
|
||||
@ -58,15 +59,14 @@ func CheckDisconnectPolicy(actor Actor, profile ipn.LoginProfileView, reason str
|
||||
if reason == "" {
|
||||
return errors.New("disconnect not allowed: reason required")
|
||||
}
|
||||
if auditLogger != nil {
|
||||
if auditFn != nil {
|
||||
var details string
|
||||
if username, _ := actor.Username(); username != "" { // best-effort; we don't have it on all platforms
|
||||
details = fmt.Sprintf("%q is being disconnected by %q: %v", profile.Name(), username, reason)
|
||||
} else {
|
||||
details = fmt.Sprintf("%q is being disconnected: %v", profile.Name(), reason)
|
||||
}
|
||||
// TODO(nickkhyl,barnstar): use a const for DISCONNECT_NODE.
|
||||
auditLogger("DISCONNECT_NODE", details)
|
||||
auditFn(profile.ID(), tailcfg.AuditNodeDisconnect, details)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -57,6 +57,7 @@ import (
|
||||
"tailscale.com/health/healthmsg"
|
||||
"tailscale.com/hostinfo"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/auditlog"
|
||||
"tailscale.com/ipn/conffile"
|
||||
"tailscale.com/ipn/ipnauth"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
@ -394,6 +395,8 @@ type LocalBackend struct {
|
||||
// and the user has disconnected with a reason.
|
||||
// See tailscale/corp#26146.
|
||||
overrideAlwaysOn bool
|
||||
|
||||
auditLogger *auditlog.AuditLogger
|
||||
}
|
||||
|
||||
// HealthTracker returns the health tracker for the backend.
|
||||
@ -831,7 +834,8 @@ func (b *LocalBackend) pauseOrResumeControlClientLocked() {
|
||||
return
|
||||
}
|
||||
networkUp := b.prevIfState.AnyInterfaceUp()
|
||||
b.cc.SetPaused((b.state == ipn.Stopped && b.netMap != nil) || (!networkUp && !testenv.InTest() && !assumeNetworkUpdateForTest()))
|
||||
pause := ((b.state == ipn.Stopped && b.netMap != nil) || (!networkUp && !testenv.InTest() && !assumeNetworkUpdateForTest()))
|
||||
b.cc.SetPaused(pause)
|
||||
}
|
||||
|
||||
// DisconnectControl shuts down control client. This can be run before node shutdown to force control to consider this ndoe
|
||||
@ -2312,6 +2316,13 @@ func (b *LocalBackend) Start(opts ipn.Options) error {
|
||||
debugFlags = append([]string{"netstack"}, debugFlags...)
|
||||
}
|
||||
|
||||
al := auditlog.NewAuditLogger(auditlog.Opts{
|
||||
Logf: b.logf,
|
||||
RetryLimit: 100,
|
||||
Store: auditlog.NewStateStore(b.store, b.logf),
|
||||
})
|
||||
b.auditLogger = al
|
||||
|
||||
// TODO(apenwarr): The only way to change the ServerURL is to
|
||||
// re-run b.Start, because this is the only place we create a
|
||||
// new controlclient. EditPrefs allows you to overwrite ServerURL,
|
||||
@ -2337,6 +2348,7 @@ func (b *LocalBackend) Start(opts ipn.Options) error {
|
||||
C2NHandler: http.HandlerFunc(b.handleC2N),
|
||||
DialPlan: &b.dialPlan, // pointer because it can't be copied
|
||||
ControlKnobs: b.sys.ControlKnobs(),
|
||||
AuditLogShutdown: al.FlushAndStop,
|
||||
|
||||
// Don't warn about broken Linux IP forwarding when
|
||||
// netstack is being used.
|
||||
@ -4145,6 +4157,26 @@ func (b *LocalBackend) MaybeClearAppConnector(mp *ipn.MaskedPrefs) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// enqueueAuditLogLocked enqueues an audit log entry for the specified action and details
|
||||
// b.mu must be held.
|
||||
func (b *LocalBackend) enqueueAuditLogLocked(id ipn.ProfileID, action tailcfg.ClientAuditAction, details string) {
|
||||
log := auditlog.PendingAuditLog{
|
||||
Action: action,
|
||||
Details: details,
|
||||
TimeStamp: time.Now(),
|
||||
}
|
||||
if b.auditLogger != nil {
|
||||
_, err := b.auditLogger.Enqueue(log)
|
||||
// We don't want to return an error here, because it will get propagated to localAPI, and prevent
|
||||
// things like disconnections and user switches - but we do want to log it.
|
||||
if err != nil {
|
||||
b.logf("failed to enqueue audit log %v: %w", log, err)
|
||||
}
|
||||
} else {
|
||||
b.logf("failed to enqueue audit log %v: no audit logger configred", log)
|
||||
}
|
||||
}
|
||||
|
||||
// EditPrefs applies the changes in mp to the current prefs,
|
||||
// acting as the tailscaled itself rather than a specific user.
|
||||
func (b *LocalBackend) EditPrefs(mp *ipn.MaskedPrefs) (ipn.PrefsView, error) {
|
||||
@ -4170,9 +4202,7 @@ func (b *LocalBackend) EditPrefsAs(mp *ipn.MaskedPrefs, actor ipnauth.Actor) (ip
|
||||
unlock := b.lockAndGetUnlock()
|
||||
defer unlock()
|
||||
if mp.WantRunningSet && !mp.WantRunning && b.pm.CurrentPrefs().WantRunning() {
|
||||
// TODO(barnstar,nickkhyl): replace loggerFn with the actual audit logger.
|
||||
loggerFn := func(action, details string) { b.logf("[audit]: %s: %s", action, details) }
|
||||
if err := actor.CheckProfileAccess(b.pm.CurrentProfile(), ipnauth.Disconnect, loggerFn); err != nil {
|
||||
if err := actor.CheckProfileAccess(b.pm.CurrentProfile(), ipnauth.Disconnect, b.enqueueAuditLogLocked); err != nil {
|
||||
return ipn.PrefsView{}, err
|
||||
}
|
||||
|
||||
@ -5690,6 +5720,8 @@ func (b *LocalBackend) requestEngineStatusAndWait() {
|
||||
func (b *LocalBackend) setControlClientLocked(cc controlclient.Client) {
|
||||
b.cc = cc
|
||||
b.ccAuto, _ = cc.(*controlclient.Auto)
|
||||
pid := b.pm.CurrentProfile().ID()
|
||||
b.auditLogger.SetTransport(b.ccAuto, string(pid))
|
||||
}
|
||||
|
||||
// resetControlClientLocked sets b.cc to nil and returns the old value. If the
|
||||
@ -5699,7 +5731,6 @@ func (b *LocalBackend) resetControlClientLocked() controlclient.Client {
|
||||
if b.cc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
b.resetAuthURLLocked()
|
||||
|
||||
// When we clear the control client, stop any outstanding netmap expiry
|
||||
|
@ -2969,3 +2969,32 @@ const LBHeader = "Ts-Lb"
|
||||
// correspond to those IPs. Any services that don't correspond to a service
|
||||
// this client is hosting can be ignored.
|
||||
type ServiceIPMappings map[ServiceName][]netip.Addr
|
||||
|
||||
// ClientAuditAction represents an auditable action that a client can report to the
|
||||
// control plane. These actions must correspond to the supported actions
|
||||
// in the control plane.
|
||||
type ClientAuditAction string
|
||||
|
||||
const (
|
||||
// AuditNodeDisconnect action must be sent when a node has disconnected
|
||||
// from the control plane. The details must include a reason in the Details
|
||||
// field, either generated, or entered by the user.
|
||||
AuditNodeDisconnect = ClientAuditAction("DISCONNECT_NODE")
|
||||
)
|
||||
|
||||
// AuditLogRequest represents an audit log request to be sent to the control plane.
|
||||
//
|
||||
// This is JSON-encoded and sent over the control plane connection to:
|
||||
//
|
||||
// POST https://<control-plane>/machine/audit-logs
|
||||
type AuditLogRequest struct {
|
||||
// NodeKey is the client's current node key.
|
||||
NodeKey key.NodePublic `json:",omitzero"`
|
||||
// Action is the action to be logged. It must correspond to a known action in the control plane.
|
||||
Action ClientAuditAction `json:",omitempty"`
|
||||
// Details is an opaque string, specific to the action being logged. Empty strings may not
|
||||
// be valid depending on the action being logged.
|
||||
Details string `json:",omitempty"`
|
||||
// Timestamp is the time at which the audit log was generated on the node.
|
||||
Timestamp time.Time `json:",omitzero"`
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user