Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle global max blocks semaphore properly #2860

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions internal/block/block_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,36 @@ func (bp *BlockPool) Get() (Block, error) {
default:
// No lock is required here since blockPool is per file and all write
// calls to a single file are serialized because of inode.lock().
if bp.totalBlocks < bp.maxBlocks {
freeSlotsAvailable := bp.globalMaxBlocksSem.TryAcquire(1)
// We are allowed to create one block per file irrespective of free slots.
if bp.totalBlocks > 0 && !freeSlotsAvailable {
continue
}

if bp.canAllocateBlock() {
b, err := createBlock(bp.blockSize)
if err != nil {
return nil, err
}

bp.totalBlocks++
return b, nil

}
}
}
}

// canAllocateBlock checks if a new block can be allocated.
func (bp *BlockPool) canAllocateBlock() bool {
// If max blocks limit is reached, then no more blocks can be allocated.
if bp.totalBlocks >= bp.maxBlocks {
return false
}

// Always allow allocation if this is the first block for the file.
if bp.totalBlocks == 0 {
return true
}

// Otherwise, check if we can acquire a semaphore.
semAcquired := bp.globalMaxBlocksSem.TryAcquire(1)
return semAcquired
}

// FreeBlocksChannel returns the freeBlocksCh being used by the block pool.
func (bp *BlockPool) FreeBlocksChannel() chan Block {
return bp.freeBlocksCh
Expand All @@ -109,7 +119,9 @@ func (bp *BlockPool) ClearFreeBlockChannel() error {
return fmt.Errorf("munmap error: %v", err)
}
bp.totalBlocks--
bp.globalMaxBlocksSem.Release(1)
if bp.totalBlocks != 0 {
bp.globalMaxBlocksSem.Release(1)
}
default:
// Return if there are no more blocks on the channel.
return nil
Expand Down
178 changes: 154 additions & 24 deletions internal/block/block_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,30 +132,77 @@ func (t *BlockPoolTest) TestBlockSize() {
func (t *BlockPoolTest) TestClearFreeBlockChannel() {
bp, err := NewBlockPool(1024, 10, semaphore.NewWeighted(3))
require.Nil(t.T(), err)
b1, err := bp.Get()
blocks := make([]Block, 4)
for i := 0; i < 4; i++ {
blocks[i] = t.validateGetBlockIsNotBlocked(bp)
}
// Adding 2 blocks to freeBlocksCh
bp.freeBlocksCh <- blocks[0]
bp.freeBlocksCh <- blocks[1]
require.Equal(t.T(), int64(4), bp.totalBlocks)

err = bp.ClearFreeBlockChannel()

require.Nil(t.T(), err)
require.NotNil(t.T(), b1)
b2, err := bp.Get()
require.Equal(t.T(), int64(2), bp.totalBlocks)
require.Nil(t.T(), blocks[0].(*memoryBlock).buffer)
require.Nil(t.T(), blocks[1].(*memoryBlock).buffer)
require.NotNil(t.T(), blocks[2].(*memoryBlock).buffer)
require.NotNil(t.T(), blocks[3].(*memoryBlock).buffer)
// Check if semaphore is released correctly.
require.True(t.T(), bp.globalMaxBlocksSem.TryAcquire(2))
require.False(t.T(), bp.globalMaxBlocksSem.TryAcquire(1))
}

func (t *BlockPoolTest) TestFirstBlockIsCreatedWithoutAcquiringGlobalSem() {
bp, err := NewBlockPool(1024, 3, semaphore.NewWeighted(0))
require.Nil(t.T(), err)
require.NotNil(t.T(), b2)
b3, err := bp.Get()
b1, err := bp.Get()
require.Nil(t.T(), err)
require.NotNil(t.T(), b3)
// Adding 2 blocks to freeBlocksCh
require.NotNil(t.T(), b1)
// Adding block to freeBlocksCh
bp.freeBlocksCh <- b1
bp.freeBlocksCh <- b2
require.Equal(t.T(), int64(3), bp.totalBlocks)
require.Equal(t.T(), int64(1), bp.totalBlocks)

err = bp.ClearFreeBlockChannel()

require.Nil(t.T(), err)
require.Equal(t.T(), int64(1), bp.totalBlocks)
require.Equal(t.T(), int64(0), bp.totalBlocks)
require.Nil(t.T(), b1.(*memoryBlock).buffer)
}

func (t *BlockPoolTest) TestClearFreeBlockChannelWithMultipleBlockPools() {
globalMaxBlocksSem := semaphore.NewWeighted(1)
bp1, err := NewBlockPool(1024, 3, globalMaxBlocksSem)
require.Nil(t.T(), err)
bp2, err := NewBlockPool(1024, 3, globalMaxBlocksSem)
require.Nil(t.T(), err)
// Create 2 blocks in bp1.
b1 := t.validateGetBlockIsNotBlocked(bp1)
b2 := t.validateGetBlockIsNotBlocked(bp1)
require.Equal(t.T(), int64(2), bp1.totalBlocks)
// Create 1 block in bp2.
b3 := t.validateGetBlockIsNotBlocked(bp2)
require.Equal(t.T(), int64(1), bp2.totalBlocks)
// Freeing up bp1.
bp1.freeBlocksCh <- b1
bp1.freeBlocksCh <- b2
err = bp1.ClearFreeBlockChannel()
require.Nil(t.T(), err)
require.Nil(t.T(), b1.(*memoryBlock).buffer)
require.Nil(t.T(), b2.(*memoryBlock).buffer)
require.NotNil(t.T(), b3.(*memoryBlock).buffer)
// Check if semaphore is released correctly.
require.True(t.T(), bp.globalMaxBlocksSem.TryAcquire(2))
require.False(t.T(), bp.globalMaxBlocksSem.TryAcquire(1))

// After bp1 is freed up, 1 more block can be created in bp2.
b4 := t.validateGetBlockIsNotBlocked(bp2)
require.Equal(t.T(), int64(2), bp2.totalBlocks)

// Freeing up bp2.
bp2.freeBlocksCh <- b3
bp2.freeBlocksCh <- b4
err = bp2.ClearFreeBlockChannel()
require.Nil(t.T(), err)
require.Nil(t.T(), b3.(*memoryBlock).buffer)
require.Nil(t.T(), b4.(*memoryBlock).buffer)
}

func (t *BlockPoolTest) TestGetWhenGlobalMaxBlocksIsZero() {
Expand All @@ -170,19 +217,15 @@ func (t *BlockPoolTest) TestGetWhenGlobalMaxBlocksIsZero() {
t.validateGetBlockIsBlocked(bp)
}

func (t *BlockPoolTest) TestGetWhenTotalBlocksEqualToGlobalBlocks() {
func (t *BlockPoolTest) TestGetWhenLimitedByGlobalBlocks() {
bp, err := NewBlockPool(1024, 10, semaphore.NewWeighted(2))
require.Nil(t.T(), err)

// Create 1st block
b1, err := bp.Get()
require.Nil(t.T(), err)
require.NotNil(t.T(), b1)
// Create 2nd block
b2, err := bp.Get()
require.Nil(t.T(), err)
require.NotNil(t.T(), b2)
require.Equal(t.T(), int64(2), bp.totalBlocks)
// 3 blocks can be created.
for i := 0; i < 3; i++ {
_ = t.validateGetBlockIsNotBlocked(bp)
}
require.Equal(t.T(), int64(3), bp.totalBlocks)

t.validateGetBlockIsBlocked(bp)
}
Expand All @@ -196,6 +239,7 @@ func (t *BlockPoolTest) TestGetWhenTotalBlocksEqualToMaxBlocks() {
}

func (t *BlockPoolTest) validateGetBlockIsBlocked(bp *BlockPool) {
t.T().Helper()
done := make(chan bool, 1)
go func() {
b, err := bp.Get()
Expand All @@ -211,6 +255,25 @@ func (t *BlockPoolTest) validateGetBlockIsBlocked(bp *BlockPool) {
}
}

func (t *BlockPoolTest) validateGetBlockIsNotBlocked(bp *BlockPool) Block {
t.T().Helper()
done := make(chan Block, 1)
go func() {
b, err := bp.Get()
require.Nil(t.T(), err)
require.NotNil(t.T(), b)
done <- b
}()

select {
case block := <-done:
return block
case <-time.After(1 * time.Second):
assert.FailNow(t.T(), "Not able to get/create a block")
return nil
}
}

func (t *BlockPoolTest) TestBlockPool_FreeBlocksChannel() {
freeBlocksCh := make(chan Block)
bp := &BlockPool{
Expand All @@ -222,3 +285,70 @@ func (t *BlockPoolTest) TestBlockPool_FreeBlocksChannel() {
assert.NotNil(t.T(), ch)
assert.Equal(t.T(), freeBlocksCh, ch)
}

func (t *BlockPoolTest) TestBlockPool_canAllocateBlock() {
ashmeenkaur marked this conversation as resolved.
Show resolved Hide resolved
tests := []struct {
name string
maxBlocks int64
totalBlocks int64
globalSem *semaphore.Weighted
expected bool
}{
{
name: "max_blocks_reached",
maxBlocks: 10,
totalBlocks: 10,
globalSem: semaphore.NewWeighted(0),
expected: false,
},
{
name: "first_block",
maxBlocks: 10,
totalBlocks: 0,
globalSem: semaphore.NewWeighted(0),
expected: true,
},
{
name: "semaphore_acquirable",
maxBlocks: 10,
totalBlocks: 5,
globalSem: semaphore.NewWeighted(1),
expected: true,
},
{
name: "semaphore_not_acquirable",
maxBlocks: 10,
totalBlocks: 5,
globalSem: semaphore.NewWeighted(0),
expected: false,
},
{
name: "equal_max_blocks_and_total_blocks_0",
maxBlocks: 0,
totalBlocks: 0,
globalSem: semaphore.NewWeighted(0),
expected: false,
},
{
name: "total_blocks_more_than_max_blocks",
maxBlocks: 0,
totalBlocks: 1,
globalSem: semaphore.NewWeighted(0),
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func() {
bp := &BlockPool{
maxBlocks: tt.maxBlocks,
totalBlocks: tt.totalBlocks,
globalMaxBlocksSem: tt.globalSem,
}

got := bp.canAllocateBlock()

assert.Equal(t.T(), tt.expected, got)
})
}
}
Loading