Skip to content

Commit

Permalink
Merge pull request #54 from nyaruka/simplify
Browse files Browse the repository at this point in the history
Simplify
  • Loading branch information
rowanseymour authored Nov 30, 2021
2 parents 6317b1f + 80dc347 commit b45faeb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 56 deletions.
45 changes: 16 additions & 29 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,75 +189,64 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}
rows.Close()

log.WithFields(logrus.Fields{
"msg_count": len(msgIDs),
}).Debug("found messages")
log.WithField("msg_count", len(msgIDs)).Debug("found messages")

// verify we don't see more messages than there are in our archive (fewer is ok)
if visibleCount > archive.RecordCount {
return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount)
}

// ok, delete our messages in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(msgIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(msgIDs) {
endIdx = len(msgIDs)
}
batchIDs := msgIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// now delete any channel logs
err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch)
if err != nil {
return fmt.Errorf("error removing channel logs: %s", err.Error())
return errors.Wrap(err, "error removing channel logs")
}

// then any labels
err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch)
if err != nil {
return fmt.Errorf("error removing message labels: %s", err.Error())
return errors.Wrap(err, "error removing message labels")
}

// unlink any responses
err = executeInQuery(ctx, tx, unlinkResponses, batchIDs)
err = executeInQuery(ctx, tx, unlinkResponses, idBatch)
if err != nil {
return fmt.Errorf("error unlinking responses: %s", err.Error())
return errors.Wrap(err, "error unlinking responses")
}

// finally, delete our messages
err = executeInQuery(ctx, tx, deleteMessages, batchIDs)
err = executeInQuery(ctx, tx, deleteMessages, idBatch)
if err != nil {
return fmt.Errorf("error deleting messages: %s", err.Error())
return errors.Wrap(err, "error deleting messages")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing message delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing message delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of messages")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages")

cancel()
}
Expand All @@ -270,14 +259,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting messages")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages")

return nil
}
Expand Down
41 changes: 14 additions & 27 deletions archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,69 +180,58 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
}
rows.Close()

log.WithFields(logrus.Fields{
"run_count": len(runIDs),
}).Debug("found runs")
log.WithField("run_count", len(runIDs)).Debug("found runs")

// verify we don't see more runs than there are in our archive (fewer is ok)
if runCount > archive.RecordCount {
return fmt.Errorf("more runs in the database: %d than in archive: %d", runCount, archive.RecordCount)
}

// ok, delete our runs in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(runIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(runIDs) {
endIdx = len(runIDs)
}
batchIDs := runIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setRunDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// any recent runs
err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs)
err = executeInQuery(ctx, tx, deleteRecentRuns, idBatch)
if err != nil {
return fmt.Errorf("error deleting recent runs: %s", err.Error())
return errors.Wrap(err, "error deleting recent runs")
}

// unlink any parents
err = executeInQuery(ctx, tx, unlinkParents, batchIDs)
err = executeInQuery(ctx, tx, unlinkParents, idBatch)
if err != nil {
return fmt.Errorf("error unliking parent runs: %s", err.Error())
return errors.Wrap(err, "error unliking parent runs")
}

// finally, delete our runs
err = executeInQuery(ctx, tx, deleteRuns, batchIDs)
err = executeInQuery(ctx, tx, deleteRuns, idBatch)
if err != nil {
return fmt.Errorf("error deleting runs: %s", err.Error())
return errors.Wrap(err, "error deleting runs")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing run delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing run delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of runs")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of runs")

cancel()
}
Expand All @@ -255,14 +244,12 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting runs")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting runs")

return nil
}
15 changes: 15 additions & 0 deletions archives/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package archives

// chunks a slice of in64 IDs
func chunkIDs(ids []int64, size int) [][]int64 {
chunks := make([][]int64, 0, len(ids)/size+1)

for i := 0; i < len(ids); i += size {
end := i + size
if end > len(ids) {
end = len(ids)
}
chunks = append(chunks, ids[i:end])
}
return chunks
}

0 comments on commit b45faeb

Please sign in to comment.