Skip to content

Commit

Permalink
delete broadcasts which no longer have any active messages
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Apr 26, 2019
1 parent 0f164ff commit 34f090e
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 7 deletions.
119 changes: 119 additions & 0 deletions archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -1280,6 +1281,120 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
return nil
}

// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them
func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error {
start := time.Now()
threshhold := now.AddDate(0, 0, -org.ActiveDays)

rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold)
if err != nil {
return err
}
defer rows.Close()

count := 0
for rows.Next() {
if count == 0 {
logrus.WithField("org_id", org.ID).Info("deleting broadcasts")
}

// been deleting this org more than an hour? thats enough for today, exit out
if time.Since(start) > time.Hour {
break
}

var broadcastID int64
err := rows.Scan(&broadcastID)
if err != nil {
return errors.Wrap(err, "unable to get broadcast id")
}

// make sure we have no active messages
var msgCount int64
err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID)
if err != nil {
return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID)
}

if msgCount != 0 {
logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still")
continue
}

// we delete broadcasts in a transaction per broadcast
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return errors.Wrapf(err, "error starting transaction while deleting broadcast: %d", broadcastID)
}

// delete contacts M2M
_, err = tx.Exec(`DELETE from msgs_broadcast_contacts WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related contacts for broadcast: %d", broadcastID)
}

// delete groups M2M
_, err = tx.Exec(`DELETE from msgs_broadcast_groups WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related groups for broadcast: %d", broadcastID)
}

// delete URNs M2M
_, err = tx.Exec(`DELETE from msgs_broadcast_urns WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting related urns for broadcast: %d", broadcastID)
}

// delete counts associated with this broadcast
_, err = tx.Exec(`DELETE from msgs_broadcastmsgcount WHERE broadcast_id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting counts for broadcast: %d", broadcastID)
}

// finally, delete our broadcast
_, err = tx.Exec(`DELETE from msgs_broadcast WHERE id = $1`, broadcastID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID)
}

err = tx.Commit()
if err != nil {
return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID)
}

count++
}

if count > 0 {
logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": count,
"org_id": org.ID,
}).Info("completed deleting broadcasts")
}

return nil
}

const selectOldOrgBroadcasts = `
SELECT
id
FROM
msgs_broadcast
WHERE
org_id = $1 AND
created_on < $2 AND
schedule_id IS NULL
ORDER BY
created_on ASC,
id ASC
`

const selectOrgRunsInRange = `
SELECT fr.id, fr.is_active, cc.is_test
FROM flows_flowrun fr
Expand Down Expand Up @@ -1527,6 +1642,10 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
switch a.ArchiveType {
case MessageType:
err = DeleteArchivedMessages(ctx, config, db, s3Client, a)
if err == nil {
err = DeleteBroadcasts(ctx, now, config, db, org)
}

case RunType:
err = DeleteArchivedRuns(ctx, config, db, s3Client, a)
default:
Expand Down
12 changes: 12 additions & 0 deletions archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ func TestArchiveOrgMessages(t *testing.T) {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)

created, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType)
assert.NoError(t, err)

Expand Down Expand Up @@ -397,6 +399,9 @@ func TestArchiveOrgMessages(t *testing.T) {
)
assert.NoError(t, err)
assert.Equal(t, 1, count)

// one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new
assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)
}
}

Expand All @@ -406,6 +411,13 @@ FROM flows_flowrun
WHERE org_id = $1 and modified_on >= $2 and modified_on < $3
`

func assertCount(t *testing.T, db *sqlx.DB, expected int, query string, args ...interface{}) {
var count int
err := db.Get(&count, query, args...)
assert.NoError(t, err, "error executing query: %s", query)
assert.Equal(t, expected, count, "counts mismatch for query %s", query)
}

func TestArchiveOrgRuns(t *testing.T) {
db := setup(t)
ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/naoina/go-stringutil v0.1.0
github.com/naoina/toml v0.1.1
github.com/nyaruka/ezconf v0.2.1
github.com/pkg/errors v0.8.0
github.com/pkg/errors v0.8.1
github.com/pmezard/go-difflib v1.0.0
github.com/sirupsen/logrus v1.0.5
github.com/stretchr/testify v1.2.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36 h1:i93kN7TI/4T5
github.com/getsentry/raven-go v0.0.0-20180430182053-263040ce1a36/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/go-ini/ini v1.36.0 h1:63En8accP8FKkFZ77ztSfvQf9kGRJN3qBIdItP46RRk=
github.com/go-ini/ini v1.36.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand All @@ -23,6 +24,7 @@ github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 h1:hRGSmZu7j271trc9sneMrpOW
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
Expand All @@ -32,6 +34,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I=
Expand Down
42 changes: 36 additions & 6 deletions testdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,38 @@ CREATE TABLE msgs_broadcast (
"text" hstore NOT NULL,
purged BOOLEAN NOT NULL,
created_on timestamp with time zone NOT NULL,
schedule_id int NULL,
org_id integer NOT NULL references orgs_org(id) on delete cascade
);

DROP TABLE IF EXISTS msgs_broadcast_contacts;
CREATE TABLE msgs_broadcast_contacts (
id serial primary key,
broadcast_id integer NOT NULL,
contact_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_broadcast_groups;
CREATE TABLE msgs_broadcast_groups (
id serial primary key,
broadcast_id integer NOT NULL,
group_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_broadcast_urns;
CREATE TABLE msgs_broadcast_urns (
id serial primary key,
broadcast_id integer NOT NULL,
contacturn_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_broadcastmsgcount;
CREATE TABLE msgs_broadcastmsgcount (
id serial primary key,
"count" integer NOT NULL,
broadcast_id integer NOT NULL
);

DROP TABLE IF EXISTS msgs_label CASCADE;
CREATE TABLE msgs_label (
id serial primary key,
Expand Down Expand Up @@ -266,13 +295,19 @@ INSERT INTO contacts_contactgroup_contacts(id, contact_id, contactgroup_id) VALU
(3, 1, 4),
(4, 3, 4);

INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id, schedule_id) VALUES
(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, 1),
(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2, NULL),
(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2, NULL),
(4, 'base=>"new"'::hstore, '2019-08-12 19:11:59.890662+02:00', FALSE, 2, NULL);

INSERT INTO msgs_msg(id, broadcast_id, uuid, text, created_on, sent_on, modified_on, direction, status, visibility, msg_type, attachments, channel_id, contact_id, contact_urn_id, org_id, msg_count, error_count, next_attempt, response_to_id) VALUES
(1, NULL, '2f969340-704a-4aa2-a1bd-2f832a21d257', 'message 1', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(2, NULL, 'abe87ac1-015c-4803-be29-1e89509fe682', 'message 2', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'D', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(3, NULL, 'a7e83a22-a6ff-4e18-82d0-19545640ccba', 'message 3', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'H', 'V', 'I', '{"image/png:https://foo.bar/image1.png", "image/png:https://foo.bar/image2.png"}', NULL, 6, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(4, NULL, '1cad36af-5581-4c8a-81cd-83708398f61e', 'message 4', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', '2017-08-13 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-08-13 21:11:59.890662+00', NULL),
(5, NULL, 'f557972e-2eb5-42fa-9b87-902116d18787', 'message 5', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', '2017-08-11 21:11:59.890662+02:00', 'I', 'H', 'V', 'I', NULL, 3, 7, 8, 3, 1, 0, '2017-08-11 21:11:59.890662+02:00', NULL),
(6, NULL, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL),
(6, 2, '579d148c-0ab1-4afb-832f-afb1fe0e19b7', 'message 6', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', '2017-10-08 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2017-10-08 21:11:59.890662+00', NULL),
(7, NULL, '7aeca469-2593-444e-afe4-4702317534c9', 'message 7', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', '2018-01-02 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 6, 7, 2, 1, 0, '2018-01-02 21:11:59.890662+00', 2),
(8, NULL, '48fab92b-6d75-45c8-9121-81176d97bdbf', 'message 8', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'I', 'H', 'V', 'I', NULL, 2, 11, 7, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL),
(9, NULL, 'e14ab466-0d3b-436d-a0f7-5851fd7d9b7d', 'message 9', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', '2017-08-12 21:11:59.890662+00', 'O', 'S', 'V', 'F', NULL, NULL, 6, NULL, 2, 1, 0, '2017-08-12 21:11:59.890662+00', NULL);
Expand All @@ -296,11 +331,6 @@ INSERT INTO channels_channellog(id, msg_id) VALUES
(5, 5),
(6, 6);

INSERT INTO msgs_broadcast(id, text, created_on, purged, org_id) VALUES
(1, 'eng=>"hello",fre=>"bonjour"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2),
(2, 'base=>"hola"'::hstore, '2017-08-12 22:11:59.890662+02:00', TRUE, 2),
(3, 'base=>"not purged"'::hstore, '2017-08-12 19:11:59.890662+02:00', FALSE, 2);

INSERT INTO flows_flow(id, uuid, name) VALUES
(1, '6639286a-9120-45d4-aa39-03ae3942a4a6', 'Flow 1'),
(2, '629db399-a5fb-4fa0-88e6-f479957b63d2', 'Flow 2'),
Expand Down

0 comments on commit 34f090e

Please sign in to comment.