tailscale/logtail/logtail.go
David Crawshaw 6ddbcab71e logtail: rename the unused CheckLogs to DrainLogs
Its semantics has changed slightly, this will let us use it to
drive batched logging in special circumstances.

Signed-off-by: David Crawshaw <crawshaw@tailscale.com>
2020-03-20 13:26:33 +11:00

436 lines
10 KiB
Go

// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package logtail sends logs to log.tailscale.io.
package logtail
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"
"tailscale.com/logtail/backoff"
)
type Logger interface {
// Write logs an encoded JSON blob.
//
// If the []byte passed to Write is not an encoded JSON blob,
// then contents is fit into a JSON blob and written.
//
// This is intended as an interface for the stdlib "log" package.
Write([]byte) (int, error)
// Flush uploads all logs to the server.
// It blocks until complete or there is an unrecoverable error.
Flush() error
// Shutdown gracefully shuts down the logger while completing any
// remaining uploads.
//
// It will block, continuing to try and upload unless the passed
// context object interrupts it by being done.
// If the shutdown is interrupted, an error is returned.
Shutdown(context.Context) error
// Close shuts down this logger object, the background log uploader
// process, and any associated goroutines.
//
// DEPRECATED: use Shutdown
Close()
}
type Encoder interface {
EncodeAll(src, dst []byte) []byte
Close() error
}
type Config struct {
Collection string // collection name, a domain name
PrivateID PrivateID // machine-specific private identifier
BaseURL string // if empty defaults to "https://log.tailscale.io"
HTTPC *http.Client // if empty defaults to http.DefaultClient
SkipClientTime bool // if true, client_time is not written to logs
LowMemory bool // if true, logtail minimizes memory use
TimeNow func() time.Time // if set, subsitutes uses of time.Now
Stderr io.Writer // if set, logs are sent here instead of os.Stderr
Buffer Buffer // temp storage, if nil a MemoryBuffer
NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
// DrainLogs, if non-nil, disables autmatic uploading of new logs,
// so that logs are only uploaded when a token is sent to DrainLogs.
DrainLogs <-chan struct{}
}
func Log(cfg Config) Logger {
if cfg.BaseURL == "" {
cfg.BaseURL = "https://log.tailscale.io"
}
if cfg.HTTPC == nil {
cfg.HTTPC = http.DefaultClient
}
if cfg.TimeNow == nil {
cfg.TimeNow = time.Now
}
if cfg.Stderr == nil {
cfg.Stderr = os.Stderr
}
if cfg.Buffer == nil {
pendingSize := 256
if cfg.LowMemory {
pendingSize = 64
}
cfg.Buffer = NewMemoryBuffer(pendingSize)
}
l := &logger{
stderr: cfg.Stderr,
httpc: cfg.HTTPC,
url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String(),
lowMem: cfg.LowMemory,
buffer: cfg.Buffer,
skipClientTime: cfg.SkipClientTime,
sent: make(chan struct{}, 1),
sentinel: make(chan int32, 16),
drainLogs: cfg.DrainLogs,
timeNow: cfg.TimeNow,
bo: backoff.Backoff{
Name: "logtail",
},
shutdownStart: make(chan struct{}),
shutdownDone: make(chan struct{}),
}
if cfg.NewZstdEncoder != nil {
l.zstdEncoder = cfg.NewZstdEncoder()
}
ctx, cancel := context.WithCancel(context.Background())
l.uploadCancel = cancel
go l.uploading(ctx)
l.Write([]byte("logtail started"))
return l
}
type logger struct {
stderr io.Writer
httpc *http.Client
url string
lowMem bool
skipClientTime bool
buffer Buffer
sent chan struct{} // signal to speed up drain
drainLogs <-chan struct{} // if non-nil, external signal to attempt a drain
sentinel chan int32
timeNow func() time.Time
bo backoff.Backoff
zstdEncoder Encoder
uploadCancel func()
shutdownStart chan struct{} // closed when shutdown begins
shutdownDone chan struct{} // closd when shutdown complete
}
func (l *logger) Shutdown(ctx context.Context) error {
done := make(chan struct{})
go func() {
select {
case <-ctx.Done():
l.uploadCancel()
<-l.shutdownDone
case <-l.shutdownDone:
}
close(done)
}()
close(l.shutdownStart)
io.WriteString(l, "logger closing down\n")
<-done
if l.zstdEncoder != nil {
return l.zstdEncoder.Close()
}
return nil
}
func (l *logger) Close() {
l.Shutdown(context.Background())
}
// drainBlock is called by drainPending when there are no logs to drain.
//
// In typical operation, every call to the Write method unblocks and triggers
// a buffer.TryReadline, so logs are written with very low latency.
//
// If the caller provides a DrainLogs channel, then unblock-drain-on-Write
// is disabled, and it is up to the caller to trigger unblock the drain.
func (l *logger) drainBlock() (shuttingDown bool) {
if l.drainLogs == nil {
select {
case <-l.shutdownStart:
return true
case <-l.sent:
}
} else {
select {
case <-l.shutdownStart:
return true
case <-l.drainLogs:
}
}
return false
}
// drainPending drains and encodes a batch of logs from the buffer for upload.
// If no logs are available, drainPending blocks until logs are available.
func (l *logger) drainPending() (res []byte) {
buf := new(bytes.Buffer)
entries := 0
var batchDone bool
const maxLen = 256 << 10
for buf.Len() < maxLen && !batchDone {
b, err := l.buffer.TryReadLine()
if err == io.EOF {
break
} else if err != nil {
b = []byte(fmt.Sprintf("reading ringbuffer: %v", err))
batchDone = true
} else if b == nil {
if entries > 0 {
break
}
batchDone = l.drainBlock()
continue
}
if len(b) == 0 {
continue
}
if b[0] != '{' || !json.Valid(b) {
// This is probably a log added to stderr by filch
// outside of the logtail logger. Encode it.
// Do not add a client time, as it could have been
// been written a long time ago.
b = l.encodeText(b, true)
}
switch {
case entries == 0:
buf.Write(b)
case entries == 1:
buf2 := new(bytes.Buffer)
buf2.WriteByte('[')
buf2.Write(buf.Bytes())
buf2.WriteByte(',')
buf2.Write(b)
buf.Reset()
buf.Write(buf2.Bytes())
default:
buf.WriteByte(',')
buf.Write(b)
}
entries++
}
if entries > 1 {
buf.WriteByte(']')
}
if buf.Len() == 0 {
return nil
}
return buf.Bytes()
}
// This is the goroutine that repeatedly uploads logs in the background.
func (l *logger) uploading(ctx context.Context) {
defer close(l.shutdownDone)
for {
body := l.drainPending()
if l.zstdEncoder != nil {
body = l.zstdEncoder.EncodeAll(body, nil)
}
for len(body) > 0 {
select {
case <-ctx.Done():
return
default:
}
uploaded, err := l.upload(ctx, body)
if err != nil {
fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
}
if uploaded {
break
}
l.bo.BackOff(ctx, err)
}
select {
case <-l.shutdownStart:
return
default:
}
}
}
func (l *logger) upload(ctx context.Context, body []byte) (uploaded bool, err error) {
req, err := http.NewRequest("POST", l.url, bytes.NewReader(body))
if err != nil {
// I know of no conditions under which this could fail.
// Report it very loudly.
// TODO record logs to disk
panic("logtail: cannot build http request: " + err.Error())
}
if l.zstdEncoder != nil {
req.Header.Add("Content-Encoding", "zstd")
}
maxUploadTime := 45 * time.Second
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
defer cancel()
req = req.WithContext(ctx)
compressedNote := "not-compressed"
if l.zstdEncoder != nil {
compressedNote = "compressed"
}
resp, err := l.httpc.Do(req)
if err != nil {
return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
uploaded = resp.StatusCode == 400 // the server saved the logs anyway
b, _ := ioutil.ReadAll(resp.Body)
return uploaded, fmt.Errorf("log upload of %d bytes %s failed %d: %q", len(body), compressedNote, resp.StatusCode, b)
}
return true, nil
}
func (l *logger) Flush() error {
return nil
}
func (l *logger) send(jsonBlob []byte) (int, error) {
n, err := l.buffer.Write(jsonBlob)
if l.drainLogs == nil {
select {
case l.sent <- struct{}{}:
default:
}
}
return n, err
}
func (l *logger) encodeText(buf []byte, skipClientTime bool) []byte {
now := l.timeNow()
b := make([]byte, 0, len(buf)+16)
b = append(b, '{')
if !skipClientTime {
b = append(b, `"logtail": {"client_time": "`...)
b = now.AppendFormat(b, time.RFC3339Nano)
b = append(b, "\"}, "...)
}
b = append(b, "\"text\": \""...)
for i, c := range buf {
switch c {
case '\b':
b = append(b, '\\', 'b')
case '\f':
b = append(b, '\\', 'f')
case '\n':
b = append(b, '\\', 'n')
case '\r':
b = append(b, '\\', 'r')
case '\t':
b = append(b, '\\', 't')
case '"':
b = append(b, '\\', '"')
case '\\':
b = append(b, '\\', '\\')
default:
b = append(b, c)
}
if l.lowMem && i > 254 {
b = append(b, "…"...)
break
}
}
b = append(b, "\"}\n"...)
return b
}
func (l *logger) encode(buf []byte) []byte {
if buf[0] != '{' {
return l.encodeText(buf, l.skipClientTime) // text fast-path
}
now := l.timeNow()
obj := make(map[string]interface{})
if err := json.Unmarshal(buf, &obj); err != nil {
for k := range obj {
delete(obj, k)
}
obj["text"] = string(buf)
}
if txt, isStr := obj["text"].(string); l.lowMem && isStr && len(txt) > 254 {
// TODO(crawshaw): trim to unicode code point
obj["text"] = txt[:254] + "…"
}
hasLogtail := obj["logtail"] != nil
if hasLogtail {
obj["error_has_logtail"] = obj["logtail"]
obj["logtail"] = nil
}
if !l.skipClientTime {
obj["logtail"] = map[string]string{
"client_time": now.Format(time.RFC3339Nano),
}
}
b, err := json.Marshal(obj)
if err != nil {
fmt.Fprintf(l.stderr, "logtail: re-encoding JSON failed: %v\n", err)
// I know of no conditions under which this could fail.
// Report it very loudly.
panic("logtail: re-encoding JSON failed: " + err.Error())
}
b = append(b, '\n')
return b
}
func (l *logger) Write(buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
}
if l.stderr != nil && l.stderr != ioutil.Discard {
if buf[len(buf)-1] == '\n' {
l.stderr.Write(buf)
} else {
// The log package always line-terminates logs,
// so this is an uncommon path.
withNL := append(buf[:len(buf):len(buf)], '\n')
l.stderr.Write(withNL)
}
}
b := l.encode(buf)
return l.send(b)
}