Skip to content

Commit

Permalink
Rotating logger (#2446)
Browse files Browse the repository at this point in the history
* Rotating logger

* Testcase

* Update common/logger.go

Co-authored-by: Gauri Prasad <[email protected]>

* Fix function spelling and update comment

* Fix CI

* Add header

---------

Co-authored-by: Gauri Prasad <[email protected]>
  • Loading branch information
nakulkar-msft and gapra-msft authored Nov 28, 2023
1 parent d087a6b commit abd185c
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 7 deletions.
15 changes: 8 additions & 7 deletions common/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ package common

import (
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"io"
"log"
"os"
"path"
"runtime"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
)

type ILogger interface {
Expand All @@ -53,12 +53,14 @@ type ILoggerResetable interface {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

const maxLogSize = 500 * 1024 * 1024

type jobLogger struct {
// maximum loglevel represents the maximum severity of log messages which can be logged to Job Log file.
// any message with severity higher than this will be ignored.
jobID JobID
minimumLevelToLog LogLevel // The maximum customer-desired log level for this job
file *os.File // The job's log file
file io.WriteCloser // The job's log file
logFileFolder string // The log file's parent folder, needed for opening the file at the right place
logger *log.Logger // The Job's logger
sanitizer LogSanitizer
Expand All @@ -80,8 +82,7 @@ func (jl *jobLogger) OpenLog() {
return
}

file, err := os.OpenFile(path.Join(jl.logFileFolder, jl.jobID.String()+jl.logFileNameSuffix+".log"),
os.O_RDWR|os.O_CREATE|os.O_APPEND, DEFAULT_FILE_PERM)
file, err := NewRotatingWriter(path.Join(jl.logFileFolder, jl.jobID.String()+jl.logFileNameSuffix), maxLogSize)
PanicIfErr(err)

jl.file = file
Expand Down
115 changes: 115 additions & 0 deletions common/rotatingWriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright © 2023 microsoft <[email protected]>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package common

import (
"fmt"
"io"
"os"
"sync"
"sync/atomic"
)

type rotatingWriter struct {
filePath string
file *os.File
l sync.RWMutex
currentSuffix int32
currentSize uint64
maxLogSize uint64
}

func NewRotatingWriter(filePath string, size uint64) (io.WriteCloser, error) {
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, DEFAULT_FILE_PERM)
if err != nil {
return nil, err
}

return &rotatingWriter{
file: file,
filePath: filePath,
maxLogSize: size,
}, nil
}
// rotate() takes in a context inform of integer, and rotates log only
// if the context matches current suffix.
// rotate() should be called with a RLock held. It'll return back with
// RLock held.
func (w *rotatingWriter) rotate(suffix int32) error {
w.l.RUnlock()
defer w.l.RLock()

w.l.Lock()
defer w.l.Unlock()

if atomic.LoadInt32(&w.currentSuffix) > suffix {
// This log has already been rotated.
return nil
}

if err := w.file.Close(); err != nil {
return err
}

suffixString := fmt.Sprintf(".%d.log", w.currentSuffix)
if err := os.Rename(w.filePath, w.filePath + suffixString); err != nil {
return err
}

atomic.AddInt32(&w.currentSuffix, 1)
atomic.StoreUint64(&w.currentSize, 0)

// create new one
file, err := os.OpenFile(w.filePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, DEFAULT_FILE_PERM)
if err != nil {
return err
}

w.file = file

return nil
}

func (w *rotatingWriter) Close() error {
return w.file.Close()
}

func (w *rotatingWriter) Write(p []byte) (n int, err error) {
w.l.RLock()
defer w.l.RUnlock()

// We have to save curSuffix here so that if we rotate() the
// same log file we checked here.
currSuffix := atomic.LoadInt32(&w.currentSuffix)
if atomic.AddUint64(&w.currentSize, uint64(len(p))) <= w.maxLogSize {
// we've enough size
return w.file.Write(p)
}

//1. Take out these bytes
atomic.AddUint64(&w.currentSize, -uint64(len(p)))

if err := w.rotate(currSuffix); err != nil {
return 0, err
}

atomic.AddUint64(&w.currentSize, uint64(len(p)))
return w.file.Write(p)
}
118 changes: 118 additions & 0 deletions common/rotatingWriter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright © Microsoft <[email protected]>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package common

import (
"os"
"path"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestRotatingWriter(t *testing.T) {
convertToMap := func(entries []os.DirEntry) map[string]int {
ret := make(map[string]int)
for _, v := range entries {
ret[v.Name()] += 1
}

return ret
}

a := assert.New(t)
logFileName := "logfile"
data := "This string is one hundred bytes. Also has some junk to make actually make it one hundred bytes. Bye"

tmpDir, err := os.MkdirTemp("", "")
defer os.RemoveAll(tmpDir)

a.Nil(err)
logFile := path.Join(tmpDir, logFileName)

// 1. Create a rotating writer of size 100B
w, err := NewRotatingWriter(logFile, 100)
a.Nil(err)

// write 10 bytes and verify there is only one file in tmpDir
w.Write([]byte(data[:10]))
entries, err := os.ReadDir(tmpDir)
a.Nil(err)
a.Equal(1, len(entries))
a.Equal(logFileName, entries[0].Name())

// write 90 more bytes and verify there is still only one file
w.Write([]byte(data[:90]))
entries, err = os.ReadDir(tmpDir)
a.Nil(err)
a.Equal(1, len(entries))
a.Equal(logFileName, entries[0].Name())

// write 10 more bytes and verify a new log file is created
n, err := w.Write([]byte(data[:10]))
a.Equal(10, n)
a.Nil(err)

entries, err = os.ReadDir(tmpDir)
a.Nil(err)
a.Equal(2, len(entries))
f := convertToMap(entries)
a.Contains(f, logFileName)
a.Contains(f, logFileName + ".0.log")

// Write 80 bytes to prepare for next test
w.Write([]byte(data[:80]))

// parallelly write from 5 writers and verify log is rotated only once.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
w.Write([]byte(data[:10]))
n, err := w.Write([]byte(data[:10]))
a.Equal(10, n)
a.Nil(err)
wg.Done()
}()
}

wg.Wait()

// verify only one new log file is created.
entries, err = os.ReadDir(tmpDir)
a.Nil(err)
a.Equal(3, len(entries))
f = convertToMap(entries)
a.Contains(f, logFileName)
a.Contains(f, logFileName + ".0.log")
a.Contains(f, logFileName + ".1.log")

// close and verify we've 3 log files
w.Close()
entries, err = os.ReadDir(tmpDir)
a.Nil(err)
a.Equal(3, len(entries))
f = convertToMap(entries)
a.Contains(f, logFileName)
a.Contains(f, logFileName + ".0.log")
a.Contains(f, logFileName + ".1.log")
}

0 comments on commit abd185c

Please sign in to comment.