cleaner startup/shutdown of the link writer's worker

This commit is contained in:
Arceliar 2020-05-17 08:34:22 -05:00
parent 15ac2595aa
commit 0dcc555eab

View File

@ -136,6 +136,7 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
force: force, force: force,
} }
intf.writer.intf = &intf intf.writer.intf = &intf
intf.writer.worker = make(chan [][]byte, 1)
intf.reader.intf = &intf intf.reader.intf = &intf
intf.reader.err = make(chan error) intf.reader.err = make(chan error)
return &intf, nil return &intf, nil
@ -151,6 +152,15 @@ func (l *link) stop() error {
func (intf *linkInterface) handler() error { func (intf *linkInterface) handler() error {
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
go func() {
for bss := range intf.writer.worker {
intf.msgIO.writeMsgs(bss)
}
}()
defer intf.writer.Act(nil, func() {
intf.writer.closed = true
close(intf.writer.worker)
})
myLinkPub, myLinkPriv := crypto.NewBoxKeys() myLinkPub, myLinkPriv := crypto.NewBoxKeys()
meta := version_getBaseMetadata() meta := version_getBaseMetadata()
meta.box = intf.link.core.boxPub meta.box = intf.link.core.boxPub
@ -256,11 +266,6 @@ func (intf *linkInterface) handler() error {
intf.link.core.log.Infof("Disconnected %s: %s, source %s", intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
} }
intf.writer.Act(nil, func() {
if intf.writer.worker != nil {
close(intf.writer.worker)
}
})
return err return err
} }
@ -435,27 +440,20 @@ type linkWriter struct {
phony.Inbox phony.Inbox
intf *linkInterface intf *linkInterface
worker chan [][]byte worker chan [][]byte
closed bool
} }
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) { func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) {
w.Act(from, func() { w.Act(from, func() {
if w.closed {
return
}
var size int var size int
for _, bs := range bss { for _, bs := range bss {
size += len(bs) size += len(bs)
} }
if w.worker == nil {
w.worker = make(chan [][]byte, 1)
go func() {
for bss := range w.worker {
w.intf.msgIO.writeMsgs(bss)
}
}()
}
w.intf.notifySending(size, isLinkTraffic) w.intf.notifySending(size, isLinkTraffic)
func() {
defer func() { recover() }()
w.worker <- bss w.worker <- bss
}()
w.intf.notifySent(size, isLinkTraffic) w.intf.notifySent(size, isLinkTraffic)
}) })
} }