diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 9cdf662..375379b 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -273,7 +273,7 @@ func copyOnePVC(ctx context.Context, w *log.Logger, clientset k8sclient.Interfac if bufPodLogs == nil { continue } - line, _, err := bufPodLogs.ReadLine() + line, err := readLineWithTimeout(bufPodLogs, 30*time.Minute) if err != nil { if errors.Is(err, io.EOF) { break @@ -1115,3 +1115,31 @@ func resetReclaimPolicy(ctx context.Context, w *log.Logger, clientset k8sclient. return nil } + +// LineReader is a helper so we can easily implement tests using concrete implementations. This is +// implemented by bufio.Reader type and that is the type we are targeting here. +type LineReader interface { + ReadLine() ([]byte, bool, error) +} + +// readLineWithTimeout attempts to read a line from provided Reader respecting the provided timeout. +func readLineWithTimeout(reader LineReader, timeout time.Duration) ([]byte, error) { + type readerMessage struct { + line []byte + err error + } + + messages := make(chan readerMessage, 1) + go func() { + line, _, err := reader.ReadLine() + messages <- readerMessage{line, err} + close(messages) + }() + + select { + case <-time.NewTimer(timeout).C: + return nil, fmt.Errorf("timeout reading output") + case message := <-messages: + return message.line, message.err + } +} diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index 5c0d8f1..886ee31 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -3065,3 +3065,68 @@ func Test_copyAllPVCs(t *testing.T) { }) } } + +type mockReader struct { + fn func() ([]byte, bool, error) +} + +func (m mockReader) ReadLine() ([]byte, bool, error) { + return m.fn() +} + +func Test_readLineWithTimeout(t *testing.T) { + for _, tt := range []struct { + name string + timeout time.Duration + err string + output []byte + fn func() ([]byte, bool, error) + }{ + { + name: "immediatly return should work", + timeout: time.Second, + output: []byte(`testing`), + fn: func() ([]byte, bool, error) { + return []byte(`testing`), false, nil + }, + }, + { + name: "taking to long to read should fail with timeout", + timeout: 500 * time.Millisecond, + err: "timeout reading output", + fn: func() ([]byte, bool, error) { + time.Sleep(time.Second) + return []byte(`testing`), false, nil + }, + }, + { + name: "returned error from the reader should bubble up to the caller", + timeout: time.Second, + err: "this is a custom error", + fn: func() ([]byte, bool, error) { + return nil, false, fmt.Errorf("this is a custom error") + }, + }, + { + name: "slow read but with a bigger timeout should work", + timeout: 3 * time.Second, + output: []byte(`this is the returned message`), + fn: func() ([]byte, bool, error) { + time.Sleep(2 * time.Second) + return []byte(`this is the returned message`), false, nil + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + req := require.New(t) + var reader = &mockReader{fn: tt.fn} + line, err := readLineWithTimeout(reader, tt.timeout) + if len(tt.err) == 0 { + req.NoError(err, "unexpected error %v", err) + } else { + req.ErrorContains(err, tt.err) + } + req.Equal(line, tt.output, "expected %q, received %q", string(tt.output), string(line)) + }) + } +}