Skip to content

Commit

Permalink
Improve object key parsing for boltdb shipper. (#2323)
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Jul 13, 2020
1 parent 2548d69 commit c9f89df
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions pkg/storage/stores/local/downloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ func (fc *FilesCollection) checkStorageForUpdates(ctx context.Context) (toDownlo

fc.mtx.RLock()
for _, object := range objects {
uploader := strings.Split(object.Key, "/")[1]
uploader, err := getUploaderFromObjectKey(object.Key)
if err != nil {
return nil, nil, err
}
listedUploaders[uploader] = struct{}{}

// Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates
Expand Down Expand Up @@ -81,14 +84,17 @@ func (fc *FilesCollection) Sync(ctx context.Context) error {

// It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it.
func (fc *FilesCollection) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error {
uploader := strings.Split(storageObject.Key, "/")[1]
uploader, err := getUploaderFromObjectKey(storageObject.Key)
if err != nil {
return err
}
folderPath, _ := fc.getFolderPathForPeriod(false)
filePath := path.Join(folderPath, uploader)

// download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists
tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp"))

err := fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath)
err = fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath)
if err != nil {
return err
}
Expand Down Expand Up @@ -185,7 +191,11 @@ func (fc *FilesCollection) downloadAllFilesForPeriod(ctx context.Context) (err e
}

for _, object := range objects {
uploader := getUploaderFromObjectKey(object.Key)
var uploader string
uploader, err = getUploaderFromObjectKey(object.Key)
if err != nil {
return
}

filePath := path.Join(folderPath, uploader)
df := downloadedFile{}
Expand Down Expand Up @@ -232,6 +242,13 @@ func (fc *FilesCollection) getFolderPathForPeriod(ensureExists bool) (string, er
return folderPath, nil
}

func getUploaderFromObjectKey(objectKey string) string {
return strings.Split(objectKey, "/")[1]
func getUploaderFromObjectKey(objectKey string) (string, error) {
uploaders := strings.Split(objectKey, "/")
if len(uploaders) != 2 {
return "", fmt.Errorf("invalid object key: %v", objectKey)
}
if uploaders[1] == "" {
return "", fmt.Errorf("empty uploader, object key: %v", objectKey)
}
return uploaders[1], nil
}

0 comments on commit c9f89df

Please sign in to comment.