logtail, logpolicy: remove an unidiomatic use of an interface

This commit is contained in:
Brad Fitzpatrick 2020-12-21 09:03:39 -08:00
parent 83f45ae2dd
commit d97ee12179
4 changed files with 40 additions and 48 deletions

View File

@ -49,7 +49,7 @@ type Config struct {
// Policy is a logger and its public ID. // Policy is a logger and its public ID.
type Policy struct { type Policy struct {
// Logtail is the logger. // Logtail is the logger.
Logtail logtail.Logger Logtail *logtail.Logger
// PublicID is the logger's instance identifier. // PublicID is the logger's instance identifier.
PublicID logtail.PublicID PublicID logtail.PublicID
} }
@ -391,7 +391,7 @@ func New(collection string) *Policy {
if filchBuf != nil { if filchBuf != nil {
c.Buffer = filchBuf c.Buffer = filchBuf
} }
lw := logtail.Log(c, log.Printf) lw := logtail.NewLogger(c, log.Printf)
log.SetFlags(0) // other logflags are set on console, not here log.SetFlags(0) // other logflags are set on console, not here
log.SetOutput(lw) log.SetOutput(lw)

View File

@ -31,7 +31,7 @@ func main() {
log.Fatalf("logtail: bad -privateid: %v", err) log.Fatalf("logtail: bad -privateid: %v", err)
} }
logger := logtail.Log(logtail.Config{ logger := logtail.NewLogger(logtail.Config{
Collection: *collection, Collection: *collection,
PrivateID: id, PrivateID: id,
}, log.Printf) }, log.Printf)

View File

@ -25,34 +25,6 @@
// Config.BaseURL isn't provided. // Config.BaseURL isn't provided.
const DefaultHost = "log.tailscale.io" const DefaultHost = "log.tailscale.io"
type Logger interface {
// Write logs an encoded JSON blob.
//
// If the []byte passed to Write is not an encoded JSON blob,
// then contents is fit into a JSON blob and written.
//
// This is intended as an interface for the stdlib "log" package.
Write([]byte) (int, error)
// Flush uploads all logs to the server.
// It blocks until complete or there is an unrecoverable error.
Flush() error
// Shutdown gracefully shuts down the logger while completing any
// remaining uploads.
//
// It will block, continuing to try and upload unless the passed
// context object interrupts it by being done.
// If the shutdown is interrupted, an error is returned.
Shutdown(context.Context) error
// Close shuts down this logger object, the background log uploader
// process, and any associated goroutines.
//
// DEPRECATED: use Shutdown
Close()
}
type Encoder interface { type Encoder interface {
EncodeAll(src, dst []byte) []byte EncodeAll(src, dst []byte) []byte
Close() error Close() error
@ -75,7 +47,7 @@ type Config struct {
DrainLogs <-chan struct{} DrainLogs <-chan struct{}
} }
func Log(cfg Config, logf tslogger.Logf) Logger { func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
if cfg.BaseURL == "" { if cfg.BaseURL == "" {
cfg.BaseURL = "https://" + DefaultHost cfg.BaseURL = "https://" + DefaultHost
} }
@ -95,7 +67,7 @@ func Log(cfg Config, logf tslogger.Logf) Logger {
} }
cfg.Buffer = NewMemoryBuffer(pendingSize) cfg.Buffer = NewMemoryBuffer(pendingSize)
} }
l := &logger{ l := &Logger{
stderr: cfg.Stderr, stderr: cfg.Stderr,
httpc: cfg.HTTPC, httpc: cfg.HTTPC,
url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String(), url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String(),
@ -123,7 +95,9 @@ func Log(cfg Config, logf tslogger.Logf) Logger {
return l return l
} }
type logger struct { // Logger writes logs, splitting them as configured between local
// logging facilities and uploading to a log server.
type Logger struct {
stderr io.Writer stderr io.Writer
httpc *http.Client httpc *http.Client
url string url string
@ -142,7 +116,13 @@ type logger struct {
shutdownDone chan struct{} // closd when shutdown complete shutdownDone chan struct{} // closd when shutdown complete
} }
func (l *logger) Shutdown(ctx context.Context) error { // Shutdown gracefully shuts down the logger while completing any
// remaining uploads.
//
// It will block, continuing to try and upload unless the passed
// context object interrupts it by being done.
// If the shutdown is interrupted, an error is returned.
func (l *Logger) Shutdown(ctx context.Context) error {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
select { select {
@ -164,7 +144,11 @@ func (l *logger) Shutdown(ctx context.Context) error {
return nil return nil
} }
func (l *logger) Close() { // Close shuts down this logger object, the background log uploader
// process, and any associated goroutines.
//
// Deprecated: use Shutdown
func (l *Logger) Close() {
l.Shutdown(context.Background()) l.Shutdown(context.Background())
} }
@ -175,7 +159,7 @@ func (l *logger) Close() {
// //
// If the caller provides a DrainLogs channel, then unblock-drain-on-Write // If the caller provides a DrainLogs channel, then unblock-drain-on-Write
// is disabled, and it is up to the caller to trigger unblock the drain. // is disabled, and it is up to the caller to trigger unblock the drain.
func (l *logger) drainBlock() (shuttingDown bool) { func (l *Logger) drainBlock() (shuttingDown bool) {
if l.drainLogs == nil { if l.drainLogs == nil {
select { select {
case <-l.shutdownStart: case <-l.shutdownStart:
@ -194,7 +178,7 @@ func (l *logger) drainBlock() (shuttingDown bool) {
// drainPending drains and encodes a batch of logs from the buffer for upload. // drainPending drains and encodes a batch of logs from the buffer for upload.
// If no logs are available, drainPending blocks until logs are available. // If no logs are available, drainPending blocks until logs are available.
func (l *logger) drainPending() (res []byte) { func (l *Logger) drainPending() (res []byte) {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
entries := 0 entries := 0
@ -255,7 +239,7 @@ func (l *logger) drainPending() (res []byte) {
} }
// This is the goroutine that repeatedly uploads logs in the background. // This is the goroutine that repeatedly uploads logs in the background.
func (l *logger) uploading(ctx context.Context) { func (l *Logger) uploading(ctx context.Context) {
defer close(l.shutdownDone) defer close(l.shutdownDone)
for { for {
@ -300,7 +284,7 @@ func (l *logger) uploading(ctx context.Context) {
// upload uploads body to the log server. // upload uploads body to the log server.
// origlen indicates the pre-compression body length. // origlen indicates the pre-compression body length.
// origlen of -1 indicates that the body is not compressed. // origlen of -1 indicates that the body is not compressed.
func (l *logger) upload(ctx context.Context, body []byte, origlen int) (uploaded bool, err error) { func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded bool, err error) {
req, err := http.NewRequest("POST", l.url, bytes.NewReader(body)) req, err := http.NewRequest("POST", l.url, bytes.NewReader(body))
if err != nil { if err != nil {
// I know of no conditions under which this could fail. // I know of no conditions under which this could fail.
@ -347,11 +331,13 @@ func (l *logger) upload(ctx context.Context, body []byte, origlen int) (uploaded
return true, nil return true, nil
} }
func (l *logger) Flush() error { // Flush uploads all logs to the server.
// It blocks until complete or there is an unrecoverable error.
func (l *Logger) Flush() error {
return nil return nil
} }
func (l *logger) send(jsonBlob []byte) (int, error) { func (l *Logger) send(jsonBlob []byte) (int, error) {
n, err := l.buffer.Write(jsonBlob) n, err := l.buffer.Write(jsonBlob)
if l.drainLogs == nil { if l.drainLogs == nil {
select { select {
@ -364,7 +350,7 @@ func (l *logger) send(jsonBlob []byte) (int, error) {
// TODO: instead of allocating, this should probably just append // TODO: instead of allocating, this should probably just append
// directly into the output log buffer. // directly into the output log buffer.
func (l *logger) encodeText(buf []byte, skipClientTime bool) []byte { func (l *Logger) encodeText(buf []byte, skipClientTime bool) []byte {
now := l.timeNow() now := l.timeNow()
// Factor in JSON encoding overhead to try to only do one alloc // Factor in JSON encoding overhead to try to only do one alloc
@ -420,7 +406,7 @@ func (l *logger) encodeText(buf []byte, skipClientTime bool) []byte {
return b return b
} }
func (l *logger) encode(buf []byte) []byte { func (l *Logger) encode(buf []byte) []byte {
if buf[0] != '{' { if buf[0] != '{' {
return l.encodeText(buf, l.skipClientTime) // text fast-path return l.encodeText(buf, l.skipClientTime) // text fast-path
} }
@ -461,7 +447,13 @@ func (l *logger) encode(buf []byte) []byte {
return b return b
} }
func (l *logger) Write(buf []byte) (int, error) { // Write logs an encoded JSON blob.
//
// If the []byte passed to Write is not an encoded JSON blob,
// then contents is fit into a JSON blob and written.
//
// This is intended as an interface for the stdlib "log" package.
func (l *Logger) Write(buf []byte) (int, error) {
if len(buf) == 0 { if len(buf) == 0 {
return 0, nil return 0, nil
} }

View File

@ -14,7 +14,7 @@ func TestFastShutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
l := Log(Config{ l := NewLogger(Config{
BaseURL: "http://localhost:1234", BaseURL: "http://localhost:1234",
}, t.Logf) }, t.Logf)
l.Shutdown(ctx) l.Shutdown(ctx)
@ -23,7 +23,7 @@ func TestFastShutdown(t *testing.T) {
var sink []byte var sink []byte
func TestLoggerEncodeTextAllocs(t *testing.T) { func TestLoggerEncodeTextAllocs(t *testing.T) {
lg := &logger{timeNow: time.Now} lg := &Logger{timeNow: time.Now}
inBuf := []byte("some text to encode") inBuf := []byte("some text to encode")
n := testing.AllocsPerRun(1000, func() { n := testing.AllocsPerRun(1000, func() {
sink = lg.encodeText(inBuf, false) sink = lg.encodeText(inBuf, false)
@ -34,7 +34,7 @@ func TestLoggerEncodeTextAllocs(t *testing.T) {
} }
func TestLoggerWriteLength(t *testing.T) { func TestLoggerWriteLength(t *testing.T) {
lg := &logger{ lg := &Logger{
timeNow: time.Now, timeNow: time.Now,
buffer: NewMemoryBuffer(1024), buffer: NewMemoryBuffer(1024),
} }