Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky parallel dirOps e2e tests #2852

Merged
merged 8 commits into from
Jan 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 66 additions & 129 deletions tools/integration_tests/operations/parallel_dirops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,26 @@ func createDirectoryStructureForParallelDiropsTest(t *testing.T) string {
return testDir
}

// lookUpFileStat performs a lookup for the given file path and returns the FileInfo and error.
func lookUpFileStat(wg *sync.WaitGroup, filePath string, result *os.FileInfo, err *error) {
defer wg.Done()
fileInfo, lookupErr := os.Stat(filePath)
*result = fileInfo
*err = lookupErr
}

func TestParallelLookUpsForSameFile(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
lookUpFunc := func(wg *sync.WaitGroup, filePath string) (os.FileInfo, error) {
defer wg.Done()
fileInfo, err := os.Stat(filePath)
return fileInfo, err
}
var stat1, stat2 os.FileInfo
var err1, err2 error

// Parallel lookups of file just under mount.
filePath := path.Join(testDir, "file1.txt")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
stat1, err1 = lookUpFunc(&wg, filePath)
}()
go func() {
stat2, err2 = lookUpFunc(&wg, filePath)
}()
go lookUpFileStat(&wg, filePath, &stat1, &err1)
go lookUpFileStat(&wg, filePath, &stat2, &err2)
wg.Wait()

// Assert both stats passed and give correct information
Expand All @@ -101,12 +100,8 @@ func TestParallelLookUpsForSameFile(t *testing.T) {
// Parallel lookups of file under a directory in mount.
filePath = path.Join(testDir, "explicitDir1/file2.txt")
wg.Add(2)
go func() {
stat1, err1 = lookUpFunc(&wg, filePath)
}()
go func() {
stat2, err2 = lookUpFunc(&wg, filePath)
}()
go lookUpFileStat(&wg, filePath, &stat1, &err1)
go lookUpFileStat(&wg, filePath, &stat2, &err2)
wg.Wait()

// Assert both stats passed and give correct information
Expand All @@ -121,10 +116,9 @@ func TestParallelLookUpsForSameFile(t *testing.T) {
func TestParallelReadDirs(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
readDirFunc := func(wg *sync.WaitGroup, dirPath string) ([]os.DirEntry, error) {
readDirFunc := func(wg *sync.WaitGroup, dirPath string, dirEntries *[]os.DirEntry, err *error) {
defer wg.Done()
dirEntries, err := os.ReadDir(dirPath)
return dirEntries, err
*dirEntries, *err = os.ReadDir(dirPath)
}
var dirEntries1, dirEntries2 []os.DirEntry
var err1, err2 error
Expand All @@ -133,12 +127,9 @@ func TestParallelReadDirs(t *testing.T) {
dirPath := path.Join(testDir, "explicitDir1")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
dirEntries1, err1 = readDirFunc(&wg, dirPath)
}()
go func() {
dirEntries2, err2 = readDirFunc(&wg, dirPath)
}()
go readDirFunc(&wg, dirPath, &dirEntries1, &err1)
go readDirFunc(&wg, dirPath, &dirEntries2, &err2)

wg.Wait()

// Assert both readDirs passed and give correct information
Expand All @@ -156,12 +147,8 @@ func TestParallelReadDirs(t *testing.T) {
parentDirPath := testDir
wg = sync.WaitGroup{}
wg.Add(2)
go func() {
dirEntries1, err1 = readDirFunc(&wg, dirPath)
}()
go func() {
dirEntries2, err2 = readDirFunc(&wg, parentDirPath)
}()
go readDirFunc(&wg, dirPath, &dirEntries1, &err1)
go readDirFunc(&wg, parentDirPath, &dirEntries2, &err2)
wg.Wait()

// Assert both readDirs passed and give correct information
Expand All @@ -180,15 +167,9 @@ func TestParallelReadDirs(t *testing.T) {
func TestParallelLookUpAndDeleteSameDir(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
lookUpFunc := func(wg *sync.WaitGroup, dirPath string) (os.FileInfo, error) {
defer wg.Done()
fileInfo, err := os.Stat(dirPath)
return fileInfo, err
}
deleteFunc := func(wg *sync.WaitGroup, dirPath string) error {
deleteFunc := func(wg *sync.WaitGroup, dirPath string, err *error) {
defer wg.Done()
err := os.RemoveAll(dirPath)
return err
*err = os.RemoveAll(dirPath)
}
var statInfo os.FileInfo
var lookUpErr, deleteErr error
Expand All @@ -197,21 +178,18 @@ func TestParallelLookUpAndDeleteSameDir(t *testing.T) {
dirPath := path.Join(testDir, "explicitDir1")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
statInfo, lookUpErr = lookUpFunc(&wg, dirPath)
}()
go func() {
deleteErr = deleteFunc(&wg, dirPath)
}()
go lookUpFileStat(&wg, dirPath, &statInfo, &lookUpErr)
go deleteFunc(&wg, dirPath, &deleteErr)
wg.Wait()

assert.NoError(t, deleteErr)
_, err := os.Stat(dirPath)
assert.True(t, os.IsNotExist(err))
// Assert either dir is looked up first or deleted first
if lookUpErr == nil {
assert.NotNil(t, statInfo, "statInfo should not be nil when lookUpErr is nil")
assert.Contains(t, statInfo.Name(), "explicitDir1")
assert.True(t, statInfo.IsDir())
assert.True(t, statInfo.IsDir(), "The created path should be a directory")
} else {
assert.True(t, os.IsNotExist(lookUpErr))
}
Expand All @@ -220,11 +198,6 @@ func TestParallelLookUpAndDeleteSameDir(t *testing.T) {
func TestParallelLookUpsForDifferentFiles(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
lookUpFunc := func(wg *sync.WaitGroup, filePath string) (os.FileInfo, error) {
defer wg.Done()
fileInfo, err := os.Stat(filePath)
return fileInfo, err
}
var stat1, stat2 os.FileInfo
var err1, err2 error

Expand All @@ -233,12 +206,9 @@ func TestParallelLookUpsForDifferentFiles(t *testing.T) {
filePath2 := path.Join(testDir, "file2.txt")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
stat1, err1 = lookUpFunc(&wg, filePath1)
}()
go func() {
stat2, err2 = lookUpFunc(&wg, filePath2)
}()
go lookUpFileStat(&wg, filePath1, &stat1, &err1)
go lookUpFileStat(&wg, filePath2, &stat2, &err2)

wg.Wait()

// Assert both stats passed and give correct information
Expand All @@ -254,12 +224,8 @@ func TestParallelLookUpsForDifferentFiles(t *testing.T) {
filePath2 = path.Join(testDir, "explicitDir1", "file2.txt")
wg = sync.WaitGroup{}
wg.Add(2)
go func() {
stat1, err1 = lookUpFunc(&wg, filePath1)
}()
go func() {
stat2, err2 = lookUpFunc(&wg, filePath2)
}()
go lookUpFileStat(&wg, filePath1, &stat1, &err1)
go lookUpFileStat(&wg, filePath2, &stat2, &err2)
wg.Wait()

// Assert both stats passed and give correct information
Expand All @@ -274,19 +240,16 @@ func TestParallelLookUpsForDifferentFiles(t *testing.T) {
func TestParallelReadDirAndMkdirInsideSameDir(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
readDirFunc := func(wg *sync.WaitGroup, dirPath string) ([]os.DirEntry, error) {
readDirFunc := func(wg *sync.WaitGroup, dirPath string, dirEntries *[]os.DirEntry, err *error) {
defer wg.Done()
var dirEntries []os.DirEntry
err := filepath.WalkDir(dirPath, func(path string, d fs.DirEntry, err error) error {
dirEntries = append(dirEntries, d)
*err = filepath.WalkDir(dirPath, func(path string, d fs.DirEntry, err error) error {
*dirEntries = append(*dirEntries, d)
return nil
})
return dirEntries, err
}
mkdirFunc := func(wg *sync.WaitGroup, dirPath string) error {
mkdirFunc := func(wg *sync.WaitGroup, dirPath string, err *error) {
defer wg.Done()
err := os.Mkdir(dirPath, setup.DirPermission_0755)
return err
*err = os.Mkdir(dirPath, setup.DirPermission_0755)
}
var dirEntries []os.DirEntry
var readDirErr, mkdirErr error
Expand All @@ -295,20 +258,16 @@ func TestParallelReadDirAndMkdirInsideSameDir(t *testing.T) {
newDirPath := path.Join(testDir, "newDir")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
dirEntries, readDirErr = readDirFunc(&wg, testDir)
}()
go func() {
mkdirErr = mkdirFunc(&wg, newDirPath)
}()
go readDirFunc(&wg, testDir, &dirEntries, &readDirErr)
go mkdirFunc(&wg, newDirPath, &mkdirErr)
wg.Wait()

// Assert both listing and mkdir succeeded
assert.NoError(t, readDirErr)
assert.NoError(t, mkdirErr)
dirStatInfo, err := os.Stat(newDirPath)
assert.NoError(t, err)
assert.True(t, dirStatInfo.IsDir())
assert.True(t, dirStatInfo.IsDir(), "The created path should be a directory")
// List should happen either before or after creation of newDir.
assert.GreaterOrEqual(t, len(dirEntries), 8)
assert.LessOrEqual(t, len(dirEntries), 9)
Expand All @@ -320,15 +279,9 @@ func TestParallelReadDirAndMkdirInsideSameDir(t *testing.T) {
func TestParallelLookUpAndDeleteSameFile(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
lookUpFunc := func(wg *sync.WaitGroup, filePath string) (os.FileInfo, error) {
defer wg.Done()
fileInfo, err := os.Stat(filePath)
return fileInfo, err
}
deleteFileFunc := func(wg *sync.WaitGroup, filePath string) error {
deleteFileFunc := func(wg *sync.WaitGroup, filePath string, err *error) {
defer wg.Done()
err := os.Remove(filePath)
return err
*err = os.Remove(filePath)
}
var fileInfo os.FileInfo
var lookUpErr, deleteErr error
Expand All @@ -337,22 +290,21 @@ func TestParallelLookUpAndDeleteSameFile(t *testing.T) {
filePath := path.Join(testDir, "explicitDir1", "file1.txt")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
fileInfo, lookUpErr = lookUpFunc(&wg, filePath)
}()
go func() {
deleteErr = deleteFileFunc(&wg, filePath)
}()

go lookUpFileStat(&wg, filePath, &fileInfo, &lookUpErr)
go deleteFileFunc(&wg, filePath, &deleteErr)

wg.Wait()

assert.NoError(t, deleteErr)
_, err := os.Stat(filePath)
assert.True(t, os.IsNotExist(err))
// Assert either file is looked up first or deleted first
if lookUpErr == nil {
assert.NotNil(t, fileInfo, "fileInfo should not be nil when lookUpErr is nil")
assert.Equal(t, int64(5), fileInfo.Size())
assert.Contains(t, fileInfo.Name(), "file1.txt")
assert.False(t, fileInfo.IsDir())
assert.False(t, fileInfo.IsDir(), "The created path should not be a directory")
} else {
assert.True(t, os.IsNotExist(lookUpErr))
}
Expand All @@ -361,15 +313,9 @@ func TestParallelLookUpAndDeleteSameFile(t *testing.T) {
func TestParallelLookUpAndRenameSameFile(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
lookUpFunc := func(wg *sync.WaitGroup, filePath string) (os.FileInfo, error) {
defer wg.Done()
fileInfo, err := os.Stat(filePath)
return fileInfo, err
}
renameFunc := func(wg *sync.WaitGroup, oldFilePath string, newFilePath string) error {
renameFunc := func(wg *sync.WaitGroup, oldFilePath string, newFilePath string, err *error) {
defer wg.Done()
err := os.Rename(oldFilePath, newFilePath)
return err
*err = os.Rename(oldFilePath, newFilePath)
}
var fileInfo os.FileInfo
var lookUpErr, renameErr error
Expand All @@ -379,12 +325,9 @@ func TestParallelLookUpAndRenameSameFile(t *testing.T) {
newFilePath := path.Join(testDir, "newFile.txt")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
fileInfo, lookUpErr = lookUpFunc(&wg, filePath)
}()
go func() {
renameErr = renameFunc(&wg, filePath, newFilePath)
}()
go lookUpFileStat(&wg, filePath, &fileInfo, &lookUpErr)
go renameFunc(&wg, filePath, newFilePath, &renameErr)

wg.Wait()

assert.NoError(t, renameErr)
Expand All @@ -395,9 +338,10 @@ func TestParallelLookUpAndRenameSameFile(t *testing.T) {
assert.Equal(t, int64(5), newFileInfo.Size())
// Assert either file is renamed first or looked up first
if lookUpErr == nil {
assert.NotNil(t, fileInfo, "fileInfo should not be nil when lookUpErr is nil")
assert.Equal(t, int64(5), fileInfo.Size())
assert.Contains(t, fileInfo.Name(), "file1.txt")
assert.False(t, fileInfo.IsDir())
assert.False(t, fileInfo.IsDir(), "The created path should not be a directory")
} else {
assert.True(t, os.IsNotExist(lookUpErr))
}
Expand All @@ -406,40 +350,33 @@ func TestParallelLookUpAndRenameSameFile(t *testing.T) {
func TestParallelLookUpAndMkdirSameDir(t *testing.T) {
// Create directory structure for testing.
testDir := createDirectoryStructureForParallelDiropsTest(t)
lookUpFunc := func(wg *sync.WaitGroup, dirPath string) (os.FileInfo, error) {
defer wg.Done()
fileInfo, err := os.Stat(dirPath)
return fileInfo, err
}
mkdirFunc := func(wg *sync.WaitGroup, dirPath string) error {
mkdirFunc := func(wg *sync.WaitGroup, dirPath string, err *error) {
defer wg.Done()
err := os.Mkdir(dirPath, setup.DirPermission_0755)
return err
*err = os.Mkdir(dirPath, setup.DirPermission_0755)
}

var statInfo os.FileInfo
var lookUpErr, mkdirErr error

// Parallel lookup and mkdir of a new directory.
dirPath := path.Join(testDir, "newDir")
wg := sync.WaitGroup{}
var wg sync.WaitGroup
wg.Add(2)
go func() {
statInfo, lookUpErr = lookUpFunc(&wg, dirPath)
}()
go func() {
mkdirErr = mkdirFunc(&wg, dirPath)
}()

go lookUpFileStat(&wg, dirPath, &statInfo, &lookUpErr)
go mkdirFunc(&wg, dirPath, &mkdirErr)
wg.Wait()

assert.NoError(t, mkdirErr)
// Assert either directory is created first or looked up first
assert.NoError(t, mkdirErr, "mkdirFunc should not fail")

if lookUpErr == nil {
assert.NotNil(t, statInfo, "statInfo should not be nil when lookUpErr is nil")
assert.Contains(t, statInfo.Name(), "newDir")
assert.True(t, statInfo.IsDir())
} else {
assert.True(t, os.IsNotExist(lookUpErr))
assert.True(t, os.IsNotExist(lookUpErr), "lookUpErr should indicate directory does not exist")
dirStatInfo, err := os.Stat(dirPath)
assert.NoError(t, err)
assert.True(t, dirStatInfo.IsDir())
assert.NoError(t, err, "os.Stat should succeed after directory creation")
assert.True(t, dirStatInfo.IsDir(), "The created path should be a directory")
}
}
Loading