From cd0dcdb0e70486b1a26667f2bdb97c385435d8c0 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 19 Apr 2024 16:11:38 +0200 Subject: [PATCH] use concurrency save connectedmap, more metrics Fixes #1862 Signed-off-by: Kristoffer Dalby --- hscontrol/app.go | 2 +- hscontrol/auth.go | 2 +- hscontrol/db/node.go | 9 +++--- hscontrol/db/node_test.go | 3 +- hscontrol/db/routes.go | 37 ++++++++++++----------- hscontrol/db/routes_test.go | 55 ++++++++++++++++++++-------------- hscontrol/grpcv1.go | 16 ++++++---- hscontrol/mapper/mapper.go | 21 ++++++------- hscontrol/metrics.go | 10 +++++++ hscontrol/noise.go | 3 ++ hscontrol/notifier/metrics.go | 5 ++++ hscontrol/notifier/notifier.go | 29 +++++++++++------- hscontrol/poll.go | 4 +-- hscontrol/types/node.go | 3 +- 14 files changed, 124 insertions(+), 75 deletions(-) diff --git a/hscontrol/app.go b/hscontrol/app.go index cd80bc9978..acc942293c 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -509,7 +509,7 @@ func (h *Headscale) Serve() error { // Fetch an initial DERP Map before we start serving h.DERPMap = derp.GetDERPMap(h.cfg.DERP) - h.mapper = mapper.NewMapper(h.db, h.cfg, h.DERPMap, h.nodeNotifier.ConnectedMap()) + h.mapper = mapper.NewMapper(h.db, h.cfg, h.DERPMap, h.nodeNotifier) if h.cfg.DERP.ServerEnabled { // When embedded DERP is enabled we always need a STUN server diff --git a/hscontrol/auth.go b/hscontrol/auth.go index 1f47593374..0679d72ee3 100644 --- a/hscontrol/auth.go +++ b/hscontrol/auth.go @@ -546,7 +546,7 @@ func (h *Headscale) handleNodeLogOut( } if node.IsEphemeral() { - changedNodes, err := h.db.DeleteNode(&node, h.nodeNotifier.ConnectedMap()) + changedNodes, err := h.db.DeleteNode(&node, h.nodeNotifier.LikelyConnectedMap()) if err != nil { log.Error(). Err(err). diff --git a/hscontrol/db/node.go b/hscontrol/db/node.go index 109fd610a9..91bf0cb354 100644 --- a/hscontrol/db/node.go +++ b/hscontrol/db/node.go @@ -10,6 +10,7 @@ import ( "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/util" "github.com/patrickmn/go-cache" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog/log" "gorm.io/gorm" "tailscale.com/tailcfg" @@ -260,9 +261,9 @@ func NodeSetExpiry(tx *gorm.DB, return tx.Model(&types.Node{}).Where("id = ?", nodeID).Update("expiry", expiry).Error } -func (hsdb *HSDatabase) DeleteNode(node *types.Node, isConnected types.NodeConnectedMap) ([]types.NodeID, error) { +func (hsdb *HSDatabase) DeleteNode(node *types.Node, isLikelyConnected *xsync.MapOf[types.NodeID, bool]) ([]types.NodeID, error) { return Write(hsdb.DB, func(tx *gorm.DB) ([]types.NodeID, error) { - return DeleteNode(tx, node, isConnected) + return DeleteNode(tx, node, isLikelyConnected) }) } @@ -270,9 +271,9 @@ func (hsdb *HSDatabase) DeleteNode(node *types.Node, isConnected types.NodeConne // Caller is responsible for notifying all of change. func DeleteNode(tx *gorm.DB, node *types.Node, - isConnected types.NodeConnectedMap, + isLikelyConnected *xsync.MapOf[types.NodeID, bool], ) ([]types.NodeID, error) { - changed, err := deleteNodeRoutes(tx, node, isConnected) + changed, err := deleteNodeRoutes(tx, node, isLikelyConnected) if err != nil { return changed, err } diff --git a/hscontrol/db/node_test.go b/hscontrol/db/node_test.go index 9ff022871a..ce2ada3372 100644 --- a/hscontrol/db/node_test.go +++ b/hscontrol/db/node_test.go @@ -11,6 +11,7 @@ import ( "github.com/juanfont/headscale/hscontrol/policy" "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/util" + "github.com/puzpuzpuz/xsync/v3" "gopkg.in/check.v1" "tailscale.com/tailcfg" "tailscale.com/types/key" @@ -120,7 +121,7 @@ func (s *Suite) TestHardDeleteNode(c *check.C) { } db.DB.Save(&node) - _, err = db.DeleteNode(&node, types.NodeConnectedMap{}) + _, err = db.DeleteNode(&node, xsync.NewMapOf[types.NodeID, bool]()) c.Assert(err, check.IsNil) _, err = db.getNode(user.Name, "testnode3") diff --git a/hscontrol/db/routes.go b/hscontrol/db/routes.go index bc3f88a5bc..74b2b4b71b 100644 --- a/hscontrol/db/routes.go +++ b/hscontrol/db/routes.go @@ -8,6 +8,7 @@ import ( "github.com/juanfont/headscale/hscontrol/policy" "github.com/juanfont/headscale/hscontrol/types" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog/log" "gorm.io/gorm" "tailscale.com/util/set" @@ -126,7 +127,7 @@ func EnableRoute(tx *gorm.DB, id uint64) (*types.StateUpdate, error) { func DisableRoute(tx *gorm.DB, id uint64, - isConnected types.NodeConnectedMap, + isLikelyConnected *xsync.MapOf[types.NodeID, bool], ) ([]types.NodeID, error) { route, err := GetRoute(tx, id) if err != nil { @@ -147,7 +148,7 @@ func DisableRoute(tx *gorm.DB, return nil, err } - update, err = failoverRouteTx(tx, isConnected, route) + update, err = failoverRouteTx(tx, isLikelyConnected, route) if err != nil { return nil, err } @@ -182,17 +183,17 @@ func DisableRoute(tx *gorm.DB, func (hsdb *HSDatabase) DeleteRoute( id uint64, - isConnected types.NodeConnectedMap, + isLikelyConnected *xsync.MapOf[types.NodeID, bool], ) ([]types.NodeID, error) { return Write(hsdb.DB, func(tx *gorm.DB) ([]types.NodeID, error) { - return DeleteRoute(tx, id, isConnected) + return DeleteRoute(tx, id, isLikelyConnected) }) } func DeleteRoute( tx *gorm.DB, id uint64, - isConnected types.NodeConnectedMap, + isLikelyConnected *xsync.MapOf[types.NodeID, bool], ) ([]types.NodeID, error) { route, err := GetRoute(tx, id) if err != nil { @@ -207,7 +208,7 @@ func DeleteRoute( // https://github.com/juanfont/headscale/issues/804#issuecomment-1399314002 var update []types.NodeID if !route.IsExitRoute() { - update, err = failoverRouteTx(tx, isConnected, route) + update, err = failoverRouteTx(tx, isLikelyConnected, route) if err != nil { return nil, nil } @@ -252,7 +253,7 @@ func DeleteRoute( return update, nil } -func deleteNodeRoutes(tx *gorm.DB, node *types.Node, isConnected types.NodeConnectedMap) ([]types.NodeID, error) { +func deleteNodeRoutes(tx *gorm.DB, node *types.Node, isLikelyConnected *xsync.MapOf[types.NodeID, bool]) ([]types.NodeID, error) { routes, err := GetNodeRoutes(tx, node) if err != nil { return nil, fmt.Errorf("getting node routes: %w", err) @@ -266,7 +267,7 @@ func deleteNodeRoutes(tx *gorm.DB, node *types.Node, isConnected types.NodeConne // TODO(kradalby): This is a bit too aggressive, we could probably // figure out which routes needs to be failed over rather than all. - chn, err := failoverRouteTx(tx, isConnected, &routes[i]) + chn, err := failoverRouteTx(tx, isLikelyConnected, &routes[i]) if err != nil { return changed, fmt.Errorf("failing over route after delete: %w", err) } @@ -409,7 +410,7 @@ func SaveNodeRoutes(tx *gorm.DB, node *types.Node) (bool, error) { // If needed, the failover will be attempted. func FailoverNodeRoutesIfNeccessary( tx *gorm.DB, - isConnected types.NodeConnectedMap, + isLikelyConnected *xsync.MapOf[types.NodeID, bool], node *types.Node, ) (*types.StateUpdate, error) { nodeRoutes, err := GetNodeRoutes(tx, node) @@ -430,12 +431,12 @@ nodeRouteLoop: if route.IsPrimary { // if we have a primary route, and the node is connected // nothing needs to be done. - if conn, ok := isConnected[route.Node.ID]; conn && ok { + if val, ok := isLikelyConnected.Load(route.Node.ID); ok && val { continue nodeRouteLoop } // if not, we need to failover the route - failover := failoverRoute(isConnected, &route, routes) + failover := failoverRoute(isLikelyConnected, &route, routes) if failover != nil { err := failover.save(tx) if err != nil { @@ -477,7 +478,7 @@ nodeRouteLoop: // If the given route was not primary, it returns early. func failoverRouteTx( tx *gorm.DB, - isConnected types.NodeConnectedMap, + isLikelyConnected *xsync.MapOf[types.NodeID, bool], r *types.Route, ) ([]types.NodeID, error) { if r == nil { @@ -500,7 +501,7 @@ func failoverRouteTx( return nil, fmt.Errorf("getting routes by prefix: %w", err) } - fo := failoverRoute(isConnected, r, routes) + fo := failoverRoute(isLikelyConnected, r, routes) if fo == nil { return nil, nil } @@ -538,7 +539,7 @@ func (f *failover) save(tx *gorm.DB) error { } func failoverRoute( - isConnected types.NodeConnectedMap, + isLikelyConnected *xsync.MapOf[types.NodeID, bool], routeToReplace *types.Route, altRoutes types.Routes, @@ -570,9 +571,11 @@ func failoverRoute( continue } - if isConnected != nil && isConnected[route.Node.ID] { - newPrimary = &altRoutes[idx] - break + if isLikelyConnected != nil { + if val, ok := isLikelyConnected.Load(route.Node.ID); ok && val { + newPrimary = &altRoutes[idx] + break + } } } diff --git a/hscontrol/db/routes_test.go b/hscontrol/db/routes_test.go index 453a7503d9..02342ca2fd 100644 --- a/hscontrol/db/routes_test.go +++ b/hscontrol/db/routes_test.go @@ -10,11 +10,22 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/util" + "github.com/puzpuzpuz/xsync/v3" "gopkg.in/check.v1" "gorm.io/gorm" "tailscale.com/tailcfg" ) +var smap = func(m map[types.NodeID]bool) *xsync.MapOf[types.NodeID, bool] { + s := xsync.NewMapOf[types.NodeID, bool]() + + for k, v := range m { + s.Store(k, v) + } + + return s +} + func (s *Suite) TestGetRoutes(c *check.C) { user, err := db.CreateUser("test") c.Assert(err, check.IsNil) @@ -331,7 +342,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { name string nodes types.Nodes routes types.Routes - isConnected []types.NodeConnectedMap + isConnected []map[types.NodeID]bool want []*types.StateUpdate wantErr bool }{ @@ -346,7 +357,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(1, 1, ipp("10.0.0.0/24"), true, true), r(2, 2, ipp("10.0.0.0/24"), true, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 goes down { 1: false, @@ -384,7 +395,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(1, 1, ipp("10.0.0.0/24"), true, true), r(2, 2, ipp("10.0.0.0/24"), true, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 up recon = noop { 1: true, @@ -428,7 +439,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(2, 2, ipp("10.0.0.0/24"), true, false), r(3, 3, ipp("10.0.0.0/24"), true, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 goes down { 1: false, @@ -486,7 +497,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(2, 2, ipp("10.0.0.0/24"), false, false), r(3, 3, ipp("10.0.0.0/24"), true, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 goes down { 1: false, @@ -516,7 +527,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(2, 2, ipp("10.0.0.0/24"), true, false), r(3, 3, ipp("10.1.0.0/24"), true, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 goes down { 1: false, @@ -539,7 +550,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(2, 2, ipp("10.0.0.0/24"), true, false), r(3, 3, ipp("10.1.0.0/24"), false, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 goes down { 1: false, @@ -562,7 +573,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(2, 2, ipp("10.0.0.0/24"), true, false), r(3, 3, ipp("10.1.0.0/24"), true, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 goes down { 1: false, @@ -585,7 +596,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { r(2, 2, ipp("10.0.0.0/24"), true, true), r(3, 3, ipp("10.1.0.0/24"), true, false), }, - isConnected: []types.NodeConnectedMap{ + isConnected: []map[types.NodeID]bool{ // n1 goes down { 1: true, @@ -618,7 +629,7 @@ func TestFailoverNodeRoutesIfNeccessary(t *testing.T) { want := tt.want[step] got, err := Write(db.DB, func(tx *gorm.DB) (*types.StateUpdate, error) { - return FailoverNodeRoutesIfNeccessary(tx, isConnected, node) + return FailoverNodeRoutesIfNeccessary(tx, smap(isConnected), node) }) if (err != nil) != tt.wantErr { @@ -640,7 +651,7 @@ func TestFailoverRouteTx(t *testing.T) { name string failingRoute types.Route routes types.Routes - isConnected types.NodeConnectedMap + isConnected map[types.NodeID]bool want []types.NodeID wantErr bool }{ @@ -743,7 +754,7 @@ func TestFailoverRouteTx(t *testing.T) { Enabled: true, }, }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: false, 2: true, }, @@ -841,7 +852,7 @@ func TestFailoverRouteTx(t *testing.T) { Enabled: true, }, }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: true, 2: true, 3: true, @@ -889,7 +900,7 @@ func TestFailoverRouteTx(t *testing.T) { Enabled: true, }, }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: true, 4: false, }, @@ -945,7 +956,7 @@ func TestFailoverRouteTx(t *testing.T) { Enabled: true, }, }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: false, 2: true, 4: false, @@ -1010,7 +1021,7 @@ func TestFailoverRouteTx(t *testing.T) { } got, err := Write(db.DB, func(tx *gorm.DB) ([]types.NodeID, error) { - return failoverRouteTx(tx, tt.isConnected, &tt.failingRoute) + return failoverRouteTx(tx, smap(tt.isConnected), &tt.failingRoute) }) if (err != nil) != tt.wantErr { @@ -1048,7 +1059,7 @@ func TestFailoverRoute(t *testing.T) { name string failingRoute types.Route routes types.Routes - isConnected types.NodeConnectedMap + isConnected map[types.NodeID]bool want *failover }{ { @@ -1085,7 +1096,7 @@ func TestFailoverRoute(t *testing.T) { r(1, 1, ipp("10.0.0.0/24"), true, true), r(2, 2, ipp("10.0.0.0/24"), true, false), }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: false, 2: true, }, @@ -1111,7 +1122,7 @@ func TestFailoverRoute(t *testing.T) { r(2, 2, ipp("10.0.0.0/24"), true, true), r(3, 3, ipp("10.0.0.0/24"), true, false), }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: true, 2: true, 3: true, @@ -1128,7 +1139,7 @@ func TestFailoverRoute(t *testing.T) { r(1, 1, ipp("10.0.0.0/24"), true, true), r(2, 4, ipp("10.0.0.0/24"), true, false), }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: true, 4: false, }, @@ -1142,7 +1153,7 @@ func TestFailoverRoute(t *testing.T) { r(2, 4, ipp("10.0.0.0/24"), true, false), r(3, 2, ipp("10.0.0.0/24"), true, false), }, - isConnected: types.NodeConnectedMap{ + isConnected: map[types.NodeID]bool{ 1: false, 2: true, 4: false, @@ -1172,7 +1183,7 @@ func TestFailoverRoute(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotf := failoverRoute(tt.isConnected, &tt.failingRoute, tt.routes) + gotf := failoverRoute(smap(tt.isConnected), &tt.failingRoute, tt.routes) if tt.want == nil && gotf != nil { t.Fatalf("expected nil, got %+v", gotf) diff --git a/hscontrol/grpcv1.go b/hscontrol/grpcv1.go index 4310a617a8..41be5e9dcd 100644 --- a/hscontrol/grpcv1.go +++ b/hscontrol/grpcv1.go @@ -301,7 +301,7 @@ func (api headscaleV1APIServer) DeleteNode( changedNodes, err := api.h.db.DeleteNode( node, - api.h.nodeNotifier.ConnectedMap(), + api.h.nodeNotifier.LikelyConnectedMap(), ) if err != nil { return nil, err @@ -401,7 +401,7 @@ func (api headscaleV1APIServer) ListNodes( ctx context.Context, request *v1.ListNodesRequest, ) (*v1.ListNodesResponse, error) { - isConnected := api.h.nodeNotifier.ConnectedMap() + isLikelyConnected := api.h.nodeNotifier.LikelyConnectedMap() if request.GetUser() != "" { nodes, err := db.Read(api.h.db.DB, func(rx *gorm.DB) (types.Nodes, error) { return db.ListNodesByUser(rx, request.GetUser()) @@ -416,7 +416,9 @@ func (api headscaleV1APIServer) ListNodes( // Populate the online field based on // currently connected nodes. - resp.Online = isConnected[node.ID] + if val, ok := isLikelyConnected.Load(node.ID); ok && val { + resp.Online = true + } response[index] = resp } @@ -439,7 +441,9 @@ func (api headscaleV1APIServer) ListNodes( // Populate the online field based on // currently connected nodes. - resp.Online = isConnected[node.ID] + if val, ok := isLikelyConnected.Load(node.ID); ok && val { + resp.Online = true + } validTags, invalidTags := api.h.ACLPolicy.TagsOfNode( node, @@ -528,7 +532,7 @@ func (api headscaleV1APIServer) DisableRoute( request *v1.DisableRouteRequest, ) (*v1.DisableRouteResponse, error) { update, err := db.Write(api.h.db.DB, func(tx *gorm.DB) ([]types.NodeID, error) { - return db.DisableRoute(tx, request.GetRouteId(), api.h.nodeNotifier.ConnectedMap()) + return db.DisableRoute(tx, request.GetRouteId(), api.h.nodeNotifier.LikelyConnectedMap()) }) if err != nil { return nil, err @@ -568,7 +572,7 @@ func (api headscaleV1APIServer) DeleteRoute( ctx context.Context, request *v1.DeleteRouteRequest, ) (*v1.DeleteRouteResponse, error) { - isConnected := api.h.nodeNotifier.ConnectedMap() + isConnected := api.h.nodeNotifier.LikelyConnectedMap() update, err := db.Write(api.h.db.DB, func(tx *gorm.DB) ([]types.NodeID, error) { return db.DeleteRoute(tx, request.GetRouteId(), isConnected) }) diff --git a/hscontrol/mapper/mapper.go b/hscontrol/mapper/mapper.go index fe8af4d363..d4f4392a2e 100644 --- a/hscontrol/mapper/mapper.go +++ b/hscontrol/mapper/mapper.go @@ -17,6 +17,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "github.com/juanfont/headscale/hscontrol/db" + "github.com/juanfont/headscale/hscontrol/notifier" "github.com/juanfont/headscale/hscontrol/policy" "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/util" @@ -51,10 +52,10 @@ var debugDumpMapResponsePath = envknob.String("HEADSCALE_DEBUG_DUMP_MAPRESPONSE_ type Mapper struct { // Configuration // TODO(kradalby): figure out if this is the format we want this in - db *db.HSDatabase - cfg *types.Config - derpMap *tailcfg.DERPMap - isLikelyConnected types.NodeConnectedMap + db *db.HSDatabase + cfg *types.Config + derpMap *tailcfg.DERPMap + notif *notifier.Notifier uid string created time.Time @@ -70,15 +71,15 @@ func NewMapper( db *db.HSDatabase, cfg *types.Config, derpMap *tailcfg.DERPMap, - isLikelyConnected types.NodeConnectedMap, + notif *notifier.Notifier, ) *Mapper { uid, _ := util.GenerateRandomStringDNSSafe(mapperIDLength) return &Mapper{ - db: db, - cfg: cfg, - derpMap: derpMap, - isLikelyConnected: isLikelyConnected, + db: db, + cfg: cfg, + derpMap: derpMap, + notif: notif, uid: uid, created: time.Now(), @@ -517,7 +518,7 @@ func (m *Mapper) ListPeers(nodeID types.NodeID) (types.Nodes, error) { } for _, peer := range peers { - online := m.isLikelyConnected[peer.ID] + online := m.notif.IsLikelyConnected(peer.ID) peer.IsOnline = &online } diff --git a/hscontrol/metrics.go b/hscontrol/metrics.go index 5547bca262..c00b8e4485 100644 --- a/hscontrol/metrics.go +++ b/hscontrol/metrics.go @@ -37,6 +37,16 @@ var ( Name: "mapresponse_readonly_requests_total", Help: "total count of readonly requests received", }, []string{"status"}) + mapResponseSessions = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "mapresponse_current_sessions_total", + Help: "total count open map response sessions", + }) + mapResponseRejected = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: prometheusNamespace, + Name: "mapresponse_rejected_new_sessions_total", + Help: "total count of new mapsessions rejected", + }, []string{"reason"}) httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: prometheusNamespace, Name: "http_duration_seconds", diff --git a/hscontrol/noise.go b/hscontrol/noise.go index e4e54e7ae2..9ddf2c859e 100644 --- a/hscontrol/noise.go +++ b/hscontrol/noise.go @@ -268,10 +268,12 @@ func (ns *noiseServer) NoisePollNetMapHandler( defer ns.headscale.mapSessionMu.Unlock() sess.infof("node has an open stream(%p), rejecting new stream", sess) + mapResponseRejected.WithLabelValues("exists").Inc() return } ns.headscale.mapSessions[node.ID] = sess + mapResponseSessions.Inc() ns.headscale.mapSessionMu.Unlock() sess.tracef("releasing lock to check stream") } @@ -284,6 +286,7 @@ func (ns *noiseServer) NoisePollNetMapHandler( defer ns.headscale.mapSessionMu.Unlock() delete(ns.headscale.mapSessions, node.ID) + mapResponseSessions.Dec() sess.tracef("releasing lock to remove stream") } diff --git a/hscontrol/notifier/metrics.go b/hscontrol/notifier/metrics.go index 12952dbb0b..c461d3799c 100644 --- a/hscontrol/notifier/metrics.go +++ b/hscontrol/notifier/metrics.go @@ -19,4 +19,9 @@ var ( Name: "notifier_update_sent_total", Help: "total count of update sent on nodes channel", }, []string{"status", "type"}) + notifierNodeUpdateChans = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: prometheusNamespace, + Name: "notifier_open_channels_total", + Help: "total count open channels in notifier", + }) ) diff --git a/hscontrol/notifier/notifier.go b/hscontrol/notifier/notifier.go index c2ffb10e6b..4ad5872363 100644 --- a/hscontrol/notifier/notifier.go +++ b/hscontrol/notifier/notifier.go @@ -9,19 +9,20 @@ import ( "time" "github.com/juanfont/headscale/hscontrol/types" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog/log" ) type Notifier struct { l sync.RWMutex nodes map[types.NodeID]chan<- types.StateUpdate - connected types.NodeConnectedMap + connected *xsync.MapOf[types.NodeID, bool] } func NewNotifier() *Notifier { return &Notifier{ nodes: make(map[types.NodeID]chan<- types.StateUpdate), - connected: make(types.NodeConnectedMap), + connected: xsync.NewMapOf[types.NodeID, bool](), } } @@ -38,12 +39,13 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) { notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds()) n.nodes[nodeID] = c - n.connected[nodeID] = true + n.connected.Store(nodeID, true) log.Trace(). Uint64("node.id", nodeID.Uint64()). Int("open_chans", len(n.nodes)). Msg("Added new channel") + notifierNodeUpdateChans.Inc() } func (n *Notifier) RemoveNode(nodeID types.NodeID) { @@ -63,12 +65,13 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID) { } delete(n.nodes, nodeID) - n.connected[nodeID] = false + n.connected.Store(nodeID, false) log.Trace(). Uint64("node.id", nodeID.Uint64()). Int("open_chans", len(n.nodes)). Msg("Removed channel") + notifierNodeUpdateChans.Dec() } // IsConnected reports if a node is connected to headscale and has a @@ -77,17 +80,22 @@ func (n *Notifier) IsConnected(nodeID types.NodeID) bool { n.l.RLock() defer n.l.RUnlock() - return n.connected[nodeID] + if val, ok := n.connected.Load(nodeID); ok { + return val + } + return false } // IsLikelyConnected reports if a node is connected to headscale and has a // poll session open, but doesnt lock, so might be wrong. func (n *Notifier) IsLikelyConnected(nodeID types.NodeID) bool { - return n.connected[nodeID] + if val, ok := n.connected.Load(nodeID); ok { + return val + } + return false } -// TODO(kradalby): This returns a pointer and can be dangerous. -func (n *Notifier) ConnectedMap() types.NodeConnectedMap { +func (n *Notifier) LikelyConnectedMap() *xsync.MapOf[types.NodeID, bool] { return n.connected } @@ -162,9 +170,10 @@ func (n *Notifier) String() string { b.WriteString("\n") b.WriteString("connected:\n") - for k, v := range n.connected { + n.connected.Range(func(k types.NodeID, v bool) bool { fmt.Fprintf(&b, "\t%d: %t\n", k, v) - } + return true + }) return b.String() } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index 4a1d13ad66..b903f122f1 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -64,7 +64,7 @@ func (h *Headscale) newMapSession( w http.ResponseWriter, node *types.Node, ) *mapSession { - warnf, tracef, infof, errf := logPollFunc(req, node) + warnf, infof, tracef, errf := logPollFunc(req, node) // Use a buffered channel in case a node is not fully ready // to receive a message to make sure we dont block the entire @@ -437,7 +437,7 @@ func (m *mapSession) serve() { func (m *mapSession) pollFailoverRoutes(where string, node *types.Node) { update, err := db.Write(m.h.db.DB, func(tx *gorm.DB) (*types.StateUpdate, error) { - return db.FailoverNodeRoutesIfNeccessary(tx, m.h.nodeNotifier.ConnectedMap(), node) + return db.FailoverNodeRoutesIfNeccessary(tx, m.h.nodeNotifier.LikelyConnectedMap(), node) }) if err != nil { m.errf(err, fmt.Sprintf("failed to ensure failover routes, %s", where)) diff --git a/hscontrol/types/node.go b/hscontrol/types/node.go index 0e30bd9e81..7f28592428 100644 --- a/hscontrol/types/node.go +++ b/hscontrol/types/node.go @@ -28,7 +28,8 @@ var ( ) type NodeID uint64 -type NodeConnectedMap map[NodeID]bool + +// type NodeConnectedMap *xsync.MapOf[NodeID, bool] func (id NodeID) StableID() tailcfg.StableNodeID { return tailcfg.StableNodeID(strconv.FormatUint(uint64(id), util.Base10))