tailscale/logpolicy/logpolicy.go
David Anderson 9cd4e65191 smallzstd: new package that constructs zstd small encoders/decoders.
It's just a config wrapper that passes "use less memory at the
expense of compression" parameters by default, so that we don't
accidentally construct resource-hungry (de)compressors.

Also includes a benchmark that measures the memory cost of the
small variants vs. the stock variants. The savings are significant
on both compressors (~8x less memory) and decompressors (~1.4x less,
not including the savings from the significantly smaller
window on the compression side - with those savings included it's
more like ~140x smaller).

BenchmarkSmallEncoder-8            	   56174	     19354 ns/op	      31 B/op	       0 allocs/op
BenchmarkSmallEncoderWithBuild-8   	    2900	    382940 ns/op	 1746547 B/op	      36 allocs/op
BenchmarkStockEncoder-8            	   48921	     25761 ns/op	     286 B/op	       0 allocs/op
BenchmarkStockEncoderWithBuild-8   	     426	   2630241 ns/op	13843842 B/op	     124 allocs/op
BenchmarkSmallDecoder-8            	  123814	      9344 ns/op	       0 B/op	       0 allocs/op
BenchmarkSmallDecoderWithBuild-8   	   41547	     27455 ns/op	   27694 B/op	      31 allocs/op
BenchmarkStockDecoder-8            	  129832	      9417 ns/op	       1 B/op	       0 allocs/op
BenchmarkStockDecoderWithBuild-8   	   25561	     51751 ns/op	   39607 B/op	      92 allocs/op

Signed-off-by: David Anderson <danderson@tailscale.com>
2020-07-02 16:13:06 -07:00

449 lines
12 KiB
Go

// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package logpolicy manages the creation or reuse of logtail loggers,
// caching collection instance state on disk for use on future runs of
// programs on the same machine.
package logpolicy
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"golang.org/x/crypto/ssh/terminal"
"tailscale.com/atomicfile"
"tailscale.com/logtail"
"tailscale.com/logtail/filch"
"tailscale.com/net/netns"
"tailscale.com/net/tlsdial"
"tailscale.com/smallzstd"
"tailscale.com/version"
)
// Config represents an instance of logs in a collection.
type Config struct {
Collection string
PrivateID logtail.PrivateID
PublicID logtail.PublicID
}
// Policy is a logger and its public ID.
type Policy struct {
// Logtail is the logger.
Logtail logtail.Logger
// PublicID is the logger's instance identifier.
PublicID logtail.PublicID
}
// ToBytes returns the JSON representation of c.
func (c *Config) ToBytes() []byte {
data, err := json.MarshalIndent(c, "", "\t")
if err != nil {
log.Fatalf("logpolicy.Config marshal: %v\n", err)
}
return data
}
// Save writes the JSON representation of c to stateFile.
func (c *Config) save(stateFile string) error {
c.PublicID = c.PrivateID.Public()
if err := os.MkdirAll(filepath.Dir(stateFile), 0750); err != nil {
return err
}
data := c.ToBytes()
if err := atomicfile.WriteFile(stateFile, data, 0600); err != nil {
return err
}
return nil
}
// ConfigFromBytes parses a a Config from its JSON encoding.
func ConfigFromBytes(jsonEnc []byte) (*Config, error) {
c := &Config{}
if err := json.Unmarshal(jsonEnc, c); err != nil {
return nil, err
}
return c, nil
}
// stderrWriter is an io.Writer that always writes to the latest
// os.Stderr, even if os.Stderr changes during the lifetime of the
// stderrWriter value.
type stderrWriter struct{}
func (stderrWriter) Write(buf []byte) (int, error) {
return os.Stderr.Write(buf)
}
type logWriter struct {
logger *log.Logger
}
func (l logWriter) Write(buf []byte) (int, error) {
l.logger.Printf("%s", buf)
return len(buf), nil
}
// logsDir returns the directory to use for log configuration and
// buffer storage.
func logsDir() string {
systemdStateDir := os.Getenv("STATE_DIRECTORY")
if systemdStateDir != "" {
return systemdStateDir
}
cacheDir, err := os.UserCacheDir()
if err == nil {
return filepath.Join(cacheDir, "Tailscale")
}
// Use the current working directory, unless we're being run by a
// service manager that sets it to /.
wd, err := os.Getwd()
if err == nil && wd != "/" {
return wd
}
// No idea where to put stuff. Try to create a temp dir. It'll
// mean we might lose some logs and rotate through log IDs, but
// it's something.
tmp, err := ioutil.TempDir("", "tailscaled-log-*")
if err != nil {
panic("no safe place found to store log state")
}
return tmp
}
// runningUnderSystemd reports whether we're running under systemd.
func runningUnderSystemd() bool {
if runtime.GOOS == "linux" && os.Getppid() == 1 {
slurp, _ := ioutil.ReadFile("/proc/1/stat")
return bytes.HasPrefix(slurp, []byte("1 (systemd) "))
}
return false
}
// tryFixLogStateLocation is a temporary fixup for
// https://github.com/tailscale/tailscale/issues/247 . We accidentally
// wrote logging state files to /, and then later to $CACHE_DIRECTORY
// (which is incorrect because the log ID is not reconstructible if
// deleted - it's state, not cache data).
//
// If log state for cmdname exists in / or $CACHE_DIRECTORY, and no
// log state for that command exists in dir, then the log state is
// moved from whereever it does exist, into dir. Leftover logs state
// in / and $CACHE_DIRECTORY is deleted.
func tryFixLogStateLocation(dir, cmdname string) {
if cmdname == "" {
log.Printf("[unexpected] no cmdname given to tryFixLogStateLocation, please file a bug at https://github.com/tailscale/tailscale")
return
}
if dir == "/" {
// Trying to store things in / still. That's a bug, but don't
// abort hard.
log.Printf("[unexpected] storing logging config in /, please file a bug at https://github.com/tailscale/tailscale")
return
}
if os.Getuid() != 0 {
// Only root could have written log configs to weird places.
return
}
switch runtime.GOOS {
case "linux", "freebsd", "openbsd":
// These are the OSes where we might have written stuff into
// root. Others use different logic to find the logs storage
// dir.
default:
return
}
// We stored logs in 2 incorrect places: either /, or CACHE_DIR
// (aka /var/cache/tailscale). We want to move files into the
// provided dir, preferring those in CACHE_DIR over those in / if
// both exist. If files already exist in dir, don't
// overwrite. Finally, once we've maybe moved files around, we
// want to delete leftovers in / and CACHE_DIR, to clean up after
// our past selves.
files := []string{
fmt.Sprintf("%s.log.conf", cmdname),
fmt.Sprintf("%s.log1.txt", cmdname),
fmt.Sprintf("%s.log2.txt", cmdname),
}
// checks if any of the files above exist in d.
checkExists := func(d string) (bool, error) {
for _, file := range files {
p := filepath.Join(d, file)
_, err := os.Stat(p)
if os.IsNotExist(err) {
continue
} else if err != nil {
return false, fmt.Errorf("stat %q: %w", p, err)
}
return true, nil
}
return false, nil
}
// move files from d into dir, if they exist.
moveFiles := func(d string) error {
for _, file := range files {
src := filepath.Join(d, file)
_, err := os.Stat(src)
if os.IsNotExist(err) {
continue
} else if err != nil {
return fmt.Errorf("stat %q: %v", src, err)
}
dst := filepath.Join(dir, file)
bs, err := exec.Command("mv", src, dst).CombinedOutput()
if err != nil {
return fmt.Errorf("mv %q %q: %v (%s)", src, dst, err, bs)
}
}
return nil
}
existsInRoot, err := checkExists("/")
if err != nil {
log.Printf("checking for configs in /: %v", err)
return
}
existsInCache := false
cacheDir := os.Getenv("CACHE_DIRECTORY")
if cacheDir != "" {
existsInCache, err = checkExists("/var/cache/tailscale")
if err != nil {
log.Printf("checking for configs in %s: %v", cacheDir, err)
}
}
existsInDest, err := checkExists(dir)
if err != nil {
log.Printf("checking for configs in %s: %v", dir, err)
return
}
switch {
case !existsInRoot && !existsInCache:
// No leftover files, nothing to do.
return
case existsInDest:
// Already have "canonical" configs, just delete any remnants
// (below).
case existsInCache:
// CACHE_DIRECTORY takes precedence over /, move files from
// there.
if err := moveFiles(cacheDir); err != nil {
log.Print(err)
return
}
case existsInRoot:
// Files from root is better than nothing.
if err := moveFiles("/"); err != nil {
log.Print(err)
return
}
}
// If moving succeeded, or we didn't need to move files, try to
// delete any leftover files, but it's okay if we can't delete
// them for some reason.
dirs := []string{}
if existsInCache {
dirs = append(dirs, cacheDir)
}
if existsInRoot {
dirs = append(dirs, "/")
}
for _, d := range dirs {
for _, file := range files {
p := filepath.Join(d, file)
_, err := os.Stat(p)
if os.IsNotExist(err) {
continue
} else if err != nil {
log.Printf("stat %q: %v", p, err)
return
}
if err := os.Remove(p); err != nil {
log.Printf("rm %q: %v", p, err)
}
}
}
}
// New returns a new log policy (a logger and its instance ID) for a
// given collection name.
func New(collection string) *Policy {
var lflags int
if terminal.IsTerminal(2) || runtime.GOOS == "windows" {
lflags = 0
} else {
lflags = log.LstdFlags
}
if runningUnderSystemd() {
// If journalctl is going to prepend its own timestamp
// anyway, no need to add one.
lflags = 0
}
console := log.New(stderrWriter{}, "", lflags)
dir := logsDir()
if runtime.GOOS != "windows" { // version.CmdName call was blowing some Windows stack limit via goversion DLL loading
tryFixLogStateLocation(dir, version.CmdName())
}
cfgPath := filepath.Join(dir, fmt.Sprintf("%s.log.conf", version.CmdName()))
var oldc *Config
data, err := ioutil.ReadFile(cfgPath)
if err != nil {
log.Printf("logpolicy.Read %v: %v\n", cfgPath, err)
oldc = &Config{}
oldc.Collection = collection
} else {
oldc, err = ConfigFromBytes(data)
if err != nil {
log.Printf("logpolicy.Config unmarshal: %v\n", err)
oldc = &Config{}
}
}
newc := *oldc
if newc.Collection != collection {
log.Printf("logpolicy.Config: config collection %q does not match %q", newc.Collection, collection)
// We picked up an incompatible config file.
// Regenerate the private ID.
newc.PrivateID = logtail.PrivateID{}
newc.Collection = collection
}
if newc.PrivateID == (logtail.PrivateID{}) {
newc.PrivateID, err = logtail.NewPrivateID()
if err != nil {
log.Fatalf("logpolicy: NewPrivateID() should never fail")
}
}
newc.PublicID = newc.PrivateID.Public()
if newc != *oldc {
if err := newc.save(cfgPath); err != nil {
log.Printf("logpolicy.Config.Save: %v\n", err)
}
}
c := logtail.Config{
Collection: newc.Collection,
PrivateID: newc.PrivateID,
Stderr: logWriter{console},
NewZstdEncoder: func() logtail.Encoder {
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
HTTPC: &http.Client{Transport: newLogtailTransport(logtail.DefaultHost)},
}
filchBuf, filchErr := filch.New(filepath.Join(dir, version.CmdName()), filch.Options{})
if filchBuf != nil {
c.Buffer = filchBuf
}
lw := logtail.Log(c, log.Printf)
log.SetFlags(0) // other logflags are set on console, not here
log.SetOutput(lw)
log.Printf("Program starting: v%v, Go %v: %#v\n",
version.LONG,
strings.TrimPrefix(runtime.Version(), "go"),
os.Args)
log.Printf("LogID: %v\n", newc.PublicID)
if filchErr != nil {
log.Printf("filch failed: %v", filchErr)
}
return &Policy{
Logtail: lw,
PublicID: newc.PublicID,
}
}
// Close immediately shuts down the logger.
func (p *Policy) Close() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
p.Shutdown(ctx)
}
// Shutdown gracefully shuts down the logger, finishing any current
// log upload if it can be done before ctx is canceled.
func (p *Policy) Shutdown(ctx context.Context) error {
if p.Logtail != nil {
log.Printf("flushing log.\n")
return p.Logtail.Shutdown(ctx)
}
return nil
}
// newLogtailTransport returns the HTTP Transport we use for uploading
// logs to the given host name.
func newLogtailTransport(host string) *http.Transport {
// Start with a copy of http.DefaultTransport and tweak it a bit.
tr := http.DefaultTransport.(*http.Transport).Clone()
// We do our own zstd compression on uploads, and responses never contain any payload,
// so don't send "Accept-Encoding: gzip" to save a few bytes on the wire, since there
// will never be any body to decompress:
tr.DisableCompression = true
// Log whenever we dial:
tr.DialContext = func(ctx context.Context, netw, addr string) (net.Conn, error) {
nd := netns.FromDialer(&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
})
t0 := time.Now()
c, err := nd.DialContext(ctx, netw, addr)
d := time.Since(t0).Round(time.Millisecond)
if err != nil {
log.Printf("logtail: dial %q failed: %v (in %v)", addr, err, d)
} else {
log.Printf("logtail: dialed %q in %v", addr, d)
}
return c, err
}
// We're contacting exactly 1 hostname, so the default's 100
// max idle conns is very high for our needs. Even 2 is
// probably double what we need:
tr.MaxIdleConns = 2
// Provide knob to force HTTP/1 for log uploads.
// TODO(bradfitz): remove this debug knob once we've decided
// to upload via HTTP/1 or HTTP/2 (probably HTTP/1). Or we might just enforce
// it server-side.
if h1, _ := strconv.ParseBool(os.Getenv("TS_DEBUG_FORCE_H1_LOGS")); h1 {
tr.TLSClientConfig = nil // DefaultTransport's was already initialized w/ h2
tr.ForceAttemptHTTP2 = false
tr.TLSNextProto = map[string]func(authority string, c *tls.Conn) http.RoundTripper{}
}
tr.TLSClientConfig = tlsdial.Config(host, tr.TLSClientConfig)
return tr
}