diff --git a/pkg/scheduler/health_checker_test.go b/pkg/scheduler/health_checker_test.go index 02402fc9f..e4017828e 100644 --- a/pkg/scheduler/health_checker_test.go +++ b/pkg/scheduler/health_checker_test.go @@ -230,7 +230,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) { assert.Assert(t, healthInfo.HealthChecks[8].Succeeded, "The orphan allocation check on the app still fails after removing the app") // check that foreign allocation does not interfere with health check - falloc := newForeignAllocation("foreign-1", "node") + falloc := newForeignAllocation("foreign-1", "node", resources.Zero) node.AddAllocation(falloc) healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext) assert.Assert(t, healthInfo.HealthChecks[7].Succeeded, "Foreign allocation was detected as orphan") diff --git a/pkg/scheduler/objects/foreign_allocations.go b/pkg/scheduler/objects/foreign_allocations.go new file mode 100644 index 000000000..d4bc841ab --- /dev/null +++ b/pkg/scheduler/objects/foreign_allocations.go @@ -0,0 +1,13 @@ +package objects + +type ForeignAllocationsHandler struct { + allocToNode map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations + allocations map[string]*Allocation +} + +func NewForeignAllocationsHandler() *ForeignAllocationsHandler { + return &ForeignAllocationsHandler{ + allocations: make(map[string]*Allocation), + allocToNode: make(map[string]string), + } +} diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go index db51fa746..3a5c7d999 100644 --- a/pkg/scheduler/objects/node.go +++ b/pkg/scheduler/objects/node.go @@ -364,6 +364,31 @@ func (sn *Node) AddAllocation(alloc *Allocation) { _ = sn.addAllocationInternal(alloc, true) } +// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources +func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation { + sn.Lock() + defer sn.Unlock() + key := alloc.GetAllocationKey() + existing := sn.allocations[key] + sn.allocations[key] = alloc + if existing == nil { + log.Log(log.SchedNode).Debug("unknown allocation to update", + zap.String("allocationKey", key)) + return nil + } + + existingResource := existing.GetAllocatedResource().Clone() + newResource := alloc.GetAllocatedResource().Clone() + delta := resources.Sub(newResource, existingResource) + delta.Prune() + + sn.occupiedResource.AddTo(delta) + sn.occupiedResource.Prune() + sn.refreshAvailableResource() + + return existing +} + func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool { if alloc == nil { return false diff --git a/pkg/scheduler/objects/node_test.go b/pkg/scheduler/objects/node_test.go index 4b38e4360..fb0863715 100644 --- a/pkg/scheduler/objects/node_test.go +++ b/pkg/scheduler/objects/node_test.go @@ -963,3 +963,27 @@ func TestGetAllocations(t *testing.T) { assert.Assert(t, m[foreignAlloc2]) }) } + +func TestUpdateForeignAllocation(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200}) + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 20}) + alloc := newForeignAllocation(foreignAlloc1, nodeID1, allocRes) + node.AddAllocation(alloc) + + // update existing allocation + updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15, "second": 0}) + allocUpd := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes) + prev := node.UpdateForeignAllocation(allocUpd) + expectedOccupied := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}) + expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85, "second": 200}) + assert.Assert(t, prev == alloc, "returned previous allocation is different") + assert.Assert(t, resources.Equals(node.GetOccupiedResource(), expectedOccupied), "occupied resource has been updated incorrectly") + assert.Assert(t, resources.Equals(node.GetAllocatedResource(), resources.Zero), "allocated resource has changed") + assert.Assert(t, resources.Equals(node.GetAvailableResource(), expectedAvailable), "avaiable resource has been updated incorrectly") + + // update non-existing allocation + alloc2 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes) + prev = node.UpdateForeignAllocation(alloc2) + assert.Assert(t, prev == nil, "unexpected previous allocation returned") + assert.Assert(t, node.GetAllocation(foreignAlloc2) == alloc2, "foreign allocation not found") +} diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index b3d72ba06..b05551650 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -65,7 +65,8 @@ type PartitionContext struct { reservations int // number of reservations placeholderAllocations int // number of placeholder allocations preemptionEnabled bool // whether preemption is enabled or not - foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations + foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations + foreignNodes map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application @@ -95,7 +96,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC applications: make(map[string]*objects.Application), completedApplications: make(map[string]*objects.Application), nodes: objects.NewNodeCollection(conf.Name), - foreignAllocs: make(map[string]string), + foreignAllocs: make(map[string]*objects.Allocation), + foreignNodes: make(map[string]string), } pc.partitionManager = newPartitionManager(pc, cc) if err := pc.initialPartitionFromConfig(conf); err != nil { @@ -1310,6 +1312,7 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID zap.String("nodeID", nodeID), zap.String("allocationKey", allocationKey)) node.AddAllocation(alloc) + pc.foreignAllocs[allocationKey] = alloc return false, true, nil } @@ -1317,7 +1320,12 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID zap.String("partitionName", pc.Name), zap.String("appID", applicationID), zap.String("allocationKey", allocationKey)) - // this is a placeholder for eventual resource updates; nothing to do yet + prev := node.UpdateForeignAllocation(alloc) + if prev == nil { + log.Log(log.SchedPartition).Warn("BUG: previous allocation not found during update", + zap.String("allocationKey", allocationKey)) + } + return false, false, nil } @@ -1331,11 +1339,11 @@ func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string { pc.Lock() defer pc.Unlock() - id := pc.foreignAllocs[allocKey] + id := pc.foreignNodes[allocKey] if id != "" { return id } - pc.foreignAllocs[allocKey] = nodeID + pc.foreignNodes[allocKey] = nodeID return "" } @@ -1548,14 +1556,22 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* } func (pc *PartitionContext) removeForeignAllocation(allocID string) { - nodeID := pc.foreignAllocs[allocID] - if nodeID == "" { + alloc := pc.foreignAllocs[allocID] + delete(pc.foreignAllocs, allocID) + if alloc == nil { log.Log(log.SchedPartition).Debug("Tried to remove a non-existing foreign allocation", + zap.String("allocationID", allocID)) + } + + nodeID := pc.foreignNodes[allocID] + delete(pc.foreignNodes, allocID) + if nodeID == "" { + log.Log(log.SchedPartition).Debug("Assigned node not found for foreign allocation", zap.String("allocationID", allocID), zap.String("nodeID", nodeID)) return } - delete(pc.foreignAllocs, allocID) + node := pc.GetNode(nodeID) if node == nil { log.Log(log.SchedPartition).Debug("Node not found for foreign allocation", diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 8a16d34dc..e8d29519a 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -4705,32 +4705,61 @@ func TestForeignAllocation(t *testing.T) { assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc") + assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) // error: empty node ID - req = newForeignAllocation(foreignAlloc1, "") + req = newForeignAllocation(foreignAlloc1, "", resources.Zero) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1") + assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) // error: no node found - req = newForeignAllocation(foreignAlloc1, nodeID2) + req = newForeignAllocation(foreignAlloc1, nodeID2, resources.Zero) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "failed to find node node-2 for allocation foreign-alloc-1") assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) // add new allocation - req = newForeignAllocation(foreignAlloc1, nodeID1) + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) + req = newForeignAllocation(foreignAlloc1, nodeID1, allocRes) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, allocCreated) assert.NilError(t, err) assert.Equal(t, 1, len(partition.foreignAllocs)) - assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1]) + assert.Equal(t, 1, len(partition.foreignNodes)) + assert.Equal(t, nodeID1, partition.foreignNodes[foreignAlloc1]) assert.Equal(t, 0, len(node1.GetYunikornAllocations())) assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil) + occupied := node1.GetOccupiedResource().Clone() + available := node1.GetAvailableResource().Clone() + allocated := node1.GetAllocatedResource().Clone() + assert.Assert(t, resources.Equals(occupied, allocRes), "occupied resources has been calculated incorrectly") + assert.Assert(t, resources.Equals(allocated, resources.Zero), "allocated resources has changed") + expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}) + assert.Assert(t, resources.Equals(available, expectedAvailable), "available resources has been calculated incorrectly") + + // update resources + updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2}) + update := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes) + reqCreated, allocCreated, err = partition.UpdateAllocation(update) + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.NilError(t, err) + updatedOccupied := node1.GetOccupiedResource().Clone() + updatedAvailable := node1.GetAvailableResource().Clone() + updatedAllocated := node1.GetAllocatedResource().Clone() + assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly") + assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed") + expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}) + assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly") // remove allocation released, confirmed := partition.removeAllocation(&si.AllocationRelease{ @@ -4739,6 +4768,7 @@ func TestForeignAllocation(t *testing.T) { assert.Assert(t, released == nil) assert.Assert(t, confirmed == nil) assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) assert.Equal(t, 0, len(node1.GetYunikornAllocations())) assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil) } diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index 56845fd9e..0e0be6b7c 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -610,7 +610,7 @@ func newForeignRequest(allocKey string) *objects.Allocation { }) } -func newForeignAllocation(allocKey, nodeID string) *objects.Allocation { +func newForeignAllocation(allocKey, nodeID string, allocated *resources.Resource) *objects.Allocation { var alloc *objects.Allocation defer func() { if nodeID == "" { @@ -626,7 +626,8 @@ func newForeignAllocation(allocKey, nodeID string) *objects.Allocation { AllocationTags: map[string]string{ siCommon.Foreign: siCommon.AllocTypeDefault, }, - NodeID: id, + NodeID: id, + ResourcePerAlloc: allocated.ToProto(), }) return alloc }