From a01a0d103924348fc2f76cd6d2cea8b20f6d71bd Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Tue, 5 Oct 2021 16:24:46 +0000 Subject: [PATCH] Remove unstable update channel, replace with state updates --- app.go | 6 +-- machine.go | 106 -------------------------------------------------- namespaces.go | 19 +++------ 3 files changed, 7 insertions(+), 124 deletions(-) diff --git a/app.go b/app.go index 27b9672d..a6e547f5 100644 --- a/app.go +++ b/app.go @@ -65,9 +65,6 @@ type Headscale struct { aclPolicy *ACLPolicy aclRules *[]tailcfg.FilterRule - clientsUpdateChannels sync.Map - clientsUpdateChannelMutex sync.Mutex - lastStateChange sync.Map } @@ -145,10 +142,9 @@ func (h *Headscale) expireEphemeralNodesWorker() { if err != nil { log.Error().Err(err).Str("machine", m.Name).Msg("🤮 Cannot delete ephemeral machine from the database") } - updateRequestsFromNode.WithLabelValues("ephemeral-node-update").Inc() - h.notifyChangesToPeers(&m) } } + h.setLastStateChangeToNow(ns.Name) } } diff --git a/machine.go b/machine.go index 7a9df2e8..fcc8255c 100644 --- a/machine.go +++ b/machine.go @@ -2,7 +2,6 @@ package headscale import ( "encoding/json" - "errors" "fmt" "sort" "strconv" @@ -214,111 +213,6 @@ func (m *Machine) GetHostInfo() (*tailcfg.Hostinfo, error) { return &hostinfo, nil } -func (h *Headscale) notifyChangesToPeers(m *Machine) { - peers, err := h.getPeers(m) - if err != nil { - log.Error(). - Str("func", "notifyChangesToPeers"). - Str("machine", m.Name). - Msgf("Error getting peers: %s", err) - return - } - for _, peer := range peers { - log.Info(). - Str("func", "notifyChangesToPeers"). - Str("machine", m.Name). - Str("peer", peer.Name). - Str("address", peer.IPAddress). - Msgf("Notifying peer %s (%s)", peer.Name, peer.IPAddress) - err := h.sendRequestOnUpdateChannel(&peer) - if err != nil { - log.Info(). - Str("func", "notifyChangesToPeers"). - Str("machine", m.Name). - Str("peer", peer.Name). - Msgf("Peer %s does not have an open update client, skipping.", peer.Name) - continue - } - log.Trace(). - Str("func", "notifyChangesToPeers"). - Str("machine", m.Name). - Str("peer", peer.Name). - Str("address", peer.IPAddress). - Msgf("Notified peer %s (%s)", peer.Name, peer.IPAddress) - } -} - -func (h *Headscale) getOrOpenUpdateChannel(m *Machine) <-chan struct{} { - var updateChan chan struct{} - if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok { - if unwrapped, ok := storedChan.(chan struct{}); ok { - updateChan = unwrapped - } else { - log.Error(). - Str("handler", "openUpdateChannel"). - Str("machine", m.Name). - Msg("Failed to convert update channel to struct{}") - } - } else { - log.Debug(). - Str("handler", "openUpdateChannel"). - Str("machine", m.Name). - Msg("Update channel not found, creating") - - updateChan = make(chan struct{}) - h.clientsUpdateChannels.Store(m.ID, updateChan) - } - return updateChan -} - -func (h *Headscale) closeUpdateChannel(m *Machine) { - h.clientsUpdateChannelMutex.Lock() - defer h.clientsUpdateChannelMutex.Unlock() - - if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok { - if unwrapped, ok := storedChan.(chan struct{}); ok { - close(unwrapped) - } - } - h.clientsUpdateChannels.Delete(m.ID) -} - -func (h *Headscale) sendRequestOnUpdateChannel(m *Machine) error { - h.clientsUpdateChannelMutex.Lock() - defer h.clientsUpdateChannelMutex.Unlock() - - pUp, ok := h.clientsUpdateChannels.Load(uint64(m.ID)) - if ok { - log.Info(). - Str("func", "requestUpdate"). - Str("machine", m.Name). - Msgf("Notifying peer %s", m.Name) - - if update, ok := pUp.(chan struct{}); ok { - log.Trace(). - Str("func", "requestUpdate"). - Str("machine", m.Name). - Msgf("Update channel is %#v", update) - - updateRequestsToNode.Inc() - update <- struct{}{} - - log.Trace(). - Str("func", "requestUpdate"). - Str("machine", m.Name). - Msgf("Notified machine %s", m.Name) - } - } else { - err := errors.New("machine does not have an open update channel") - log.Info(). - Str("func", "requestUpdate"). - Str("machine", m.Name). - Msgf("Machine %s does not have an open update channel", m.Name) - return err - } - return nil -} - func (h *Headscale) isOutdated(m *Machine) bool { err := h.UpdateMachine(m) if err != nil { diff --git a/namespaces.go b/namespaces.go index 75b6eab5..2bf62bb3 100644 --- a/namespaces.go +++ b/namespaces.go @@ -176,24 +176,17 @@ func (h *Headscale) checkForNamespacesPendingUpdates() { return } - names := []string{} - err = json.Unmarshal([]byte(v), &names) + namespaces := []string{} + err = json.Unmarshal([]byte(v), &namespaces) if err != nil { return } - for _, name := range names { + for _, namespace := range namespaces { log.Trace(). Str("func", "RequestMapUpdates"). - Str("machine", name). - Msg("Sending updates to nodes in namespace") - machines, err := h.ListMachinesInNamespace(name) - if err != nil { - continue - } - for _, m := range *machines { - updateRequestsFromNode.WithLabelValues("namespace-update").Inc() - h.notifyChangesToPeers(&m) - } + Str("machine", namespace). + Msg("Sending updates to nodes in namespacespace") + h.setLastStateChangeToNow(namespace) } newV, err := h.getValue("namespaces_pending_updates") if err != nil {