mirror of
https://github.com/tailscale/tailscale.git
synced 2025-01-10 18:13:41 +00:00
bac3af06f5
Re-use a pre-allocated bytes.Buffer struct and shallow the copy the result of bytes.NewBuffer into it to avoid allocating the struct. Note that we're only reusing the bytes.Buffer struct itself and not the underling []byte temporarily stored within it. Updates #cleanup Updates tailscale/corp#18514 Updates golang/go#67004 Signed-off-by: Joe Tsai <joetsai@digital-static.net>
977 lines
30 KiB
Go
977 lines
30 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
// Package logtail sends logs to log.tailscale.io.
|
|
package logtail
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
mrand "math/rand/v2"
|
|
"net/http"
|
|
"net/netip"
|
|
"os"
|
|
"regexp"
|
|
"runtime"
|
|
"slices"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/go-json-experiment/json/jsontext"
|
|
"tailscale.com/envknob"
|
|
"tailscale.com/net/netmon"
|
|
"tailscale.com/net/sockstats"
|
|
"tailscale.com/net/tsaddr"
|
|
"tailscale.com/tstime"
|
|
tslogger "tailscale.com/types/logger"
|
|
"tailscale.com/types/logid"
|
|
"tailscale.com/util/set"
|
|
"tailscale.com/util/truncate"
|
|
"tailscale.com/util/zstdframe"
|
|
)
|
|
|
|
// maxSize is the maximum size that a single log entry can be.
|
|
// It is also the maximum body size that may be uploaded at a time.
|
|
const maxSize = 256 << 10
|
|
|
|
// maxTextSize is the maximum size for a text log message.
|
|
// Note that JSON log messages can be as large as maxSize.
|
|
const maxTextSize = 16 << 10
|
|
|
|
// lowMemRatio reduces maxSize and maxTextSize by this ratio in lowMem mode.
|
|
const lowMemRatio = 4
|
|
|
|
// bufferSize is the typical buffer size to retain.
|
|
// It is large enough to handle most log messages,
|
|
// but not too large to be a notable waste of memory if retained forever.
|
|
const bufferSize = 4 << 10
|
|
|
|
// DefaultHost is the default host name to upload logs to when
|
|
// Config.BaseURL isn't provided.
|
|
const DefaultHost = "log.tailscale.io"
|
|
|
|
const defaultFlushDelay = 2 * time.Second
|
|
|
|
const (
|
|
// CollectionNode is the name of a logtail Config.Collection
|
|
// for tailscaled (or equivalent: IPNExtension, Android app).
|
|
CollectionNode = "tailnode.log.tailscale.io"
|
|
)
|
|
|
|
type Config struct {
|
|
Collection string // collection name, a domain name
|
|
PrivateID logid.PrivateID // private ID for the primary log stream
|
|
CopyPrivateID logid.PrivateID // private ID for a log stream that is a superset of this log stream
|
|
BaseURL string // if empty defaults to "https://log.tailscale.io"
|
|
HTTPC *http.Client // if empty defaults to http.DefaultClient
|
|
SkipClientTime bool // if true, client_time is not written to logs
|
|
LowMemory bool // if true, logtail minimizes memory use
|
|
Clock tstime.Clock // if set, Clock.Now substitutes uses of time.Now
|
|
Stderr io.Writer // if set, logs are sent here instead of os.Stderr
|
|
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
|
|
Buffer Buffer // temp storage, if nil a MemoryBuffer
|
|
CompressLogs bool // whether to compress the log uploads
|
|
|
|
// MetricsDelta, if non-nil, is a func that returns an encoding
|
|
// delta in clientmetrics to upload alongside existing logs.
|
|
// It can return either an empty string (for nothing) or a string
|
|
// that's safe to embed in a JSON string literal without further escaping.
|
|
MetricsDelta func() string
|
|
|
|
// FlushDelayFn, if non-nil is a func that returns how long to wait to
|
|
// accumulate logs before uploading them. 0 or negative means to upload
|
|
// immediately.
|
|
//
|
|
// If nil, a default value is used. (currently 2 seconds)
|
|
FlushDelayFn func() time.Duration
|
|
|
|
// IncludeProcID, if true, results in an ephemeral process identifier being
|
|
// included in logs. The ID is random and not guaranteed to be globally
|
|
// unique, but it can be used to distinguish between different instances
|
|
// running with same PrivateID.
|
|
IncludeProcID bool
|
|
|
|
// IncludeProcSequence, if true, results in an ephemeral sequence number
|
|
// being included in the logs. The sequence number is incremented for each
|
|
// log message sent, but is not persisted across process restarts.
|
|
IncludeProcSequence bool
|
|
}
|
|
|
|
func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
|
|
if cfg.BaseURL == "" {
|
|
cfg.BaseURL = "https://" + DefaultHost
|
|
}
|
|
if cfg.HTTPC == nil {
|
|
cfg.HTTPC = http.DefaultClient
|
|
}
|
|
if cfg.Clock == nil {
|
|
cfg.Clock = tstime.StdClock{}
|
|
}
|
|
if cfg.Stderr == nil {
|
|
cfg.Stderr = os.Stderr
|
|
}
|
|
if cfg.Buffer == nil {
|
|
pendingSize := 256
|
|
if cfg.LowMemory {
|
|
pendingSize = 64
|
|
}
|
|
cfg.Buffer = NewMemoryBuffer(pendingSize)
|
|
}
|
|
var procID uint32
|
|
if cfg.IncludeProcID {
|
|
keyBytes := make([]byte, 4)
|
|
rand.Read(keyBytes)
|
|
procID = binary.LittleEndian.Uint32(keyBytes)
|
|
if procID == 0 {
|
|
// 0 is the empty/off value, assign a different (non-zero) value to
|
|
// make sure we still include an ID (actual value does not matter).
|
|
procID = 7
|
|
}
|
|
}
|
|
if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
|
|
if delay, err := time.ParseDuration(s); err == nil {
|
|
cfg.FlushDelayFn = func() time.Duration { return delay }
|
|
} else {
|
|
log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
|
|
}
|
|
} else if cfg.FlushDelayFn == nil && envknob.Bool("IN_TS_TEST") {
|
|
cfg.FlushDelayFn = func() time.Duration { return 0 }
|
|
}
|
|
|
|
var urlSuffix string
|
|
if !cfg.CopyPrivateID.IsZero() {
|
|
urlSuffix = "?copyId=" + cfg.CopyPrivateID.String()
|
|
}
|
|
l := &Logger{
|
|
privateID: cfg.PrivateID,
|
|
stderr: cfg.Stderr,
|
|
stderrLevel: int64(cfg.StderrLevel),
|
|
httpc: cfg.HTTPC,
|
|
url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String() + urlSuffix,
|
|
lowMem: cfg.LowMemory,
|
|
buffer: cfg.Buffer,
|
|
skipClientTime: cfg.SkipClientTime,
|
|
drainWake: make(chan struct{}, 1),
|
|
sentinel: make(chan int32, 16),
|
|
flushDelayFn: cfg.FlushDelayFn,
|
|
clock: cfg.Clock,
|
|
metricsDelta: cfg.MetricsDelta,
|
|
|
|
procID: procID,
|
|
includeProcSequence: cfg.IncludeProcSequence,
|
|
|
|
shutdownStart: make(chan struct{}),
|
|
shutdownDone: make(chan struct{}),
|
|
}
|
|
l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
|
|
l.compressLogs = cfg.CompressLogs
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
l.uploadCancel = cancel
|
|
|
|
go l.uploading(ctx)
|
|
l.Write([]byte("logtail started"))
|
|
return l
|
|
}
|
|
|
|
// Logger writes logs, splitting them as configured between local
|
|
// logging facilities and uploading to a log server.
|
|
type Logger struct {
|
|
stderr io.Writer
|
|
stderrLevel int64 // accessed atomically
|
|
httpc *http.Client
|
|
url string
|
|
lowMem bool
|
|
skipClientTime bool
|
|
netMonitor *netmon.Monitor
|
|
buffer Buffer
|
|
drainWake chan struct{} // signal to speed up drain
|
|
drainBuf []byte // owned by drainPending for reuse
|
|
flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
|
|
flushPending atomic.Bool
|
|
sentinel chan int32
|
|
clock tstime.Clock
|
|
compressLogs bool
|
|
uploadCancel func()
|
|
explainedRaw bool
|
|
metricsDelta func() string // or nil
|
|
privateID logid.PrivateID
|
|
httpDoCalls atomic.Int32
|
|
sockstatsLabel atomicSocktatsLabel
|
|
|
|
procID uint32
|
|
includeProcSequence bool
|
|
|
|
writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
|
|
procSequence uint64
|
|
flushTimer tstime.TimerController // used when flushDelay is >0
|
|
writeBuf [bufferSize]byte // owned by Write for reuse
|
|
bytesBuf bytes.Buffer // owned by appendTextOrJSONLocked for reuse
|
|
jsonDec jsontext.Decoder // owned by appendTextOrJSONLocked for reuse
|
|
|
|
shutdownStartMu sync.Mutex // guards the closing of shutdownStart
|
|
shutdownStart chan struct{} // closed when shutdown begins
|
|
shutdownDone chan struct{} // closed when shutdown complete
|
|
}
|
|
|
|
type atomicSocktatsLabel struct{ p atomic.Uint32 }
|
|
|
|
func (p *atomicSocktatsLabel) Load() sockstats.Label { return sockstats.Label(p.p.Load()) }
|
|
func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(label)) }
|
|
|
|
// SetVerbosityLevel controls the verbosity level that should be
|
|
// written to stderr. 0 is the default (not verbose). Levels 1 or higher
|
|
// are increasingly verbose.
|
|
func (l *Logger) SetVerbosityLevel(level int) {
|
|
atomic.StoreInt64(&l.stderrLevel, int64(level))
|
|
}
|
|
|
|
// SetNetMon sets the network monitor.
|
|
//
|
|
// It should not be changed concurrently with log writes and should
|
|
// only be set once.
|
|
func (l *Logger) SetNetMon(lm *netmon.Monitor) {
|
|
l.netMonitor = lm
|
|
}
|
|
|
|
// SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
|
|
func (l *Logger) SetSockstatsLabel(label sockstats.Label) {
|
|
l.sockstatsLabel.Store(label)
|
|
}
|
|
|
|
// PrivateID returns the logger's private log ID.
|
|
//
|
|
// It exists for internal use only.
|
|
func (l *Logger) PrivateID() logid.PrivateID { return l.privateID }
|
|
|
|
// Shutdown gracefully shuts down the logger while completing any
|
|
// remaining uploads.
|
|
//
|
|
// It will block, continuing to try and upload unless the passed
|
|
// context object interrupts it by being done.
|
|
// If the shutdown is interrupted, an error is returned.
|
|
func (l *Logger) Shutdown(ctx context.Context) error {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
l.uploadCancel()
|
|
<-l.shutdownDone
|
|
case <-l.shutdownDone:
|
|
}
|
|
close(done)
|
|
l.httpc.CloseIdleConnections()
|
|
}()
|
|
|
|
l.shutdownStartMu.Lock()
|
|
select {
|
|
case <-l.shutdownStart:
|
|
l.shutdownStartMu.Unlock()
|
|
return nil
|
|
default:
|
|
}
|
|
close(l.shutdownStart)
|
|
l.shutdownStartMu.Unlock()
|
|
|
|
io.WriteString(l, "logger closing down\n")
|
|
<-done
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close shuts down this logger object, the background log uploader
|
|
// process, and any associated goroutines.
|
|
//
|
|
// Deprecated: use Shutdown
|
|
func (l *Logger) Close() {
|
|
l.Shutdown(context.Background())
|
|
}
|
|
|
|
// drainBlock is called by drainPending when there are no logs to drain.
|
|
//
|
|
// In typical operation, every call to the Write method unblocks and triggers a
|
|
// buffer.TryReadline, so logs are written with very low latency.
|
|
//
|
|
// If the caller specified FlushInterface, drainWake is only sent to
|
|
// periodically.
|
|
func (l *Logger) drainBlock() (shuttingDown bool) {
|
|
select {
|
|
case <-l.shutdownStart:
|
|
return true
|
|
case <-l.drainWake:
|
|
}
|
|
return false
|
|
}
|
|
|
|
// drainPending drains and encodes a batch of logs from the buffer for upload.
|
|
// If no logs are available, drainPending blocks until logs are available.
|
|
// The returned buffer is only valid until the next call to drainPending.
|
|
func (l *Logger) drainPending() (b []byte) {
|
|
b = l.drainBuf[:0]
|
|
b = append(b, '[')
|
|
defer func() {
|
|
b = bytes.TrimRight(b, ",")
|
|
b = append(b, ']')
|
|
l.drainBuf = b
|
|
if len(b) <= len("[]") {
|
|
b = nil
|
|
}
|
|
}()
|
|
|
|
maxLen := maxSize
|
|
if l.lowMem {
|
|
// When operating in a low memory environment, it is better to upload
|
|
// in multiple operations than it is to allocate a large body and OOM.
|
|
// Even if maxLen is less than maxSize, we can still upload an entry
|
|
// that is up to maxSize if we happen to encounter one.
|
|
maxLen /= lowMemRatio
|
|
}
|
|
for len(b) < maxLen {
|
|
line, err := l.buffer.TryReadLine()
|
|
switch {
|
|
case err == io.EOF:
|
|
return b
|
|
case err != nil:
|
|
b = append(b, '{')
|
|
b = l.appendMetadata(b, false, true, 0, 0, "reading ringbuffer: "+err.Error(), nil, 0)
|
|
b = bytes.TrimRight(b, ",")
|
|
b = append(b, '}')
|
|
return b
|
|
case line == nil:
|
|
// If we read at least some log entries, return immediately.
|
|
if len(b) > len("[") {
|
|
return b
|
|
}
|
|
|
|
// We're about to block. If we're holding on to too much memory
|
|
// in our buffer from a previous large write, let it go.
|
|
if cap(b) > bufferSize {
|
|
b = bytes.Clone(b)
|
|
l.drainBuf = b
|
|
}
|
|
|
|
if shuttingDown := l.drainBlock(); shuttingDown {
|
|
return b
|
|
}
|
|
continue
|
|
}
|
|
|
|
switch {
|
|
case len(line) == 0:
|
|
continue
|
|
case line[0] == '{' && jsontext.Value(line).IsValid():
|
|
// This is already a valid JSON object, so just append it.
|
|
// This may exceed maxLen, but should be no larger than maxSize
|
|
// so long as logic writing into the buffer enforces the limit.
|
|
b = append(b, line...)
|
|
default:
|
|
// This is probably a log added to stderr by filch
|
|
// outside of the logtail logger. Encode it.
|
|
if !l.explainedRaw {
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR:\n")
|
|
l.explainedRaw = true
|
|
}
|
|
fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
|
|
// Do not add a client time, as it could be really old.
|
|
// Do not include instance key or ID either,
|
|
// since this came from a different instance.
|
|
b = l.appendText(b, line, true, 0, 0, 0)
|
|
}
|
|
b = append(b, ',')
|
|
}
|
|
return b
|
|
}
|
|
|
|
// This is the goroutine that repeatedly uploads logs in the background.
|
|
func (l *Logger) uploading(ctx context.Context) {
|
|
defer close(l.shutdownDone)
|
|
|
|
for {
|
|
body := l.drainPending()
|
|
origlen := -1 // sentinel value: uncompressed
|
|
// Don't attempt to compress tiny bodies; not worth the CPU cycles.
|
|
if l.compressLogs && len(body) > 256 {
|
|
zbody := zstdframe.AppendEncode(nil, body,
|
|
zstdframe.FastestCompression, zstdframe.LowMemory(true))
|
|
|
|
// Only send it compressed if the bandwidth savings are sufficient.
|
|
// Just the extra headers associated with enabling compression
|
|
// are 50 bytes by themselves.
|
|
if len(body)-len(zbody) > 64 {
|
|
origlen = len(body)
|
|
body = zbody
|
|
}
|
|
}
|
|
|
|
var lastError string
|
|
var numFailures int
|
|
var firstFailure time.Time
|
|
for len(body) > 0 && ctx.Err() == nil {
|
|
retryAfter, err := l.upload(ctx, body, origlen)
|
|
if err != nil {
|
|
numFailures++
|
|
firstFailure = l.clock.Now()
|
|
|
|
if !l.internetUp() {
|
|
fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
|
|
l.awaitInternetUp(ctx)
|
|
continue
|
|
}
|
|
|
|
// Only print the same message once.
|
|
if currError := err.Error(); lastError != currError {
|
|
fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
|
|
lastError = currError
|
|
}
|
|
|
|
// Sleep for the specified retryAfter period,
|
|
// otherwise default to some random value.
|
|
if retryAfter <= 0 {
|
|
retryAfter = mrand.N(30*time.Second) + 30*time.Second
|
|
}
|
|
tstime.Sleep(ctx, retryAfter)
|
|
} else {
|
|
// Only print a success message after recovery.
|
|
if numFailures > 0 {
|
|
fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, l.clock.Since(firstFailure).Round(time.Second))
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-l.shutdownStart:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *Logger) internetUp() bool {
|
|
if l.netMonitor == nil {
|
|
// No way to tell, so assume it is.
|
|
return true
|
|
}
|
|
return l.netMonitor.InterfaceState().AnyInterfaceUp()
|
|
}
|
|
|
|
func (l *Logger) awaitInternetUp(ctx context.Context) {
|
|
upc := make(chan bool, 1)
|
|
defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
|
|
if delta.New.AnyInterfaceUp() {
|
|
select {
|
|
case upc <- true:
|
|
default:
|
|
}
|
|
}
|
|
})()
|
|
if l.internetUp() {
|
|
return
|
|
}
|
|
select {
|
|
case <-upc:
|
|
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
|
|
// upload uploads body to the log server.
|
|
// origlen indicates the pre-compression body length.
|
|
// origlen of -1 indicates that the body is not compressed.
|
|
func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) {
|
|
const maxUploadTime = 45 * time.Second
|
|
ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel.Load(), l.Logf)
|
|
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", l.url, bytes.NewReader(body))
|
|
if err != nil {
|
|
// I know of no conditions under which this could fail.
|
|
// Report it very loudly.
|
|
// TODO record logs to disk
|
|
panic("logtail: cannot build http request: " + err.Error())
|
|
}
|
|
if origlen != -1 {
|
|
req.Header.Add("Content-Encoding", "zstd")
|
|
req.Header.Add("Orig-Content-Length", strconv.Itoa(origlen))
|
|
}
|
|
if runtime.GOOS == "js" {
|
|
// We once advertised we'd accept optional client certs (for internal use)
|
|
// on log.tailscale.io but then Tailscale SSH js/wasm clients prompted
|
|
// users (on some browsers?) to pick a client cert. We'll fix the server's
|
|
// TLS ServerHello, but we can also fix it client side for good measure.
|
|
//
|
|
// Corp details: https://github.com/tailscale/corp/issues/18177#issuecomment-2026598715
|
|
// and https://github.com/tailscale/corp/pull/18775#issuecomment-2027505036
|
|
//
|
|
// See https://github.com/golang/go/wiki/WebAssembly#configuring-fetch-options-while-using-nethttp
|
|
// and https://developer.mozilla.org/en-US/docs/Web/API/fetch#credentials
|
|
req.Header.Set("js.fetch:credentials", "omit")
|
|
}
|
|
req.Header["User-Agent"] = nil // not worth writing one; save some bytes
|
|
|
|
compressedNote := "not-compressed"
|
|
if origlen != -1 {
|
|
compressedNote = "compressed"
|
|
}
|
|
|
|
l.httpDoCalls.Add(1)
|
|
resp, err := l.httpc.Do(req)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
n, _ := strconv.Atoi(resp.Header.Get("Retry-After"))
|
|
b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
|
|
return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b))
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
// Flush uploads all logs to the server. It blocks until complete or there is an
|
|
// unrecoverable error.
|
|
//
|
|
// TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
|
|
// Finish cleaning this up.
|
|
func (l *Logger) Flush() error {
|
|
return nil
|
|
}
|
|
|
|
// StartFlush starts a log upload, if anything is pending.
|
|
//
|
|
// If l is nil, StartFlush is a no-op.
|
|
func (l *Logger) StartFlush() {
|
|
if l != nil {
|
|
l.tryDrainWake()
|
|
}
|
|
}
|
|
|
|
// logtailDisabled is whether logtail uploads to logcatcher are disabled.
|
|
var logtailDisabled atomic.Bool
|
|
|
|
// Disable disables logtail uploads for the lifetime of the process.
|
|
func Disable() {
|
|
logtailDisabled.Store(true)
|
|
}
|
|
|
|
var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
|
|
|
|
// tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
|
|
// It does not block.
|
|
func (l *Logger) tryDrainWake() {
|
|
l.flushPending.Store(false)
|
|
if debugWakesAndUploads() {
|
|
// Using println instead of log.Printf here to avoid recursing back into
|
|
// ourselves.
|
|
println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load())
|
|
}
|
|
select {
|
|
case l.drainWake <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
|
|
tapSend(jsonBlob)
|
|
if logtailDisabled.Load() {
|
|
return len(jsonBlob), nil
|
|
}
|
|
|
|
n, err := l.buffer.Write(jsonBlob)
|
|
|
|
flushDelay := defaultFlushDelay
|
|
if l.flushDelayFn != nil {
|
|
flushDelay = l.flushDelayFn()
|
|
}
|
|
if flushDelay > 0 {
|
|
if l.flushPending.CompareAndSwap(false, true) {
|
|
if l.flushTimer == nil {
|
|
l.flushTimer = l.clock.AfterFunc(flushDelay, l.tryDrainWake)
|
|
} else {
|
|
l.flushTimer.Reset(flushDelay)
|
|
}
|
|
}
|
|
} else {
|
|
l.tryDrainWake()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
|
|
// This assumes dst is already within a JSON object.
|
|
// Each member is comma-terminated.
|
|
func (l *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, procID uint32, procSequence uint64, errDetail string, errData jsontext.Value, level int) []byte {
|
|
// Append optional logtail metadata.
|
|
if !skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
|
|
dst = append(dst, `"logtail":{`...)
|
|
if !skipClientTime {
|
|
dst = append(dst, `"client_time":"`...)
|
|
dst = l.clock.Now().UTC().AppendFormat(dst, time.RFC3339Nano)
|
|
dst = append(dst, '"', ',')
|
|
}
|
|
if procID != 0 {
|
|
dst = append(dst, `"proc_id":`...)
|
|
dst = strconv.AppendUint(dst, uint64(procID), 10)
|
|
dst = append(dst, ',')
|
|
}
|
|
if procSequence != 0 {
|
|
dst = append(dst, `"proc_seq":`...)
|
|
dst = strconv.AppendUint(dst, procSequence, 10)
|
|
dst = append(dst, ',')
|
|
}
|
|
if errDetail != "" || errData != nil {
|
|
dst = append(dst, `"error":{`...)
|
|
if errDetail != "" {
|
|
dst = append(dst, `"detail":`...)
|
|
dst, _ = jsontext.AppendQuote(dst, errDetail)
|
|
dst = append(dst, ',')
|
|
}
|
|
if errData != nil {
|
|
dst = append(dst, `"bad_data":`...)
|
|
dst = append(dst, errData...)
|
|
dst = append(dst, ',')
|
|
}
|
|
dst = bytes.TrimRight(dst, ",")
|
|
dst = append(dst, '}', ',')
|
|
}
|
|
dst = bytes.TrimRight(dst, ",")
|
|
dst = append(dst, '}', ',')
|
|
}
|
|
|
|
// Append optional metrics metadata.
|
|
if !skipMetrics && l.metricsDelta != nil {
|
|
if d := l.metricsDelta(); d != "" {
|
|
dst = append(dst, `"metrics":"`...)
|
|
dst = append(dst, d...)
|
|
dst = append(dst, '"', ',')
|
|
}
|
|
}
|
|
|
|
// Add the optional log level, if non-zero.
|
|
// Note that we only use log levels 1 and 2 currently.
|
|
// It's unlikely we'll ever make it past 9.
|
|
if level > 0 && level < 10 {
|
|
dst = append(dst, `"v":`...)
|
|
dst = append(dst, '0'+byte(level))
|
|
dst = append(dst, ',')
|
|
}
|
|
|
|
return dst
|
|
}
|
|
|
|
// appendText appends a raw text message in the Tailscale JSON log entry format.
|
|
func (l *Logger) appendText(dst, src []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
|
|
dst = slices.Grow(dst, len(src))
|
|
dst = append(dst, '{')
|
|
dst = l.appendMetadata(dst, skipClientTime, false, procID, procSequence, "", nil, level)
|
|
if len(src) == 0 {
|
|
dst = bytes.TrimRight(dst, ",")
|
|
return append(dst, "}\n"...)
|
|
}
|
|
|
|
// Append the text string, which may be truncated.
|
|
// Invalid UTF-8 will be mangled with the Unicode replacement character.
|
|
max := maxTextSize
|
|
if l.lowMem {
|
|
max /= lowMemRatio
|
|
}
|
|
dst = append(dst, `"text":`...)
|
|
dst = appendTruncatedString(dst, src, max)
|
|
return append(dst, "}\n"...)
|
|
}
|
|
|
|
// appendTruncatedString appends a JSON string for src,
|
|
// truncating the src to be no larger than n.
|
|
func appendTruncatedString(dst, src []byte, n int) []byte {
|
|
srcLen := len(src)
|
|
src = truncate.String(src, n)
|
|
dst, _ = jsontext.AppendQuote(dst, src) // ignore error; only occurs for invalid UTF-8
|
|
if srcLen > len(src) {
|
|
dst = dst[:len(dst)-len(`"`)] // trim off preceding double-quote
|
|
dst = append(dst, "…+"...)
|
|
dst = strconv.AppendInt(dst, int64(srcLen-len(src)), 10)
|
|
dst = append(dst, '"') // re-append succeeding double-quote
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func (l *Logger) AppendTextOrJSONLocked(dst, src []byte) []byte {
|
|
l.clock = tstime.StdClock{}
|
|
return l.appendTextOrJSONLocked(dst, src, 0)
|
|
}
|
|
|
|
// appendTextOrJSONLocked appends a raw text message or a raw JSON object
|
|
// in the Tailscale JSON log format.
|
|
func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
|
|
if l.includeProcSequence {
|
|
l.procSequence++
|
|
}
|
|
if len(src) == 0 || src[0] != '{' {
|
|
return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
|
|
}
|
|
|
|
// Check whether the input is a valid JSON object and
|
|
// whether it contains the reserved "logtail" name at the top-level.
|
|
var logtailKeyOffset, logtailValOffset, logtailValLength int
|
|
validJSON := func() bool {
|
|
// The jsontext.NewDecoder API operates on an io.Reader, for which
|
|
// bytes.Buffer provides a means to convert a []byte into an io.Reader.
|
|
// However, bytes.NewBuffer normally allocates unless
|
|
// we immediately shallow copy it into a pre-allocated Buffer struct.
|
|
// See https://go.dev/issue/67004.
|
|
l.bytesBuf = *bytes.NewBuffer(src)
|
|
defer func() { l.bytesBuf = bytes.Buffer{} }() // avoid pinning src
|
|
|
|
dec := &l.jsonDec
|
|
dec.Reset(&l.bytesBuf)
|
|
if tok, err := dec.ReadToken(); tok.Kind() != '{' || err != nil {
|
|
return false
|
|
}
|
|
for dec.PeekKind() != '}' {
|
|
keyOffset := dec.InputOffset()
|
|
tok, err := dec.ReadToken()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
isLogtail := tok.String() == "logtail"
|
|
valOffset := dec.InputOffset()
|
|
if dec.SkipValue() != nil {
|
|
return false
|
|
}
|
|
if isLogtail {
|
|
logtailKeyOffset = int(keyOffset)
|
|
logtailValOffset = int(valOffset)
|
|
logtailValLength = int(dec.InputOffset()) - logtailValOffset
|
|
}
|
|
}
|
|
if tok, err := dec.ReadToken(); tok.Kind() != '}' || err != nil {
|
|
return false
|
|
}
|
|
if _, err := dec.ReadToken(); err != io.EOF {
|
|
return false // trailing junk after JSON object
|
|
}
|
|
return true
|
|
}()
|
|
|
|
// Treat invalid JSON as a raw text message.
|
|
if !validJSON {
|
|
return l.appendText(dst, src, l.skipClientTime, l.procID, l.procSequence, level)
|
|
}
|
|
|
|
// Check whether the JSON payload is too large.
|
|
// Due to logtail metadata, the formatted log entry could exceed maxSize.
|
|
// That's okay as the Tailscale log service limit is actually 2*maxSize.
|
|
// However, so long as logging applications aim to target the maxSize limit,
|
|
// there should be no trouble eventually uploading logs.
|
|
if len(src) > maxSize {
|
|
errDetail := fmt.Sprintf("entry too large: %d bytes", len(src))
|
|
errData := appendTruncatedString(nil, src, maxSize/len(`\uffff`)) // escaping could increase size
|
|
|
|
dst = append(dst, '{')
|
|
dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
|
|
dst = bytes.TrimRight(dst, ",")
|
|
return append(dst, "}\n"...)
|
|
}
|
|
|
|
// Check whether the reserved logtail member occurs in the log data.
|
|
// If so, it is moved to the the logtail/error member.
|
|
const jsonSeperators = ",:" // per RFC 8259, section 2
|
|
const jsonWhitespace = " \n\r\t" // per RFC 8259, section 2
|
|
var errDetail string
|
|
var errData jsontext.Value
|
|
if logtailValLength > 0 {
|
|
errDetail = "duplicate logtail member"
|
|
errData = bytes.Trim(src[logtailValOffset:][:logtailValLength], jsonSeperators+jsonWhitespace)
|
|
}
|
|
dst = slices.Grow(dst, len(src))
|
|
dst = append(dst, '{')
|
|
dst = l.appendMetadata(dst, l.skipClientTime, true, l.procID, l.procSequence, errDetail, errData, level)
|
|
if logtailValLength > 0 {
|
|
// Exclude original logtail member from the message.
|
|
dst = appendWithoutNewline(dst, src[len("{"):logtailKeyOffset])
|
|
dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
|
|
dst = appendWithoutNewline(dst, src[logtailValOffset+logtailValLength:])
|
|
} else {
|
|
dst = appendWithoutNewline(dst, src[len("{"):])
|
|
}
|
|
dst = bytes.TrimRight(dst, jsonWhitespace)
|
|
dst = dst[:len(dst)-len("}")]
|
|
dst = bytes.TrimRight(dst, jsonSeperators+jsonWhitespace)
|
|
return append(dst, "}\n"...)
|
|
}
|
|
|
|
// appendWithoutNewline appends src to dst except that it ignores newlines
|
|
// since newlines are used to frame individual log entries.
|
|
func appendWithoutNewline(dst, src []byte) []byte {
|
|
for _, c := range src {
|
|
if c != '\n' {
|
|
dst = append(dst, c)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// Logf logs to l using the provided fmt-style format and optional arguments.
|
|
func (l *Logger) Logf(format string, args ...any) {
|
|
fmt.Fprintf(l, format, args...)
|
|
}
|
|
|
|
var obscureIPs = envknob.RegisterBool("TS_OBSCURE_LOGGED_IPS")
|
|
|
|
// Write logs an encoded JSON blob.
|
|
//
|
|
// If the []byte passed to Write is not an encoded JSON blob,
|
|
// then contents is fit into a JSON blob and written.
|
|
//
|
|
// This is intended as an interface for the stdlib "log" package.
|
|
func (l *Logger) Write(buf []byte) (int, error) {
|
|
if len(buf) == 0 {
|
|
return 0, nil
|
|
}
|
|
inLen := len(buf) // length as provided to us, before modifications to downstream writers
|
|
|
|
level, buf := parseAndRemoveLogLevel(buf)
|
|
if l.stderr != nil && l.stderr != io.Discard && int64(level) <= atomic.LoadInt64(&l.stderrLevel) {
|
|
if buf[len(buf)-1] == '\n' {
|
|
l.stderr.Write(buf)
|
|
} else {
|
|
// The log package always line-terminates logs,
|
|
// so this is an uncommon path.
|
|
withNL := append(buf[:len(buf):len(buf)], '\n')
|
|
l.stderr.Write(withNL)
|
|
}
|
|
}
|
|
|
|
if obscureIPs() {
|
|
buf = redactIPs(buf)
|
|
}
|
|
|
|
l.writeLock.Lock()
|
|
defer l.writeLock.Unlock()
|
|
|
|
b := l.appendTextOrJSONLocked(l.writeBuf[:0], buf, level)
|
|
_, err := l.sendLocked(b)
|
|
return inLen, err
|
|
}
|
|
|
|
var (
|
|
regexMatchesIPv6 = regexp.MustCompile(`([0-9a-fA-F]{1,4}):([0-9a-fA-F]{1,4}):([0-9a-fA-F:]{1,4})*`)
|
|
regexMatchesIPv4 = regexp.MustCompile(`(\d{1,3})\.(\d{1,3})\.\d{1,3}\.\d{1,3}`)
|
|
)
|
|
|
|
// redactIPs is a helper function used in Write() to redact IPs (other than tailscale IPs).
|
|
// This function takes a log line as a byte slice and
|
|
// uses regex matching to parse and find IP addresses. Based on if the IP address is IPv4 or
|
|
// IPv6, it parses and replaces the end of the addresses with an "x". This function returns the
|
|
// log line with the IPs redacted.
|
|
func redactIPs(buf []byte) []byte {
|
|
out := regexMatchesIPv6.ReplaceAllFunc(buf, func(b []byte) []byte {
|
|
ip, err := netip.ParseAddr(string(b))
|
|
if err != nil || tsaddr.IsTailscaleIP(ip) {
|
|
return b // don't change this one
|
|
}
|
|
|
|
prefix := bytes.Split(b, []byte(":"))
|
|
return bytes.Join(append(prefix[:2], []byte("x")), []byte(":"))
|
|
})
|
|
|
|
out = regexMatchesIPv4.ReplaceAllFunc(out, func(b []byte) []byte {
|
|
ip, err := netip.ParseAddr(string(b))
|
|
if err != nil || tsaddr.IsTailscaleIP(ip) {
|
|
return b // don't change this one
|
|
}
|
|
|
|
prefix := bytes.Split(b, []byte("."))
|
|
return bytes.Join(append(prefix[:2], []byte("x.x")), []byte("."))
|
|
})
|
|
|
|
return []byte(out)
|
|
}
|
|
|
|
var (
|
|
openBracketV = []byte("[v")
|
|
v1 = []byte("[v1] ")
|
|
v2 = []byte("[v2] ")
|
|
vJSON = []byte("[v\x00JSON]") // precedes log level '0'-'9' byte, then JSON value
|
|
)
|
|
|
|
// level 0 is normal (or unknown) level; 1+ are increasingly verbose
|
|
func parseAndRemoveLogLevel(buf []byte) (level int, cleanBuf []byte) {
|
|
if len(buf) == 0 || buf[0] == '{' || !bytes.Contains(buf, openBracketV) {
|
|
return 0, buf
|
|
}
|
|
if bytes.Contains(buf, v1) {
|
|
return 1, bytes.ReplaceAll(buf, v1, nil)
|
|
}
|
|
if bytes.Contains(buf, v2) {
|
|
return 2, bytes.ReplaceAll(buf, v2, nil)
|
|
}
|
|
if i := bytes.Index(buf, vJSON); i != -1 {
|
|
rest := buf[i+len(vJSON):]
|
|
if len(rest) >= 2 {
|
|
v := rest[0]
|
|
if v >= '0' && v <= '9' {
|
|
return int(v - '0'), rest[1:]
|
|
}
|
|
}
|
|
}
|
|
return 0, buf
|
|
}
|
|
|
|
var (
|
|
tapSetSize atomic.Int32
|
|
tapMu sync.Mutex
|
|
tapSet set.HandleSet[chan<- string]
|
|
)
|
|
|
|
// RegisterLogTap registers dst to get a copy of every log write. The caller
|
|
// must call unregister when done watching.
|
|
//
|
|
// This would ideally be a method on Logger, but Logger isn't really available
|
|
// in most places; many writes go via stderr which filch redirects to the
|
|
// singleton Logger set up early. For better or worse, there's basically only
|
|
// one Logger within the program. This mechanism at least works well for
|
|
// tailscaled. It works less well for a binary with multiple tsnet.Servers. Oh
|
|
// well. This then subscribes to all of them.
|
|
func RegisterLogTap(dst chan<- string) (unregister func()) {
|
|
tapMu.Lock()
|
|
defer tapMu.Unlock()
|
|
h := tapSet.Add(dst)
|
|
tapSetSize.Store(int32(len(tapSet)))
|
|
return func() {
|
|
tapMu.Lock()
|
|
defer tapMu.Unlock()
|
|
delete(tapSet, h)
|
|
tapSetSize.Store(int32(len(tapSet)))
|
|
}
|
|
}
|
|
|
|
// tapSend relays the JSON blob to any/all registered local debug log watchers
|
|
// (somebody running "tailscale debug daemon-logs").
|
|
func tapSend(jsonBlob []byte) {
|
|
if tapSetSize.Load() == 0 {
|
|
return
|
|
}
|
|
s := string(jsonBlob)
|
|
tapMu.Lock()
|
|
defer tapMu.Unlock()
|
|
for _, dst := range tapSet {
|
|
select {
|
|
case dst <- s:
|
|
default:
|
|
}
|
|
}
|
|
}
|