From efe9fb2ebb72a5f2b829e0c3cc6f8a5a120e937e Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Wed, 27 Mar 2024 20:26:07 +0100 Subject: [PATCH] fix(inventory): readjust inventory on startup for existing leases Signed-off-by: Artur Troian --- cluster/inventory.go | 20 +- cluster/inventory_test.go | 19 +- .../operators/clients/inventory/client.go | 9 +- .../operators/clients/inventory/inventory.go | 32 +- .../v1beta3/clients/inventory/inventory.go | 965 +++++++++++++----- .../clients/inventory/resourcetypes_test.go | 52 - cluster/types/v1beta3/types.go | 1 + operator/ip/operator_test.go | 2 +- 8 files changed, 755 insertions(+), 345 deletions(-) delete mode 100644 cluster/types/v1beta3/clients/inventory/resourcetypes_test.go diff --git a/cluster/inventory.go b/cluster/inventory.go index f485b181..57f3ec56 100644 --- a/cluster/inventory.go +++ b/cluster/inventory.go @@ -478,6 +478,10 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat } var runch <-chan runner.Result + var currinv ctypes.Inventory + + invupch := make(chan ctypes.Inventory, 1) + invch := is.clients.inventory.ResultChan() var reserveChLocal <-chan inventoryRequest @@ -543,6 +547,13 @@ loop: "order", res.OrderID(), "resource-group", res.Resources().GetName(), "allocated", res.allocated) + + if currinv != nil { + select { + case invupch <- currinv: + default: + } + } } break @@ -606,12 +617,19 @@ loop: res: resp, err: err, } - inventoryRequestsCounter.WithLabelValues("status", "success").Inc() + if err == nil { + inventoryRequestsCounter.WithLabelValues("status", "success").Inc() + } else { + inventoryRequestsCounter.WithLabelValues("status", "error").Inc() + } case inv, open := <-invch: if !open { continue } + invupch <- inv + case inv := <-invupch: + currinv = inv.Dup() state.inventory = inv updateIPs() diff --git a/cluster/inventory_test.go b/cluster/inventory_test.go index c4d70f97..8a3da5ff 100644 --- a/cluster/inventory_test.go +++ b/cluster/inventory_test.go @@ -67,7 +67,7 @@ func TestInventory_reservationAllocatable(t *testing.T) { } } - inv := <-cinventory.NewNull("a", "b").ResultChan() + inv := <-cinventory.NewNull(context.Background(), "a", "b").ResultChan() reservations := []*reservation{ mkres(true, mkrg(750, 0, 3*unit.Gi, 1*unit.Gi, 0, 1)), @@ -109,7 +109,7 @@ func TestInventory_ClusterDeploymentNotDeployed(t *testing.T) { ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac)) - ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA")) + ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA")) inv, err := newInventoryService( ctx, @@ -205,7 +205,7 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) { ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000)) ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac)) - ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA")) + ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA")) inv, err := newInventoryService( ctx, @@ -419,7 +419,7 @@ func TestInventory_ReserveIPNoIPOperator(t *testing.T) { ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000)) ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac)) - ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA")) + ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA")) inv, err := newInventoryService( ctx, @@ -473,7 +473,7 @@ func TestInventory_ReserveIPUnavailableWithIPOperator(t *testing.T) { ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000)) ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac)) - ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA")) + ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA")) ctx = context.WithValue(ctx, cfromctx.CtxKeyClientIP, cip.Client(mockIP)) inv, err := newInventoryService( @@ -548,7 +548,7 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) { ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000)) ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac)) - ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA", "nodeB")) + ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA", "nodeB")) ctx = context.WithValue(ctx, cfromctx.CtxKeyClientIP, cip.Client(mockIP)) inv, err := newInventoryService( @@ -596,6 +596,7 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) { mockIP.AssertNumberOfCalls(t, "GetIPAddressStatus", 2) } +// following test needs refactoring it reports incorrect inventory func TestInventory_OverReservations(t *testing.T) { scaffold := makeInventoryScaffold(t, 10) defer scaffold.bus.Close() @@ -659,10 +660,12 @@ func TestInventory_OverReservations(t *testing.T) { ac := afake.NewSimpleClientset() ctx, cancel := context.WithCancel(context.Background()) + nullInv := cinventory.NewNull(ctx, "nodeA") + ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000)) ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac)) - ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA")) + ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, nullInv) inv, err := newInventoryService( ctx, @@ -685,6 +688,8 @@ func TestInventory_OverReservations(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, ctypes.ErrInsufficientCapacity) + nullInv.Commit(reservation.Resources()) + // Send the event immediately to indicate it was deployed err = scaffold.bus.Publish(event.ClusterDeployment{ LeaseID: lid0, diff --git a/cluster/kube/operators/clients/inventory/client.go b/cluster/kube/operators/clients/inventory/client.go index 9054e028..33d25a88 100644 --- a/cluster/kube/operators/clients/inventory/client.go +++ b/cluster/kube/operators/clients/inventory/client.go @@ -19,8 +19,6 @@ import ( "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" - "github.com/tendermint/tendermint/libs/log" - inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" kutil "github.com/akash-network/provider/cluster/kube/util" @@ -48,7 +46,6 @@ type client struct { type inventory struct { inventoryV1.Cluster - log log.Logger } type inventoryState struct { @@ -209,8 +206,6 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In var msg ctypes.Inventory var och chan<- ctypes.Inventory - ilog := fromctx.LogcFromCtx(cl.ctx).With("inventory.adjuster") - for { select { case <-cl.ctx.Done(): @@ -218,13 +213,13 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In case inv := <-in: pending = append(pending, inv) if och == nil { - msg = newInventory(ilog, pending[0]) + msg = newInventory(pending[0]) och = out } case och <- msg: pending = pending[1:] if len(pending) > 0 { - msg = newInventory(ilog, pending[0]) + msg = newInventory(pending[0]) } else { och = nil msg = nil diff --git a/cluster/kube/operators/clients/inventory/inventory.go b/cluster/kube/operators/clients/inventory/inventory.go index a862eb91..62371f97 100644 --- a/cluster/kube/operators/clients/inventory/inventory.go +++ b/cluster/kube/operators/clients/inventory/inventory.go @@ -1,13 +1,10 @@ package inventory import ( - "encoding/json" "fmt" "reflect" "strings" - "github.com/tendermint/tendermint/libs/log" - inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" types "github.com/akash-network/akash-api/go/node/types/v1beta3" @@ -20,10 +17,9 @@ import ( var _ ctypes.Inventory = (*inventory)(nil) -func newInventory(log log.Logger, clState inventoryV1.Cluster) *inventory { +func newInventory(clState inventoryV1.Cluster) *inventory { inv := &inventory{ Cluster: clState, - log: log, } return inv @@ -32,12 +28,17 @@ func newInventory(log log.Logger, clState inventoryV1.Cluster) *inventory { func (inv *inventory) dup() inventory { dup := inventory{ Cluster: *inv.Cluster.Dup(), - log: inv.log, } return dup } +func (inv *inventory) Dup() ctypes.Inventory { + dup := inv.dup() + + return &dup +} + // tryAdjust cluster inventory // It returns two boolean values. First indicates if node-wide resources satisfy (true) requirements // Seconds indicates if cluster-wide resources satisfy (true) requirements @@ -84,11 +85,6 @@ func (inv *inventory) tryAdjust(node int, res *types.Resources) (*crd.SchedulerP return nil, false, true } - // if !nd.tryAdjustVolumesAttached(types.NewResourceValue(1)) { - // return nil, false, true - - // } - storageAdjusted := false for idx := range storageClasses { @@ -272,13 +268,6 @@ nodes: // same adjusted resource units as well as cluster params if adjustedGroup { if !reflect.DeepEqual(adjusted, &adjustedResources[i].Resources) { - jFirstAdjusted, _ := json.Marshal(&adjustedResources[i].Resources) - jCurrAdjusted, _ := json.Marshal(adjusted) - - inv.log.Error(fmt.Sprintf("resource mismatch between replicas within group:\n"+ - "\tfirst adjusted replica: %s\n"+ - "\tcurr adjusted replica: %s", string(jFirstAdjusted), string(jCurrAdjusted))) - err = ctypes.ErrGroupResourceMismatch break nodes } @@ -286,13 +275,6 @@ nodes: // all replicas of the same service are expected to have same node selectors and runtimes // if they don't match then provider cannot bid if !reflect.DeepEqual(sparams, cparams[adjusted.ID]) { - jFirstSparams, _ := json.Marshal(cparams[adjusted.ID]) - jCurrSparams, _ := json.Marshal(sparams) - - inv.log.Error(fmt.Sprintf("scheduler params mismatch between replicas within group:\n"+ - "\tfirst replica: %s\n"+ - "\tcurr replica: %s", string(jFirstSparams), string(jCurrSparams))) - err = ctypes.ErrGroupResourceMismatch break nodes } diff --git a/cluster/types/v1beta3/clients/inventory/inventory.go b/cluster/types/v1beta3/clients/inventory/inventory.go index a5a636eb..4b758d40 100644 --- a/cluster/types/v1beta3/clients/inventory/inventory.go +++ b/cluster/types/v1beta3/clients/inventory/inventory.go @@ -1,7 +1,12 @@ package inventory import ( - sdk "github.com/cosmos/cosmos-sdk/types" + "fmt" + "reflect" + "strings" + + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/api/resource" inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1" @@ -9,9 +14,8 @@ import ( "github.com/akash-network/akash-api/go/node/types/unit" types "github.com/akash-network/akash-api/go/node/types/v1beta3" - "github.com/akash-network/node/sdl" - ctypes "github.com/akash-network/provider/cluster/types/v1beta3" + crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2" ) const ( @@ -29,256 +33,525 @@ type Client interface { ResultChan() <-chan ctypes.Inventory } -type nullInventory struct { - inv *inventory +type NullClient interface { + Client + Commit(dtypes.ResourceGroup) bool +} + +type commitReq struct { + res dtypes.ResourceGroup + resp chan<- struct{} } -type resourcePair struct { - allocatable sdk.Int - allocated sdk.Int +type nullInventory struct { + ctx context.Context + group *errgroup.Group + subch chan chan<- ctypes.Inventory + cmch chan commitReq + cluster inventoryV1.Cluster } -type storageClassState struct { - resourcePair - isDefault bool +type inventory struct { + inventoryV1.Cluster } -func (rp *resourcePair) dup() resourcePair { - return resourcePair{ - allocatable: rp.allocatable.AddRaw(0), - allocated: rp.allocated.AddRaw(0), +var ( + _ Client = (*nullInventory)(nil) + _ ctypes.Inventory = (*inventory)(nil) +) + +func NewNull(ctx context.Context, nodes ...string) NullClient { + group, ctx := errgroup.WithContext(ctx) + + cluster := inventoryV1.Cluster{} + cluster.Storage = append(cluster.Storage, inventoryV1.Storage{ + Quantity: inventoryV1.NewResourcePair(nullClientStorage, nullClientStorage-(10*unit.Gi), resource.DecimalSI), + Info: inventoryV1.StorageInfo{ + Class: "beta2", + }, + }) + + for _, ndName := range nodes { + nd := inventoryV1.Node{ + Name: ndName, + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(nullClientCPU, 100, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(nullClientMemory, 1*unit.Gi, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(nullClientStorage, 10*unit.Gi, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{}, + } + + cluster.Nodes = append(cluster.Nodes, nd) } + + if len(cluster.Nodes) == 0 { + cluster.Nodes = append(cluster.Nodes, inventoryV1.Node{ + Name: "solo", + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(nullClientCPU, 100, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(nullClientMemory, 1*unit.Gi, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(nullClientGPU, 1, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(nullClientStorage, 10*unit.Gi, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{}, + }) + } + + cl := &nullInventory{ + ctx: ctx, + group: group, + subch: make(chan chan<- ctypes.Inventory, 1), + cmch: make(chan commitReq, 1), + cluster: cluster, + } + + group.Go(cl.run) + + return cl } -func (rp *resourcePair) subNLZ(val types.ResourceValue) bool { - avail := rp.available() +// Commit at the moment commit works on single node clusters +func (cl *nullInventory) Commit(res dtypes.ResourceGroup) bool { + ch := make(chan struct{}, 1) - res := avail.Sub(val.Val) - if res.IsNegative() { + select { + case <-cl.ctx.Done(): return false + case cl.cmch <- commitReq{ + res: res, + resp: ch, + }: } - *rp = resourcePair{ - allocatable: rp.allocatable.AddRaw(0), - allocated: rp.allocated.Add(val.Val), + select { + case <-cl.ctx.Done(): + return false + case <-ch: + return true + } +} + +func (cl *nullInventory) ResultChan() <-chan ctypes.Inventory { + ch := make(chan ctypes.Inventory, 1) + + select { + case <-cl.ctx.Done(): + close(ch) + case cl.subch <- ch: } - return true + return ch } -func (rp *resourcePair) available() sdk.Int { - return rp.allocatable.Sub(rp.allocated) +func (cl *nullInventory) run() error { + var subs []chan<- inventoryV1.Cluster + + for { + select { + case <-cl.ctx.Done(): + return cl.ctx.Err() + case cmreq := <-cl.cmch: + ru := cmreq.res.GetResourceUnits() + + ndRes := &cl.cluster.Nodes[0].Resources + + for _, res := range ru { + if res.CPU != nil { + ndRes.CPU.Quantity.SubNLZ(res.CPU.Units) + } + + if res.GPU != nil { + ndRes.GPU.Quantity.SubNLZ(res.GPU.Units) + } + + if res.Memory != nil { + ndRes.Memory.Quantity.SubNLZ(res.Memory.Quantity) + } + + for i, storage := range res.Storage { + attrs, _ := ParseStorageAttributes(storage.Attributes) + + if !attrs.Persistent { + if attrs.Class == "ram" { + ndRes.Memory.Quantity.SubNLZ(storage.Quantity) + } else { + // ephemeral storage + tryAdjustEphemeralStorage(&ndRes.EphemeralStorage, &res.Storage[i]) + } + } else { + for idx := range cl.cluster.Storage { + if cl.cluster.Storage[idx].Info.Class == attrs.Class { + cl.cluster.Storage[idx].Quantity.SubNLZ(storage.Quantity) + break + } + } + } + } + } + + for _, sub := range subs { + sub <- *cl.cluster.Dup() + } + + cmreq.resp <- struct{}{} + case reqch := <-cl.subch: + ch := make(chan inventoryV1.Cluster, 1) + + subs = append(subs, ch) + + cl.group.Go(func() error { + return cl.subscriber(ch, reqch) + }) + + ch <- *cl.cluster.Dup() + } + } } -type node struct { - id string - cpu resourcePair - gpu resourcePair - memory resourcePair - ephemeralStorage resourcePair +func (cl *nullInventory) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.Inventory) error { + defer close(out) + + var pending []inventoryV1.Cluster + var msg ctypes.Inventory + var och chan<- ctypes.Inventory + + for { + select { + case <-cl.ctx.Done(): + return cl.ctx.Err() + case inv := <-in: + pending = append(pending, inv) + if och == nil { + msg = newInventory(pending[0]) + och = out + } + case och <- msg: + pending = pending[1:] + if len(pending) > 0 { + msg = newInventory(pending[0]) + } else { + och = nil + msg = nil + } + } + } } -type clusterStorage map[string]*storageClassState +func newInventory(clState inventoryV1.Cluster) *inventory { + inv := &inventory{ + Cluster: clState, + } -func (cs clusterStorage) dup() clusterStorage { - res := make(clusterStorage) - for k, v := range cs { - res[k] = &storageClassState{ - resourcePair: v.resourcePair.dup(), - isDefault: v.isDefault, - } + return inv +} + +func (inv *inventory) dup() inventory { + dup := inventory{ + Cluster: *inv.Cluster.Dup(), } - return res + return dup } -type inventory struct { - storage clusterStorage - nodes []*node +func (inv *inventory) Dup() ctypes.Inventory { + dup := inv.dup() + + return &dup } -var ( - _ Client = (*nullInventory)(nil) - _ ctypes.Inventory = (*inventory)(nil) -) +// tryAdjust cluster inventory +// It returns two boolean values. First indicates if node-wide resources satisfy (true) requirements +// Seconds indicates if cluster-wide resources satisfy (true) requirements +func (inv *inventory) tryAdjust(node int, res *types.Resources) (*crd.SchedulerParams, bool, bool) { + nd := inv.Nodes[node].Dup() + sparams := &crd.SchedulerParams{} -func NewNull(nodes ...string) Client { - inv := &inventory{ - nodes: make([]*node, 0, len(nodes)), - storage: map[string]*storageClassState{ - "beta2": { - resourcePair: resourcePair{ - allocatable: sdk.NewInt(nullClientStorage), - allocated: sdk.NewInt(nullClientStorage - (10 * unit.Gi)), - }, - isDefault: true, - }, - }, + if !tryAdjustCPU(&nd.Resources.CPU.Quantity, res.CPU) { + return nil, false, true } - for _, ndName := range nodes { - nd := &node{ - id: ndName, - cpu: resourcePair{ - allocatable: sdk.NewInt(nullClientCPU), - allocated: sdk.NewInt(100), - }, - gpu: resourcePair{ - allocatable: sdk.NewInt(nullClientGPU), - allocated: sdk.NewInt(1), - }, - memory: resourcePair{ - allocatable: sdk.NewInt(nullClientMemory), - allocated: sdk.NewInt(1 * unit.Gi), - }, - ephemeralStorage: resourcePair{ - allocatable: sdk.NewInt(nullClientStorage), - allocated: sdk.NewInt(10 * unit.Gi), - }, - } + if !tryAdjustGPU(&nd.Resources.GPU, res.GPU, sparams) { + return nil, false, true + } - inv.nodes = append(inv.nodes, nd) + if !nd.Resources.Memory.Quantity.SubNLZ(res.Memory.Quantity) { + return nil, false, true } - if len(inv.nodes) == 0 { - inv.nodes = append(inv.nodes, &node{ - id: "solo", - cpu: resourcePair{ - allocatable: sdk.NewInt(nullClientCPU), - allocated: sdk.NewInt(nullClientCPU - 100), - }, - gpu: resourcePair{ - allocatable: sdk.NewInt(nullClientGPU), - allocated: sdk.NewInt(1), - }, - memory: resourcePair{ - allocatable: sdk.NewInt(nullClientMemory), - allocated: sdk.NewInt(nullClientMemory - unit.Gi), - }, - ephemeralStorage: resourcePair{ - allocatable: sdk.NewInt(nullClientStorage), - allocated: sdk.NewInt(nullClientStorage - (10 * unit.Gi)), - }, - }) + storageClasses := inv.Storage.Dup() + + for i, storage := range res.Storage { + attrs, err := ParseStorageAttributes(storage.Attributes) + if err != nil { + return nil, false, false + } + + if !attrs.Persistent { + if attrs.Class == "ram" { + if !nd.Resources.Memory.Quantity.SubNLZ(storage.Quantity) { + return nil, false, true + } + } else { + // ephemeral storage + if !tryAdjustEphemeralStorage(&nd.Resources.EphemeralStorage, &res.Storage[i]) { + return nil, false, true + } + } + + continue + } + + if !nd.IsStorageClassSupported(attrs.Class) { + return nil, false, true + } + + storageAdjusted := false + + for idx := range storageClasses { + if storageClasses[idx].Info.Class == attrs.Class { + if !storageClasses[idx].Quantity.SubNLZ(storage.Quantity) { + // cluster storage does not have enough space thus break to error + return nil, false, false + } + storageAdjusted = true + break + } + } + + // requested storage class is not present in the cluster + // there is no point to adjust inventory further + if !storageAdjusted { + return nil, false, false + } } - return &nullInventory{ - inv: inv, + // all requirements for current group have been satisfied + // commit and move on + inv.Nodes[node] = nd + inv.Storage = storageClasses + + if reflect.DeepEqual(sparams, &crd.SchedulerParams{}) { + return nil, true, true } + + return sparams, true, true } -func (inv *nullInventory) ResultChan() <-chan ctypes.Inventory { - ch := make(chan ctypes.Inventory, 1) +func tryAdjustCPU(rp *inventoryV1.ResourcePair, res *types.CPU) bool { + return rp.SubMilliNLZ(res.Units) +} - ch <- inv.inv.dup() +func tryAdjustGPU(rp *inventoryV1.GPU, res *types.GPU, sparams *crd.SchedulerParams) bool { + reqCnt := res.Units.Value() - return ch -} + if reqCnt == 0 { + return true + } -func (inv *inventory) Adjust(reservation ctypes.ReservationGroup, _ ...ctypes.InventoryOption) error { - resources := make(dtypes.ResourceUnits, len(reservation.Resources().GetResourceUnits())) - copy(resources, reservation.Resources().GetResourceUnits()) + if rp.Quantity.Available().Value() == 0 { + return false + } - currInventory := inv.dup() + attrs, err := ParseGPUAttributes(res.Attributes) + if err != nil { + return false + } -nodes: - for nodeName, nd := range currInventory.nodes { - // with persistent storage go through iff there is capacity available - // there is no point to go through any other node without available storage - currResources := resources[:0] + for _, info := range rp.Info { + models, exists := attrs[info.Vendor] + if !exists { + continue + } - for _, res := range resources { - for ; res.Count > 0; res.Count-- { - var adjusted bool + attr, exists := models.ExistsOrWildcard(info.Name) + if !exists { + continue + } - cpu := nd.cpu.dup() - if adjusted = cpu.subNLZ(res.Resources.CPU.Units); !adjusted { - continue nodes - } + if attr != nil { + if attr.RAM != "" && attr.RAM != info.MemorySize { + continue + } - gpu := nd.gpu.dup() - if res.Resources.GPU != nil { - if adjusted = gpu.subNLZ(res.Resources.GPU.Units); !adjusted { - continue nodes - } + if attr.Interface != "" && attr.RAM != info.Interface { + continue + } + } + + reqCnt-- + if reqCnt == 0 { + vendor := strings.ToLower(info.Vendor) + + if !rp.Quantity.SubNLZ(res.Units) { + return false + } + + // sParamsEnsureGPU(sparams) + sparams.Resources.GPU.Vendor = vendor + sparams.Resources.GPU.Model = info.Name + + // switch vendor { + // case builder.GPUVendorNvidia: + // sparams.RuntimeClass = runtimeClassNvidia + // default: + // } + + key := fmt.Sprintf("vendor/%s/model/%s", vendor, info.Name) + if attr != nil { + if attr.RAM != "" { + key = fmt.Sprintf("%s/ram/%s", key, attr.RAM) } - memory := nd.memory.dup() - if adjusted = memory.subNLZ(res.Resources.Memory.Quantity); !adjusted { - continue nodes + if attr.Interface != "" { + key = fmt.Sprintf("%s/interface/%s", key, attr.Interface) } + } - ephemeralStorage := nd.ephemeralStorage.dup() - storageClasses := currInventory.storage.dup() + res.Attributes = types.Attributes{ + { + Key: key, + Value: "true", + }, + } - for idx, storage := range res.Resources.Storage { - attr := storage.Attributes.Find(sdl.StorageAttributePersistent) + return true + } + } - if persistent, _ := attr.AsBool(); !persistent { - if adjusted = ephemeralStorage.subNLZ(storage.Quantity); !adjusted { - continue nodes - } - continue - } + return false +} - attr = storage.Attributes.Find(sdl.StorageAttributeClass) - class, _ := attr.AsString() +func tryAdjustEphemeralStorage(rp *inventoryV1.ResourcePair, res *types.Storage) bool { + return rp.SubNLZ(res.Quantity) +} - if class == sdl.StorageClassDefault { - for name, params := range storageClasses { - if params.isDefault { - class = name +// nolint: unused +func tryAdjustVolumesAttached(rp *inventoryV1.ResourcePair, res types.ResourceValue) bool { + return rp.SubNLZ(res) +} - for i := range storage.Attributes { - if storage.Attributes[i].Key == sdl.StorageAttributeClass { - res.Resources.Storage[idx].Attributes[i].Value = class - break - } - } - break - } - } - } +func (inv *inventory) Adjust(reservation ctypes.ReservationGroup, opts ...ctypes.InventoryOption) error { + cfg := &ctypes.InventoryOptions{} + for _, opt := range opts { + cfg = opt(cfg) + } - cstorage, activeStorageClass := storageClasses[class] - if !activeStorageClass { - continue nodes - } + origResources := reservation.Resources().GetResourceUnits() + resources := make(dtypes.ResourceUnits, 0, len(origResources)) + adjustedResources := make(dtypes.ResourceUnits, 0, len(origResources)) + + for _, res := range origResources { + resources = append(resources, dtypes.ResourceUnit{ + Resources: res.Resources.Dup(), + Count: res.Count, + }) + + adjustedResources = append(adjustedResources, dtypes.ResourceUnit{ + Resources: res.Resources.Dup(), + Count: res.Count, + }) + } + + cparams := make(crd.ReservationClusterSettings) + + currInventory := inv.dup() + + var err error - if adjusted = cstorage.subNLZ(storage.Quantity); !adjusted { - // cluster storage does not have enough space thus break to error +nodes: + for nodeIdx := range currInventory.Nodes { + for i := len(resources) - 1; i >= 0; i-- { + adjustedGroup := false + + var adjusted *types.Resources + if origResources[i].Count == resources[i].Count { + adjusted = &adjustedResources[i].Resources + } else { + adjustedGroup = true + res := adjustedResources[i].Resources.Dup() + adjusted = &res + } + + for ; resources[i].Count > 0; resources[i].Count-- { + sparams, nStatus, cStatus := currInventory.tryAdjust(nodeIdx, adjusted) + if !cStatus { + // cannot satisfy cluster-wide resources, stop lookup + break nodes + } + + if !nStatus { + // cannot satisfy node-wide resources, try with next node + continue nodes + } + + // at this point we expect all replicas of the same service to produce + // same adjusted resource units as well as cluster params + if adjustedGroup { + if !reflect.DeepEqual(adjusted, &adjustedResources[i].Resources) { + err = ctypes.ErrGroupResourceMismatch break nodes } - } - // all requirements for current group have been satisfied - // commit and move on - currInventory.nodes[nodeName] = &node{ - id: nd.id, - cpu: cpu, - gpu: gpu, - memory: memory, - ephemeralStorage: ephemeralStorage, + // all replicas of the same service are expected to have same node selectors and runtimes + // if they don't match then provider cannot bid + if !reflect.DeepEqual(sparams, cparams[adjusted.ID]) { + err = ctypes.ErrGroupResourceMismatch + break nodes + } + } else { + cparams[adjusted.ID] = sparams } } - if res.Count > 0 { - currResources = append(currResources, res) + // all replicas resources are fulfilled when count == 0. + // remove group from the list to prevent double request of the same resources + if resources[i].Count == 0 { + resources = append(resources[:i], resources[i+1:]...) + goto nodes } } - - resources = currResources } if len(resources) == 0 { - *inv = *currInventory + if !cfg.DryRun { + *inv = currInventory + } + + reservation.SetAllocatedResources(adjustedResources) + reservation.SetClusterParams(cparams) return nil } + if err != nil { + return err + } + return ctypes.ErrInsufficientCapacity } +func (inv *inventory) Snapshot() inventoryV1.Cluster { + return *inv.Cluster.Dup() +} + func (inv *inventory) Metrics() inventoryV1.Metrics { cpuTotal := uint64(0) gpuTotal := uint64(0) @@ -293,44 +566,52 @@ func (inv *inventory) Metrics() inventoryV1.Metrics { storageAvailable := make(map[string]int64) ret := inventoryV1.Metrics{ - Nodes: make([]inventoryV1.NodeMetrics, 0, len(inv.nodes)), + Nodes: make([]inventoryV1.NodeMetrics, 0, len(inv.Nodes)), } - for _, nd := range inv.nodes { + for _, nd := range inv.Nodes { invNode := inventoryV1.NodeMetrics{ - Name: nd.id, + Name: nd.Name, Allocatable: inventoryV1.ResourcesMetric{ - CPU: nd.cpu.allocatable.Uint64(), - Memory: nd.memory.allocatable.Uint64(), - StorageEphemeral: nd.ephemeralStorage.allocatable.Uint64(), + CPU: uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()), + GPU: uint64(nd.Resources.GPU.Quantity.Allocatable.Value()), + Memory: uint64(nd.Resources.Memory.Quantity.Allocatable.Value()), + StorageEphemeral: uint64(nd.Resources.EphemeralStorage.Allocatable.Value()), }, } - cpuTotal += nd.cpu.allocatable.Uint64() - gpuTotal += nd.gpu.allocatable.Uint64() - - memoryTotal += nd.memory.allocatable.Uint64() - storageEphemeralTotal += nd.ephemeralStorage.allocatable.Uint64() + cpuTotal += uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()) + gpuTotal += uint64(nd.Resources.GPU.Quantity.Allocatable.Value()) + memoryTotal += uint64(nd.Resources.Memory.Quantity.Allocatable.Value()) + storageEphemeralTotal += uint64(nd.Resources.EphemeralStorage.Allocatable.Value()) - tmp := nd.cpu.allocatable.Sub(nd.cpu.allocated) - invNode.Available.CPU = tmp.Uint64() + avail := nd.Resources.CPU.Quantity.Available() + invNode.Available.CPU = uint64(avail.MilliValue()) cpuAvailable += invNode.Available.CPU - tmp = nd.gpu.allocatable.Sub(nd.gpu.allocated) - invNode.Available.GPU = tmp.Uint64() + avail = nd.Resources.GPU.Quantity.Available() + invNode.Available.GPU = uint64(avail.Value()) gpuAvailable += invNode.Available.GPU - tmp = nd.memory.allocatable.Sub(nd.memory.allocated) - invNode.Available.Memory = tmp.Uint64() + avail = nd.Resources.Memory.Quantity.Available() + invNode.Available.Memory = uint64(avail.Value()) memoryAvailable += invNode.Available.Memory - tmp = nd.ephemeralStorage.allocatable.Sub(nd.ephemeralStorage.allocated) - invNode.Available.StorageEphemeral = tmp.Uint64() + avail = nd.Resources.EphemeralStorage.Available() + invNode.Available.StorageEphemeral = uint64(avail.Value()) storageEphemeralAvailable += invNode.Available.StorageEphemeral ret.Nodes = append(ret.Nodes, invNode) } + for _, class := range inv.Storage { + tmp := class.Quantity.Allocatable.DeepCopy() + storageTotal[class.Info.Class] = tmp.Value() + + tmp = *class.Quantity.Available() + storageAvailable[class.Info.Class] = tmp.Value() + } + ret.TotalAllocatable = inventoryV1.MetricTotal{ CPU: cpuTotal, GPU: gpuTotal, @@ -350,59 +631,239 @@ func (inv *inventory) Metrics() inventoryV1.Metrics { return ret } -func (inv *inventory) Snapshot() inventoryV1.Cluster { - res := inventoryV1.Cluster{ - Nodes: make(inventoryV1.Nodes, 0, len(inv.nodes)), - Storage: make(inventoryV1.ClusterStorage, 0, len(inv.storage)), - } - - for _, nd := range inv.nodes { - res.Nodes = append(res.Nodes, inventoryV1.Node{ - Name: nd.id, - Resources: inventoryV1.NodeResources{ - CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePair(nd.cpu.allocatable.Int64(), nd.cpu.allocated.Int64(), "m"), - }, - Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(nd.memory.allocatable.Int64(), nd.memory.allocated.Int64(), resource.DecimalSI), - }, - GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(nd.gpu.allocatable.Int64(), nd.gpu.allocated.Int64(), resource.DecimalSI), - }, - EphemeralStorage: inventoryV1.NewResourcePair(nd.ephemeralStorage.allocatable.Int64(), nd.ephemeralStorage.allocated.Int64(), resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - }, - Capabilities: inventoryV1.NodeCapabilities{}, - }) - } - - for class, storage := range inv.storage { - res.Storage = append(res.Storage, inventoryV1.Storage{ - Quantity: inventoryV1.NewResourcePair(storage.allocatable.Int64(), storage.allocated.Int64(), resource.DecimalSI), - Info: inventoryV1.StorageInfo{ - Class: class, - }, - }) - } - - return res -} - -func (inv *inventory) dup() *inventory { - res := &inventory{ - nodes: make([]*node, 0, len(inv.nodes)), - } - - for _, nd := range inv.nodes { - res.nodes = append(res.nodes, &node{ - id: nd.id, - cpu: nd.cpu.dup(), - gpu: nd.gpu.dup(), - memory: nd.memory.dup(), - ephemeralStorage: nd.ephemeralStorage.dup(), - }) - } - - return res -} +// func (inv *inventory) Adjust(reservation ctypes.ReservationGroup, _ ...ctypes.InventoryOption) error { +// resources := make(dtypes.ResourceUnits, len(reservation.Resources().GetResourceUnits())) +// copy(resources, reservation.Resources().GetResourceUnits()) +// +// currInventory := inv.dup() +// +// nodes: +// for nodeName, nd := range currInventory.nodes { +// // with persistent storage go through iff there is capacity available +// // there is no point to go through any other node without available storage +// currResources := resources[:0] +// +// for _, res := range resources { +// for ; res.Count > 0; res.Count-- { +// var adjusted bool +// +// cpu := nd.cpu.dup() +// if adjusted = cpu.subNLZ(res.Resources.CPU.Units); !adjusted { +// continue nodes +// } +// +// gpu := nd.gpu.dup() +// if res.Resources.GPU != nil { +// if adjusted = gpu.subNLZ(res.Resources.GPU.Units); !adjusted { +// continue nodes +// } +// } +// +// memory := nd.memory.dup() +// if adjusted = memory.subNLZ(res.Resources.Memory.Quantity); !adjusted { +// continue nodes +// } +// +// ephemeralStorage := nd.ephemeralStorage.dup() +// storageClasses := currInventory.storage.dup() +// +// for idx, storage := range res.Resources.Storage { +// attr := storage.Attributes.Find(sdl.StorageAttributePersistent) +// +// if persistent, _ := attr.AsBool(); !persistent { +// if adjusted = ephemeralStorage.subNLZ(storage.Quantity); !adjusted { +// continue nodes +// } +// continue +// } +// +// attr = storage.Attributes.Find(sdl.StorageAttributeClass) +// class, _ := attr.AsString() +// +// if class == sdl.StorageClassDefault { +// for name, params := range storageClasses { +// if params.isDefault { +// class = name +// +// for i := range storage.Attributes { +// if storage.Attributes[i].Key == sdl.StorageAttributeClass { +// res.Resources.Storage[idx].Attributes[i].Value = class +// break +// } +// } +// break +// } +// } +// } +// +// cstorage, activeStorageClass := storageClasses[class] +// if !activeStorageClass { +// continue nodes +// } +// +// if adjusted = cstorage.subNLZ(storage.Quantity); !adjusted { +// // cluster storage does not have enough space thus break to error +// break nodes +// } +// } +// +// // all requirements for current group have been satisfied +// // commit and move on +// currInventory.nodes[nodeName] = &node{ +// id: nd.id, +// cpu: cpu, +// gpu: gpu, +// memory: memory, +// ephemeralStorage: ephemeralStorage, +// } +// } +// +// if res.Count > 0 { +// currResources = append(currResources, res) +// } +// } +// +// resources = currResources +// } +// +// if len(resources) == 0 { +// *inv = *currInventory +// +// return nil +// } +// +// return ctypes.ErrInsufficientCapacity +// } +// +// func (inv *inventory) Metrics() inventoryV1.Metrics { +// cpuTotal := uint64(0) +// gpuTotal := uint64(0) +// memoryTotal := uint64(0) +// storageEphemeralTotal := uint64(0) +// storageTotal := make(map[string]int64) +// +// cpuAvailable := uint64(0) +// gpuAvailable := uint64(0) +// memoryAvailable := uint64(0) +// storageEphemeralAvailable := uint64(0) +// storageAvailable := make(map[string]int64) +// +// ret := inventoryV1.Metrics{ +// Nodes: make([]inventoryV1.NodeMetrics, 0, len(inv.nodes)), +// } +// +// for _, nd := range inv.nodes { +// invNode := inventoryV1.NodeMetrics{ +// Name: nd.id, +// Allocatable: inventoryV1.ResourcesMetric{ +// CPU: nd.cpu.allocatable.Uint64(), +// Memory: nd.memory.allocatable.Uint64(), +// StorageEphemeral: nd.ephemeralStorage.allocatable.Uint64(), +// }, +// } +// +// cpuTotal += nd.cpu.allocatable.Uint64() +// gpuTotal += nd.gpu.allocatable.Uint64() +// +// memoryTotal += nd.memory.allocatable.Uint64() +// storageEphemeralTotal += nd.ephemeralStorage.allocatable.Uint64() +// +// tmp := nd.cpu.allocatable.Sub(nd.cpu.allocated) +// invNode.Available.CPU = tmp.Uint64() +// cpuAvailable += invNode.Available.CPU +// +// tmp = nd.gpu.allocatable.Sub(nd.gpu.allocated) +// invNode.Available.GPU = tmp.Uint64() +// gpuAvailable += invNode.Available.GPU +// +// tmp = nd.memory.allocatable.Sub(nd.memory.allocated) +// invNode.Available.Memory = tmp.Uint64() +// memoryAvailable += invNode.Available.Memory +// +// tmp = nd.ephemeralStorage.allocatable.Sub(nd.ephemeralStorage.allocated) +// invNode.Available.StorageEphemeral = tmp.Uint64() +// storageEphemeralAvailable += invNode.Available.StorageEphemeral +// +// ret.Nodes = append(ret.Nodes, invNode) +// } +// +// ret.TotalAllocatable = inventoryV1.MetricTotal{ +// CPU: cpuTotal, +// GPU: gpuTotal, +// Memory: memoryTotal, +// StorageEphemeral: storageEphemeralTotal, +// Storage: storageTotal, +// } +// +// ret.TotalAvailable = inventoryV1.MetricTotal{ +// CPU: cpuAvailable, +// GPU: gpuAvailable, +// Memory: memoryAvailable, +// StorageEphemeral: storageEphemeralAvailable, +// Storage: storageAvailable, +// } +// +// return ret +// } +// +// func (inv *inventory) Snapshot() inventoryV1.Cluster { +// res := inventoryV1.Cluster{ +// Nodes: make(inventoryV1.Nodes, 0, len(inv.nodes)), +// Storage: make(inventoryV1.ClusterStorage, 0, len(inv.storage)), +// } +// +// for i := range inv.nodes { +// nd := inv.nodes[i] +// res.Nodes = append(res.Nodes, inventoryV1.Node{ +// Name: nd.id, +// Resources: inventoryV1.NodeResources{ +// CPU: inventoryV1.CPU{ +// Quantity: inventoryV1.NewResourcePair(nd.cpu.allocatable.Int64(), nd.cpu.allocated.Int64(), "m"), +// }, +// Memory: inventoryV1.Memory{ +// Quantity: inventoryV1.NewResourcePair(nd.memory.allocatable.Int64(), nd.memory.allocated.Int64(), resource.DecimalSI), +// }, +// GPU: inventoryV1.GPU{ +// Quantity: inventoryV1.NewResourcePair(nd.gpu.allocatable.Int64(), nd.gpu.allocated.Int64(), resource.DecimalSI), +// }, +// EphemeralStorage: inventoryV1.NewResourcePair(nd.ephemeralStorage.allocatable.Int64(), nd.ephemeralStorage.allocated.Int64(), resource.DecimalSI), +// VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), +// VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), +// }, +// Capabilities: inventoryV1.NodeCapabilities{}, +// }) +// } +// +// for class, storage := range inv.storage { +// res.Storage = append(res.Storage, inventoryV1.Storage{ +// Quantity: inventoryV1.NewResourcePair(storage.allocatable.Int64(), storage.allocated.Int64(), resource.DecimalSI), +// Info: inventoryV1.StorageInfo{ +// Class: class, +// }, +// }) +// } +// +// return res +// } +// +// func (inv *inventory) dup() *inventory { +// res := &inventory{ +// nodes: make([]*node, 0, len(inv.nodes)), +// } +// +// for _, nd := range inv.nodes { +// res.nodes = append(res.nodes, &node{ +// id: nd.id, +// cpu: nd.cpu.dup(), +// gpu: nd.gpu.dup(), +// memory: nd.memory.dup(), +// ephemeralStorage: nd.ephemeralStorage.dup(), +// }) +// } +// +// return res +// } +// +// func (inv *inventory) Dup() ctypes.Inventory { +// return inv.dup() +// } diff --git a/cluster/types/v1beta3/clients/inventory/resourcetypes_test.go b/cluster/types/v1beta3/clients/inventory/resourcetypes_test.go deleted file mode 100644 index 1172ccd6..00000000 --- a/cluster/types/v1beta3/clients/inventory/resourcetypes_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package inventory - -import ( - "testing" - - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/stretchr/testify/require" - - types "github.com/akash-network/akash-api/go/node/types/v1beta3" -) - -func TestResourcePairAvailable(t *testing.T) { - rp := &resourcePair{ - allocatable: sdk.NewInt(100), - allocated: sdk.NewInt(0), - } - - avail := rp.available() - - require.Equal(t, int64(100), avail.Int64()) - - rp = &resourcePair{ - allocatable: sdk.NewInt(100), - allocated: sdk.NewInt(100), - } - - avail = rp.available() - - require.Equal(t, int64(0), avail.Int64()) -} - -func TestResourcePairSubNLZ(t *testing.T) { - rp := &resourcePair{ - allocatable: sdk.NewInt(100), - allocated: sdk.NewInt(0), - } - - adjusted := rp.subNLZ(types.NewResourceValue(0)) - require.True(t, adjusted) - - avail := rp.available() - require.Equal(t, int64(100), avail.Int64()) - - adjusted = rp.subNLZ(types.NewResourceValue(9)) - require.True(t, adjusted) - - avail = rp.available() - require.Equal(t, int64(91), avail.Int64()) - - adjusted = rp.subNLZ(types.NewResourceValue(92)) - require.False(t, adjusted) -} diff --git a/cluster/types/v1beta3/types.go b/cluster/types/v1beta3/types.go index cf4b6dc9..bf6d2202 100644 --- a/cluster/types/v1beta3/types.go +++ b/cluster/types/v1beta3/types.go @@ -67,6 +67,7 @@ type Inventory interface { Adjust(ReservationGroup, ...InventoryOption) error Metrics() inventoryV1.Metrics Snapshot() inventoryV1.Cluster + Dup() Inventory } type EventsWatcher interface { diff --git a/operator/ip/operator_test.go b/operator/ip/operator_test.go index 9f32e118..9ed662ce 100644 --- a/operator/ip/operator_test.go +++ b/operator/ip/operator_test.go @@ -49,7 +49,7 @@ func runIPOperator(t *testing.T, run bool, aobj []runtime.Object, prerun, fn fun ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac)) - ctx = context.WithValue(ctx, clfromctx.CtxKeyClientInventory, cinventory.NewNull()) + ctx = context.WithValue(ctx, clfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx)) providerAddr := testutil.AccAddress(t)