Skip to content

Commit

Permalink
feat: change log retention period to one week (#2403)
Browse files Browse the repository at this point in the history
## Description
Decreases log retention period to a week.


## Is this change user facing?
YES
  • Loading branch information
tedim52 authored Apr 30, 2024
1 parent 6d14a38 commit 3f68795
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@ type LogFileManager struct {
filesystem volume_filesystem.VolumeFilesystem

time logs_clock.LogsClock

logRetentionPeriodInWeeks int
}

func NewLogFileManager(
kurtosisBackend backend_interface.KurtosisBackend,
filesystem volume_filesystem.VolumeFilesystem,
time logs_clock.LogsClock) *LogFileManager {
time logs_clock.LogsClock,
logRetentionPeriodInWeeks int) *LogFileManager {
return &LogFileManager{
kurtosisBackend: kurtosisBackend,
filesystem: filesystem,
time: time,
kurtosisBackend: kurtosisBackend,
filesystem: filesystem,
time: time,
logRetentionPeriodInWeeks: logRetentionPeriodInWeeks,
}
}

Expand Down Expand Up @@ -125,7 +129,7 @@ func (manager *LogFileManager) CreateLogFiles(ctx context.Context) error {
// RemoveLogsBeyondRetentionPeriod implements the Job cron interface. It removes logs a week older than the log retention period.
func (manager *LogFileManager) RemoveLogsBeyondRetentionPeriod() {
// compute the next oldest week
year, weekToRemove := manager.time.Now().Add(time.Duration(-volume_consts.LogRetentionPeriodInWeeks) * oneWeek).ISOWeek()
year, weekToRemove := manager.time.Now().Add(time.Duration(-manager.logRetentionPeriodInWeeks) * oneWeek).ISOWeek()

// remove directory for that week
oldLogsDirPath := getLogsDirPathForWeek(year, weekToRemove)
Expand All @@ -146,7 +150,7 @@ func (manager *LogFileManager) RemoveAllLogs() error {

func (manager *LogFileManager) RemoveEnclaveLogs(enclaveUuid string) error {
currentTime := manager.time.Now()
for i := 0; i < volume_consts.LogRetentionPeriodInWeeks; i++ {
for i := 0; i < manager.logRetentionPeriodInWeeks; i++ {
year, week := currentTime.Add(time.Duration(-i) * oneWeek).ISOWeek()
enclaveLogsDirPathForWeek := getEnclaveLogsDirPath(year, week, enclaveUuid)
if err := manager.filesystem.RemoveAll(enclaveLogsDirPathForWeek); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestRemoveLogsBeyondRetentionPeriod(t *testing.T) {
_, _ = mockFs.Create(week1filepath)
_, _ = mockFs.Create(week2filepath)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
logFileManager.RemoveLogsBeyondRetentionPeriod() // should remove week 49 logs

_, err := mockFs.Stat(week49filepath)
Expand All @@ -69,7 +69,7 @@ func TestRemoveEnclaveLogs(t *testing.T) {
_, _ = mockFs.Create(week52filepath)
_, _ = mockFs.Create(week52filepathDiffService)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
err := logFileManager.RemoveEnclaveLogs(testEnclaveUuid) // should remove only all log files for enclave one

require.NoError(t, err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestRemoveAllLogs(t *testing.T) {
_, _ = mockFs.Create(week52filepath)
_, _ = mockFs.Create(week52filepathDiffService)

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
err := logFileManager.RemoveAllLogs()

require.NoError(t, err)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestCreateLogFiles(t *testing.T) {
expectedServiceNameFilePath := getFilepathStr(2022, 52, testEnclaveUuid, testUserService1Name)
expectedServiceShortUuidFilePath := getFilepathStr(2022, 52, testEnclaveUuid, uuid_generator.ShortenedUUIDString(testUserService1Uuid))

logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime)
logFileManager := NewLogFileManager(mockKurtosisBackend, mockFs, mockTime, 5)
err := logFileManager.CreateLogFiles(ctx)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
testUserService2Uuid = "test-user-service-2"
testUserService3Uuid = "test-user-service-3"

retentionPeriodInWeeksForTesting = 5

utcFormat = time.RFC3339
defaultUTCTimestampStr = "2023-09-06T00:35:15-04:00"
logLine1 = "{\"log\":\"Starting feature 'centralized logs'\", \"timestamp\":\"2023-09-06T00:35:15-04:00\"}"
Expand All @@ -43,7 +45,7 @@ const (
notFoundedFilterText = "it shouldn't be found in the log lines"
firstMatchRegexFilterStr = "Starting.*idempotently'"

testTimeOut = 2 * time.Second
testTimeOut = 2 * time.Minute
doNotFollowLogs = false

defaultYear = 2023
Expand Down Expand Up @@ -128,7 +130,7 @@ func TestStreamUserServiceLogsPerWeek_WithFilters(t *testing.T) {

underlyingFs := createFilledPerWeekFilesystem(startingWeek)
mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -211,7 +213,7 @@ func TestStreamUserServiceLogsPerWeek_NoLogsFromPersistentVolume(t *testing.T) {

underlyingFs := createEmptyPerWeekFilesystem(startingWeek)
mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -316,7 +318,7 @@ func TestStreamUserServiceLogsPerWeek_ThousandsOfLogLinesSuccessfulExecution(t *
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -403,7 +405,7 @@ func TestStreamUserServiceLogsPerWeek_EmptyLogLines(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -470,7 +472,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogsAcrossWeeks(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -539,7 +541,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, 4, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

receivedUserServiceLogsByUuid, testEvaluationErr := executeStreamCallAndGetReceivedServiceLogLines(
t,
Expand Down Expand Up @@ -590,7 +592,7 @@ func TestStreamUserServiceLogsPerWeekReturnsTimestampedLogLines(t *testing.T) {
require.NoError(t, err)

mockTime := logs_clock.NewMockLogsClock(defaultYear, startingWeek, defaultDay)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime)
perWeekStreamStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

expectedTime, err := time.Parse(utcFormat, defaultUTCTimestampStr)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ const (
// [.../28/d3e8832d671f/61830789f03a.json] is the file containing logs from service with uuid 61830789f03a, in enclave with uuid d3e8832d671f,
// in the 28th week of the current year
type PerWeekStreamLogsStrategy struct {
time logs_clock.LogsClock
time logs_clock.LogsClock
logRetentionPeriodInWeeks int
}

func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock) *PerWeekStreamLogsStrategy {
func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock, logRetentionPeriodInWeeks int) *PerWeekStreamLogsStrategy {
return &PerWeekStreamLogsStrategy{
time: time,
time: time,
logRetentionPeriodInWeeks: logRetentionPeriodInWeeks,
}
}

Expand All @@ -55,7 +57,7 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
shouldReturnAllLogs bool,
numLogLines uint32,
) {
paths, err := strategy.getLogFilePaths(fs, volume_consts.LogRetentionPeriodInWeeks, string(enclaveUuid), string(serviceUuid))
paths, err := strategy.getLogFilePaths(fs, strategy.logRetentionPeriodInWeeks, string(enclaveUuid), string(serviceUuid))
if err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred retrieving log file paths for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid)
return
Expand All @@ -68,11 +70,11 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
serviceUuid, enclaveUuid)
return
}
if len(paths) > volume_consts.LogRetentionPeriodInWeeks {
if len(paths) > strategy.logRetentionPeriodInWeeks {
logrus.Warnf(
`We expected to retrieve logs going back '%v' weeks, but instead retrieved logs going back '%v' weeks.
This means logs past the retention period are being returned, likely a bug in Kurtosis.`,
volume_consts.LogRetentionPeriodInWeeks, len(paths))
strategy.logRetentionPeriodInWeeks, len(paths))
}

logsReader, files, err := getLogsReader(fs, paths)
Expand Down Expand Up @@ -347,7 +349,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(

// Returns true if [logLine] has no timestamp
func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logline.LogLine) (bool, error) {
retentionPeriod := strategy.time.Now().Add(time.Duration(-volume_consts.LogRetentionPeriodInWeeks) * oneWeek)
retentionPeriod := strategy.time.Now().Add(time.Duration(-strategy.logRetentionPeriodInWeeks) * oneWeek)
timestamp := logLine.GetTimestamp()
return timestamp.After(retentionPeriod), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
testEnclaveUuid = "test-enclave"
testUserService1Uuid = "test-user-service-1"

defaultRetentionPeriodInWeeks = volume_consts.LogRetentionPeriodInWeeks
retentionPeriodInWeeksForTesting = 5

defaultYear = 2023
defaultDay = 0 // sunday
Expand Down Expand Up @@ -54,8 +54,8 @@ func TestGetLogFilePaths(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand Down Expand Up @@ -91,8 +91,8 @@ func TestGetLogFilePathsAcrossNewYear(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand Down Expand Up @@ -128,8 +128,8 @@ func TestGetLogFilePathsAcrossNewYearWith53Weeks(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(2016, currentWeek, 1)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestGetLogFilePathsWithDiffRetentionPeriod(t *testing.T) {
}

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriod, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
Expand Down Expand Up @@ -192,11 +192,11 @@ func TestGetLogFilePathsReturnsAllAvailableWeeks(t *testing.T) {
currentWeek := 2

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Less(t, len(logFilePaths), defaultRetentionPeriodInWeeks)
require.Less(t, len(logFilePaths), retentionPeriodInWeeksForTesting)
for i, filePath := range expectedLogFilePaths {
require.Equal(t, filePath, logFilePaths[i])
}
Expand All @@ -217,8 +217,8 @@ func TestGetLogFilePathsReturnsCorrectPathsIfWeeksMissingInBetween(t *testing.T)
currentWeek := 3

mockTime := logs_clock.NewMockLogsClock(defaultYear, currentWeek, defaultDay)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Len(t, logFilePaths, 1)
Expand Down Expand Up @@ -246,8 +246,8 @@ func TestGetLogFilePathsReturnsCorrectPathsIfCurrentWeekHasNoLogsYet(t *testing.
week2filepath,
}

strategy := NewPerWeekStreamLogsStrategy(mockTime)
logFilePaths, err := strategy.getLogFilePaths(filesystem, defaultRetentionPeriodInWeeks, testEnclaveUuid, testUserService1Uuid)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)
logFilePaths, err := strategy.getLogFilePaths(filesystem, retentionPeriodInWeeksForTesting, testEnclaveUuid, testUserService1Uuid)

require.NoError(t, err)
require.Equal(t, len(expectedLogFilePaths), len(logFilePaths))
Expand All @@ -264,7 +264,7 @@ func TestIsWithinRetentionPeriod(t *testing.T) {

// week 41 would put the log line outside the retention period
mockTime := logs_clock.NewMockLogsClock(2023, 41, 0)
strategy := NewPerWeekStreamLogsStrategy(mockTime)
strategy := NewPerWeekStreamLogsStrategy(mockTime, retentionPeriodInWeeksForTesting)

timestamp, err := parseTimestampFromJsonLogLine(jsonLogLine)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const (

EndOfJsonLine = "}"

// promise to keep 4 weeks of logs for users, but store an additional week for safety
LogRetentionPeriodInWeeks = 5
// promise to keep 1 weeks of logs for users
LogRetentionPeriodInWeeks = 1

RemoveLogsWaitHours = 6 * time.Hour

Expand Down
5 changes: 3 additions & 2 deletions engine/server/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package main
import (
"context"
"fmt"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_consts"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -174,7 +175,7 @@ func runMain() error {
// TODO: Move log file management into LogsDatabaseClient
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()
logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, realTime)
logFileManager := log_file_manager.NewLogFileManager(kurtosisBackend, osFs, realTime, volume_consts.LogRetentionPeriodInWeeks)
logFileManager.StartLogFileManagement(ctx)

enclaveManager, err := getEnclaveManager(
Expand Down Expand Up @@ -389,7 +390,7 @@ func getLogsDatabaseClient(kurtosisBackendType args.KurtosisBackendType, kurtosi
case args.KurtosisBackendType_Docker:
osFs := volume_filesystem.NewOsVolumeFilesystem()
realTime := logs_clock.NewRealClock()
perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime)
perWeekStreamLogsStrategy := stream_logs_strategy.NewPerWeekStreamLogsStrategy(realTime, volume_consts.LogRetentionPeriodInWeeks)
logsDatabaseClient = persistent_volume.NewPersistentVolumeLogsDatabaseClient(kurtosisBackend, osFs, perWeekStreamLogsStrategy)
case args.KurtosisBackendType_Kubernetes:
logsDatabaseClient = kurtosis_backend.NewKurtosisBackendLogsDatabaseClient(kurtosisBackend)
Expand Down

0 comments on commit 3f68795

Please sign in to comment.