mirror of
https://github.com/tailscale/tailscale.git
synced 2025-12-23 09:06:24 +00:00
logtail: avoid racing eventbus subscriptions with Shutdown (#17639)
When the eventbus is enabled, set up the subscription for change deltas at the beginning when the client is created, rather than waiting for the first awaitInternetUp check. Otherwise, it is possible for a check to race with the client close in Shutdown, which triggers a panic. Updates #17638 Change-Id: I461c07939eca46699072b14b1814ecf28eec750c Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
@@ -124,6 +124,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
|
|||||||
|
|
||||||
if cfg.Bus != nil {
|
if cfg.Bus != nil {
|
||||||
l.eventClient = cfg.Bus.Client("logtail.Logger")
|
l.eventClient = cfg.Bus.Client("logtail.Logger")
|
||||||
|
l.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
|
||||||
}
|
}
|
||||||
l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
|
l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
|
||||||
l.compressLogs = cfg.CompressLogs
|
l.compressLogs = cfg.CompressLogs
|
||||||
@@ -162,6 +163,7 @@ type Logger struct {
|
|||||||
httpDoCalls atomic.Int32
|
httpDoCalls atomic.Int32
|
||||||
sockstatsLabel atomicSocktatsLabel
|
sockstatsLabel atomicSocktatsLabel
|
||||||
eventClient *eventbus.Client
|
eventClient *eventbus.Client
|
||||||
|
changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta]
|
||||||
|
|
||||||
procID uint32
|
procID uint32
|
||||||
includeProcSequence bool
|
includeProcSequence bool
|
||||||
@@ -427,9 +429,24 @@ func (l *Logger) internetUp() bool {
|
|||||||
|
|
||||||
func (l *Logger) awaitInternetUp(ctx context.Context) {
|
func (l *Logger) awaitInternetUp(ctx context.Context) {
|
||||||
if l.eventClient != nil {
|
if l.eventClient != nil {
|
||||||
l.awaitInternetUpBus(ctx)
|
for {
|
||||||
|
if l.internetUp() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return // give up
|
||||||
|
case <-l.changeDeltaSub.Done():
|
||||||
|
return // give up (closing down)
|
||||||
|
case delta := <-l.changeDeltaSub.Events():
|
||||||
|
if delta.New.AnyInterfaceUp() || l.internetUp() {
|
||||||
|
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
upc := make(chan bool, 1)
|
upc := make(chan bool, 1)
|
||||||
defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
|
defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
|
||||||
if delta.New.AnyInterfaceUp() {
|
if delta.New.AnyInterfaceUp() {
|
||||||
@@ -449,24 +466,6 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Logger) awaitInternetUpBus(ctx context.Context) {
|
|
||||||
if l.internetUp() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
|
|
||||||
defer sub.Close()
|
|
||||||
select {
|
|
||||||
case delta := <-sub.Events():
|
|
||||||
if delta.New.AnyInterfaceUp() {
|
|
||||||
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// upload uploads body to the log server.
|
// upload uploads body to the log server.
|
||||||
// origlen indicates the pre-compression body length.
|
// origlen indicates the pre-compression body length.
|
||||||
// origlen of -1 indicates that the body is not compressed.
|
// origlen of -1 indicates that the body is not compressed.
|
||||||
|
|||||||
Reference in New Issue
Block a user