mirror of
				https://github.com/tailscale/tailscale.git
				synced 2025-10-25 18:20:07 +00:00 
			
		
		
		
	 3672f29a4e
			
		
	
	3672f29a4e
	
	
	
		
			
			The goal is to move more network state accessors to netmon.Monitor where they can be cheaper/cached. But first (this change and others) we need to make sure the one netmon.Monitor is plumbed everywhere. Some notable bits: * tsdial.NewDialer is added, taking a now-required netmon * because a tsdial.Dialer always has a netmon, anything taking both a Dialer and a NetMon is now redundant; take only the Dialer and get the NetMon from that if/when needed. * netmon.NewStatic is added, primarily for tests Updates tailscale/corp#10910 Updates tailscale/corp#18960 Updates #7967 Updates #3299 Change-Id: I877f9cb87618c4eb037cee098241d18da9c01691 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
		
			
				
	
	
		
			968 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			968 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) Tailscale Inc & AUTHORS
 | |
| // SPDX-License-Identifier: BSD-3-Clause
 | |
| 
 | |
| // Package logtail sends logs to log.tailscale.io.
 | |
| package logtail
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"crypto/rand"
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"log"
 | |
| 	mrand "math/rand"
 | |
| 	"net/http"
 | |
| 	"net/netip"
 | |
| 	"os"
 | |
| 	"regexp"
 | |
| 	"runtime"
 | |
| 	"slices"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-json-experiment/json/jsontext"
 | |
| 	"tailscale.com/envknob"
 | |
| 	"tailscale.com/net/netmon"
 | |
| 	"tailscale.com/net/sockstats"
 | |
| 	"tailscale.com/net/tsaddr"
 | |
| 	"tailscale.com/tstime"
 | |
| 	tslogger "tailscale.com/types/logger"
 | |
| 	"tailscale.com/types/logid"
 | |
| 	"tailscale.com/util/set"
 | |
| 	"tailscale.com/util/truncate"
 | |
| 	"tailscale.com/util/zstdframe"
 | |
| )
 | |
| 
 | |
| // maxSize is the maximum size that a single log entry can be.
 | |
| // It is also the maximum body size that may be uploaded at a time.
 | |
| const maxSize = 256 << 10
 | |
| 
 | |
| // maxTextSize is the maximum size for a text log message.
 | |
| // Note that JSON log messages can be as large as maxSize.
 | |
| const maxTextSize = 16 << 10
 | |
| 
 | |
| // lowMemRatio reduces maxSize and maxTextSize by this ratio in lowMem mode.
 | |
| const lowMemRatio = 4
 | |
| 
 | |
| // bufferSize is the typical buffer size to retain.
 | |
| // It is large enough to handle most log messages,
 | |
| // but not too large to be a notable waste of memory if retained forever.
 | |
| const bufferSize = 4 << 10
 | |
| 
 | |
| // DefaultHost is the default host name to upload logs to when
 | |
| // Config.BaseURL isn't provided.
 | |
| const DefaultHost = "log.tailscale.io"
 | |
| 
 | |
| const defaultFlushDelay = 2 * time.Second
 | |
| 
 | |
| const (
 | |
| 	// CollectionNode is the name of a logtail Config.Collection
 | |
| 	// for tailscaled (or equivalent: IPNExtension, Android app).
 | |
| 	CollectionNode = "tailnode.log.tailscale.io"
 | |
| )
 | |
| 
 | |
| type Config struct {
 | |
| 	Collection     string          // collection name, a domain name
 | |
| 	PrivateID      logid.PrivateID // private ID for the primary log stream
 | |
| 	CopyPrivateID  logid.PrivateID // private ID for a log stream that is a superset of this log stream
 | |
| 	BaseURL        string          // if empty defaults to "https://log.tailscale.io"
 | |
| 	HTTPC          *http.Client    // if empty defaults to http.DefaultClient
 | |
| 	SkipClientTime bool            // if true, client_time is not written to logs
 | |
| 	LowMemory      bool            // if true, logtail minimizes memory use
 | |
| 	Clock          tstime.Clock    // if set, Clock.Now substitutes uses of time.Now
 | |
| 	Stderr         io.Writer       // if set, logs are sent here instead of os.Stderr
 | |
| 	StderrLevel    int             // max verbosity level to write to stderr; 0 means the non-verbose messages only
 | |
| 	Buffer         Buffer          // temp storage, if nil a MemoryBuffer
 | |
| 	CompressLogs   bool            // whether to compress the log uploads
 | |
| 
 | |
| 	// MetricsDelta, if non-nil, is a func that returns an encoding
 | |
| 	// delta in clientmetrics to upload alongside existing logs.
 | |
| 	// It can return either an empty string (for nothing) or a string
 | |
| 	// that's safe to embed in a JSON string literal without further escaping.
 | |
| 	MetricsDelta func() string
 | |
| 
 | |
| 	// FlushDelayFn, if non-nil is a func that returns how long to wait to
 | |
| 	// accumulate logs before uploading them. 0 or negative means to upload
 | |
| 	// immediately.
 | |
| 	//
 | |
| 	// If nil, a default value is used. (currently 2 seconds)
 | |
| 	FlushDelayFn func() time.Duration
 | |
| 
 | |
| 	// IncludeProcID, if true, results in an ephemeral process identifier being
 | |
| 	// included in logs. The ID is random and not guaranteed to be globally
 | |
| 	// unique, but it can be used to distinguish between different instances
 | |
| 	// running with same PrivateID.
 | |
| 	IncludeProcID bool
 | |
| 
 | |
| 	// IncludeProcSequence, if true, results in an ephemeral sequence number
 | |
| 	// being included in the logs. The sequence number is incremented for each
 | |
| 	// log message sent, but is not persisted across process restarts.
 | |
| 	IncludeProcSequence bool
 | |
| }
 | |
| 
 | |
| func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
 | |
| 	if cfg.BaseURL == "" {
 | |
| 		cfg.BaseURL = "https://" + DefaultHost
 | |
| 	}
 | |
| 	if cfg.HTTPC == nil {
 | |
| 		cfg.HTTPC = http.DefaultClient
 | |
| 	}
 | |
| 	if cfg.Clock == nil {
 | |
| 		cfg.Clock = tstime.StdClock{}
 | |
| 	}
 | |
| 	if cfg.Stderr == nil {
 | |
| 		cfg.Stderr = os.Stderr
 | |
| 	}
 | |
| 	if cfg.Buffer == nil {
 | |
| 		pendingSize := 256
 | |
| 		if cfg.LowMemory {
 | |
| 			pendingSize = 64
 | |
| 		}
 | |
| 		cfg.Buffer = NewMemoryBuffer(pendingSize)
 | |
| 	}
 | |
| 	var procID uint32
 | |
| 	if cfg.IncludeProcID {
 | |
| 		keyBytes := make([]byte, 4)
 | |
| 		rand.Read(keyBytes)
 | |
| 		procID = binary.LittleEndian.Uint32(keyBytes)
 | |
| 		if procID == 0 {
 | |
| 			// 0 is the empty/off value, assign a different (non-zero) value to
 | |
| 			// make sure we still include an ID (actual value does not matter).
 | |
| 			procID = 7
 | |
| 		}
 | |
| 	}
 | |
| 	if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
 | |
| 		if delay, err := time.ParseDuration(s); err == nil {
 | |
| 			cfg.FlushDelayFn = func() time.Duration { return delay }
 | |
| 		} else {
 | |
| 			log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
 | |
| 		}
 | |
| 	} else if cfg.FlushDelayFn == nil && envknob.Bool("IN_TS_TEST") {
 | |
| 		cfg.FlushDelayFn = func() time.Duration { return 0 }
 | |
| 	}
 | |
| 
 | |
| 	var urlSuffix string
 | |
| 	if !cfg.CopyPrivateID.IsZero() {
 | |
| 		urlSuffix = "?copyId=" + cfg.CopyPrivateID.String()
 | |
| 	}
 | |
| 	l := &Logger{
 | |
| 		privateID:      cfg.PrivateID,
 | |
| 		stderr:         cfg.Stderr,
 | |
| 		stderrLevel:    int64(cfg.StderrLevel),
 | |
| 		httpc:          cfg.HTTPC,
 | |
| 		url:            cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
 | |
| 		lowMem:         cfg.LowMemory,
 | |
| 		buffer:         cfg.Buffer,
 | |
| 		skipClientTime: cfg.SkipClientTime,
 | |
| 		drainWake:      make(chan struct{}, 1),
 | |
| 		sentinel:       make(chan int32, 16),
 | |
| 		flushDelayFn:   cfg.FlushDelayFn,
 | |
| 		clock:          cfg.Clock,
 | |
| 		metricsDelta:   cfg.MetricsDelta,
 | |
| 
 | |
| 		procID:              procID,
 | |
| 		includeProcSequence: cfg.IncludeProcSequence,
 | |
| 
 | |
| 		shutdownStart: make(chan struct{}),
 | |
| 		shutdownDone:  make(chan struct{}),
 | |
| 	}
 | |
| 	l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
 | |
| 	l.compressLogs = cfg.CompressLogs
 | |
| 
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	l.uploadCancel = cancel
 | |
| 
 | |
| 	go l.uploading(ctx)
 | |
| 	l.Write([]byte("logtail started"))
 | |
| 	return l
 | |
| }
 | |
| 
 | |
| // Logger writes logs, splitting them as configured between local
 | |
| // logging facilities and uploading to a log server.
 | |
| type Logger struct {
 | |
| 	stderr         io.Writer
 | |
| 	stderrLevel    int64 // accessed atomically
 | |
| 	httpc          *http.Client
 | |
| 	url            string
 | |
| 	lowMem         bool
 | |
| 	skipClientTime bool
 | |
| 	netMonitor     *netmon.Monitor
 | |
| 	buffer         Buffer
 | |
| 	drainWake      chan struct{}        // signal to speed up drain
 | |
| 	drainBuf       []byte               // owned by drainPending for reuse
 | |
| 	flushDelayFn   func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
 | |
| 	flushPending   atomic.Bool
 | |
| 	sentinel       chan int32
 | |
| 	clock          tstime.Clock
 | |
| 	compressLogs   bool
 | |
| 	uploadCancel   func()
 | |
| 	explainedRaw   bool
 | |
| 	metricsDelta   func() string // or nil
 | |
| 	privateID      logid.PrivateID
 | |
| 	httpDoCalls    atomic.Int32
 | |
| 	sockstatsLabel atomicSocktatsLabel
 | |
| 
 | |
| 	procID              uint32
 | |
| 	includeProcSequence bool
 | |
| 
 | |
| 	writeLock    sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
 | |
| 	procSequence uint64
 | |
| 	flushTimer   tstime.TimerController // used when flushDelay is >0
 | |
| 	writeBuf     [bufferSize]byte       // owned by Write for reuse
 | |
| 	jsonDec      jsontext.Decoder       // owned by appendTextOrJSONLocked for reuse
 | |
| 
 | |
| 	shutdownStartMu sync.Mutex    // guards the closing of shutdownStart
 | |
| 	shutdownStart   chan struct{} // closed when shutdown begins
 | |
| 	shutdownDone    chan struct{} // closed when shutdown complete
 | |
| }
 | |
| 
 | |
| type atomicSocktatsLabel struct{ p atomic.Uint32 }
 | |
| 
 | |
| func (p *atomicSocktatsLabel) Load() sockstats.Label       { return sockstats.Label(p.p.Load()) }
 | |
| func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(label)) }
 | |
| 
 | |
| // SetVerbosityLevel controls the verbosity level that should be
 | |
| // written to stderr. 0 is the default (not verbose). Levels 1 or higher
 | |
| // are increasingly verbose.
 | |
| func (l *Logger) SetVerbosityLevel(level int) {
 | |
| 	atomic.StoreInt64(&l.stderrLevel, int64(level))
 | |
| }
 | |
| 
 | |
| // SetNetMon sets the network monitor.
 | |
| //
 | |
| // It should not be changed concurrently with log writes and should
 | |
| // only be set once.
 | |
| func (l *Logger) SetNetMon(lm *netmon.Monitor) {
 | |
| 	l.netMonitor = lm
 | |
| }
 | |
| 
 | |
| // SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
 | |
| func (l *Logger) SetSockstatsLabel(label sockstats.Label) {
 | |
| 	l.sockstatsLabel.Store(label)
 | |
| }
 | |
| 
 | |
| // PrivateID returns the logger's private log ID.
 | |
| //
 | |
| // It exists for internal use only.
 | |
| func (l *Logger) PrivateID() logid.PrivateID { return l.privateID }
 | |
| 
 | |
| // Shutdown gracefully shuts down the logger while completing any
 | |
| // remaining uploads.
 | |
| //
 | |
| // It will block, continuing to try and upload unless the passed
 | |
| // context object interrupts it by being done.
 | |
| // If the shutdown is interrupted, an error is returned.
 | |
| func (l *Logger) Shutdown(ctx context.Context) error {
 | |
| 	done := make(chan struct{})
 | |
| 	go func() {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			l.uploadCancel()
 | |
| 			<-l.shutdownDone
 | |
| 		case <-l.shutdownDone:
 | |
| 		}
 | |
| 		close(done)
 | |
| 	}()
 | |
| 
 | |
| 	l.shutdownStartMu.Lock()
 | |
| 	select {
 | |
| 	case <-l.shutdownStart:
 | |
| 		l.shutdownStartMu.Unlock()
 | |
| 		return nil
 | |
| 	default:
 | |
| 	}
 | |
| 	close(l.shutdownStart)
 | |
| 	l.shutdownStartMu.Unlock()
 | |
| 
 | |
| 	io.WriteString(l, "logger closing down\n")
 | |
| 	<-done
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close shuts down this logger object, the background log uploader
 | |
| // process, and any associated goroutines.
 | |
| //
 | |
| // Deprecated: use Shutdown
 | |
| func (l *Logger) Close() {
 | |
| 	l.Shutdown(context.Background())
 | |
| }
 | |
| 
 | |
| // drainBlock is called by drainPending when there are no logs to drain.
 | |
| //
 | |
| // In typical operation, every call to the Write method unblocks and triggers a
 | |
| // buffer.TryReadline, so logs are written with very low latency.
 | |
| //
 | |
| // If the caller specified FlushInterface, drainWake is only sent to
 | |
| // periodically.
 | |
| func (l *Logger) drainBlock() (shuttingDown bool) {
 | |
| 	select {
 | |
| 	case <-l.shutdownStart:
 | |
| 		return true
 | |
| 	case <-l.drainWake:
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // drainPending drains and encodes a batch of logs from the buffer for upload.
 | |
| // If no logs are available, drainPending blocks until logs are available.
 | |
| // The returned buffer is only valid until the next call to drainPending.
 | |
| func (l *Logger) drainPending() (b []byte) {
 | |
| 	b = l.drainBuf[:0]
 | |
| 	b = append(b, '[')
 | |
| 	defer func() {
 | |
| 		b = bytes.TrimRight(b, ",")
 | |
| 		b = append(b, ']')
 | |
| 		l.drainBuf = b
 | |
| 		if len(b) <= len("[]") {
 | |
| 			b = nil
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	maxLen := maxSize
 | |
| 	if l.lowMem {
 | |
| 		// When operating in a low memory environment, it is better to upload
 | |
| 		// in multiple operations than it is to allocate a large body and OOM.
 | |
| 		// Even if maxLen is less than maxSize, we can still upload an entry
 | |
| 		// that is up to maxSize if we happen to encounter one.
 | |
| 		maxLen /= lowMemRatio
 | |
| 	}
 | |
| 	for len(b) < maxLen {
 | |
| 		line, err := l.buffer.TryReadLine()
 | |
| 		switch {
 | |
| 		case err == io.EOF:
 | |
| 			return b
 | |
| 		case err != nil:
 | |
| 			b = append(b, '{')
 | |
| 			b = l.appendMetadata(b, false, true, 0, 0, "reading ringbuffer: "+err.Error(), nil, 0)
 | |
| 			b = bytes.TrimRight(b, ",")
 | |
| 			b = append(b, '}')
 | |
| 			return b
 | |
| 		case line == nil:
 | |
| 			// If we read at least some log entries, return immediately.
 | |
| 			if len(b) > len("[") {
 | |
| 				return b
 | |
| 			}
 | |
| 
 | |
| 			// We're about to block. If we're holding on to too much memory
 | |
| 			// in our buffer from a previous large write, let it go.
 | |
| 			if cap(b) > bufferSize {
 | |
| 				b = bytes.Clone(b)
 | |
| 				l.drainBuf = b
 | |
| 			}
 | |
| 
 | |
| 			if shuttingDown := l.drainBlock(); shuttingDown {
 | |
| 				return b
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		switch {
 | |
| 		case len(line) == 0:
 | |
| 			continue
 | |
| 		case line[0] == '{' && jsontext.Value(line).IsValid():
 | |
| 			// This is already a valid JSON object, so just append it.
 | |
| 			// This may exceed maxLen, but should be no larger than maxSize
 | |
| 			// so long as logic writing into the buffer enforces the limit.
 | |
| 			b = append(b, line...)
 | |
| 		default:
 | |
| 			// This is probably a log added to stderr by filch
 | |
| 			// outside of the logtail logger. Encode it.
 | |
| 			if !l.explainedRaw {
 | |
| 				fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
 | |
| 				fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
 | |
| 				fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
 | |
| 				fmt.Fprintf(l.stderr, "RAW-STDERR:\n")
 | |
| 				l.explainedRaw = true
 | |
| 			}
 | |
| 			fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
 | |
| 			// Do not add a client time, as it could be really old.
 | |
| 			// Do not include instance key or ID either,
 | |
| 			// since this came from a different instance.
 | |
| 			b = l.appendText(b, line, true, 0, 0, 0)
 | |
| 		}
 | |
| 		b = append(b, ',')
 | |
| 	}
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| // This is the goroutine that repeatedly uploads logs in the background.
 | |
| func (l *Logger) uploading(ctx context.Context) {
 | |
| 	defer close(l.shutdownDone)
 | |
| 
 | |
| 	for {
 | |
| 		body := l.drainPending()
 | |
| 		origlen := -1 // sentinel value: uncompressed
 | |
| 		// Don't attempt to compress tiny bodies; not worth the CPU cycles.
 | |
| 		if l.compressLogs && len(body) > 256 {
 | |
| 			zbody := zstdframe.AppendEncode(nil, body,
 | |
| 				zstdframe.FastestCompression, zstdframe.LowMemory(true))
 | |
| 
 | |
| 			// Only send it compressed if the bandwidth savings are sufficient.
 | |
| 			// Just the extra headers associated with enabling compression
 | |
| 			// are 50 bytes by themselves.
 | |
| 			if len(body)-len(zbody) > 64 {
 | |
| 				origlen = len(body)
 | |
| 				body = zbody
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		var lastError string
 | |
| 		var numFailures int
 | |
| 		var firstFailure time.Time
 | |
| 		for len(body) > 0 && ctx.Err() == nil {
 | |
| 			retryAfter, err := l.upload(ctx, body, origlen)
 | |
| 			if err != nil {
 | |
| 				numFailures++
 | |
| 				firstFailure = l.clock.Now()
 | |
| 
 | |
| 				if !l.internetUp() {
 | |
| 					fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
 | |
| 					l.awaitInternetUp(ctx)
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				// Only print the same message once.
 | |
| 				if currError := err.Error(); lastError != currError {
 | |
| 					fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
 | |
| 					lastError = currError
 | |
| 				}
 | |
| 
 | |
| 				// Sleep for the specified retryAfter period,
 | |
| 				// otherwise default to some random value.
 | |
| 				if retryAfter <= 0 {
 | |
| 					retryAfter = time.Duration(30+mrand.Intn(30)) * time.Second
 | |
| 				}
 | |
| 				tstime.Sleep(ctx, retryAfter)
 | |
| 			} else {
 | |
| 				// Only print a success message after recovery.
 | |
| 				if numFailures > 0 {
 | |
| 					fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, l.clock.Since(firstFailure).Round(time.Second))
 | |
| 				}
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-l.shutdownStart:
 | |
| 			return
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *Logger) internetUp() bool {
 | |
| 	if l.netMonitor == nil {
 | |
| 		// No way to tell, so assume it is.
 | |
| 		return true
 | |
| 	}
 | |
| 	return l.netMonitor.InterfaceState().AnyInterfaceUp()
 | |
| }
 | |
| 
 | |
| func (l *Logger) awaitInternetUp(ctx context.Context) {
 | |
| 	upc := make(chan bool, 1)
 | |
| 	defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
 | |
| 		if delta.New.AnyInterfaceUp() {
 | |
| 			select {
 | |
| 			case upc <- true:
 | |
| 			default:
 | |
| 			}
 | |
| 		}
 | |
| 	})()
 | |
| 	if l.internetUp() {
 | |
| 		return
 | |
| 	}
 | |
| 	select {
 | |
| 	case <-upc:
 | |
| 		fmt.Fprintf(l.stderr, "logtail: internet back up\n")
 | |
| 	case <-ctx.Done():
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // upload uploads body to the log server.
 | |
| // origlen indicates the pre-compression body length.
 | |
| // origlen of -1 indicates that the body is not compressed.
 | |
| func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) {
 | |
| 	const maxUploadTime = 45 * time.Second
 | |
| 	ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel.Load(), l.Logf)
 | |
| 	ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	req, err := http.NewRequestWithContext(ctx, "POST", l.url, bytes.NewReader(body))
 | |
| 	if err != nil {
 | |
| 		// I know of no conditions under which this could fail.
 | |
| 		// Report it very loudly.
 | |
| 		// TODO record logs to disk
 | |
| 		panic("logtail: cannot build http request: " + err.Error())
 | |
| 	}
 | |
| 	if origlen != -1 {
 | |
| 		req.Header.Add("Content-Encoding", "zstd")
 | |
| 		req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
 | |
| 	}
 | |
| 	if runtime.GOOS == "js" {
 | |
| 		// We once advertised we'd accept optional client certs (for internal use)
 | |
| 		// on log.tailscale.io but then Tailscale SSH js/wasm clients prompted
 | |
| 		// users (on some browsers?) to pick a client cert. We'll fix the server's
 | |
| 		// TLS ServerHello, but we can also fix it client side for good measure.
 | |
| 		//
 | |
| 		// Corp details: https://github.com/tailscale/corp/issues/18177#issuecomment-2026598715
 | |
| 		// and https://github.com/tailscale/corp/pull/18775#issuecomment-2027505036
 | |
| 		//
 | |
| 		// See https://github.com/golang/go/wiki/WebAssembly#configuring-fetch-options-while-using-nethttp
 | |
| 		// and https://developer.mozilla.org/en-US/docs/Web/API/fetch#credentials
 | |
| 		req.Header.Set("js.fetch:credentials", "omit")
 | |
| 	}
 | |
| 	req.Header["User-Agent"] = nil // not worth writing one; save some bytes
 | |
| 
 | |
| 	compressedNote := "not-compressed"
 | |
| 	if origlen != -1 {
 | |
| 		compressedNote = "compressed"
 | |
| 	}
 | |
| 
 | |
| 	l.httpDoCalls.Add(1)
 | |
| 	resp, err := l.httpc.Do(req)
 | |
| 	if err != nil {
 | |
| 		return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	if resp.StatusCode != http.StatusOK {
 | |
| 		n, _ := strconv.Atoi(resp.Header.Get("Retry-After"))
 | |
| 		b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
 | |
| 		return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b))
 | |
| 	}
 | |
| 	return 0, nil
 | |
| }
 | |
| 
 | |
| // Flush uploads all logs to the server. It blocks until complete or there is an
 | |
| // unrecoverable error.
 | |
| //
 | |
| // TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
 | |
| // Finish cleaning this up.
 | |
| func (l *Logger) Flush() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // StartFlush starts a log upload, if anything is pending.
 | |
| //
 | |
| // If l is nil, StartFlush is a no-op.
 | |
| func (l *Logger) StartFlush() {
 | |
| 	if l != nil {
 | |
| 		l.tryDrainWake()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // logtailDisabled is whether logtail uploads to logcatcher are disabled.
 | |
| var logtailDisabled atomic.Bool
 | |
| 
 | |
| // Disable disables logtail uploads for the lifetime of the process.
 | |
| func Disable() {
 | |
| 	logtailDisabled.Store(true)
 | |
| }
 | |
| 
 | |
| var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
 | |
| 
 | |
| // tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
 | |
| // It does not block.
 | |
| func (l *Logger) tryDrainWake() {
 | |
| 	l.flushPending.Store(false)
 | |
| 	if debugWakesAndUploads() {
 | |
| 		// Using println instead of log.Printf here to avoid recursing back into
 | |
| 		// ourselves.
 | |
| 		println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load())
 | |
| 	}
 | |
| 	select {
 | |
| 	case l.drainWake <- struct{}{}:
 | |
| 	default:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
 | |
| 	tapSend(jsonBlob)
 | |
| 	if logtailDisabled.Load() {
 | |
| 		return len(jsonBlob), nil
 | |
| 	}
 | |
| 
 | |
| 	n, err := l.buffer.Write(jsonBlob)
 | |
| 
 | |
| 	flushDelay := defaultFlushDelay
 | |
| 	if l.flushDelayFn != nil {
 | |
| 		flushDelay = l.flushDelayFn()
 | |
| 	}
 | |
| 	if flushDelay > 0 {
 | |
| 		if l.flushPending.CompareAndSwap(false, true) {
 | |
| 			if l.flushTimer == nil {
 | |
| 				l.flushTimer = l.clock.AfterFunc(flushDelay, l.tryDrainWake)
 | |
| 			} else {
 | |
| 				l.flushTimer.Reset(flushDelay)
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		l.tryDrainWake()
 | |
| 	}
 | |
| 	return n, err
 | |
| }
 | |
| 
 | |
| // appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
 | |
| // This assumes dst is already within a JSON object.
 | |
| // Each member is comma-terminated.
 | |
| func (l *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, procID uint32, procSequence uint64, errDetail string, errData jsontext.Value, level int) []byte {
 | |
| 	// Append optional logtail metadata.
 | |
| 	if !skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
 | |
| 		dst = append(dst, `"logtail":{`...)
 | |
| 		if !skipClientTime {
 | |
| 			dst = append(dst, `"client_time":"`...)
 | |
| 			dst = l.clock.Now().UTC().AppendFormat(dst, time.RFC3339Nano)
 | |
| 			dst = append(dst, '"', ',')
 | |
| 		}
 | |
| 		if procID != 0 {
 | |
| 			dst = append(dst, `"proc_id":`...)
 | |
| 			dst = strconv.AppendUint(dst, uint64(procID), 10)
 | |
| 			dst = append(dst, ',')
 | |
| 		}
 | |
| 		if procSequence != 0 {
 | |
| 			dst = append(dst, `"proc_seq":`...)
 | |
| 			dst = strconv.AppendUint(dst, procSequence, 10)
 | |
| 			dst = append(dst, ',')
 | |
| 		}
 | |
| 		if errDetail != "" || errData != nil {
 | |
| 			dst = append(dst, `"error":{`...)
 | |
| 			if errDetail != "" {
 | |
| 				dst = append(dst, `"detail":`...)
 | |
| 				dst, _ = jsontext.AppendQuote(dst, errDetail)
 | |
| 				dst = append(dst, ',')
 | |
| 			}
 | |
| 			if errData != nil {
 | |
| 				dst = append(dst, `"bad_data":`...)
 | |
| 				dst = append(dst, errData...)
 | |
| 				dst = append(dst, ',')
 | |
| 			}
 | |
| 			dst = bytes.TrimRight(dst, ",")
 | |
| 			dst = append(dst, '}', ',')
 | |
| 		}
 | |
| 		dst = bytes.TrimRight(dst, ",")
 | |
| 		dst = append(dst, '}', ',')
 | |
| 	}
 | |
| 
 | |
| 	// Append optional metrics metadata.
 | |
| 	if !skipMetrics && l.metricsDelta != nil {
 | |
| 		if d := l.metricsDelta(); d != "" {
 | |
| 			dst = append(dst, `"metrics":"`...)
 | |
| 			dst = append(dst, d...)
 | |
| 			dst = append(dst, '"', ',')
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Add the optional log level, if non-zero.
 | |
| 	// Note that we only use log levels 1 and 2 currently.
 | |
| 	// It's unlikely we'll ever make it past 9.
 | |
| 	if level > 0 && level < 10 {
 | |
| 		dst = append(dst, `"v":`...)
 | |
| 		dst = append(dst, '0'+byte(level))
 | |
| 		dst = append(dst, ',')
 | |
| 	}
 | |
| 
 | |
| 	return dst
 | |
| }
 | |
| 
 | |
| // appendText appends a raw text message in the Tailscale JSON log entry format.
 | |
| func (l *Logger) appendText(dst, src []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
 | |
| 	dst = slices.Grow(dst, len(src))
 | |
| 	dst = append(dst, '{')
 | |
| 	dst = l.appendMetadata(dst, skipClientTime, false, procID, procSequence, "", nil, level)
 | |
| 	if len(src) == 0 {
 | |
| 		dst = bytes.TrimRight(dst, ",")
 | |
| 		return append(dst, "}\n"...)
 | |
| 	}
 | |
| 
 | |
| 	// Append the text string, which may be truncated.
 | |
| 	// Invalid UTF-8 will be mangled with the Unicode replacement character.
 | |
| 	max := maxTextSize
 | |
| 	if l.lowMem {
 | |
| 		max /= lowMemRatio
 | |
| 	}
 | |
| 	dst = append(dst, `"text":`...)
 | |
| 	dst = appendTruncatedString(dst, src, max)
 | |
| 	return append(dst, "}\n"...)
 | |
| }
 | |
| 
 | |
| // appendTruncatedString appends a JSON string for src,
 | |
| // truncating the src to be no larger than n.
 | |
| func appendTruncatedString(dst, src []byte, n int) []byte {
 | |
| 	srcLen := len(src)
 | |
| 	src = truncate.String(src, n)
 | |
| 	dst, _ = jsontext.AppendQuote(dst, src) // ignore error; only occurs for invalid UTF-8
 | |
| 	if srcLen > len(src) {
 | |
| 		dst = dst[:len(dst)-len(`"`)] // trim off preceding double-quote
 | |
| 		dst = append(dst, "…+"...)
 | |
| 		dst = strconv.AppendInt(dst, int64(srcLen-len(src)), 10)
 | |
| 		dst = append(dst, '"') // re-append succeeding double-quote
 | |
| 	}
 | |
| 	return dst
 | |
| }
 | |
| 
 | |
| func (l *Logger) AppendTextOrJSONLocked(dst, src []byte) []byte {
 | |
| 	l.clock = tstime.StdClock{}
 | |
| 	return l.appendTextOrJSONLocked(dst, src, 0)
 | |
| }
 | |
| 
 | |
| // appendTextOrJSONLocked appends a raw text message or a raw JSON object
 | |
| // in the Tailscale JSON log format.
 | |
| func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
 | |
| 	if l.includeProcSequence {
 | |
| 		l.procSequence++
 | |
| 	}
 | |
| 	if len(src) == 0 || src[0] != '{' {
 | |
| 		return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
 | |
| 	}
 | |
| 
 | |
| 	// Check whether the input is a valid JSON object and
 | |
| 	// whether it contains the reserved "logtail" name at the top-level.
 | |
| 	var logtailKeyOffset, logtailValOffset, logtailValLength int
 | |
| 	validJSON := func() bool {
 | |
| 		// TODO(dsnet): Avoid allocation of bytes.Buffer struct.
 | |
| 		dec := &l.jsonDec
 | |
| 		dec.Reset(bytes.NewBuffer(src))
 | |
| 		if tok, err := dec.ReadToken(); tok.Kind() != '{' || err != nil {
 | |
| 			return false
 | |
| 		}
 | |
| 		for dec.PeekKind() != '}' {
 | |
| 			keyOffset := dec.InputOffset()
 | |
| 			tok, err := dec.ReadToken()
 | |
| 			if err != nil {
 | |
| 				return false
 | |
| 			}
 | |
| 			isLogtail := tok.String() == "logtail"
 | |
| 			valOffset := dec.InputOffset()
 | |
| 			if dec.SkipValue() != nil {
 | |
| 				return false
 | |
| 			}
 | |
| 			if isLogtail {
 | |
| 				logtailKeyOffset = int(keyOffset)
 | |
| 				logtailValOffset = int(valOffset)
 | |
| 				logtailValLength = int(dec.InputOffset()) - logtailValOffset
 | |
| 			}
 | |
| 		}
 | |
| 		if tok, err := dec.ReadToken(); tok.Kind() != '}' || err != nil {
 | |
| 			return false
 | |
| 		}
 | |
| 		if _, err := dec.ReadToken(); err != io.EOF {
 | |
| 			return false // trailing junk after JSON object
 | |
| 		}
 | |
| 		return true
 | |
| 	}()
 | |
| 
 | |
| 	// Treat invalid JSON as a raw text message.
 | |
| 	if !validJSON {
 | |
| 		return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
 | |
| 	}
 | |
| 
 | |
| 	// Check whether the JSON payload is too large.
 | |
| 	// Due to logtail metadata, the formatted log entry could exceed maxSize.
 | |
| 	// That's okay as the Tailscale log service limit is actually 2*maxSize.
 | |
| 	// However, so long as logging applications aim to target the maxSize limit,
 | |
| 	// there should be no trouble eventually uploading logs.
 | |
| 	if len(src) > maxSize {
 | |
| 		errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
 | |
| 		errData := appendTruncatedString(nil, src, maxSize/len(`\uffff`)) // escaping could increase size
 | |
| 
 | |
| 		dst = append(dst, '{')
 | |
| 		dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
 | |
| 		dst = bytes.TrimRight(dst, ",")
 | |
| 		return append(dst, "}\n"...)
 | |
| 	}
 | |
| 
 | |
| 	// Check whether the reserved logtail member occurs in the log data.
 | |
| 	// If so, it is moved to the the logtail/error member.
 | |
| 	const jsonSeperators = ",:"      // per RFC 8259, section 2
 | |
| 	const jsonWhitespace = " \n\r\t" // per RFC 8259, section 2
 | |
| 	var errDetail string
 | |
| 	var errData jsontext.Value
 | |
| 	if logtailValLength > 0 {
 | |
| 		errDetail = "duplicate logtail member"
 | |
| 		errData = bytes.Trim(src[logtailValOffset:][:logtailValLength], jsonSeperators+jsonWhitespace)
 | |
| 	}
 | |
| 	dst = slices.Grow(dst, len(src))
 | |
| 	dst = append(dst, '{')
 | |
| 	dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
 | |
| 	if logtailValLength > 0 {
 | |
| 		// Exclude original logtail member from the message.
 | |
| 		dst = appendWithoutNewline(dst, src[len("{"):logtailKeyOffset])
 | |
| 		dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
 | |
| 		dst = appendWithoutNewline(dst, src[logtailValOffset+logtailValLength:])
 | |
| 	} else {
 | |
| 		dst = appendWithoutNewline(dst, src[len("{"):])
 | |
| 	}
 | |
| 	dst = bytes.TrimRight(dst, jsonWhitespace)
 | |
| 	dst = dst[:len(dst)-len("}")]
 | |
| 	dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
 | |
| 	return append(dst, "}\n"...)
 | |
| }
 | |
| 
 | |
| // appendWithoutNewline appends src to dst except that it ignores newlines
 | |
| // since newlines are used to frame individual log entries.
 | |
| func appendWithoutNewline(dst, src []byte) []byte {
 | |
| 	for _, c := range src {
 | |
| 		if c != '\n' {
 | |
| 			dst = append(dst, c)
 | |
| 		}
 | |
| 	}
 | |
| 	return dst
 | |
| }
 | |
| 
 | |
| // Logf logs to l using the provided fmt-style format and optional arguments.
 | |
| func (l *Logger) Logf(format string, args ...any) {
 | |
| 	fmt.Fprintf(l, format, args...)
 | |
| }
 | |
| 
 | |
| var obscureIPs = envknob.RegisterBool("TS_OBSCURE_LOGGED_IPS")
 | |
| 
 | |
| // Write logs an encoded JSON blob.
 | |
| //
 | |
| // If the []byte passed to Write is not an encoded JSON blob,
 | |
| // then contents is fit into a JSON blob and written.
 | |
| //
 | |
| // This is intended as an interface for the stdlib "log" package.
 | |
| func (l *Logger) Write(buf []byte) (int, error) {
 | |
| 	if len(buf) == 0 {
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 	inLen := len(buf) // length as provided to us, before modifications to downstream writers
 | |
| 
 | |
| 	level, buf := parseAndRemoveLogLevel(buf)
 | |
| 	if l.stderr != nil && l.stderr != io.Discard && int64(level) <= atomic.LoadInt64(&l.stderrLevel) {
 | |
| 		if buf[len(buf)-1] == '\n' {
 | |
| 			l.stderr.Write(buf)
 | |
| 		} else {
 | |
| 			// The log package always line-terminates logs,
 | |
| 			// so this is an uncommon path.
 | |
| 			withNL := append(buf[:len(buf):len(buf)], '\n')
 | |
| 			l.stderr.Write(withNL)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if obscureIPs() {
 | |
| 		buf = redactIPs(buf)
 | |
| 	}
 | |
| 
 | |
| 	l.writeLock.Lock()
 | |
| 	defer l.writeLock.Unlock()
 | |
| 
 | |
| 	b := l.appendTextOrJSONLocked(l.writeBuf[:0], buf, level)
 | |
| 	_, err := l.sendLocked(b)
 | |
| 	return inLen, err
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	regexMatchesIPv6 = regexp.MustCompile(`([0-9a-fA-F]{1,4}):([0-9a-fA-F]{1,4}):([0-9a-fA-F:]{1,4})*`)
 | |
| 	regexMatchesIPv4 = regexp.MustCompile(`(\d{1,3})\.(\d{1,3})\.\d{1,3}\.\d{1,3}`)
 | |
| )
 | |
| 
 | |
| // redactIPs is a helper function used in Write() to redact IPs (other than tailscale IPs).
 | |
| // This function takes a log line as a byte slice and
 | |
| // uses regex matching to parse and find IP addresses. Based on if the IP address is IPv4 or
 | |
| // IPv6, it parses and replaces the end of the addresses with an "x". This function returns the
 | |
| // log line with the IPs redacted.
 | |
| func redactIPs(buf []byte) []byte {
 | |
| 	out := regexMatchesIPv6.ReplaceAllFunc(buf, func(b []byte) []byte {
 | |
| 		ip, err := netip.ParseAddr(string(b))
 | |
| 		if err != nil || tsaddr.IsTailscaleIP(ip) {
 | |
| 			return b // don't change this one
 | |
| 		}
 | |
| 
 | |
| 		prefix := bytes.Split(b, []byte(":"))
 | |
| 		return bytes.Join(append(prefix[:2], []byte("x")), []byte(":"))
 | |
| 	})
 | |
| 
 | |
| 	out = regexMatchesIPv4.ReplaceAllFunc(out, func(b []byte) []byte {
 | |
| 		ip, err := netip.ParseAddr(string(b))
 | |
| 		if err != nil || tsaddr.IsTailscaleIP(ip) {
 | |
| 			return b // don't change this one
 | |
| 		}
 | |
| 
 | |
| 		prefix := bytes.Split(b, []byte("."))
 | |
| 		return bytes.Join(append(prefix[:2], []byte("x.x")), []byte("."))
 | |
| 	})
 | |
| 
 | |
| 	return []byte(out)
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	openBracketV = []byte("[v")
 | |
| 	v1           = []byte("[v1] ")
 | |
| 	v2           = []byte("[v2] ")
 | |
| 	vJSON        = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
 | |
| )
 | |
| 
 | |
| // level 0 is normal (or unknown) level; 1+ are increasingly verbose
 | |
| func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
 | |
| 	if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
 | |
| 		return 0, buf
 | |
| 	}
 | |
| 	if bytes.Contains(buf, v1) {
 | |
| 		return 1, bytes.ReplaceAll(buf, v1, nil)
 | |
| 	}
 | |
| 	if bytes.Contains(buf, v2) {
 | |
| 		return 2, bytes.ReplaceAll(buf, v2, nil)
 | |
| 	}
 | |
| 	if i := bytes.Index(buf, vJSON); i != -1 {
 | |
| 		rest := buf[i+len(vJSON):]
 | |
| 		if len(rest) >= 2 {
 | |
| 			v := rest[0]
 | |
| 			if v >= '0' && v <= '9' {
 | |
| 				return int(v - '0'), rest[1:]
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return 0, buf
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	tapSetSize atomic.Int32
 | |
| 	tapMu      sync.Mutex
 | |
| 	tapSet     set.HandleSet[chan<- string]
 | |
| )
 | |
| 
 | |
| // RegisterLogTap registers dst to get a copy of every log write. The caller
 | |
| // must call unregister when done watching.
 | |
| //
 | |
| // This would ideally be a method on Logger, but Logger isn't really available
 | |
| // in most places; many writes go via stderr which filch redirects to the
 | |
| // singleton Logger set up early. For better or worse, there's basically only
 | |
| // one Logger within the program. This mechanism at least works well for
 | |
| // tailscaled. It works less well for a binary with multiple tsnet.Servers. Oh
 | |
| // well. This then subscribes to all of them.
 | |
| func RegisterLogTap(dst chan<- string) (unregister func()) {
 | |
| 	tapMu.Lock()
 | |
| 	defer tapMu.Unlock()
 | |
| 	h := tapSet.Add(dst)
 | |
| 	tapSetSize.Store(int32(len(tapSet)))
 | |
| 	return func() {
 | |
| 		tapMu.Lock()
 | |
| 		defer tapMu.Unlock()
 | |
| 		delete(tapSet, h)
 | |
| 		tapSetSize.Store(int32(len(tapSet)))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // tapSend relays the JSON blob to any/all registered local debug log watchers
 | |
| // (somebody running "tailscale debug daemon-logs").
 | |
| func tapSend(jsonBlob []byte) {
 | |
| 	if tapSetSize.Load() == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	s := string(jsonBlob)
 | |
| 	tapMu.Lock()
 | |
| 	defer tapMu.Unlock()
 | |
| 	for _, dst := range tapSet {
 | |
| 		select {
 | |
| 		case dst <- s:
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| }
 |