From 8c7387c86e24a11c9830699485c4bdd9f4d1013d Mon Sep 17 00:00:00 2001 From: Ricardo Maraschini Date: Wed, 4 Jan 2023 14:56:34 +0100 Subject: [PATCH 1/3] feat: bails out if rsync pod is silent if rsync pod does not log anything for 30 minutes we signalize a timeout and bail out the migration. --- pkg/migrate/migrate.go | 30 ++++++++++++++- pkg/migrate/migrate_test.go | 76 +++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 9cdf662..375379b 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -273,7 +273,7 @@ func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interfac if bufPodLogs == nil { continue } - line, _, err := bufPodLogs.ReadLine() + line, err := readLineWithTimeout(bufPodLogs, 30*time.Minute) if err != nil { if errors.Is(err, io.EOF) { break @@ -1115,3 +1115,31 @@ func resetReclaimPolicy(ctx context.Context, w *log.Logger, clientset k8sclient. return nil } + +// LineReader is a helper so we can easily implement tests using concrete implementations. This is +// implemented by bufio.Reader type and that is the type we are targeting here. +type LineReader interface { + ReadLine() ([]byte, bool, error) +} + +// readLineWithTimeout attempts to read a line from provided Reader respecting the provided timeout. +func readLineWithTimeout(reader LineReader, timeout time.Duration) ([]byte, error) { + type readerMessage struct { + line []byte + err error + } + + messages := make(chan readerMessage, 1) + go func() { + line, _, err := reader.ReadLine() + messages <- readerMessage{line, err} + close(messages) + }() + + select { + case <-time.NewTimer(timeout).C: + return nil, fmt.Errorf("timeout reading output") + case message := <-messages: + return message.line, message.err + } +} diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index 5c0d8f1..a957099 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "reflect" "strings" "testing" "time" @@ -3065,3 +3066,78 @@ func Test_copyAllPVCs(t *testing.T) { }) } } + +type mockReader struct { + fn func() ([]byte, bool, error) +} + +func (m mockReader) ReadLine() ([]byte, bool, error) { + return m.fn() +} + +func Test_readLineWithTimeout(t *testing.T) { + for _, tt := range []struct { + name string + timeout time.Duration + err string + output []byte + fn func() ([]byte, bool, error) + }{ + { + name: "immediatly return should work", + timeout: time.Second, + output: []byte(`testing`), + fn: func() ([]byte, bool, error) { + return []byte(`testing`), false, nil + }, + }, + { + name: "taking to long to read should fail with timeout", + timeout: 500 * time.Millisecond, + err: "timeout reading output", + fn: func() ([]byte, bool, error) { + time.Sleep(time.Second) + return []byte(`testing`), false, nil + }, + }, + { + name: "returned error from the reader should bubble up to the caller", + timeout: time.Second, + err: "this is a custom error", + fn: func() ([]byte, bool, error) { + return nil, false, fmt.Errorf("this is a custom error") + }, + }, + { + name: "slow read but with a bigger timeout should work", + timeout: 3 * time.Second, + output: []byte(`this is the returned message`), + fn: func() ([]byte, bool, error) { + time.Sleep(2 * time.Second) + return []byte(`this is the returned message`), false, nil + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + var reader = &mockReader{fn: tt.fn} + line, err := readLineWithTimeout(reader, tt.timeout) + if err != nil { + if len(tt.err) == 0 { + t.Errorf("unexpected error: %s", err) + } else if !strings.Contains(err.Error(), tt.err) { + t.Errorf("expecting %q, %q received instead", tt.err, err) + } + return + } + + if len(tt.err) > 0 { + t.Errorf("expecting error %q, nil received instead", tt.err) + return + } + + if !reflect.DeepEqual(line, tt.output) { + t.Errorf("expected %s, received %s", string(tt.output), string(line)) + } + }) + } +} From 361a546f850311565563998826015d9e48eab7c6 Mon Sep 17 00:00:00 2001 From: Ricardo Maraschini Date: Wed, 4 Jan 2023 16:47:51 +0100 Subject: [PATCH 2/3] chore: using testify instead of reflect --- pkg/migrate/migrate_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index a957099..da22d4e 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log" - "reflect" "strings" "testing" "time" @@ -3135,9 +3134,8 @@ func Test_readLineWithTimeout(t *testing.T) { return } - if !reflect.DeepEqual(line, tt.output) { - t.Errorf("expected %s, received %s", string(tt.output), string(line)) - } + req := require.New(t) + req.Equal(line, tt.output, "expected %q, received %q", string(tt.output), string(line)) }) } } From 13965b1d8e6a5ea1b36a9eb4fd25f767996c80c6 Mon Sep 17 00:00:00 2001 From: Ricardo Maraschini Date: Wed, 4 Jan 2023 17:37:18 +0100 Subject: [PATCH 3/3] chore: using testify testify is the default here. --- pkg/migrate/migrate_test.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index da22d4e..886ee31 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -3118,23 +3118,14 @@ func Test_readLineWithTimeout(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { + req := require.New(t) var reader = &mockReader{fn: tt.fn} line, err := readLineWithTimeout(reader, tt.timeout) - if err != nil { - if len(tt.err) == 0 { - t.Errorf("unexpected error: %s", err) - } else if !strings.Contains(err.Error(), tt.err) { - t.Errorf("expecting %q, %q received instead", tt.err, err) - } - return - } - - if len(tt.err) > 0 { - t.Errorf("expecting error %q, nil received instead", tt.err) - return + if len(tt.err) == 0 { + req.NoError(err, "unexpected error %v", err) + } else { + req.ErrorContains(err, tt.err) } - - req := require.New(t) req.Equal(line, tt.output, "expected %q, received %q", string(tt.output), string(line)) }) }