Skip to content

Commit

Permalink
fix: no timezone while parsing (#2671)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored Nov 28, 2022
1 parent 0941cc0 commit 9f006e5
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
8 changes: 6 additions & 2 deletions enterprise/replay/dumpsloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ func getMinMaxCreatedAt(key string) (int64, int64, error) {
return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("%s 's parse with _ gave tokens more than 3. Expected 3", key)
}
keyTokens = strings.Split(keyTokens[2], ".")
if len(keyTokens) != 6 {
return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("%s 's parse with . gave tokens more than 6. Expected 6", keyTokens[2])
if len(keyTokens) > 7 {
return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("%s 's parse with . gave tokens more than 7. Expected 6 or 7", keyTokens[2])
}

if len(keyTokens) < 6 { // for backward compatibility TODO: remove this check after some time
return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("%s 's parse with . gave tokens less than 6. Expected 6 or 7", keyTokens[2])
}
minJobCreatedAt, err = strconv.ParseInt(keyTokens[3], 10, 64)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions enterprise/replay/sourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath stri
copy(copyLineBytes, lineBytes)

if transformationVersionID == "" {
createdAt, err := time.Parse(misc.POSTGRESTIMEFORMATPARSE, gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(createdAt)).String())
timeStamp := gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(createdAt)).String()
createdAt, err := time.Parse(misc.NOTIMEZONEFORMATPARSE, getFormattedTimeStamp(timeStamp))
if err != nil {
worker.log.Errorf("failed to parse created at: %s", err)
continue
Expand Down Expand Up @@ -178,7 +179,7 @@ func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath stri
worker.log.Errorf("Error getting created at from transformer output: %v", err)
continue
}
createdAt, err := time.Parse(misc.POSTGRESTIMEFORMATPARSE, createdAtString)
createdAt, err := time.Parse(misc.NOTIMEZONEFORMATPARSE, getFormattedTimeStamp(createdAtString))
if err != nil {
worker.log.Errorf("failed to parse created at: %s", err)
continue
Expand All @@ -197,6 +198,7 @@ func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath stri
Parameters: params,
CustomVal: ev.Output[worker.getFieldIdentifier(customVal)].(string),
EventPayload: destEventJSON,
WorkspaceId: ev.Output[worker.getFieldIdentifier(workspaceID)].(string),
}
jobs = append(jobs, &job)
}
Expand Down Expand Up @@ -256,9 +258,15 @@ func (worker *SourceWorkerT) getFieldIdentifier(field string) string {
return "CustomVal"
case eventPayload:
return "EventPayload"
case workspaceID:
return "WorkspaceID"
case createdAt:
return "CreatedAt"
default:
return ""
}
}

func getFormattedTimeStamp(timeStamp string) string {
return strings.Split(strings.TrimSuffix(timeStamp, "Z"), ".")[0]
}
1 change: 1 addition & 0 deletions utils/misc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
// RFC3339Milli with milli sec precision
RFC3339Milli = "2006-01-02T15:04:05.000Z07:00"
POSTGRESTIMEFORMATPARSE = "2006-01-02T15:04:05Z"
NOTIMEZONEFORMATPARSE = "2006-01-02T15:04:05"
)

const (
Expand Down

0 comments on commit 9f006e5

Please sign in to comment.