Skip to content

Commit

Permalink
Implement CacheReader and enhance asset handling with tests (#597)
Browse files Browse the repository at this point in the history
* Implement CacheReader for efficient multiple reads and add unit tests

* Refactor asset file handling to use OpenFile method and implement CacheReader for efficient data caching

* Merge branch 'next' into refactor-temporary-files

* Merge branch 'next' into refactor-temporary-files

* Add end-to-end tests for archiving from Google Photos and Immich
  • Loading branch information
simulot authored Jan 2, 2025
1 parent 096b36d commit 234b3e3
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 54 deletions.
7 changes: 4 additions & 3 deletions adapters/folder/readFolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,10 @@ func (la *LocalAssetBrowser) parseDir(ctx context.Context, fsys fs.FS, dir strin
// Read metadata from the file only id needed (date range or take date from filename)
if la.requiresDateInformation {
if a.CaptureDate.IsZero() {
// no date in XMp, JSON, try reading the metadata
f, name, err := a.PartialSourceReader()
// no date in XMP, JSON, try reading the metadata
f, err := a.OpenFile()
if err == nil {
md, err := exif.GetMetaData(f, name, la.flags.TZ)
md, err := exif.GetMetaData(f, a.Ext, la.flags.TZ)
if err != nil {
la.log.Record(ctx, fileevent.INFO, a.File, "warning", err.Error())
} else {
Expand All @@ -298,6 +298,7 @@ func (la *LocalAssetBrowser) parseDir(ctx context.Context, fsys fs.FS, dir strin
}
a.CaptureDate = a.FromApplication.DateTaken
}
f.Close()
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion adapters/folder/writeFolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (w *LocalAssetWriter) WriteAsset(ctx context.Context, a *assets.Asset) erro
case <-ctx.Done():
return ctx.Err()
default:
r, err := a.Open()
r, err := a.OpenFile()
if err != nil {
return err
}
Expand Down Expand Up @@ -106,6 +106,7 @@ func (w *LocalAssetWriter) WriteAsset(ctx context.Context, a *assets.Asset) erro

// write the asset
err = fshelper.WriteFile(w.WriteToFS, path.Join(dir, base), r)
r.Close()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion app/cmd/upload/advice.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (ai *AssetIndex) ShouldUpload(la *assets.Asset) (*Advice, error) {

if len(l) > 0 {
dateTaken := la.CaptureDate
size := la.Size()
size := int64(la.FileSize)

for _, sa = range l {
compareDate := compareDate(dateTaken, sa.ExifInfo.DateTimeOriginal.Time)
Expand Down
2 changes: 1 addition & 1 deletion app/cmd/upload/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (ai *AssetIndex) AddLocalAsset(la *assets.Asset, immichID string) {
DeviceAssetID: la.DeviceAssetID(),
OriginalFileName: strings.TrimSuffix(path.Base(la.OriginalFileName), path.Ext(la.OriginalFileName)),
ExifInfo: immich.ExifInfo{
FileSizeInByte: la.Size(),
FileSizeInByte: int64(la.FileSize),
DateTimeOriginal: immich.ImmichTime{Time: la.CaptureDate},
Latitude: la.Latitude,
Longitude: la.Longitude,
Expand Down
10 changes: 5 additions & 5 deletions immich/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (ic *ImmichClient) AssetUpload(ctx context.Context, la *assets.Asset) (Asse
return ar, fmt.Errorf("type file not supported: %s", path.Ext(la.OriginalFileName))
}

f, err := la.Open()
f, err := la.OpenFile()
if err != nil {
return ar, (err)
}
Expand All @@ -85,6 +85,7 @@ func (ic *ImmichClient) AssetUpload(ctx context.Context, la *assets.Asset) (Asse
defer func() {
m.Close()
pw.Close()
f.Close()
}()
var s fs.FileInfo
s, err = f.Stat()
Expand Down Expand Up @@ -166,13 +167,12 @@ func (ic *ImmichClient) AssetUpload(ctx context.Context, la *assets.Asset) (Asse
if err != nil {
return
}
defer f.Close()
f, err = la.FromSideCar.File.Open()
scf, err := la.FromSideCar.File.Open()
if err != nil {
return
}

_, err = io.Copy(part, f)
defer scf.Close()
_, err = io.Copy(part, scf)
if err != nil {
return
}
Expand Down
9 changes: 2 additions & 7 deletions internal/assets/asset.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package assets

import (
"io"
"io/fs"
"log/slog"
"os"
"time"

"github.com/simulot/immich-go/internal/fshelper"
"github.com/simulot/immich-go/internal/fshelper/cachereader"
)

/*
Expand Down Expand Up @@ -57,10 +55,7 @@ type Asset struct {
Longitude float64 // GPS longitude

// buffer management
sourceFile fs.File // the opened source file
tempFile *os.File // buffer that keep partial reads available for the full file reading
teeReader io.Reader // write each read from it into the tempWriter
reader io.Reader // the reader that combines the partial read and original file for full file reading
cacheReader *cachereader.CacheReader
}

// Kind is the probable type of the image
Expand Down
58 changes: 23 additions & 35 deletions internal/assets/assetFile.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,46 @@
package assets

import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"time"

"github.com/simulot/immich-go/internal/fshelper"
"github.com/simulot/immich-go/internal/fshelper/cachereader"
)

// Remove the temporary file
func (a *Asset) Remove() error {
if fsys, ok := a.File.FS().(fshelper.FSCanRemove); ok {
return fsys.Remove(a.File.Name())
}
return nil
}

func (a *Asset) DeviceAssetID() string {
return fmt.Sprintf("%s-%d", a.OriginalFileName, a.FileSize)
}

// PartialSourceReader open a reader on the current asset.
// each byte read from it is saved into a temporary file.
//
// It returns a TeeReader that writes each read byte from the sou²rce into the temporary file.
// The temporary file is discarded when the LocalAssetFile is closed
// TODO: possible optimization: when the file is a plain file, do not copy it into a temporary file
// TODO: use user temp folder
// OpenFile return an os.File whatever the type of source reader is.
// It can be called several times for the same asset.

func (a *Asset) PartialSourceReader() (reader io.Reader, tmpName string, err error) {
if a.sourceFile == nil {
a.sourceFile, err = a.File.Open()
func (a *Asset) OpenFile() (*os.File, error) {
if a.cacheReader == nil {
// get a FS.File from of the asset
f, err := a.File.Open()
if err != nil {
return nil, "", err
return nil, err
}
}
if a.tempFile == nil {
a.tempFile, err = os.CreateTemp("", "immich-go_*"+a.NameInfo.Ext)
// Create a cache reader from the FS.File
cr, err := cachereader.NewCacheReader(f)
if err != nil {
return nil, "", err
}
if a.teeReader == nil {
a.teeReader = io.TeeReader(a.sourceFile, a.tempFile)
return nil, err
}
a.cacheReader = cr
}
_, err = a.tempFile.Seek(0, 0)
if err != nil {
return nil, "", err
return a.cacheReader.OpenFile()
}

// Close close the temporary file and close the source
func (a *Asset) Close() error {
if a.cacheReader == nil {
return nil
}
return io.MultiReader(a.tempFile, a.teeReader), a.tempFile.Name(), nil
return a.cacheReader.Close()
}

/*
// Open return fs.File that reads previously read bytes followed by the actual file content.
func (a *Asset) Open() (fs.File, error) {
var err error
Expand Down Expand Up @@ -124,3 +111,4 @@ func (a *Asset) ModTime() time.Time {
// Sys implements the fs.FILE interface
func (a *Asset) Sys() any { return nil }
*/
99 changes: 98 additions & 1 deletion internal/e2eTests/archive/e2e_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestArchiveFromGooglePhotos(t *testing.T) {
ctx := context.Background()

tmpDir := os.TempDir()
tmpDir, err := os.MkdirTemp(tmpDir, "upload_test_folder")
tmpDir, err := os.MkdirTemp(tmpDir, "archive_test_folder")
if err != nil {
t.Fatalf("os.MkdirTemp() error = %v", err)
return
Expand All @@ -38,3 +38,100 @@ func TestArchiveFromGooglePhotos(t *testing.T) {
a.Log().Error(err.Error())
}
}

func TestArchiveFromFolder(t *testing.T) {
e2e.InitMyEnv()
e2e.ResetImmich(t)

ctx := context.Background()

tmpDir := os.TempDir()
tmpDir, err := os.MkdirTemp(tmpDir, "archive_test_folder")
if err != nil {
t.Fatalf("os.MkdirTemp() error = %v", err)
return
}
t.Cleanup(func() {
os.RemoveAll(tmpDir)
})

c, a := cmd.RootImmichGoCommand(ctx)
c.SetArgs([]string{
"upload", "from-folder",
"--server=" + e2e.MyEnv("IMMICHGO_SERVER"),
"--api-key=" + e2e.MyEnv("IMMICHGO_APIKEY"),
"--no-ui",
"--into-album=ALBUM",
"--manage-raw-jpeg=KeepRaw",
"--manage-burst=stack",
e2e.MyEnv("IMMICHGO_TESTFILES") + "/burst/storm",
})

// let's start
err = c.ExecuteContext(ctx)
if err != nil && a.Log().GetSLog() != nil {
a.Log().Error(err.Error())
return
}

c, a = cmd.RootImmichGoCommand(ctx)
c.SetArgs([]string{
"archive", "from-imich",
"--write-to-folder=" + tmpDir,
e2e.MyEnv("IMMICHGO_TESTFILES") + "/burst/Reflex",
})

// let's start
err = c.ExecuteContext(ctx)
if err != nil && a.Log().GetSLog() != nil {
a.Log().Error(err.Error())
}
}

func TestArchiveFromImmich(t *testing.T) {
e2e.InitMyEnv()
e2e.ResetImmich(t)

ctx := context.Background()

tmpDir := os.TempDir()
tmpDir, err := os.MkdirTemp(tmpDir, "archive_test_folder")
if err != nil {
t.Fatalf("os.MkdirTemp() error = %v", err)
return
}
t.Cleanup(func() {
os.RemoveAll(tmpDir)
})

c, a := cmd.RootImmichGoCommand(ctx)
c.SetArgs([]string{
"upload", "from-folder",
"--server=" + e2e.MyEnv("IMMICHGO_SERVER"),
"--api-key=" + e2e.MyEnv("IMMICHGO_APIKEY"),
"--no-ui",
"--into-album=ALBUM",
"--manage-raw-jpeg=KeepRaw",
"--manage-burst=stack",
e2e.MyEnv("IMMICHGO_TESTFILES") + "/burst/storm",
})

// let's start
err = c.ExecuteContext(ctx)
if err != nil && a.Log().GetSLog() != nil {
a.Log().Error(err.Error())
}
c, a = cmd.RootImmichGoCommand(ctx)
c.SetArgs([]string{
"archive", "from-immich",
"--from-server=" + e2e.MyEnv("IMMICHGO_SERVER"),
"--from-api-key=" + e2e.MyEnv("IMMICHGO_APIKEY"),
"--write-to-folder=" + tmpDir,
})

// let's start
err = c.ExecuteContext(ctx)
if err != nil && a.Log().GetSLog() != nil {
a.Log().Error(err.Error())
}
}
22 changes: 22 additions & 0 deletions internal/e2eTests/upload/e2e_from_folder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,28 @@ func TestUploadFromGooglePhotos(t *testing.T) {
}
}

func TestUploadFromGooglePhotosZipped(t *testing.T) {
e2e.InitMyEnv()
e2e.ResetImmich(t)

ctx := context.Background()

c, a := cmd.RootImmichGoCommand(ctx)
c.SetArgs([]string{
"upload", "from-google-photos",
"--server=" + e2e.MyEnv("IMMICHGO_SERVER"),
"--api-key=" + e2e.MyEnv("IMMICHGO_APIKEY"),
// "--no-ui",
e2e.MyEnv("IMMICHGO_TESTFILES") + "/demo takeout/Takeout.zip",
})

// let's start
err := c.ExecuteContext(ctx)
if err != nil && a.Log().GetSLog() != nil {
a.Log().Error(err.Error())
}
}

func TestUploadFromFolder(t *testing.T) {
e2e.InitMyEnv()
e2e.ResetImmich(t)
Expand Down
52 changes: 52 additions & 0 deletions internal/fshelper/cachereader/cachereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachereader

import (
"io"
"os"
)

// CacheReader is a reader that caches the data in a temporary file to allow multiple reads
type CacheReader struct {
tmpFile *os.File // tmpFile is the temporary file or the original file
shouldRemove bool
}

// NewCacheReader creates a new CacheReader from an io.ReadCloser
// When the reader is an os.File, it will be used directly
// Otherwise, the content will be copied into a temporary file, and the original reader will be closed
func NewCacheReader(rc io.ReadCloser) (*CacheReader, error) {
var err error
c := &CacheReader{}
if f, ok := rc.(*os.File); ok {
c.tmpFile = f
} else {
c.tmpFile, err = os.CreateTemp("", "immich-go_*")
if err != nil {
return nil, err
}
// be sure to copy the reader content into the temporary file
_, err = io.Copy(c.tmpFile, rc)
rc.Close()
c.shouldRemove = true
}
return c, err
}

// OpenFile creates a new file based on the temporary file
func (cr *CacheReader) OpenFile() (*os.File, error) {
f, err := os.Open(cr.tmpFile.Name())
if err != nil {
return nil, err
}
return f, nil
}

// Close closes the temporary file only if it was created by NewCacheReader
func (cr *CacheReader) Close() error {
if cr.shouldRemove {
// the source is already closed
return os.Remove(cr.tmpFile.Name())
} else {
return cr.tmpFile.Close()
}
}
Loading

0 comments on commit 234b3e3

Please sign in to comment.